Skip to content

Commit e1f0ae7

Browse files
More clippy issues and use global context everywehere
1 parent 7b2d6ab commit e1f0ae7

File tree

2 files changed

+16
-16
lines changed

2 files changed

+16
-16
lines changed

ballista/rust/scheduler/src/lib.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -746,19 +746,19 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
746746
) -> std::result::Result<Response<ExecuteQueryResult>, tonic::Status> {
747747
if let ExecuteQueryParams {
748748
query: Some(query),
749-
settings,
749+
settings: _,
750750
} = request.into_inner()
751751
{
752752
// parse config
753-
let mut config_builder = BallistaConfig::builder();
754-
for kv_pair in &settings {
755-
config_builder = config_builder.set(&kv_pair.key, &kv_pair.value);
756-
}
757-
let config = config_builder.build().map_err(|e| {
758-
let msg = format!("Could not parse configs: {}", e);
759-
error!("{}", msg);
760-
tonic::Status::internal(msg)
761-
})?;
753+
// let mut config_builder = BallistaConfig::builder();
754+
// for kv_pair in &settings {
755+
// config_builder = config_builder.set(&kv_pair.key, &kv_pair.value);
756+
// }
757+
// let config = config_builder.build().map_err(|e| {
758+
// let msg = format!("Could not parse configs: {}", e);
759+
// error!("{}", msg);
760+
// tonic::Status::internal(msg)
761+
// })?;
762762

763763
let plan = match query {
764764
Query::LogicalPlan(message) => {
@@ -817,9 +817,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
817817
Some(self.scheduler_env.as_ref().unwrap().tx_job.clone())
818818
}
819819
};
820+
let datafusion_ctx = self.ctx.read().await.clone();
820821
tokio::spawn(async move {
821822
// create physical plan using DataFusion
822-
let datafusion_ctx = create_datafusion_context(&config);
823823
macro_rules! fail_job {
824824
($code :expr) => {{
825825
match $code {

ballista/rust/scheduler/src/state/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ impl<T: 'static + AsExecutionPlan> SchedulerState<T> {
299299
)));
300300
}
301301
let value = T::try_decode(value.as_slice())?;
302-
let plan = value.try_into_physical_plan(&ctx)?;
302+
let plan = value.try_into_physical_plan(ctx)?;
303303
// let value: PhysicalPlanNode = decode_protobuf(value)?;
304304
// Ok((&value).try_into()?)
305305
Ok(plan)
@@ -366,7 +366,7 @@ impl<T: 'static + AsExecutionPlan> SchedulerState<T> {
366366
ctx: &ExecutionContext,
367367
) -> Result<Option<(TaskStatus, Arc<dyn ExecutionPlan>)>> {
368368
let tasks = self.get_all_tasks().await?;
369-
self.assign_next_schedulable_task_inner(executor_id, tasks, &ctx)
369+
self.assign_next_schedulable_task_inner(executor_id, tasks, ctx)
370370
.await
371371
}
372372

@@ -377,7 +377,7 @@ impl<T: 'static + AsExecutionPlan> SchedulerState<T> {
377377
ctx: &ExecutionContext,
378378
) -> Result<Option<(TaskStatus, Arc<dyn ExecutionPlan>)>> {
379379
let job_tasks = self.get_job_tasks(job_id).await?;
380-
self.assign_next_schedulable_task_inner(executor_id, job_tasks, &ctx)
380+
self.assign_next_schedulable_task_inner(executor_id, job_tasks, ctx)
381381
.await
382382
}
383383

@@ -387,7 +387,7 @@ impl<T: 'static + AsExecutionPlan> SchedulerState<T> {
387387
tasks: HashMap<String, TaskStatus>,
388388
ctx: &ExecutionContext,
389389
) -> Result<Option<(TaskStatus, Arc<dyn ExecutionPlan>)>> {
390-
match self.get_next_schedulable_task(tasks, &ctx).await? {
390+
match self.get_next_schedulable_task(tasks, ctx).await? {
391391
Some((status, plan)) => {
392392
let mut status = status.clone();
393393
status.status = Some(task_status::Status::Running(RunningTask {
@@ -413,7 +413,7 @@ impl<T: 'static + AsExecutionPlan> SchedulerState<T> {
413413
if status.status.is_none() {
414414
let partition = status.partition_id.as_ref().unwrap();
415415
let plan = self
416-
.get_stage_plan(&partition.job_id, partition.stage_id as usize, &ctx)
416+
.get_stage_plan(&partition.job_id, partition.stage_id as usize, ctx)
417417
.await?;
418418

419419
// Let's try to resolve any unresolved shuffles we find

0 commit comments

Comments
 (0)