Skip to content

Commit b7bdea2

Browse files
author
Wang
committed
Add SessionBuilder fn type to SchedulerServer to allow customized SessionContext creation
1 parent 5e3d695 commit b7bdea2

File tree

5 files changed

+46
-7
lines changed

5 files changed

+46
-7
lines changed

ballista/rust/client/src/prelude.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
2020
pub use crate::context::BallistaContext;
2121
pub use ballista_core::config::BallistaConfig;
22-
pub use ballista_core::config::BALLISTA_DEFAULT_SHUFFLE_PARTITIONS;
2322
pub use ballista_core::config::BALLISTA_DEFAULT_BATCH_SIZE;
23+
pub use ballista_core::config::BALLISTA_DEFAULT_SHUFFLE_PARTITIONS;
2424
pub use ballista_core::error::{BallistaError, Result};
2525

2626
pub use futures::StreamExt;

ballista/rust/scheduler/src/lib.rs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,14 @@ use ballista_core::serde::protobuf::execute_query_params::OptionalSessionId;
105105
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
106106
use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
107107
use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec};
108+
use datafusion::execution::context::{default_session_builder, SessionState};
108109
use datafusion::prelude::{SessionConfig, SessionContext};
109110
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
110111
use tokio::sync::{mpsc, RwLock};
111112
use tonic::transport::Channel;
112113

113114
type ExecutorsClient = Arc<RwLock<HashMap<String, ExecutorGrpcClient<Channel>>>>;
115+
type SessionBuilder = fn(SessionConfig) -> SessionState;
114116

115117
#[derive(Clone)]
116118
pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
@@ -120,6 +122,8 @@ pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
120122
scheduler_loop: Option<SchedulerLoop>,
121123
executors_client: Option<ExecutorsClient>,
122124
codec: BallistaCodec<T, U>,
125+
/// SessionState Builder
126+
session_builder: SessionBuilder,
123127
/// DataFusion session contexts that are registered within the SchedulerServer
124128
session_context_registry: Arc<SessionContextRegistry>,
125129
}
@@ -134,13 +138,28 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
134138
config: Arc<dyn ConfigBackendClient>,
135139
namespace: String,
136140
codec: BallistaCodec<T, U>,
141+
) -> Self {
142+
SchedulerServer::new_with_builder(
143+
config,
144+
namespace,
145+
codec,
146+
default_session_builder,
147+
)
148+
}
149+
150+
pub fn new_with_builder(
151+
config: Arc<dyn ConfigBackendClient>,
152+
namespace: String,
153+
codec: BallistaCodec<T, U>,
154+
session_builder: SessionBuilder,
137155
) -> Self {
138156
SchedulerServer::new_with_policy(
139157
config,
140158
namespace,
141159
TaskSchedulingPolicy::PullStaged,
142160
None,
143161
codec,
162+
session_builder,
144163
)
145164
}
146165

@@ -150,6 +169,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
150169
policy: TaskSchedulingPolicy,
151170
scheduler_loop: Option<SchedulerLoop>,
152171
codec: BallistaCodec<T, U>,
172+
session_builder: SessionBuilder,
153173
) -> Self {
154174
let state = Arc::new(SchedulerState::new(config, namespace, codec.clone()));
155175

@@ -168,6 +188,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
168188
scheduler_loop,
169189
executors_client,
170190
codec,
191+
session_builder,
171192
session_context_registry: Arc::new(SessionContextRegistry::default()),
172193
}
173194
}
@@ -771,7 +792,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
771792
update_datafusion_session_context(session_ctx, &config)
772793
}
773794
_ => {
774-
let df_session = create_datafusion_session_context(&config);
795+
let df_session =
796+
create_datafusion_session_context(&config, self.session_builder);
775797
let session_id = df_session.session_id.clone();
776798
self.session_context_registry
777799
.register_session(session_id, df_session.clone())
@@ -972,7 +994,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
972994
error!("{}", msg);
973995
tonic::Status::internal(msg)
974996
})?;
975-
let df_session = create_datafusion_session_context(&config);
997+
let df_session =
998+
create_datafusion_session_context(&config, self.session_builder);
976999
let session_id = df_session.session_id.clone();
9771000
self.session_context_registry
9781001
.register_session(session_id.clone(), df_session.clone())
@@ -1000,15 +1023,19 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
10001023
}
10011024

10021025
/// Create a new DataFusion session context from Ballista Configuration
1003-
pub fn create_datafusion_session_context(config: &BallistaConfig) -> Arc<SessionContext> {
1026+
pub fn create_datafusion_session_context(
1027+
config: &BallistaConfig,
1028+
session_builder: SessionBuilder,
1029+
) -> Arc<SessionContext> {
10041030
let config = SessionConfig::new()
10051031
.with_target_partitions(config.default_shuffle_partitions())
10061032
.with_batch_size(config.default_batch_size())
10071033
.with_repartition_joins(config.repartition_joins())
10081034
.with_repartition_aggregations(config.repartition_aggregations())
10091035
.with_repartition_windows(config.repartition_windows())
10101036
.with_parquet_pruning(config.parquet_pruning());
1011-
Arc::new(SessionContext::with_config(config))
1037+
let session_state = session_builder(config);
1038+
Arc::new(SessionContext::with_state(session_state))
10121039
}
10131040

10141041
/// Update the existing DataFusion session context with Ballista Configuration

ballista/rust/scheduler/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ mod config {
6363
}
6464

6565
use config::prelude::*;
66+
use datafusion::execution::context::default_session_builder;
6667

6768
async fn start_server(
6869
config_backend: Arc<dyn ConfigBackendClient>,
@@ -90,6 +91,7 @@ async fn start_server(
9091
policy,
9192
Some(SchedulerLoop { tx_job }),
9293
BallistaCodec::default(),
94+
default_session_builder,
9395
);
9496
let task_scheduler =
9597
TaskScheduler::new(Arc::new(scheduler_server.clone()));

benchmarks/src/bin/tpch.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ use std::{
3030
};
3131

3232
use ballista::context::BallistaContext;
33-
use ballista::prelude::{BallistaConfig, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, BALLISTA_DEFAULT_BATCH_SIZE};
33+
use ballista::prelude::{
34+
BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
35+
};
3436

3537
use datafusion::datasource::{MemTable, TableProvider};
3638
use datafusion::error::{DataFusionError, Result};

datafusion/src/execution/context.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1151,8 +1151,16 @@ pub struct SessionState {
11511151
pub runtime: Arc<RuntimeEnv>,
11521152
}
11531153

1154+
/// Default session builder using the provided configuration
1155+
pub fn default_session_builder(config: SessionConfig) -> SessionState {
1156+
SessionState::with_config(
1157+
config,
1158+
Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()),
1159+
)
1160+
}
1161+
11541162
impl SessionState {
1155-
/// Returns new SessionState using the provided configuration
1163+
/// Returns new SessionState using the provided configuration and runtime
11561164
pub fn with_config(config: SessionConfig, runtime: Arc<RuntimeEnv>) -> Self {
11571165
let catalog_list = Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
11581166

0 commit comments

Comments
 (0)