diff --git a/datafusion-examples/examples/thread_pools.rs b/datafusion-examples/examples/thread_pools.rs new file mode 100644 index 0000000000000..a4db6eebbbbc7 --- /dev/null +++ b/datafusion-examples/examples/thread_pools.rs @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This example shows how to use a separate thread pool (tokio [`Runtime`])) to +//! run the CPU intensive parts of DataFusion plans. +//! +//! Running DataFusion plans that perform I/O, such as reading parquet files +//! directly from remote object storage (e.g. AWS S3) without care will result +//! in running CPU intensive jobs on the same thread pool, which can lead to the +//! issues described in the [Architecture section] such as throttled bandwidth +//! due to congestion control and increased latencies for processing network +//! messages. +use arrow::util::pretty::pretty_format_batches; +use datafusion::error::Result; +use datafusion::execution::dedicated_executor::DedicatedExecutor; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; +use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder}; +use datafusion::prelude::*; +use futures::stream::StreamExt; +use object_store::http::HttpBuilder; +use object_store::ObjectStore; +use std::sync::Arc; +use url::Url; + +/// Normally, you don't need to worry about the details of the tokio runtime, +/// but for this example it is important to understand how the [`Runtime`]s work. +/// +/// There is a "current" runtime that is installed in a thread local variable +/// that is used by the `tokio::spawn` function. +/// +/// The `#[tokio::main]` macro actually creates a [`Runtime`] and installs it as +/// as the "current" runtime (on which any `async` futures, streams and tasks +/// are run). +#[tokio::main] +async fn main() -> Result<()> { + // The first two examples only do local file IO. Enable the URL table so we + // can select directly from filenames in SQL. + let sql = format!( + "SELECT * FROM '{}/alltypes_plain.parquet'", + datafusion::test_util::parquet_test_data() + ); + + // Run the same query on the same runtime. Note that calling `await` here + // will effectively run the future (in this case the `async` function) on + // the current runtime + same_runtime(&sql).await?; + + // Run the same query on a different runtime. + // Run the same query on a different runtime including remote IO + different_runtime_advanced().await?; + + Ok(()) +} + +/// Run queries directly on the current tokio `Runtime` +/// +/// This is now most examples in DataFusion are written and works well for +/// development and local query processing. +async fn same_runtime(sql: &str) -> Result<()> { + let ctx = SessionContext::new().enable_url_table(); + + // Calling .sql is an async function as it may also do network + // I/O, for example to contact a remote catalog or do an object store LIST + let df = ctx.sql(sql).await?; + + // While many examples call `collect` or `show()`, those methods buffers the + // results. internally DataFusion generates output a RecordBatch at a time + + // Calling `execute_stream` on a DataFrame returns a + // `SendableRecordBatchStream`. Depending on the plan, this may also do + // network I/O, for example to begin reading a parquet file from a remote + // object store as well. It is also possible that this function call spawns + // tasks that begin doing CPU intensive work as well + let mut stream: SendableRecordBatchStream = df.execute_stream().await?; + + // Calling `next()` drives the plan, producing new `RecordBatch`es using the + // current runtime (and typically also the current thread). + // + // Perhaps somewhat non obvious, calling the `next()` function often will + // result in other tasks being spawned on the current runtime (e.g. for + // `RepartitionExec` to read data from each of its input partitions in + // parallel). + // + // Executing the plan like this results in all CPU intensive work + // running on same (default) Runtime. + while let Some(batch) = stream.next().await { + println!("{}", pretty_format_batches(&[batch?]).unwrap()); + } + Ok(()) +} + +/// Demonstrates how to run queries on a **different** runtime than the current one +/// +async fn different_runtime_advanced() -> Result<()> { + // In this example, we will configure access to a remote object store + // over the network during the plan + + let dedicated_executor = DedicatedExecutor::builder().build(); + + // setup http object store + let base_url = Url::parse("https://github.com").unwrap(); + let http_store: Arc = + Arc::new(HttpBuilder::new().with_url(base_url.clone()).build()?); + + // By default, the object store will use the "current runtime" for IO operations + // if we use a dedicated executor to run the plan, the eventual object store requests will also use the + // dedicated executor's runtime + // + // To avoid this, we can wrap the object store to run on the "IO" runtime + // + // (if we don't do this the example fails with an error like + // + // ctx.register_object_store(&base_url, http_store); + // A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers. + + //let http_store = dedicated_executor.wrap_object_store(http_store); + + // we must also register the dedicated executor with the runtime + let runtime_env = RuntimeEnvBuilder::new() + .with_dedicated_executor(dedicated_executor.clone()) + .build_arc()?; + + // Tell datafusion about processing http:// urls with this wrapped object store + runtime_env.register_object_store(&base_url, http_store); + + let ctx = SessionContext::from( + SessionStateBuilder::new() + .with_runtime_env(runtime_env) + .with_default_features() + .build(), + ) + .enable_url_table(); + + // Plan (and execute) the query on the dedicated runtime + // TODO it would be great to figure out how to run this as part of `ctx.sql` + // Plan / execute the query + let url = "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv"; + let df = ctx + .sql(&format!("SELECT c1,c2,c3 FROM '{url}' LIMIT 5")) + .await?; + let stream: SendableRecordBatchStream = df.execute_stream().await?; + + // We have now planned the query on the dedicated runtime, Yay! but we still need to + // drive the stream (aka call `next()` to get the results). + + // However, as mentioned above, calling `next()` resolves the Stream (and + // any work it may do) on a thread in the current (default) runtime. + // + // To drive the stream on the dedicated runtime, we need to wrap it using a + // `DedicatedExecutor::wrap_stream` stream function + // + // Note if you don't do this you will likely see a panic about `No IO runtime registered.` + // because the threads in the current (main) tokio runtime have not had the IO runtime + // installed + let mut stream = dedicated_executor.run_sendable_record_batch_stream(stream); + + // Note you can run other streams on the DedicatedExecutor as well using the + // DedicatedExecutor:YYYXXX function. This is helpful for example, if you + // need to do non trivial CPU work on the results of the stream (e.g. + // calling a FlightDataEncoder to convert the results to flight to send it + // over the network), + + while let Some(batch) = stream.next().await { + println!("{}", pretty_format_batches(&[batch?]).unwrap()); + } + + Ok(()) +} + +// TODO add an example of a how to call IO / CPU bound work directly using DedicatedExecutor +// (e.g. to create a listing table directly) diff --git a/datafusion/core/src/datasource/dynamic_file.rs b/datafusion/core/src/datasource/dynamic_file.rs index 6654d0871c3f6..eababe70435d0 100644 --- a/datafusion/core/src/datasource/dynamic_file.rs +++ b/datafusion/core/src/datasource/dynamic_file.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use async_trait::async_trait; use datafusion_catalog::{SessionStore, UrlTableFactory}; -use datafusion_common::plan_datafusion_err; +use datafusion_common::{internal_err, plan_datafusion_err}; use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl}; use crate::datasource::TableProvider; @@ -55,33 +55,42 @@ impl UrlTableFactory for DynamicListTableFactory { return Ok(None); }; - let state = &self + let session = self .session_store() .get_session() .upgrade() - .and_then(|session| { - session - .read() - .as_any() - .downcast_ref::() - .cloned() - }) .ok_or_else(|| plan_datafusion_err!("get current SessionStore error"))?; - match ListingTableConfig::new(table_url.clone()) - .infer_options(state) + let runtime_env = Arc::clone(&session.read().runtime_env()); + + let Some(state) = session + .read() + .as_any() + .downcast_ref::() + .cloned() + else { + return internal_err!("Expected SessionState, got something else"); + }; + + // Do remove catalog operations on a different runtime + runtime_env + .spawn_io(async move { + match ListingTableConfig::new(table_url.clone()) + .infer_options(&state) + .await + { + Ok(cfg) => { + let cfg = cfg + .infer_partitions_from_path(&state) + .await? + .infer_schema(&state) + .await?; + ListingTable::try_new(cfg) + .map(|table| Some(Arc::new(table) as Arc)) + } + Err(_) => Ok(None), + } + }) .await - { - Ok(cfg) => { - let cfg = cfg - .infer_partitions_from_path(state) - .await? - .infer_schema(state) - .await?; - ListingTable::try_new(cfg) - .map(|table| Some(Arc::new(table) as Arc)) - } - Err(_) => Ok(None), - } } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index ffe49dd2ba116..89ca3d8a54cc9 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -679,7 +679,7 @@ impl ListingOptions { /// # Ok(()) /// # } /// ``` -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ListingTable { table_paths: Vec, /// File fields only @@ -843,10 +843,22 @@ impl TableProvider for ListingTable { }); // TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here? let session_state = state.as_any().downcast_ref::().unwrap(); - let (mut partitioned_file_lists, statistics) = self - .list_files_for_scan(session_state, &partition_filters, limit) + // TODO avoid these clones when possible. + let session_state_captured = session_state.clone(); + let partition_filters_captured = partition_filters.clone(); + let self_captured = self.clone(); + let (mut partitioned_file_lists, statistics) = state + .runtime_env() + .spawn_io(async move { + self_captured + .list_files_for_scan( + &session_state_captured, + &partition_filters_captured, + limit, + ) + .await + }) .await?; - // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { let projected_schema = project_schema(&self.schema(), projection)?; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 67236c9a6bd2c..8d661a4e316ab 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -608,10 +608,14 @@ impl SessionContext { sql: &str, options: SQLOptions, ) -> Result { - let plan = self.state().create_logical_plan(sql).await?; - options.verify_plan(&plan)?; + self.runtime_env() + .spawn_cpu2(async { + let plan = self.state().create_logical_plan(sql).await?; + options.verify_plan(&plan)?; - self.execute_logical_plan(plan).await + self.execute_logical_plan(plan).await + }) + .await } /// Creates logical expressions from SQL query text. diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index ac1eb729b6ff8..72cf722c88aca 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -37,6 +37,7 @@ path = "src/lib.rs" [dependencies] arrow = { workspace = true } +async-trait = {workspace = true} chrono = { workspace = true } dashmap = { workspace = true } datafusion-common = { workspace = true, default-features = true } @@ -45,6 +46,12 @@ futures = { workspace = true } log = { workspace = true } object_store = { workspace = true } parking_lot = { workspace = true } +pin-project-lite = "^0.2.7" rand = { workspace = true } tempfile = { workspace = true } + url = { workspace = true } +tokio = { workspace = true } +# todo figure out if we need to use tokio_stream / could use record batch receiver stream +tokio-stream = {version = "0.1"} + diff --git a/datafusion/execution/src/cross_rt_stream.rs b/datafusion/execution/src/cross_rt_stream.rs new file mode 100644 index 0000000000000..85d63765f9748 --- /dev/null +++ b/datafusion/execution/src/cross_rt_stream.rs @@ -0,0 +1,406 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [CrossRtStream] runs [`Stream`]s in a different tokio runtime. + +//! Tooling to pull [`Stream`]s from one tokio runtime into another. +//! +//! Originally from [InfluxDB 3.0] +//! [InfluxDB 3.0]:https://github.com/influxdata/influxdb3_core/blob/6fcbb004232738d55655f32f4ad2385523d10696/iox_query/src/exec/cross_rt_stream.rs#L1 +//! +//! This is critical so that CPU heavy loads are not run on the same runtime as IO handling + +// TODO: figure out where ot pull this code (not in physical plan...) +// maybe its own crate or maybe in common-runtime ?? + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use crate::dedicated_executor::{DedicatedExecutor, JobError}; +use datafusion_common::DataFusionError; +use futures::{future::BoxFuture, ready, FutureExt, Stream, StreamExt}; +use tokio::sync::mpsc::{channel, Sender}; +use tokio_stream::wrappers::ReceiverStream; + +/// [`Stream`] that is calculated by one tokio runtime but can safely be pulled +/// from another w/o stalling (esp. when the calculating runtime is +/// CPU-blocked). +/// +/// See XXX in the architecture documentation for moe details +pub struct CrossRtStream { + /// Future that drives the underlying stream. + /// + /// This is actually wrapped into [`DedicatedExecutor::spawn_cpu`] so it can be safely polled by the receiving runtime. + driver: BoxFuture<'static, ()>, + + /// Flags if the [driver](Self::driver) returned [`Poll::Ready`]. + driver_ready: bool, + + /// Receiving stream. + /// + /// This one can be polled from the receiving runtime. + inner: ReceiverStream, + + /// Signals that [`inner`](Self::inner) finished. + /// + /// Note that we must also drive the [driver](Self::driver) even when the stream finished to allow proper state clean-ups. + inner_done: bool, +} + +impl std::fmt::Debug for CrossRtStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CrossRtStream") + .field("driver", &"...") + .field("driver_ready", &self.driver_ready) + .field("inner", &"...") + .field("inner_done", &self.inner_done) + .finish() + } +} + +impl CrossRtStream { + /// Create new stream by producing a future that sends its state to the given [`Sender`]. + /// + /// This is an internal method. `f` should always be wrapped into [`DedicatedExecutor::spawn_cpu`] (except for testing purposes). + fn new_with_tx(f: F) -> Self + where + F: FnOnce(Sender) -> Fut, + Fut: Future + Send + 'static, + { + let (tx, rx) = channel(1); + let driver = f(tx).boxed(); + Self { + driver, + driver_ready: false, + inner: ReceiverStream::new(rx), + inner_done: false, + } + } +} + +impl CrossRtStream> +where + X: Send + 'static, + E: Send + 'static, +{ + /// Create new stream based on an existing stream that transports [`Result`]s. + /// + /// Also receives an executor that actually executes the underlying stream as well as a converter that converts + /// [`executor::JobError`] to the error type of the stream (so we can send potential crashes/panics). + pub fn new_with_error_stream( + stream: S, + exec: DedicatedExecutor, + converter: C, + ) -> Self + where + S: Stream> + Send + 'static, + C: Fn(JobError) -> E + Send + 'static, + { + Self::new_with_tx(|tx| { + // future to be run in the other runtime + let tx_captured = tx.clone(); + let fut = async move { + tokio::pin!(stream); + + while let Some(res) = stream.next().await { + if tx_captured.send(res).await.is_err() { + // receiver gone + return; + } + } + }; + + // future for this runtime (likely the tokio/tonic/web driver) + async move { + if let Err(e) = exec.spawn_cpu(fut).await { + let e = converter(e); + + // last message, so we don't care about the receiver side + tx.send(Err(e)).await.ok(); + } + } + }) + } +} + +impl CrossRtStream> +where + X: Send + 'static, +{ + /// Create new stream based on an existing stream that transports [`Result`]s w/ [`DataFusionError`]s. + /// + /// Also receives an executor that actually executes the underlying stream. + pub fn new_with_df_error_stream(stream: S, exec: DedicatedExecutor) -> Self + where + S: Stream> + Send + 'static, + { + Self::new_with_error_stream(stream, exec, |e| { + DataFusionError::Context( + "Join Error (panic)".to_string(), + Box::new(DataFusionError::External(e.into())), + ) + }) + } +} + +impl Stream for CrossRtStream { + type Item = T; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = &mut *self; + + if !this.driver_ready { + let res = this.driver.poll_unpin(cx); + + if res.is_ready() { + this.driver_ready = true; + } + } + + if this.inner_done { + if this.driver_ready { + Poll::Ready(None) + } else { + Poll::Pending + } + } else { + match ready!(this.inner.poll_next_unpin(cx)) { + None => { + this.inner_done = true; + if this.driver_ready { + Poll::Ready(None) + } else { + Poll::Pending + } + } + Some(x) => Poll::Ready(Some(x)), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::dedicated_executor::DedicatedExecutorBuilder; + use std::sync::OnceLock; + use std::{sync::Arc, time::Duration}; + use tokio::runtime::{Handle, RuntimeFlavor}; + + // Don't create many different runtimes for testing to avoid thread creation/description overhead + fn testing_executor() -> DedicatedExecutor { + TESTING_EXECUTOR + .get_or_init(|| { + DedicatedExecutorBuilder::new() + .with_name("cross_rt_stream") + .build() + }) + .clone() + } + static TESTING_EXECUTOR: OnceLock = OnceLock::new(); + + #[tokio::test] + async fn test_async_block() { + let exec = testing_executor(); + let barrier1 = Arc::new(tokio::sync::Barrier::new(2)); + let barrier1_captured = Arc::clone(&barrier1); + let barrier2 = Arc::new(tokio::sync::Barrier::new(2)); + let barrier2_captured = Arc::clone(&barrier2); + let mut stream = CrossRtStream::>::new_with_error_stream( + futures::stream::once(async move { + barrier1_captured.wait().await; + barrier2_captured.wait().await; + Ok(1) + }), + exec, + std::convert::identity, + ); + + let mut f = stream.next(); + + ensure_pending(&mut f).await; + barrier1.wait().await; + ensure_pending(&mut f).await; + barrier2.wait().await; + + let res = f.await.expect("streamed data"); + assert_eq!(res.unwrap(), 1); + } + + #[tokio::test] + async fn test_sync_block() { + // This would deadlock if the stream payload would run within the same tokio runtime. To prevent any cheating + // (e.g. via channels), we ensure that the current runtime only has a single thread: + assert_eq!( + RuntimeFlavor::CurrentThread, + Handle::current().runtime_flavor() + ); + + let exec = testing_executor(); + let barrier1 = Arc::new(std::sync::Barrier::new(2)); + let barrier1_captured = Arc::clone(&barrier1); + let barrier2 = Arc::new(std::sync::Barrier::new(2)); + let barrier2_captured = Arc::clone(&barrier2); + let mut stream = CrossRtStream::>::new_with_error_stream( + futures::stream::once(async move { + barrier1_captured.wait(); + barrier2_captured.wait(); + Ok(1) + }), + exec, + std::convert::identity, + ); + + let mut f = stream.next(); + + ensure_pending(&mut f).await; + barrier1.wait(); + ensure_pending(&mut f).await; + barrier2.wait(); + + let res = f.await.expect("streamed data"); + assert_eq!(res.unwrap(), 1); + } + + #[tokio::test] + async fn test_panic() { + let exec = testing_executor(); + let mut stream = CrossRtStream::>::new_with_error_stream( + futures::stream::once(async { panic!("foo") }), + exec, + std::convert::identity, + ); + + let e = stream + .next() + .await + .expect("stream not finished") + .unwrap_err(); + assert_eq!(e.to_string(), "Panic: foo"); + + let none = stream.next().await; + assert!(none.is_none()); + } + + #[tokio::test] + async fn test_cancel_future() { + let exec = testing_executor; + let barrier1 = Arc::new(tokio::sync::Barrier::new(2)); + let barrier1_captured = Arc::clone(&barrier1); + let barrier2 = Arc::new(tokio::sync::Barrier::new(2)); + let barrier2_captured = Arc::clone(&barrier2); + let mut stream = CrossRtStream::>::new_with_error_stream( + futures::stream::once(async move { + barrier1_captured.wait().await; + barrier2_captured.wait().await; + Ok(1) + }), + exec, + std::convert::identity, + ); + + let mut f = stream.next(); + + // fire up stream + ensure_pending(&mut f).await; + barrier1.wait().await; + + // cancel + drop(f); + + barrier2.wait().await; + let res = stream.next().await.expect("streamed data"); + assert_eq!(res.unwrap(), 1); + } + + #[tokio::test] + async fn test_cancel_stream() { + let exec = testing_executor(); + let barrier = Arc::new(tokio::sync::Barrier::new(2)); + let barrier_captured = Arc::clone(&barrier); + let mut stream = CrossRtStream::>::new_with_error_stream( + futures::stream::once(async move { + barrier_captured.wait().await; + + // block forever + futures::future::pending::<()>().await; + + // keep barrier Arc alive + drop(barrier_captured); + unreachable!() + }), + exec, + std::convert::identity, + ); + + let mut f = stream.next(); + + // fire up stream + ensure_pending(&mut f).await; + barrier.wait().await; + assert_eq!(Arc::strong_count(&barrier), 2); + + // cancel + drop(f); + drop(stream); + + tokio::time::timeout(Duration::from_secs(5), async { + loop { + if Arc::strong_count(&barrier) == 1 { + return; + } + + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_inner_future_driven_to_completion_after_stream_ready() { + let barrier = Arc::new(tokio::sync::Barrier::new(2)); + let barrier_captured = Arc::clone(&barrier); + + let mut stream = CrossRtStream::::new_with_tx(|tx| async move { + tx.send(1).await.ok(); + drop(tx); + barrier_captured.wait().await; + }); + + let handle = tokio::spawn(async move { barrier.wait().await }); + + assert_eq!(stream.next().await, Some(1)); + handle.await.unwrap(); + } + + async fn ensure_pending(f: &mut F) + where + F: Future + Send + Unpin, + { + tokio::select! { + _ = tokio::time::sleep(Duration::from_millis(100)) => {} + _ = f => {panic!("not pending")}, + } + } +} diff --git a/datafusion/execution/src/dedicated_executor.rs b/datafusion/execution/src/dedicated_executor.rs new file mode 100644 index 0000000000000..1d42a995aaece --- /dev/null +++ b/datafusion/execution/src/dedicated_executor.rs @@ -0,0 +1,1004 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [DedicatedExecutor] for running CPU-bound tasks on a separate tokio runtime. +//! +//! Originally from [InfluxDB 3.0] +//! +//! [InfluxDB 3.0]: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor +use crate::cross_rt_stream::CrossRtStream; +use crate::stream::RecordBatchStreamAdapter; +use crate::SendableRecordBatchStream; +use datafusion_common::DataFusionError; +use futures::{ + future::{BoxFuture, Shared}, + Future, FutureExt, Stream, TryFutureExt, +}; +use log::{info, warn}; +use parking_lot::RwLock; +use std::cell::RefCell; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{fmt::Display, sync::Arc, time::Duration}; +use tokio::runtime::Builder; +use tokio::task::JoinHandle; +use tokio::{ + runtime::Handle, + sync::{oneshot::error::RecvError, Notify}, + task::JoinSet, +}; + +impl From for DedicatedExecutorBuilder { + fn from(value: Builder) -> Self { + Self::new_from_builder(value) + } +} + +/// Manages a separate tokio [`Runtime`] (thread pool) for executing tasks such +/// as DataFusion `ExecutionPlans`. +/// +/// See [`DedicatedExecutorBuilder`] for creating a new instance. +/// +/// A `DedicatedExecutor` makes it easier to avoid running IO and CPU bound +/// tasks on the same threadpool by running futures (and any `tasks` that are +/// `tokio::task::spawned` by them) on a separate tokio [`Executor`]. +/// +/// DedicatedExecutor can be `clone`ed and all clones share the same threadpool. +/// +/// TODO add note about `io_thread` +/// +/// TODO: things we use in InfluxData +/// 1. Testing mode (so we can make a bunch of DedicatedExecutors) -- maybe we can wrap DedicatedExectors like IOxDedicatedExecutors +/// 2. Some sort of hook to install tokio metrics +/// +/// When [`DedicatedExecutorBuilder::build`] is called, the "current" tokio +/// runtime will be maked for io, via [`register_io_runtime`] by all threads +/// spawned by the executor. Any I/O done by threads in this +/// [`DedicatedExecutor`] should use [`spawn_io`], which will run them on the I/O +/// runtime. +/// +/// ## TODO examples +/// +/// # Background +/// +/// Tokio has the notion of the "current" runtime, which runs the current future +/// and any tasks spawned by it. Typically, this is the runtime created by +/// `tokio::main` and is used for the main application logic and I/O handling +/// +/// For CPU bound work, such as DataFusion plan execution, it is important to +/// run on a separate thread pool to avoid blocking the I/O handling for extended +/// periods of time in order to avoid long poll latencies (which decreases the +/// throughput of small requests under concurrent load). +/// +/// # IO Scheduling +/// +/// I/O, such as network calls, should not be performed on the runtime managed +/// by [`DedicatedExecutor`]. As tokio is a cooperative scheduler, long-running +/// CPU tasks will not be preempted and can therefore starve servicing of other +/// tasks. This manifests in long poll-latencies, where a task is ready to run +/// but isn't being scheduled to run. For CPU-bound work this isn't a problem as +/// there is no external party waiting on a response, however, for I/O tasks, +/// long poll latencies can prevent timely servicing of IO, which can have a +/// significant detrimental effect. +/// +/// # Details +/// +/// The worker thread priority is set to low so that such tasks do +/// not starve other more important tasks (such as answering health checks) +/// +/// Follows the example from stack overflow and spawns a new +/// thread to install a Tokio runtime "context" +/// +/// +/// # Trouble Shooting: +/// +/// ## "No IO runtime registered. Call `register_io_runtime`/`register_current_runtime_for_io` in current thread! +/// +/// This means that IO was attempted on a tokio runtime that was not registered +/// for IO. One solution is to run the task using [DedicatedExecutor::spawn_cpu]. +/// +/// ## "Cannot drop a runtime in a context where blocking is not allowed"` +/// +/// If you try to use this structure from an async context you see something like +/// thread 'test_builder_plan' panicked at 'Cannot +/// drop a runtime in a context where blocking is not allowed it means This +/// happens when a runtime is dropped from within an asynchronous +/// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21 +/// +/// TODO: make this an Arc<..> rather than an cloneable thing (to follow the smae +/// pattern as the rest of the system) +#[derive(Clone, Debug)] +pub struct DedicatedExecutor { + state: Arc>, +} + +impl DedicatedExecutor { + /// Create a new builder to crate a [`DedicatedExecutor`] + pub fn builder() -> DedicatedExecutorBuilder { + DedicatedExecutorBuilder::new() + } + + /// Runs the specified [`Future`] (and any tasks it spawns) on the thread + /// pool managed by this `DedicatedExecutor`. + /// + /// # TODO: make this wait (aka so the API doesn't start a new background task or whatever) + /// + /// # Notes + /// + /// This task is run on a dedicated Tokio runtime that purposely does not have + /// IO enabled. If your future makes any IO calls, you have to + /// explicitly run them on DedicatedExecutor::spawn_io. + /// + /// If you see a message like this + /// + /// (Panic { msg: "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers." + /// + /// It means some work that was meant to be done on the IO runtime was done + /// on the CPU runtime. + /// + /// UNLIKE [`tokio::task::spawn`], the returned future is **cancelled** when + /// it is dropped. Thus, you need ensure the returned future lives until it + /// completes (call `await`) or you wish to cancel it. + /// + /// All spawned tasks are added to the tokio executor immediately and + /// compete for the threadpool's resources. + pub fn spawn_cpu( + &self, + task: T, + ) -> impl Future> + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + let handle = { + let state = self.state.read(); + state.handle.clone() + }; + + let Some(handle) = handle else { + return futures::future::err(JobError::WorkerGone).boxed(); + }; + + // use JoinSet implement "cancel on drop" + let mut join_set = JoinSet::new(); + join_set.spawn_on(task, &handle); + async move { + join_set + .join_next() + .await + .expect("just spawned task") + .map_err(|e| match e.try_into_panic() { + Ok(e) => { + let s = if let Some(s) = e.downcast_ref::() { + s.clone() + } else if let Some(s) = e.downcast_ref::<&str>() { + s.to_string() + } else { + "unknown internal error".to_string() + }; + + JobError::Panic { msg: s } + } + Err(_) => JobError::WorkerGone, + }) + } + .boxed() + } + + /// Runs the specified work on the dedicated executor and returns the result + /// + /// Note the future is not 'static (aka it can have internal references) + pub fn spawn_cpu2<'a, T>(&self, task: T) -> impl Future + where + T: Future + Send + 'a, + T::Output: Send, + { + // If we can figure out how to make this work, then + // we could integrate it nicely into DataFusion + async { todo!() } + } + + /// signals shutdown of this executor and any Clones + pub fn shutdown(&self) { + // hang up the channel which will cause the dedicated thread + // to quit + let mut state = self.state.write(); + state.handle = None; + state.start_shutdown.notify_one(); + } + + /// Stops all subsequent task executions, and waits for the worker + /// thread to complete. Note this will shutdown all clones of this + /// `DedicatedExecutor` as well. + /// + /// Only the first all to `join` will actually wait for the + /// executing thread to complete. All other calls to join will + /// complete immediately. + /// + /// # Panic / Drop + /// [`DedicatedExecutor`] implements shutdown on [`Drop`]. You should just use this behavior and NOT call + /// [`join`](Self::join) manually during [`Drop`] or panics because this might lead to another panic, see + /// . + pub async fn join(&self) { + self.shutdown(); + + // get handle mutex is held + let handle = { + let state = self.state.read(); + state.completed_shutdown.clone() + }; + + // wait for completion while not holding the mutex to avoid + // deadlocks + handle.await.expect("Thread died?") + } + + /// Returns a SendableRecordBatchStream that will run on this executor's thread pool + pub fn run_sendable_record_batch_stream( + &self, + stream: SendableRecordBatchStream, + ) -> SendableRecordBatchStream { + let schema = stream.schema(); + let cross_rt_stream = + CrossRtStream::new_with_df_error_stream(stream, self.clone()); + Box::pin(RecordBatchStreamAdapter::new(schema, cross_rt_stream)) + } + + /// Runs an stream that produces Results on the executor's thread pool + /// + /// Ths stream must produce Results so that any errors on the dedicated + /// executor (like a panic or shutdown) can be communicated back. + /// + /// # Arguments: + /// - stream: the stream to run on this dedicated executor + /// - converter: a function that converts a [`JobError`] to the error type of the stream + pub fn run_stream( + &self, + stream: S, + converter: C, + ) -> impl Stream> + Send + 'static + where + X: Send + 'static, + E: Send + 'static, + S: Stream> + Send + 'static, + C: Fn(JobError) -> E + Send + 'static, + { + CrossRtStream::new_with_error_stream(stream, self.clone(), converter) + } + + /// Registers `handle` as the IO runtime for this thread + /// + /// This sets a thread-local variable + /// + /// See [`spawn_io`](Self::spawn_io) for more details + pub fn register_io_runtime(handle: Option) { + IO_RUNTIME.set(handle) + } + + /// Registers the "current" `handle` as the IO runtime for this thread + /// + /// This is useful for testing purposes. + /// + /// # Panics if no current handle is available (aka not running in a tokio + /// runtime) + pub fn register_current_runtime_for_io() { + Self::register_io_runtime(Some(Handle::current())) + } + + /// Runs `fut` on the runtime registered by [`register_io_runtime`] if any, + /// otherwise panics. + /// + /// # Panic + /// Needs a IO runtime [registered](register_io_runtime). + pub async fn spawn_io(fut: Fut) -> Fut::Output + where + Fut: Future + Send + 'static, + Fut::Output: Send, + { + let h = IO_RUNTIME.with_borrow(|h| h.clone()).expect( + "No IO runtime registered. If you hit this panic, it likely \ + means a DataFusion plan or other CPU bound work is running on the \ + a tokio threadpool used for IO. Try spawning the work using \ + `DedicatedExecutor::spawn` or for tests `DedicatedExecutor::register_current_runtime_for_io`", + ); + DropGuard(h.spawn(fut)).await + } +} + +thread_local! { + /// Tokio runtime `Handle` for doing network (I/O) operations, see [`spawn_io`] + pub static IO_RUNTIME: RefCell> = const { RefCell::new(None) }; +} + +struct DropGuard(JoinHandle); +impl Drop for DropGuard { + fn drop(&mut self) { + self.0.abort() + } +} + +impl Future for DropGuard { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Poll::Ready(match std::task::ready!(self.0.poll_unpin(cx)) { + Ok(v) => v, + Err(e) if e.is_cancelled() => panic!("IO runtime was shut down"), + Err(e) => std::panic::resume_unwind(e.into_panic()), + }) + } +} + +/// Runs futures (and any `tasks` that are `tokio::task::spawned` by +/// them) on a separate tokio Executor. +/// +/// The state is only used by the "outer" API, not by the newly created runtime. The new runtime waits for +/// [`start_shutdown`](Self::start_shutdown) and signals the completion via +/// [`completed_shutdown`](Self::completed_shutdown) (for which is owns the sender side). +#[derive(Debug)] +struct State { + /// Runtime handle. + /// + /// This is `None` when the executor is shutting down. + handle: Option, + + /// If notified, the executor tokio runtime will begin to shutdown. + /// + /// We could implement this by checking `handle.is_none()` in regular intervals but requires regular wake-ups and + /// locking of the state. Just using a proper async signal is nicer. + start_shutdown: Arc, + + /// Receiver side indicating that shutdown is complete. + completed_shutdown: Shared>>>, + + /// The inner thread that can be used to join during drop. + thread: Option>, +} + +/// IMPORTANT: Implement `Drop` for [`State`], NOT for [`DedicatedExecutor`], +/// because the executor can be cloned and clones share their inner state. +impl Drop for State { + fn drop(&mut self) { + if self.handle.is_some() { + warn!("DedicatedExecutor dropped without calling shutdown()"); + self.handle = None; + self.start_shutdown.notify_one(); + } + + // do NOT poll the shared future if we are panicking due to https://github.com/rust-lang/futures-rs/issues/2575 + if !std::thread::panicking() + && self.completed_shutdown.clone().now_or_never().is_none() + { + warn!("DedicatedExecutor dropped without waiting for worker termination",); + } + + // join thread but don't care about the results + self.thread.take().expect("not dropped yet").join().ok(); + } +} + +const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60 * 5); + +/// Potential error returned when polling [`DedicatedExecutor::spawn_cpu`]. +#[derive(Debug)] +pub enum JobError { + WorkerGone, + Panic { msg: String }, +} + +impl Display for JobError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + JobError::WorkerGone => { + write!(f, "Worker thread gone, executor was likely shut down") + } + JobError::Panic { msg } => write!(f, "Panic: {}", msg), + } + } +} + +impl std::error::Error for JobError {} + +/// Builder for [`DedicatedExecutor`] +pub struct DedicatedExecutorBuilder { + /// Name given to all execution threads. Defaults to "DedicatedExecutor" + name: String, + /// Builder for tokio runtime. Defaults to multi-threaded builder + runtime_builder: Builder, +} + +impl From for DataFusionError { + fn from(value: JobError) -> Self { + DataFusionError::External(Box::new(value)) + .context("JobError from DedicatedExecutor") + } +} + +impl DedicatedExecutorBuilder { + /// Create a new `DedicatedExecutorBuilder` with default values + /// + /// Note that by default this `DedicatedExecutor` will not be able to + /// perform network I/O. + pub fn new() -> Self { + Self { + name: String::from("DedicatedExecutor"), + runtime_builder: Builder::new_multi_thread(), + } + } + + /// Create a new `DedicatedExecutorBuilder` from a pre-existing tokio + /// runtime [`Builder`]. + /// + /// This method permits customizing the tokio [`Executor`] used for the + /// [`DedicatedExecutor`] + pub fn new_from_builder(runtime_builder: Builder) -> Self { + Self { + name: String::from("DedicatedExecutor"), + runtime_builder, + } + } + + /// Set the name of the dedicated executor (appear in the names of each thread). + /// + /// Defaults to "DedicatedExecutor" + pub fn with_name(mut self, name: impl Into) -> Self { + self.name = name.into(); + self + } + + /// Set the number of worker threads. Defaults to the tokio default (the + /// number of virtual CPUs) + pub fn with_worker_threads(mut self, num_threads: usize) -> Self { + self.runtime_builder.worker_threads(num_threads); + self + } + + /// Creates a new `DedicatedExecutor` with a dedicated tokio + /// executor that is separate from the thread pool created via + /// `[tokio::main]` or similar. + /// + /// Note: If [`DedicatedExecutorBuilder::build`] is called from an existing + /// tokio runtime, it will assume that the existing runtime should be used + /// for I/O. + /// + /// See the documentation on [`DedicatedExecutor`] for more details. + pub fn build(self) -> DedicatedExecutor { + let Self { + name, + runtime_builder, + } = self; + + let notify_shutdown = Arc::new(Notify::new()); + let notify_shutdown_captured = Arc::clone(¬ify_shutdown); + + let (tx_shutdown, rx_shutdown) = tokio::sync::oneshot::channel(); + let (tx_handle, rx_handle) = std::sync::mpsc::channel(); + + let io_handle = Handle::try_current().ok(); + let thread = std::thread::Builder::new() + .name(format!("{name} driver")) + .spawn(move || { + // also register the IO runtime for the current thread, since it might be used as well (esp. for the + // current thread RT) + DedicatedExecutor::register_io_runtime(io_handle.clone()); + + info!("Creating DedicatedExecutor",); + + let mut runtime_builder = runtime_builder; + let runtime = runtime_builder + .on_thread_start(move || { + DedicatedExecutor::register_io_runtime(io_handle.clone()) + }) + .build() + .expect("Creating tokio runtime"); + + runtime.block_on(async move { + // Enable the "notified" receiver BEFORE sending the runtime handle back to the constructor thread + // (i.e .the one that runs `new`) to avoid the potential (but unlikely) race that the shutdown is + // started right after the constructor finishes and the new runtime calls + // `notify_shutdown_captured.notified().await`. + // + // Tokio provides an API for that by calling `enable` on the `notified` future (this requires + // pinning though). + let shutdown = notify_shutdown_captured.notified(); + let mut shutdown = std::pin::pin!(shutdown); + shutdown.as_mut().enable(); + + if tx_handle.send(Handle::current()).is_err() { + return; + } + shutdown.await; + }); + + runtime.shutdown_timeout(SHUTDOWN_TIMEOUT); + + // send shutdown "done" signal + tx_shutdown.send(()).ok(); + }) + .expect("executor setup"); + + let handle = rx_handle.recv().expect("driver started"); + + let state = State { + handle: Some(handle), + start_shutdown: notify_shutdown, + completed_shutdown: rx_shutdown.map_err(Arc::new).boxed().shared(), + thread: Some(thread), + }; + + DedicatedExecutor { + state: Arc::new(RwLock::new(state)), + } + } +} + +#[cfg(test)] +#[allow(unused_qualifications)] +mod tests { + use super::*; + use std::{ + panic::panic_any, + sync::{Arc, Barrier}, + time::Duration, + }; + use tokio::{net::TcpListener, sync::Barrier as AsyncBarrier}; + + /// Wait for the barrier and then return `result` + async fn do_work(result: usize, barrier: Arc) -> usize { + barrier.wait(); + result + } + + /// Wait for the barrier and then return `result` + async fn do_work_async(result: usize, barrier: Arc) -> usize { + barrier.wait().await; + result + } + + fn exec() -> DedicatedExecutor { + exec_with_threads(1) + } + + fn exec2() -> DedicatedExecutor { + exec_with_threads(2) + } + + fn exec_with_threads(threads: usize) -> DedicatedExecutor { + let mut runtime_builder = Builder::new_multi_thread(); + runtime_builder.worker_threads(threads); + runtime_builder.enable_all(); + + DedicatedExecutorBuilder::from(runtime_builder) + .with_name("Test DedicatedExecutor") + .build() + } + + async fn test_io_runtime_multi_thread_impl(dedicated: DedicatedExecutor) { + let io_runtime_id = std::thread::current().id(); + dedicated + .spawn_cpu(async move { + let dedicated_id = std::thread::current().id(); + let spawned = + DedicatedExecutor::spawn_io( + async move { std::thread::current().id() }, + ) + .await; + + assert_ne!(dedicated_id, spawned); + assert_eq!(io_runtime_id, spawned); + }) + .await + .unwrap(); + } + + #[tokio::test] + async fn basic() { + let barrier = Arc::new(Barrier::new(2)); + + let exec = exec(); + let dedicated_task = exec.spawn_cpu(do_work(42, Arc::clone(&barrier))); + + // Note the dedicated task will never complete if it runs on + // the main tokio thread (as this test is not using the + // 'multithreaded' version of the executor and the call to + // barrier.wait actually blocks the tokio thread) + barrier.wait(); + + // should be able to get the result + assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn basic_clone() { + let barrier = Arc::new(Barrier::new(2)); + let exec = exec(); + // Run task on clone should work fine + let dedicated_task = exec.clone().spawn_cpu(do_work(42, Arc::clone(&barrier))); + barrier.wait(); + assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn drop_empty_exec() { + exec(); + } + + #[tokio::test] + async fn drop_clone() { + let barrier = Arc::new(Barrier::new(2)); + let exec = exec(); + + drop(exec.clone()); + + let task = exec.spawn_cpu(do_work(42, Arc::clone(&barrier))); + barrier.wait(); + assert_eq!(task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + #[should_panic(expected = "foo")] + async fn just_panic() { + struct S(DedicatedExecutor); + + impl Drop for S { + fn drop(&mut self) { + self.0.join().now_or_never(); + } + } + + let exec = exec(); + let _s = S(exec); + + // this must not lead to a double-panic and SIGILL + panic!("foo") + } + + #[tokio::test] + async fn multi_task() { + let barrier = Arc::new(Barrier::new(3)); + + // make an executor with two threads + let exec = exec2(); + let dedicated_task1 = exec.spawn_cpu(do_work(11, Arc::clone(&barrier))); + let dedicated_task2 = exec.spawn_cpu(do_work(42, Arc::clone(&barrier))); + + // block main thread until completion of other two tasks + barrier.wait(); + + // should be able to get the result + assert_eq!(dedicated_task1.await.unwrap(), 11); + assert_eq!(dedicated_task2.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn tokio_spawn() { + let exec = exec2(); + + // spawn a task that spawns to other tasks and ensure they run on the dedicated + // executor + let dedicated_task = exec.spawn_cpu(async move { + // spawn separate tasks + let t1 = tokio::task::spawn(async { 25usize }); + t1.await.unwrap() + }); + + // Validate the inner task ran to completion (aka it did not panic) + assert_eq!(dedicated_task.await.unwrap(), 25); + + exec.join().await; + } + + #[tokio::test] + async fn panic_on_executor_str() { + let exec = exec(); + let dedicated_task = exec.spawn_cpu(async move { + if true { + panic!("At the disco, on the dedicated task scheduler"); + } else { + 42 + } + }); + + // should not be able to get the result + let err = dedicated_task.await.unwrap_err(); + assert_eq!( + err.to_string(), + "Panic: At the disco, on the dedicated task scheduler", + ); + + exec.join().await; + } + + #[tokio::test] + async fn panic_on_executor_string() { + let exec = exec(); + let dedicated_task = exec.spawn_cpu(async move { + if true { + panic!("{} {}", 1, 2); + } else { + 42 + } + }); + + // should not be able to get the result + let err = dedicated_task.await.unwrap_err(); + assert_eq!(err.to_string(), "Panic: 1 2",); + + exec.join().await; + } + + #[tokio::test] + async fn panic_on_executor_other() { + let exec = exec(); + let dedicated_task = exec.spawn_cpu(async move { + if true { + panic_any(1) + } else { + 42 + } + }); + + // should not be able to get the result + let err = dedicated_task.await.unwrap_err(); + assert_eq!(err.to_string(), "Panic: unknown internal error",); + + exec.join().await; + } + + #[tokio::test] + async fn executor_shutdown_while_task_running() { + let barrier_1 = Arc::new(Barrier::new(2)); + let captured_1 = Arc::clone(&barrier_1); + let barrier_2 = Arc::new(Barrier::new(2)); + let captured_2 = Arc::clone(&barrier_2); + + let exec = exec(); + let dedicated_task = exec.spawn_cpu(async move { + captured_1.wait(); + do_work(42, captured_2).await + }); + barrier_1.wait(); + + exec.shutdown(); + // block main thread until completion of the outstanding task + barrier_2.wait(); + + // task should complete successfully + assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn executor_submit_task_after_shutdown() { + let exec = exec(); + + // Simulate trying to submit tasks once executor has shutdown + exec.shutdown(); + let dedicated_task = exec.spawn_cpu(async { 11 }); + + // task should complete, but return an error + let err = dedicated_task.await.unwrap_err(); + assert_eq!( + err.to_string(), + "Worker thread gone, executor was likely shut down" + ); + + exec.join().await; + } + + #[tokio::test] + async fn executor_submit_task_after_clone_shutdown() { + let exec = exec(); + + // shutdown the clone (but not the exec) + exec.clone().join().await; + + // Simulate trying to submit tasks once executor has shutdown + let dedicated_task = exec.spawn_cpu(async { 11 }); + + // task should complete, but return an error + let err = dedicated_task.await.unwrap_err(); + assert_eq!( + err.to_string(), + "Worker thread gone, executor was likely shut down" + ); + + exec.join().await; + } + + #[tokio::test] + async fn executor_join() { + let exec = exec(); + // test it doesn't hang + exec.join().await; + } + + #[tokio::test] + async fn executor_join2() { + let exec = exec(); + // test it doesn't hang + exec.join().await; + exec.join().await; + } + + #[tokio::test] + #[allow(clippy::redundant_clone)] + async fn executor_clone_join() { + let exec = exec(); + // test it doesn't hang + exec.clone().join().await; + exec.clone().join().await; + exec.join().await; + } + + #[tokio::test] + async fn drop_receiver() { + // create empty executor + let exec = exec(); + + // create first blocked task + let barrier1_pre = Arc::new(AsyncBarrier::new(2)); + let barrier1_pre_captured = Arc::clone(&barrier1_pre); + let barrier1_post = Arc::new(AsyncBarrier::new(2)); + let barrier1_post_captured = Arc::clone(&barrier1_post); + let dedicated_task1 = exec.spawn_cpu(async move { + barrier1_pre_captured.wait().await; + do_work_async(11, barrier1_post_captured).await + }); + barrier1_pre.wait().await; + + // create second blocked task + let barrier2_pre = Arc::new(AsyncBarrier::new(2)); + let barrier2_pre_captured = Arc::clone(&barrier2_pre); + let barrier2_post = Arc::new(AsyncBarrier::new(2)); + let barrier2_post_captured = Arc::clone(&barrier2_post); + let dedicated_task2 = exec.spawn_cpu(async move { + barrier2_pre_captured.wait().await; + do_work_async(22, barrier2_post_captured).await + }); + barrier2_pre.wait().await; + + // cancel task + drop(dedicated_task1); + + // cancelation might take a short while + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if Arc::strong_count(&barrier1_post) == 1 { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await + } + }) + .await + .unwrap(); + + // unblock other task + barrier2_post.wait().await; + assert_eq!(dedicated_task2.await.unwrap(), 22); + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if Arc::strong_count(&barrier2_post) == 1 { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await + } + }) + .await + .unwrap(); + + exec.join().await; + } + + #[tokio::test] + async fn test_io_runtime_multi_thread() { + let mut runtime_builder = tokio::runtime::Builder::new_multi_thread(); + runtime_builder.worker_threads(1); + + let dedicated = DedicatedExecutorBuilder::from(runtime_builder) + .with_name("Test DedicatedExecutor") + .build(); + test_io_runtime_multi_thread_impl(dedicated).await; + } + + #[tokio::test] + async fn test_io_runtime_current_thread() { + let runtime_builder = tokio::runtime::Builder::new_current_thread(); + + let dedicated = DedicatedExecutorBuilder::new_from_builder(runtime_builder) + .with_name("Test DedicatedExecutor") + .build(); + test_io_runtime_multi_thread_impl(dedicated).await; + } + + #[tokio::test] + async fn test_that_default_executor_prevents_io() { + let exec = DedicatedExecutorBuilder::new().build(); + + let io_disabled = exec + .spawn_cpu(async move { + // the only way (I've found) to test if IO is enabled is to use it and observer if tokio panics + TcpListener::bind("127.0.0.1:0") + .catch_unwind() + .await + .is_err() + }) + .await + .unwrap(); + + assert!(io_disabled) + } + + #[tokio::test] + async fn test_happy_path() { + let rt_io = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + let io_thread_id = rt_io + .spawn(async move { std::thread::current().id() }) + .await + .unwrap(); + let parent_thread_id = std::thread::current().id(); + assert_ne!(io_thread_id, parent_thread_id); + + DedicatedExecutor::register_io_runtime(Some(rt_io.handle().clone())); + + let measured_thread_id = + DedicatedExecutor::spawn_io(async move { std::thread::current().id() }).await; + assert_eq!(measured_thread_id, io_thread_id); + + rt_io.shutdown_background(); + } + + #[tokio::test] + #[should_panic(expected = "IO runtime registered")] + async fn test_panic_if_no_runtime_registered() { + DedicatedExecutor::spawn_io(futures::future::ready(())).await; + } + + #[tokio::test] + #[should_panic(expected = "IO runtime was shut down")] + async fn test_io_runtime_down() { + let rt_io = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + DedicatedExecutor::register_io_runtime(Some(rt_io.handle().clone())); + + tokio::task::spawn_blocking(move || { + rt_io.shutdown_timeout(Duration::from_secs(1)); + }) + .await + .unwrap(); + + DedicatedExecutor::spawn_io(futures::future::ready(())).await; + } +} diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs index 317bd3203ab1b..9565a85c9fb09 100644 --- a/datafusion/execution/src/lib.rs +++ b/datafusion/execution/src/lib.rs @@ -22,11 +22,13 @@ pub mod cache; pub mod config; +pub mod cross_rt_stream; +pub mod dedicated_executor; pub mod disk_manager; pub mod memory_pool; pub mod object_store; pub mod runtime_env; -mod stream; +pub mod stream; mod task; pub mod registry { diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 5420080efd3e3..b91bba381d2a3 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -27,8 +27,10 @@ use crate::{ }; use crate::cache::cache_manager::{CacheManager, CacheManagerConfig}; +use crate::dedicated_executor::DedicatedExecutor; use datafusion_common::{DataFusionError, Result}; use object_store::ObjectStore; +use std::future::Future; use std::path::PathBuf; use std::sync::Arc; use std::{ @@ -67,6 +69,8 @@ use url::Url; /// .build() /// .unwrap(); /// ``` +/// +/// TODO examples for spawning IO / CPU bound work pub struct RuntimeEnv { /// Runtime memory management pub memory_pool: Arc, @@ -76,6 +80,8 @@ pub struct RuntimeEnv { pub cache_manager: Arc, /// Object Store Registry pub object_store_registry: Arc, + /// Optional dedicated executor + pub dedicated_executor: Option, } impl Debug for RuntimeEnv { @@ -155,6 +161,79 @@ impl RuntimeEnv { .get_store(url.as_ref()) .map_err(DataFusionError::from) } + + /// Return the current DedicatedExecutor + pub fn dedicated_executor(&self) -> Option<&DedicatedExecutor> { + self.dedicated_executor.as_ref() + } + + /// Run an async future that will do IO operations on the IO thread pool + /// if there is a [`DedicatedExecutor`] registered + /// + /// If no DedicatedExecutor is registered, runs the operation on the current + /// thread pool + /// + /// See [`DedicatedExecutor`] for more details + pub async fn spawn_io(&self, fut: Fut) -> Fut::Output + where + Fut: Future + Send + 'static, + Fut::Output: Send, + { + if self.dedicated_executor().is_some() { + println!("Running IO on dedicated executor"); + // TODO it is strange that the io thread is tied directly to a thread + // local rather than bound to an instance + DedicatedExecutor::spawn_io(fut).await + } else { + // otherwise run on the current runtime + println!("Running IO on current runtime"); + fut.await + } + } + + /// Run an async future that will do CPU operations on the CPU task pool + /// if there is a [`DedicatedExecutor`] registered + /// + /// If no DedicatedExecutor is registered, runs the operation on the current + /// thread pool + /// + /// See [`DedicatedExecutor`] for more details + pub async fn spawn_cpu(&self, fut: Fut) -> Result + where + Fut: Future + Send + 'static, + Fut::Output: Send, + { + if let Some(dedicated_executor) = self.dedicated_executor() { + println!("Running CPU on dedicated executor"); + dedicated_executor.spawn_cpu(fut).await.map_err(|e| { + DataFusionError::Context( + "Join Error (panic)".to_string(), + Box::new(DataFusionError::External(e.into())), + ) + }) + } else { + // otherwise run on the current runtime + println!("Running CPU on current runtime"); + Ok(fut.await) + } + } + + /// Figure out signature necessary to run futures with references in them on + /// a different thread pool. + pub async fn spawn_cpu2<'a, Fut>(&self, fut: Fut) -> Fut::Output + where + Fut: Future + Send + 'a, + Fut::Output: Send, + { + if let Some(dedicated_executor) = self.dedicated_executor() { + println!("2 Running CPU on dedicated executor"); + dedicated_executor.spawn_cpu2(fut).await + } else { + // otherwise run on the current runtime + println!("2 Running CPU on current runtime"); + fut.await + } + } } impl Default for RuntimeEnv { @@ -183,6 +262,8 @@ pub struct RuntimeEnvBuilder { pub cache_manager: CacheManagerConfig, /// ObjectStoreRegistry to get object store based on url pub object_store_registry: Arc, + /// Optional dedicated executor + pub dedicated_executor: Option, } impl Default for RuntimeEnvBuilder { @@ -199,6 +280,7 @@ impl RuntimeEnvBuilder { memory_pool: Default::default(), cache_manager: Default::default(), object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()), + dedicated_executor: None, } } @@ -229,6 +311,15 @@ impl RuntimeEnvBuilder { self } + /// Customize [`DedicatedExecutor`] to be used for running queries + pub fn with_dedicated_executor( + mut self, + dedicated_executor: DedicatedExecutor, + ) -> Self { + self.dedicated_executor = Some(dedicated_executor); + self + } + /// Specify the total memory to use while running the DataFusion /// plan to `max_memory * memory_fraction` in bytes. /// @@ -255,6 +346,7 @@ impl RuntimeEnvBuilder { memory_pool, cache_manager, object_store_registry, + dedicated_executor, } = self; let memory_pool = memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default())); @@ -264,6 +356,7 @@ impl RuntimeEnvBuilder { disk_manager: DiskManager::try_new(disk_manager)?, cache_manager: CacheManager::try_new(&cache_manager)?, object_store_registry, + dedicated_executor, }) } diff --git a/datafusion/execution/src/stream.rs b/datafusion/execution/src/stream.rs index f3eb7b77e03cc..9dcf6eba90595 100644 --- a/datafusion/execution/src/stream.rs +++ b/datafusion/execution/src/stream.rs @@ -18,7 +18,10 @@ use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::Result; use futures::Stream; +use pin_project_lite::pin_project; use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; /// Trait for types that stream [RecordBatch] /// @@ -51,3 +54,53 @@ pub trait RecordBatchStream: Stream> { /// [`Stream`]s there is no mechanism to prevent callers polling so returning /// `Ready(None)` is recommended. pub type SendableRecordBatchStream = Pin>; + +pin_project! { + /// Combines a [`Stream`] with a [`SchemaRef`] implementing + /// [`RecordBatchStream`] for the combination + pub struct RecordBatchStreamAdapter { + schema: SchemaRef, + + #[pin] + stream: S, + } +} + +impl RecordBatchStreamAdapter { + /// Creates a new [`RecordBatchStreamAdapter`] from the provided schema and stream + pub fn new(schema: SchemaRef, stream: S) -> Self { + Self { schema, stream } + } +} + +impl std::fmt::Debug for RecordBatchStreamAdapter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RecordBatchStreamAdapter") + .field("schema", &self.schema) + .finish() + } +} + +impl Stream for RecordBatchStreamAdapter +where + S: Stream>, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().stream.poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} + +impl RecordBatchStream for RecordBatchStreamAdapter +where + S: Stream>, +{ + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index bb0e21fdfd158..a1917985eb018 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -65,6 +65,10 @@ parking_lot = { workspace = true } pin-project-lite = "^0.2.7" rand = { workspace = true } tokio = { workspace = true } +# todo figure out if we need to use tokio_stream / could use record batch receiver stream +tokio-stream = {version = "0.1"} +object_store = { workspace = true } + [dev-dependencies] criterion = { version = "0.5", features = ["async_futures"] } diff --git a/datafusion/physical-plan/src/stream.rs b/datafusion/physical-plan/src/stream.rs index b3054299b7f73..0be003f393219 100644 --- a/datafusion/physical-plan/src/stream.rs +++ b/datafusion/physical-plan/src/stream.rs @@ -33,10 +33,12 @@ use datafusion_execution::TaskContext; use futures::stream::BoxStream; use futures::{Future, Stream, StreamExt}; use log::debug; -use pin_project_lite::pin_project; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task::JoinSet; +// Backwards compatibility +pub use datafusion_execution::stream::RecordBatchStreamAdapter; + /// Creates a stream from a collection of producing tasks, routing panics to the stream. /// /// Note that this is similar to [`ReceiverStream` from tokio-stream], with the differences being: @@ -335,56 +337,6 @@ impl RecordBatchReceiverStream { } } -pin_project! { - /// Combines a [`Stream`] with a [`SchemaRef`] implementing - /// [`RecordBatchStream`] for the combination - pub struct RecordBatchStreamAdapter { - schema: SchemaRef, - - #[pin] - stream: S, - } -} - -impl RecordBatchStreamAdapter { - /// Creates a new [`RecordBatchStreamAdapter`] from the provided schema and stream - pub fn new(schema: SchemaRef, stream: S) -> Self { - Self { schema, stream } - } -} - -impl std::fmt::Debug for RecordBatchStreamAdapter { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RecordBatchStreamAdapter") - .field("schema", &self.schema) - .finish() - } -} - -impl Stream for RecordBatchStreamAdapter -where - S: Stream>, -{ - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().stream.poll_next(cx) - } - - fn size_hint(&self) -> (usize, Option) { - self.stream.size_hint() - } -} - -impl RecordBatchStream for RecordBatchStreamAdapter -where - S: Stream>, -{ - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } -} - /// `EmptyRecordBatchStream` can be used to create a [`RecordBatchStream`] /// that will produce no results pub struct EmptyRecordBatchStream {