Skip to content

Commit 4994eda

Browse files
mingmwangWang
and
Wang
authored
Refactor ExecutionContext and related conf to support multi-tenancy configurations - Part 1 (#1987)
Co-authored-by: Wang <[email protected]>
1 parent a166c51 commit 4994eda

File tree

134 files changed

+1678
-1452
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

134 files changed

+1678
-1452
lines changed

ballista/rust/client/src/context.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use datafusion::datasource::TableProvider;
3434
use datafusion::error::{DataFusionError, Result};
3535
use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScan};
3636
use datafusion::prelude::{
37-
AvroReadOptions, CsvReadOptions, ExecutionConfig, ExecutionContext,
37+
AvroReadOptions, CsvReadOptions, SessionConfig, SessionContext,
3838
};
3939
use datafusion::sql::parser::{DFParser, FileType, Statement as DFStatement};
4040

@@ -304,8 +304,8 @@ impl BallistaContext {
304304
// the show tables、 show columns sql can not run at scheduler because the tables is store at client
305305
if is_show {
306306
let state = self.state.lock();
307-
ctx = ExecutionContext::with_config(
308-
ExecutionConfig::new().with_information_schema(
307+
ctx = SessionContext::with_config(
308+
SessionConfig::new().with_information_schema(
309309
state.config.default_with_information_schema(),
310310
),
311311
);

ballista/rust/core/src/execution_plans/distributed_query.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use datafusion::physical_plan::{
4242

4343
use crate::serde::{AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec};
4444
use async_trait::async_trait;
45-
use datafusion::execution::runtime_env::RuntimeEnv;
45+
use datafusion::execution::context::TaskContext;
4646
use futures::future;
4747
use futures::StreamExt;
4848
use log::{error, info};
@@ -150,7 +150,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
150150
async fn execute(
151151
&self,
152152
partition: usize,
153-
_runtime: Arc<RuntimeEnv>,
153+
_context: Arc<TaskContext>,
154154
) -> Result<SendableRecordBatchStream> {
155155
assert_eq!(0, partition);
156156

ballista/rust/core/src/execution_plans/shuffle_reader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use crate::utils::WrappedStream;
2525
use async_trait::async_trait;
2626
use datafusion::arrow::datatypes::SchemaRef;
2727

28-
use datafusion::execution::runtime_env::RuntimeEnv;
2928
use datafusion::physical_plan::expressions::PhysicalSortExpr;
3029
use datafusion::physical_plan::metrics::{
3130
ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
@@ -39,6 +38,7 @@ use datafusion::{
3938
};
4039
use futures::{future, StreamExt};
4140

41+
use datafusion::execution::context::TaskContext;
4242
use log::info;
4343

4444
/// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec
@@ -106,7 +106,7 @@ impl ExecutionPlan for ShuffleReaderExec {
106106
async fn execute(
107107
&self,
108108
partition: usize,
109-
_runtime: Arc<RuntimeEnv>,
109+
_context: Arc<TaskContext>,
110110
) -> Result<SendableRecordBatchStream> {
111111
info!("ShuffleReaderExec::execute({})", partition);
112112

ballista/rust/core/src/execution_plans/shuffle_writer.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
4242

4343
use datafusion::arrow::record_batch::RecordBatch;
4444
use datafusion::error::{DataFusionError, Result};
45-
use datafusion::execution::runtime_env::RuntimeEnv;
4645
use datafusion::physical_plan::common::IPCWriter;
4746
use datafusion::physical_plan::hash_utils::create_hashes;
4847
use datafusion::physical_plan::memory::MemoryStream;
@@ -55,6 +54,7 @@ use datafusion::physical_plan::{
5554
};
5655
use futures::StreamExt;
5756

57+
use datafusion::execution::context::TaskContext;
5858
use log::{debug, info};
5959

6060
/// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
@@ -138,11 +138,11 @@ impl ShuffleWriterExec {
138138
pub async fn execute_shuffle_write(
139139
&self,
140140
input_partition: usize,
141-
runtime: Arc<RuntimeEnv>,
141+
context: Arc<TaskContext>,
142142
) -> Result<Vec<ShuffleWritePartition>> {
143143
let now = Instant::now();
144144

145-
let mut stream = self.plan.execute(input_partition, runtime).await?;
145+
let mut stream = self.plan.execute(input_partition, context).await?;
146146

147147
let mut path = PathBuf::from(&self.work_dir);
148148
path.push(&self.job_id);
@@ -358,9 +358,9 @@ impl ExecutionPlan for ShuffleWriterExec {
358358
async fn execute(
359359
&self,
360360
partition: usize,
361-
runtime: Arc<RuntimeEnv>,
361+
context: Arc<TaskContext>,
362362
) -> Result<SendableRecordBatchStream> {
363-
let part_loc = self.execute_shuffle_write(partition, runtime).await?;
363+
let part_loc = self.execute_shuffle_write(partition, context).await?;
364364

365365
// build metadata result batch
366366
let num_writers = part_loc.len();
@@ -448,11 +448,13 @@ mod tests {
448448
use datafusion::physical_plan::expressions::Column;
449449

450450
use datafusion::physical_plan::memory::MemoryExec;
451+
use datafusion::prelude::SessionContext;
451452
use tempfile::TempDir;
452453

453454
#[tokio::test]
454455
async fn test() -> Result<()> {
455-
let runtime = Arc::new(RuntimeEnv::default());
456+
let session_ctx = SessionContext::new();
457+
let task_ctx = session_ctx.task_ctx();
456458

457459
let input_plan = Arc::new(CoalescePartitionsExec::new(create_input_plan()?));
458460
let work_dir = TempDir::new()?;
@@ -463,7 +465,7 @@ mod tests {
463465
work_dir.into_path().to_str().unwrap().to_owned(),
464466
Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
465467
)?;
466-
let mut stream = query_stage.execute(0, runtime).await?;
468+
let mut stream = query_stage.execute(0, task_ctx).await?;
467469
let batches = utils::collect_stream(&mut stream)
468470
.await
469471
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
@@ -506,7 +508,8 @@ mod tests {
506508

507509
#[tokio::test]
508510
async fn test_partitioned() -> Result<()> {
509-
let runtime = Arc::new(RuntimeEnv::default());
511+
let session_ctx = SessionContext::new();
512+
let task_ctx = session_ctx.task_ctx();
510513

511514
let input_plan = create_input_plan()?;
512515
let work_dir = TempDir::new()?;
@@ -517,7 +520,7 @@ mod tests {
517520
work_dir.into_path().to_str().unwrap().to_owned(),
518521
Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
519522
)?;
520-
let mut stream = query_stage.execute(0, runtime).await?;
523+
let mut stream = query_stage.execute(0, task_ctx).await?;
521524
let batches = utils::collect_stream(&mut stream)
522525
.await
523526
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;

ballista/rust/core/src/execution_plans/unresolved_shuffle.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::sync::Arc;
2121
use async_trait::async_trait;
2222
use datafusion::arrow::datatypes::SchemaRef;
2323
use datafusion::error::{DataFusionError, Result};
24-
use datafusion::execution::runtime_env::RuntimeEnv;
24+
use datafusion::execution::context::TaskContext;
2525
use datafusion::physical_plan::expressions::PhysicalSortExpr;
2626
use datafusion::physical_plan::{
2727
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
@@ -104,7 +104,7 @@ impl ExecutionPlan for UnresolvedShuffleExec {
104104
async fn execute(
105105
&self,
106106
_partition: usize,
107-
_runtime: Arc<RuntimeEnv>,
107+
_context: Arc<TaskContext>,
108108
) -> Result<SendableRecordBatchStream> {
109109
Err(DataFusionError::Plan(
110110
"Ballista UnresolvedShuffleExec does not support execution".to_owned(),

ballista/rust/core/src/serde/logical_plan/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use datafusion::logical_plan::{
3636
Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan,
3737
LogicalPlanBuilder, Repartition, TableScan, Values,
3838
};
39-
use datafusion::prelude::ExecutionContext;
39+
use datafusion::prelude::SessionContext;
4040

4141
use prost::bytes::BufMut;
4242
use prost::Message;
@@ -70,7 +70,7 @@ impl AsLogicalPlan for LogicalPlanNode {
7070

7171
fn try_into_logical_plan(
7272
&self,
73-
ctx: &ExecutionContext,
73+
ctx: &SessionContext,
7474
extension_codec: &dyn LogicalExtensionCodec,
7575
) -> Result<LogicalPlan, BallistaError> {
7676
let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
@@ -920,7 +920,7 @@ mod roundtrip_tests {
920920
roundtrip_test!($initial_struct, protobuf::LogicalPlanNode, $struct_type);
921921
};
922922
($initial_struct:ident) => {
923-
let ctx = ExecutionContext::new();
923+
let ctx = SessionContext::new();
924924
let codec: BallistaCodec<
925925
protobuf::LogicalPlanNode,
926926
protobuf::PhysicalPlanNode,
@@ -1252,7 +1252,7 @@ mod roundtrip_tests {
12521252

12531253
#[tokio::test]
12541254
async fn roundtrip_logical_plan_custom_ctx() -> Result<()> {
1255-
let ctx = ExecutionContext::new();
1255+
let ctx = SessionContext::new();
12561256
let codec: BallistaCodec<protobuf::LogicalPlanNode, protobuf::PhysicalPlanNode> =
12571257
BallistaCodec::default();
12581258
let custom_object_store = Arc::new(TestObjectStore {});

ballista/rust/core/src/serde/mod.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::{error::BallistaError, serde::scheduler::Action as BallistaAction};
3030

3131
use datafusion::logical_plan::plan::Extension;
3232
use datafusion::physical_plan::ExecutionPlan;
33-
use datafusion::prelude::ExecutionContext;
33+
use datafusion::prelude::SessionContext;
3434
use prost::Message;
3535

3636
// include the generated protobuf source as a submodule
@@ -67,7 +67,7 @@ pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
6767

6868
fn try_into_logical_plan(
6969
&self,
70-
ctx: &ExecutionContext,
70+
ctx: &SessionContext,
7171
extension_codec: &dyn LogicalExtensionCodec,
7272
) -> Result<LogicalPlan, BallistaError>;
7373

@@ -130,7 +130,7 @@ pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
130130

131131
fn try_into_physical_plan(
132132
&self,
133-
ctx: &ExecutionContext,
133+
ctx: &SessionContext,
134134
extension_codec: &dyn PhysicalExtensionCodec,
135135
) -> Result<Arc<dyn ExecutionPlan>, BallistaError>;
136136

@@ -345,8 +345,7 @@ mod tests {
345345
use datafusion::arrow::datatypes::SchemaRef;
346346
use datafusion::datasource::object_store::local::LocalFileSystem;
347347
use datafusion::error::DataFusionError;
348-
use datafusion::execution::context::{ExecutionContextState, QueryPlanner};
349-
use datafusion::execution::runtime_env::RuntimeEnv;
348+
use datafusion::execution::context::{QueryPlanner, SessionState, TaskContext};
350349
use datafusion::logical_plan::plan::Extension;
351350
use datafusion::logical_plan::{
352351
col, DFSchemaRef, Expr, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode,
@@ -357,7 +356,7 @@ mod tests {
357356
DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalPlanner,
358357
SendableRecordBatchStream, Statistics,
359358
};
360-
use datafusion::prelude::{CsvReadOptions, ExecutionConfig, ExecutionContext};
359+
use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext};
361360
use prost::Message;
362361
use std::any::Any;
363362

@@ -512,7 +511,7 @@ mod tests {
512511
async fn execute(
513512
&self,
514513
_partition: usize,
515-
_runtime: Arc<RuntimeEnv>,
514+
_context: Arc<TaskContext>,
516515
) -> datafusion::error::Result<SendableRecordBatchStream> {
517516
Err(DataFusionError::NotImplemented(
518517
"not implemented".to_string(),
@@ -548,7 +547,7 @@ mod tests {
548547
node: &dyn UserDefinedLogicalNode,
549548
logical_inputs: &[&LogicalPlan],
550549
physical_inputs: &[Arc<dyn ExecutionPlan>],
551-
_ctx_state: &ExecutionContextState,
550+
_session_state: &SessionState,
552551
) -> datafusion::error::Result<Option<Arc<dyn ExecutionPlan>>> {
553552
Ok(
554553
if let Some(topk_node) = node.as_any().downcast_ref::<TopKPlanNode>() {
@@ -575,7 +574,7 @@ mod tests {
575574
async fn create_physical_plan(
576575
&self,
577576
logical_plan: &LogicalPlan,
578-
ctx_state: &ExecutionContextState,
577+
session_state: &SessionState,
579578
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
580579
// Teach the default physical planner how to plan TopK nodes.
581580
let physical_planner =
@@ -584,7 +583,7 @@ mod tests {
584583
)]);
585584
// Delegate most work of physical planning to the default physical planner
586585
physical_planner
587-
.create_physical_plan(logical_plan, ctx_state)
586+
.create_physical_plan(logical_plan, session_state)
588587
.await
589588
}
590589
}
@@ -694,9 +693,9 @@ mod tests {
694693
async fn test_extension_plan() -> crate::error::Result<()> {
695694
let store = Arc::new(LocalFileSystem {});
696695
let config =
697-
ExecutionConfig::new().with_query_planner(Arc::new(TopKQueryPlanner {}));
696+
SessionConfig::new().with_query_planner(Arc::new(TopKQueryPlanner {}));
698697

699-
let ctx = ExecutionContext::with_config(config);
698+
let ctx = SessionContext::with_config(config);
700699

701700
let scan = LogicalPlanBuilder::scan_csv(
702701
store,

ballista/rust/core/src/serde/physical_plan/from_proto.rs

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,10 @@ use crate::serde::{from_proto_binary_op, proto_error, protobuf};
2626
use crate::{convert_box_required, convert_required};
2727
use chrono::{TimeZone, Utc};
2828

29-
use datafusion::catalog::catalog::{CatalogList, MemoryCatalogList};
3029
use datafusion::datasource::object_store::local::LocalFileSystem;
31-
use datafusion::datasource::object_store::{FileMeta, ObjectStoreRegistry, SizedFile};
30+
use datafusion::datasource::object_store::{FileMeta, SizedFile};
3231
use datafusion::datasource::PartitionedFile;
33-
use datafusion::execution::context::{
34-
ExecutionConfig, ExecutionContextState, ExecutionProps,
35-
};
36-
use datafusion::execution::runtime_env::RuntimeEnv;
32+
use datafusion::execution::context::SessionState;
3733

3834
use datafusion::physical_plan::file_format::FileScanConfig;
3935

@@ -157,22 +153,12 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
157153
.map(|x| x.try_into())
158154
.collect::<Result<Vec<_>, _>>()?;
159155

160-
let catalog_list =
161-
Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
162-
163-
let ctx_state = ExecutionContextState {
164-
catalog_list,
165-
scalar_functions: Default::default(),
166-
aggregate_functions: Default::default(),
167-
config: ExecutionConfig::new(),
168-
execution_props: ExecutionProps::new(),
169-
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
170-
runtime_env: Arc::new(RuntimeEnv::default()),
171-
};
156+
// TODO Do not create new the SessionState
157+
let session_state = SessionState::new();
172158

173159
let fun_expr = functions::create_physical_fun(
174160
&(&scalar_function).into(),
175-
&ctx_state.execution_props,
161+
&session_state.execution_props,
176162
)?;
177163

178164
Arc::new(ScalarFunctionExpr::new(

ballista/rust/core/src/serde/physical_plan/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ use datafusion::physical_plan::windows::{create_window_expr, WindowAggExec};
5656
use datafusion::physical_plan::{
5757
AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr,
5858
};
59-
use datafusion::prelude::ExecutionContext;
59+
use datafusion::prelude::SessionContext;
6060
use prost::bytes::BufMut;
6161
use prost::Message;
6262
use std::convert::TryInto;
@@ -87,7 +87,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
8787

8888
fn try_into_physical_plan(
8989
&self,
90-
ctx: &ExecutionContext,
90+
ctx: &SessionContext,
9191
extension_codec: &dyn PhysicalExtensionCodec,
9292
) -> Result<Arc<dyn ExecutionPlan>, BallistaError> {
9393
let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
@@ -883,7 +883,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
883883

884884
fn decode_scan_config(
885885
proto: &protobuf::FileScanExecConf,
886-
ctx: &ExecutionContext,
886+
ctx: &SessionContext,
887887
) -> Result<FileScanConfig, BallistaError> {
888888
let schema = Arc::new(convert_required!(proto.schema)?);
889889
let projection = proto
@@ -940,7 +940,7 @@ mod roundtrip_tests {
940940
use datafusion::datasource::object_store::local::LocalFileSystem;
941941
use datafusion::datasource::PartitionedFile;
942942
use datafusion::physical_plan::sorts::sort::SortExec;
943-
use datafusion::prelude::ExecutionContext;
943+
use datafusion::prelude::SessionContext;
944944
use datafusion::{
945945
arrow::{
946946
compute::kernels::sort::SortOptions,
@@ -969,7 +969,7 @@ mod roundtrip_tests {
969969
use crate::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode};
970970

971971
fn roundtrip_test(exec_plan: Arc<dyn ExecutionPlan>) -> Result<()> {
972-
let ctx = ExecutionContext::new();
972+
let ctx = SessionContext::new();
973973
let codec: BallistaCodec<LogicalPlanNode, PhysicalPlanNode> =
974974
BallistaCodec::default();
975975
let proto: protobuf::PhysicalPlanNode =

0 commit comments

Comments
 (0)