Skip to content

Commit

Permalink
Merge pull request #264 from linebender/async
Browse files Browse the repository at this point in the history
Experimental async wiring
  • Loading branch information
raphlinus authored Jan 27, 2023
2 parents ca79d5c + db018da commit 6a18424
Show file tree
Hide file tree
Showing 5 changed files with 624 additions and 338 deletions.
21 changes: 20 additions & 1 deletion examples/with_winit/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{borrow::Cow, path::PathBuf, time::Instant};

use clap::Parser;
use vello::{
block_on_wgpu,
kurbo::{Affine, Vec2},
util::RenderContext,
Renderer, Scene, SceneBuilder,
Expand Down Expand Up @@ -187,6 +188,24 @@ async fn run(event_loop: EventLoop<UserEvent>, window: Window, args: Args) {
.surface
.get_current_texture()
.expect("failed to get surface texture");
#[cfg(not(target_arch = "wasm32"))]
{
block_on_wgpu(
&device_handle.device,
renderer.render_to_surface_async(
&device_handle.device,
&device_handle.queue,
&scene,
&surface_texture,
width,
height,
),
)
.expect("failed to render to surface");
}
// Note: in the wasm case, we're currently not running the robust
// pipeline, as it requires more async wiring for the readback.
#[cfg(target_arch = "wasm32")]
renderer
.render_to_surface(
&device_handle.device,
Expand All @@ -198,7 +217,7 @@ async fn run(event_loop: EventLoop<UserEvent>, window: Window, args: Args) {
)
.expect("failed to render to surface");
surface_texture.present();
device_handle.device.poll(wgpu::Maintain::Wait);
device_handle.device.poll(wgpu::Maintain::Poll);
}
Event::UserEvent(event) => match event {
#[cfg(not(target_arch = "wasm32"))]
Expand Down
184 changes: 98 additions & 86 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@

use std::{
borrow::Cow,
collections::{hash_map::Entry, HashMap},
collections::{hash_map::Entry, HashMap, HashSet},
num::{NonZeroU32, NonZeroU64},
sync::atomic::{AtomicU64, Ordering},
};

use futures_intrusive::channel::shared::GenericOneshotReceiver;
use parking_lot::RawMutex;
use wgpu::{
util::DeviceExt, BindGroup, BindGroupLayout, Buffer, BufferAsyncError, BufferSlice,
BufferUsages, BufferView, ComputePipeline, Device, Queue, Texture, TextureAspect,
TextureFormat, TextureUsages, TextureView, TextureViewDimension,
util::DeviceExt, BindGroup, BindGroupLayout, Buffer, BufferUsages, ComputePipeline, Device,
Queue, Texture, TextureAspect, TextureFormat, TextureUsages, TextureView, TextureViewDimension,
};

pub type Error = Box<dyn std::error::Error>;
Expand All @@ -42,6 +39,8 @@ static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
pub struct Engine {
shaders: Vec<Shader>,
pool: ResourcePool,
bind_map: BindMap,
downloads: HashMap<Id, Buffer>,
}

struct Shader {
Expand Down Expand Up @@ -96,11 +95,8 @@ pub enum Command {
Dispatch(ShaderId, (u32, u32, u32), Vec<ResourceProxy>),
Download(BufProxy),
Clear(BufProxy, u64, Option<NonZeroU64>),
}

#[derive(Default)]
pub struct Downloads {
buf_map: HashMap<Id, Buffer>,
FreeBuf(BufProxy),
FreeImage(ImageProxy),
}

/// The type of resource that will be bound to a slot in a shader.
Expand Down Expand Up @@ -149,6 +145,8 @@ impl Engine {
Engine {
shaders: vec![],
pool: Default::default(),
bind_map: Default::default(),
downloads: Default::default(),
}
}

Expand Down Expand Up @@ -249,28 +247,32 @@ impl Engine {
queue: &Queue,
recording: &Recording,
external_resources: &[ExternalResource],
) -> Result<Downloads, Error> {
let mut bind_map = BindMap::default();
let mut downloads = Downloads::default();
) -> Result<(), Error> {
let mut free_bufs: HashSet<Id> = Default::default();
let mut free_images: HashSet<Id> = Default::default();

let mut encoder = device.create_command_encoder(&Default::default());
for command in &recording.commands {
match command {
Command::Upload(buf_proxy, bytes) => {
let usage =
BufferUsages::COPY_SRC | BufferUsages::COPY_DST | BufferUsages::STORAGE;
let buf = self.pool.get_buf(buf_proxy, usage, device);
let buf = self
.pool
.get_buf(buf_proxy.size, buf_proxy.name, usage, device);
// TODO: if buffer is newly created, might be better to make it mapped at creation
// and copy. However, we expect reuse will be most common.
queue.write_buffer(&buf, 0, bytes);
bind_map.insert_buf(buf_proxy, buf);
self.bind_map.insert_buf(buf_proxy, buf);
}
Command::UploadUniform(buf_proxy, bytes) => {
let usage = BufferUsages::UNIFORM | BufferUsages::COPY_DST;
// Same consideration as above
let buf = self.pool.get_buf(buf_proxy, usage, device);
let buf = self
.pool
.get_buf(buf_proxy.size, buf_proxy.name, usage, device);
queue.write_buffer(&buf, 0, bytes);
bind_map.insert_buf(buf_proxy, buf);
self.bind_map.insert_buf(buf_proxy, buf);
}
Command::UploadImage(image_proxy, bytes) => {
let buf = device.create_buffer_init(&wgpu::util::BufferInitDescriptor {
Expand Down Expand Up @@ -322,12 +324,13 @@ impl Engine {
depth_or_array_layers: 1,
},
);
bind_map.insert_image(image_proxy.id, texture, texture_view)
self.bind_map
.insert_image(image_proxy.id, texture, texture_view)
}
Command::Dispatch(shader_id, wg_size, bindings) => {
// println!("dispatching {:?} with {} bindings", wg_size, bindings.len());
let shader = &self.shaders[shader_id.0];
let bind_group = bind_map.create_bind_group(
let bind_group = self.bind_map.create_bind_group(
device,
&shader.bind_group_layout,
bindings,
Expand All @@ -340,18 +343,20 @@ impl Engine {
cpass.dispatch_workgroups(wg_size.0, wg_size.1, wg_size.2);
}
Command::Download(proxy) => {
let src_buf = bind_map.buf_map.get(&proxy.id).ok_or("buffer not in map")?;
let buf = device.create_buffer(&wgpu::BufferDescriptor {
label: Some(proxy.name),
size: proxy.size,
usage: wgpu::BufferUsages::MAP_READ | wgpu::BufferUsages::COPY_DST,
mapped_at_creation: false,
});
let src_buf = self
.bind_map
.buf_map
.get(&proxy.id)
.ok_or("buffer not in map")?;
let usage = BufferUsages::MAP_READ | BufferUsages::COPY_DST;
let buf = self.pool.get_buf(proxy.size, "download", usage, device);
encoder.copy_buffer_to_buffer(&src_buf.buffer, 0, &buf, 0, proxy.size);
downloads.buf_map.insert(proxy.id, buf);
self.downloads.insert(proxy.id, buf);
}
Command::Clear(proxy, offset, size) => {
let buffer = bind_map.get_or_create(*proxy, device, &mut self.pool)?;
let buffer = self
.bind_map
.get_or_create(*proxy, device, &mut self.pool)?;
#[cfg(not(target_arch = "wasm32"))]
encoder.clear_buffer(buffer, *offset, *size);
#[cfg(target_arch = "wasm32")]
Expand All @@ -366,11 +371,42 @@ impl Engine {
queue.write_buffer(buffer, *offset, &zeros);
}
}
Command::FreeBuf(proxy) => {
free_bufs.insert(proxy.id);
}
Command::FreeImage(proxy) => {
free_images.insert(proxy.id);
}
}
}
queue.submit(Some(encoder.finish()));
self.pool.reap_bindmap(bind_map);
Ok(downloads)
for id in free_bufs {
if let Some(buf) = self.bind_map.buf_map.remove(&id) {
let props = BufferProperties {
size: buf.buffer.size(),
usages: buf.buffer.usage(),
#[cfg(feature = "buffer_labels")]
name: buf.label,
};
self.pool.bufs.entry(props).or_default().push(buf.buffer);
}
}
for id in free_images {
if let Some((texture, view)) = self.bind_map.image_map.remove(&id) {
// TODO: have a pool to avoid needless re-allocation
drop(texture);
drop(view);
}
}
Ok(())
}

pub fn get_download(&self, buf: BufProxy) -> Option<&Buffer> {
self.downloads.get(&buf.id)
}

pub fn free_download(&mut self, buf: BufProxy) {
self.downloads.remove(&buf.id);
}
}

Expand Down Expand Up @@ -418,13 +454,32 @@ impl Recording {
));
}

/// Prepare a buffer for downloading.
///
/// Currently this copies to a download buffer. The original buffer can be freed
/// immediately after.
pub fn download(&mut self, buf: BufProxy) {
self.push(Command::Download(buf));
}

pub fn clear_all(&mut self, buf: BufProxy) {
self.push(Command::Clear(buf, 0, None));
}

pub fn free_buf(&mut self, buf: BufProxy) {
self.push(Command::FreeBuf(buf));
}

pub fn free_image(&mut self, image: ImageProxy) {
self.push(Command::FreeImage(image));
}

pub fn free_resource(&mut self, resource: ResourceProxy) {
match resource {
ResourceProxy::Buf(buf) => self.free_buf(buf),
ResourceProxy::Image(image) => self.free_image(image),
}
}
}

impl BufProxy {
Expand Down Expand Up @@ -565,7 +620,7 @@ impl BindMap {
if let Entry::Vacant(v) = self.buf_map.entry(proxy.id) {
let usage =
BufferUsages::COPY_SRC | BufferUsages::COPY_DST | BufferUsages::STORAGE;
let buf = pool.get_buf(&proxy, usage, device);
let buf = pool.get_buf(proxy.size, proxy.name, usage, device);
v.insert(BindMapBuffer {
buffer: buf,
label: proxy.name,
Expand Down Expand Up @@ -647,7 +702,7 @@ impl BindMap {
Entry::Occupied(occupied) => Ok(&occupied.into_mut().buffer),
Entry::Vacant(vacant) => {
let usage = BufferUsages::COPY_SRC | BufferUsages::COPY_DST | BufferUsages::STORAGE;
let buf = pool.get_buf(&proxy, usage, device);
let buf = pool.get_buf(proxy.size, proxy.name, usage, device);
Ok(&vacant
.insert(BindMapBuffer {
buffer: buf,
Expand All @@ -659,53 +714,23 @@ impl BindMap {
}
}

pub struct DownloadsMapped<'a>(
HashMap<
Id,
(
BufferSlice<'a>,
GenericOneshotReceiver<RawMutex, Result<(), BufferAsyncError>>,
),
>,
);

impl Downloads {
// Discussion: should API change so we get one buffer, rather than mapping all?
pub fn map(&self) -> DownloadsMapped {
let mut map = HashMap::new();
for (id, buf) in &self.buf_map {
let buf_slice = buf.slice(..);
let (sender, receiver) = futures_intrusive::channel::shared::oneshot_channel();
buf_slice.map_async(wgpu::MapMode::Read, move |v| sender.send(v).unwrap());
map.insert(*id, (buf_slice, receiver));
}
DownloadsMapped(map)
}
}

impl<'a> DownloadsMapped<'a> {
pub async fn get_mapped(&self, proxy: BufProxy) -> Result<BufferView, Error> {
let (slice, recv) = self.0.get(&proxy.id).ok_or("buffer not in map")?;
if let Some(recv_result) = recv.receive().await {
recv_result?;
} else {
return Err("channel was closed".into());
}
Ok(slice.get_mapped_range())
}
}

const SIZE_CLASS_BITS: u32 = 1;

impl ResourcePool {
/// Get a buffer from the pool or create one.
fn get_buf(&mut self, proxy: &BufProxy, usage: BufferUsages, device: &Device) -> Buffer {
let rounded_size = Self::size_class(proxy.size, SIZE_CLASS_BITS);
fn get_buf(
&mut self,
size: u64,
name: &'static str,
usage: BufferUsages,
device: &Device,
) -> Buffer {
let rounded_size = Self::size_class(size, SIZE_CLASS_BITS);
let props = BufferProperties {
size: rounded_size,
usages: usage,
#[cfg(feature = "buffer_labels")]
name: proxy.name,
name: name,
};
if let Some(buf_vec) = self.bufs.get_mut(&props) {
if let Some(buf) = buf_vec.pop() {
Expand All @@ -714,7 +739,7 @@ impl ResourcePool {
}
device.create_buffer(&wgpu::BufferDescriptor {
#[cfg(feature = "buffer_labels")]
label: Some(proxy.name),
label: Some(name),
#[cfg(not(feature = "buffer_labels"))]
label: None,
size: rounded_size,
Expand All @@ -723,19 +748,6 @@ impl ResourcePool {
})
}

fn reap_bindmap(&mut self, bind_map: BindMap) {
for (_id, buf) in bind_map.buf_map {
let size = buf.buffer.size();
let props = BufferProperties {
size,
usages: buf.buffer.usage(),
#[cfg(feature = "buffer_labels")]
name: buf.label,
};
self.bufs.entry(props).or_default().push(buf.buffer);
}
}

/// Quantize a size up to the nearest size class.
fn size_class(x: u64, bits: u32) -> u64 {
if x > 1 << bits {
Expand Down
Loading

0 comments on commit 6a18424

Please sign in to comment.