diff --git a/crates/bevy_asset/src/lib.rs b/crates/bevy_asset/src/lib.rs index d6db4b988a283..813351a7f5238 100644 --- a/crates/bevy_asset/src/lib.rs +++ b/crates/bevy_asset/src/lib.rs @@ -452,12 +452,19 @@ mod tests { event::ManualEventReader, schedule::{LogLevel, ScheduleBuildSettings}, }; - use bevy_log::LogPlugin; + use bevy_log::{error, LogPlugin}; use bevy_reflect::TypePath; + use bevy_tasks::AsyncComputeTaskPool; use bevy_utils::{Duration, HashMap}; use futures_lite::AsyncReadExt; use serde::{Deserialize, Serialize}; - use std::{path::Path, sync::Arc}; + use std::{ + path::Path, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + }; use thiserror::Error; #[derive(Asset, TypePath, Debug, Default)] @@ -1481,9 +1488,48 @@ mod tests { ..Default::default() }); }); + } + + #[test] + fn stream_io() { + // The particular usage of GatedReader in this test will cause deadlocking if running single-threaded + #[cfg(not(feature = "multi_threaded"))] + panic!("This test requires the \"multi_threaded\" feature, otherwise it will deadlock.\ncargo test --package bevy_asset --features multi_threaded"); + + let dir = Dir::default(); + + let a_path = "a.cool.ron"; + let a_ron = r#" +( + text: "a", + dependencies: [ + "foo/b.cool.ron", + "c.cool.ron", + ], + embedded_dependencies: [], + sub_texts: [], +)"#; + dir.insert_asset_text(Path::new(a_path), a_ron); // running schedule does not error on ambiguity between the 2 uses_assets systems - app.world_mut().run_schedule(Update); + let (mut app, gate) = test_app(dir); + let asset_server = app.world().resource::().clone(); + let lock = Arc::new(AtomicBool::new(false)); + let lock_check = lock.clone(); + let final_check = lock.clone(); + gate.open(a_path); + AsyncComputeTaskPool::get() + .spawn(async move { + let mut reader = Vec::new(); + asset_server.read_stream(a_path, &mut reader).await.unwrap(); + assert_eq!(reader, a_ron.as_bytes()); + lock.store(true, Ordering::Release); + }) + .detach(); + run_app_until(&mut app, |_| { + lock_check.load(Ordering::Acquire).then_some(()) + }); + assert!(final_check.load(Ordering::Acquire)); } // validate the Asset derive macro for various asset types diff --git a/crates/bevy_asset/src/server/mod.rs b/crates/bevy_asset/src/server/mod.rs index 99ce54b374af3..ee2b5462967d9 100644 --- a/crates/bevy_asset/src/server/mod.rs +++ b/crates/bevy_asset/src/server/mod.rs @@ -5,7 +5,8 @@ use crate::{ folder::LoadedFolder, io::{ AssetReaderError, AssetSource, AssetSourceEvent, AssetSourceId, AssetSources, - ErasedAssetReader, MissingAssetSourceError, MissingProcessedAssetReaderError, Reader, + AssetWriterError, ErasedAssetReader, MissingAssetSourceError, MissingAssetWriterError, + MissingProcessedAssetReaderError, MissingProcessedAssetWriterError, Reader, }, loader::{AssetLoader, ErasedAssetLoader, LoadContext, LoadedAsset}, meta::{ @@ -22,6 +23,7 @@ use bevy_tasks::IoTaskPool; use bevy_utils::tracing::{error, info}; use bevy_utils::{CowArc, HashSet}; use crossbeam_channel::{Receiver, Sender}; +use futures_io::{AsyncRead, AsyncWrite}; use futures_lite::StreamExt; use info::*; use loaders::*; @@ -1104,6 +1106,60 @@ impl AssetServer { }) }) } + + /// Low level function that reads a file from an [`AssetSource`] into an [`AsyncWrite`]. + /// + /// # Example + /// + /// ``` + /// # use bevy_asset::AssetServer; + /// # async fn read(asset_server: &AssetServer) { + /// let mut vec = Vec::new(); + /// asset_server.read_stream("config/config.json", &mut vec).await; + /// # } + /// ``` + pub async fn read_stream( + &self, + path: impl Into>, + sink: impl AsyncWrite, + ) -> Result<(), AssetReadError> { + let path: AssetPath = path.into(); + let source = self.get_source(path.source())?; + let asset_reader = match self.data.mode { + AssetServerMode::Unprocessed { .. } => source.reader(), + AssetServerMode::Processed { .. } => source.processed_reader()?, + }; + let reader = asset_reader.read(path.path()).await?; + futures_lite::io::copy(reader, sink).await?; + Ok(()) + } + + /// Low level function that writes to a file from an [`AssetSource`] from an [`AsyncRead`]. + /// + /// # Example + /// + /// ``` + /// # use bevy_asset::AssetServer; + /// # async fn write(asset_server: &AssetServer) { + /// let mut json = br#"{"bevy": "awesome"}"#; + /// asset_server.write_stream("config/config.json", json.as_slice()).await; + /// # } + /// ``` + pub async fn write_stream( + &self, + path: impl Into>, + stream: impl AsyncRead, + ) -> Result<(), AssetWriteError> { + let path: AssetPath = path.into(); + let source = self.get_source(path.source())?; + let asset_writer = match self.data.mode { + AssetServerMode::Unprocessed { .. } => source.writer()?, + AssetServerMode::Processed { .. } => source.processed_writer()?, + }; + let writer = asset_writer.write(path.path()).await?; + futures_lite::io::copy(stream, writer).await?; + Ok(()) + } } /// A system that manages internal [`AssetServer`] events, such as finalizing asset loads. @@ -1338,6 +1394,34 @@ pub enum AssetLoadError { }, } +/// An error that occurs during an [`AssetServer::read_stream`]. +#[derive(Error, Debug)] +pub enum AssetReadError { + #[error(transparent)] + MissingAssetSourceError(#[from] MissingAssetSourceError), + #[error(transparent)] + MissingProcessedAssetReaderError(#[from] MissingProcessedAssetReaderError), + #[error(transparent)] + AssetReaderError(#[from] AssetReaderError), + #[error(transparent)] + IoError(#[from] std::io::Error), +} + +/// An error that occurs during an [`AssetServer::write_stream`]. +#[derive(Error, Debug)] +pub enum AssetWriteError { + #[error(transparent)] + MissingAssetSourceError(#[from] MissingAssetSourceError), + #[error(transparent)] + MissingAssetWriterError(#[from] MissingAssetWriterError), + #[error(transparent)] + MissingProcessedAssetWriterError(#[from] MissingProcessedAssetWriterError), + #[error(transparent)] + AssetWriterError(#[from] AssetWriterError), + #[error(transparent)] + IoError(#[from] std::io::Error), +} + #[derive(Error, Debug, Clone)] #[error("Failed to load asset '{path}' with asset loader '{loader_name}': {error}")] pub struct AssetLoaderError {