Skip to content

Commit abee72c

Browse files
committed
resolve review comments
1 parent 2e2b090 commit abee72c

File tree

6 files changed

+31
-16
lines changed

6 files changed

+31
-16
lines changed

ballista/rust/core/src/serde/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,7 @@ mod tests {
708708
let store = Arc::new(LocalFileSystem {});
709709
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
710710
let session_state =
711-
SessionState::with_config(SessionConfig::new(), runtime.clone())
711+
SessionState::with_config_rt(SessionConfig::new(), runtime.clone())
712712
.with_query_planner(Arc::new(TopKQueryPlanner {}));
713713

714714
let ctx = SessionContext::with_state(session_state);

ballista/rust/core/src/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ pub fn create_df_ctx_with_ballista_query_planner<T: 'static + AsLogicalPlan>(
238238
let session_config = SessionConfig::new()
239239
.with_target_partitions(config.default_shuffle_partitions())
240240
.with_information_schema(true);
241-
let mut session_state = SessionState::with_config(
241+
let mut session_state = SessionState::with_config_rt(
242242
session_config,
243243
Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()),
244244
)

datafusion/src/execution/context.rs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ impl SessionContext {
164164

165165
/// Creates a new session context using the provided configuration and RuntimeEnv.
166166
pub fn with_config_rt(config: SessionConfig, runtime: Arc<RuntimeEnv>) -> Self {
167-
let state = SessionState::with_config(config, runtime);
167+
let state = SessionState::with_config_rt(config, runtime);
168168
Self {
169169
session_id: state.session_id.clone(),
170170
session_start_time: chrono::Utc::now(),
@@ -1010,15 +1010,15 @@ pub struct SessionState {
10101010

10111011
/// Default session builder using the provided configuration
10121012
pub fn default_session_builder(config: SessionConfig) -> SessionState {
1013-
SessionState::with_config(
1013+
SessionState::with_config_rt(
10141014
config,
10151015
Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()),
10161016
)
10171017
}
10181018

10191019
impl SessionState {
10201020
/// Returns new SessionState using the provided configuration and runtime
1021-
pub fn with_config(config: SessionConfig, runtime: Arc<RuntimeEnv>) -> Self {
1021+
pub fn with_config_rt(config: SessionConfig, runtime: Arc<RuntimeEnv>) -> Self {
10221022
let session_id = Uuid::new_v4().to_string();
10231023

10241024
let catalog_list = Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
@@ -1287,17 +1287,17 @@ pub enum TaskProperties {
12871287
/// Task Execution Context
12881288
pub struct TaskContext {
12891289
/// Session Id
1290-
pub session_id: String,
1290+
session_id: String,
12911291
/// Optional Task Identify
1292-
pub task_id: Option<String>,
1292+
task_id: Option<String>,
12931293
/// Task properties
1294-
pub properties: TaskProperties,
1294+
properties: TaskProperties,
12951295
/// Scalar functions associated with this task context
1296-
pub scalar_functions: HashMap<String, Arc<ScalarUDF>>,
1296+
scalar_functions: HashMap<String, Arc<ScalarUDF>>,
12971297
/// Aggregate functions associated with this task context
1298-
pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
1298+
aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
12991299
/// Runtime environment associated with this task context
1300-
pub runtime: Arc<RuntimeEnv>,
1300+
runtime: Arc<RuntimeEnv>,
13011301
}
13021302

13031303
impl TaskContext {
@@ -1351,6 +1351,21 @@ impl TaskContext {
13511351
TaskProperties::SessionConfig(session_config) => session_config.clone(),
13521352
}
13531353
}
1354+
1355+
/// Return the session_id of this [TaskContext]
1356+
pub fn session_id(&self) -> String {
1357+
self.session_id.clone()
1358+
}
1359+
1360+
/// Return the task_id of this [TaskContext]
1361+
pub fn task_id(&self) -> Option<String> {
1362+
self.task_id.clone()
1363+
}
1364+
1365+
/// Return the [RuntimeEnv] associated with this [TaskContext]
1366+
pub fn runtime_env(&self) -> Arc<RuntimeEnv> {
1367+
self.runtime.clone()
1368+
}
13541369
}
13551370

13561371
/// Create a new task context instance from SessionContext
@@ -2961,7 +2976,7 @@ mod tests {
29612976
#[tokio::test]
29622977
async fn custom_query_planner() -> Result<()> {
29632978
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
2964-
let session_state = SessionState::with_config(SessionConfig::new(), runtime)
2979+
let session_state = SessionState::with_config_rt(SessionConfig::new(), runtime)
29652980
.with_query_planner(Arc::new(MyQueryPlanner {}));
29662981
let ctx = SessionContext::with_state(session_state);
29672982

datafusion/src/physical_plan/planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1466,7 +1466,7 @@ mod tests {
14661466

14671467
fn make_session_state() -> SessionState {
14681468
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
1469-
SessionState::with_config(SessionConfig::new(), runtime)
1469+
SessionState::with_config_rt(SessionConfig::new(), runtime)
14701470
}
14711471

14721472
async fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {

datafusion/src/physical_plan/sorts/sort.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -582,9 +582,9 @@ async fn do_sort(
582582
expr,
583583
metrics_set,
584584
Arc::new(context.session_config()),
585-
context.runtime.clone(),
585+
context.runtime_env(),
586586
);
587-
context.runtime.register_requester(sorter.id());
587+
context.runtime_env().register_requester(sorter.id());
588588
while let Some(batch) = input.next().await {
589589
let batch = batch?;
590590
sorter.insert_batch(batch).await?;

datafusion/tests/user_defined_plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ async fn topk_plan() -> Result<()> {
245245
fn make_topk_context() -> SessionContext {
246246
let config = SessionConfig::new().with_target_partitions(48);
247247
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
248-
let state = SessionState::with_config(config, runtime)
248+
let state = SessionState::with_config_rt(config, runtime)
249249
.with_query_planner(Arc::new(TopKQueryPlanner {}))
250250
.add_optimizer_rule(Arc::new(TopKOptimizerRule {}));
251251
SessionContext::with_state(state)

0 commit comments

Comments
 (0)