Skip to content

Commit 022cf79

Browse files
Fix various cases in which we were still using the old codec
1 parent 098b9ae commit 022cf79

File tree

5 files changed

+19
-393
lines changed

5 files changed

+19
-393
lines changed

ballista/rust/core/src/serde/physical_plan/from_proto.rs

Lines changed: 0 additions & 382 deletions
Original file line numberDiff line numberDiff line change
@@ -84,388 +84,6 @@ use log::debug;
8484
use protobuf::physical_expr_node::ExprType;
8585
use protobuf::physical_plan_node::PhysicalPlanType;
8686

87-
impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
88-
type Error = BallistaError;
89-
90-
fn try_into(self) -> Result<Arc<dyn ExecutionPlan>, Self::Error> {
91-
let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
92-
proto_error(format!(
93-
"physical_plan::from_proto() Unsupported physical plan '{:?}'",
94-
self
95-
))
96-
})?;
97-
match plan {
98-
PhysicalPlanType::Projection(projection) => {
99-
let input: Arc<dyn ExecutionPlan> =
100-
convert_box_required!(projection.input)?;
101-
let exprs = projection
102-
.expr
103-
.iter()
104-
.zip(projection.expr_name.iter())
105-
.map(|(expr, name)| Ok((expr.try_into()?, name.to_string())))
106-
.collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>, Self::Error>>(
107-
)?;
108-
Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
109-
}
110-
PhysicalPlanType::Filter(filter) => {
111-
let input: Arc<dyn ExecutionPlan> = convert_box_required!(filter.input)?;
112-
let predicate = filter
113-
.expr
114-
.as_ref()
115-
.ok_or_else(|| {
116-
BallistaError::General(
117-
"filter (FilterExecNode) in PhysicalPlanNode is missing."
118-
.to_owned(),
119-
)
120-
})?
121-
.try_into()?;
122-
Ok(Arc::new(FilterExec::try_new(predicate, input)?))
123-
}
124-
PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new(
125-
scan.base_conf.as_ref().unwrap().try_into()?,
126-
scan.has_header,
127-
str_to_byte(&scan.delimiter)?,
128-
))),
129-
PhysicalPlanType::ParquetScan(scan) => {
130-
Ok(Arc::new(ParquetExec::new(
131-
scan.base_conf.as_ref().unwrap().try_into()?,
132-
// TODO predicate should be de-serialized
133-
None,
134-
)))
135-
}
136-
PhysicalPlanType::AvroScan(scan) => Ok(Arc::new(AvroExec::new(
137-
scan.base_conf.as_ref().unwrap().try_into()?,
138-
))),
139-
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
140-
let input: Arc<dyn ExecutionPlan> =
141-
convert_box_required!(coalesce_batches.input)?;
142-
Ok(Arc::new(CoalesceBatchesExec::new(
143-
input,
144-
coalesce_batches.target_batch_size as usize,
145-
)))
146-
}
147-
PhysicalPlanType::Merge(merge) => {
148-
let input: Arc<dyn ExecutionPlan> = convert_box_required!(merge.input)?;
149-
Ok(Arc::new(CoalescePartitionsExec::new(input)))
150-
}
151-
PhysicalPlanType::Repartition(repart) => {
152-
let input: Arc<dyn ExecutionPlan> = convert_box_required!(repart.input)?;
153-
match repart.partition_method {
154-
Some(PartitionMethod::Hash(ref hash_part)) => {
155-
let expr = hash_part
156-
.hash_expr
157-
.iter()
158-
.map(|e| e.try_into())
159-
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>, _>>()?;
160-
161-
Ok(Arc::new(RepartitionExec::try_new(
162-
input,
163-
Partitioning::Hash(
164-
expr,
165-
hash_part.partition_count.try_into().unwrap(),
166-
),
167-
)?))
168-
}
169-
Some(PartitionMethod::RoundRobin(partition_count)) => {
170-
Ok(Arc::new(RepartitionExec::try_new(
171-
input,
172-
Partitioning::RoundRobinBatch(
173-
partition_count.try_into().unwrap(),
174-
),
175-
)?))
176-
}
177-
Some(PartitionMethod::Unknown(partition_count)) => {
178-
Ok(Arc::new(RepartitionExec::try_new(
179-
input,
180-
Partitioning::UnknownPartitioning(
181-
partition_count.try_into().unwrap(),
182-
),
183-
)?))
184-
}
185-
_ => Err(BallistaError::General(
186-
"Invalid partitioning scheme".to_owned(),
187-
)),
188-
}
189-
}
190-
PhysicalPlanType::GlobalLimit(limit) => {
191-
let input: Arc<dyn ExecutionPlan> = convert_box_required!(limit.input)?;
192-
Ok(Arc::new(GlobalLimitExec::new(input, limit.limit as usize)))
193-
}
194-
PhysicalPlanType::LocalLimit(limit) => {
195-
let input: Arc<dyn ExecutionPlan> = convert_box_required!(limit.input)?;
196-
Ok(Arc::new(LocalLimitExec::new(input, limit.limit as usize)))
197-
}
198-
PhysicalPlanType::Window(window_agg) => {
199-
let input: Arc<dyn ExecutionPlan> =
200-
convert_box_required!(window_agg.input)?;
201-
let input_schema = window_agg
202-
.input_schema
203-
.as_ref()
204-
.ok_or_else(|| {
205-
BallistaError::General(
206-
"input_schema in WindowAggrNode is missing.".to_owned(),
207-
)
208-
})?
209-
.clone();
210-
let physical_schema: SchemaRef =
211-
SchemaRef::new((&input_schema).try_into()?);
212-
213-
let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
214-
.window_expr
215-
.iter()
216-
.zip(window_agg.window_expr_name.iter())
217-
.map(|(expr, name)| {
218-
let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
219-
proto_error("Unexpected empty window physical expression")
220-
})?;
221-
222-
match expr_type {
223-
ExprType::WindowExpr(window_node) => Ok(create_window_expr(
224-
&convert_required!(window_node.window_function)?,
225-
name.to_owned(),
226-
&[convert_box_required!(window_node.expr)?],
227-
&[],
228-
&[],
229-
Some(WindowFrame::default()),
230-
&physical_schema,
231-
)?),
232-
_ => Err(BallistaError::General(
233-
"Invalid expression for WindowAggrExec".to_string(),
234-
)),
235-
}
236-
})
237-
.collect::<Result<Vec<_>, _>>()?;
238-
239-
Ok(Arc::new(WindowAggExec::try_new(
240-
physical_window_expr,
241-
input,
242-
Arc::new((&input_schema).try_into()?),
243-
)?))
244-
}
245-
PhysicalPlanType::HashAggregate(hash_agg) => {
246-
let input: Arc<dyn ExecutionPlan> =
247-
convert_box_required!(hash_agg.input)?;
248-
let mode = protobuf::AggregateMode::from_i32(hash_agg.mode).ok_or_else(|| {
249-
proto_error(format!(
250-
"Received a HashAggregateNode message with unknown AggregateMode {}",
251-
hash_agg.mode
252-
))
253-
})?;
254-
let agg_mode: AggregateMode = match mode {
255-
protobuf::AggregateMode::Partial => AggregateMode::Partial,
256-
protobuf::AggregateMode::Final => AggregateMode::Final,
257-
protobuf::AggregateMode::FinalPartitioned => {
258-
AggregateMode::FinalPartitioned
259-
}
260-
};
261-
let group = hash_agg
262-
.group_expr
263-
.iter()
264-
.zip(hash_agg.group_expr_name.iter())
265-
.map(|(expr, name)| {
266-
expr.try_into().map(|expr| (expr, name.to_string()))
267-
})
268-
.collect::<Result<Vec<_>, _>>()?;
269-
270-
let input_schema = hash_agg
271-
.input_schema
272-
.as_ref()
273-
.ok_or_else(|| {
274-
BallistaError::General(
275-
"input_schema in HashAggregateNode is missing.".to_owned(),
276-
)
277-
})?
278-
.clone();
279-
let physical_schema: SchemaRef =
280-
SchemaRef::new((&input_schema).try_into()?);
281-
282-
let physical_aggr_expr: Vec<Arc<dyn AggregateExpr>> = hash_agg
283-
.aggr_expr
284-
.iter()
285-
.zip(hash_agg.aggr_expr_name.iter())
286-
.map(|(expr, name)| {
287-
let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
288-
proto_error("Unexpected empty aggregate physical expression")
289-
})?;
290-
291-
match expr_type {
292-
ExprType::AggregateExpr(agg_node) => {
293-
let aggr_function =
294-
protobuf::AggregateFunction::from_i32(
295-
agg_node.aggr_function,
296-
)
297-
.ok_or_else(
298-
|| {
299-
proto_error(format!(
300-
"Received an unknown aggregate function: {}",
301-
agg_node.aggr_function
302-
))
303-
},
304-
)?;
305-
306-
Ok(create_aggregate_expr(
307-
&aggr_function.into(),
308-
false,
309-
&[convert_box_required!(agg_node.expr)?],
310-
&physical_schema,
311-
name.to_string(),
312-
)?)
313-
}
314-
_ => Err(BallistaError::General(
315-
"Invalid aggregate expression for HashAggregateExec"
316-
.to_string(),
317-
)),
318-
}
319-
})
320-
.collect::<Result<Vec<_>, _>>()?;
321-
322-
Ok(Arc::new(HashAggregateExec::try_new(
323-
agg_mode,
324-
group,
325-
physical_aggr_expr,
326-
input,
327-
Arc::new((&input_schema).try_into()?),
328-
)?))
329-
}
330-
PhysicalPlanType::HashJoin(hashjoin) => {
331-
let left: Arc<dyn ExecutionPlan> = convert_box_required!(hashjoin.left)?;
332-
let right: Arc<dyn ExecutionPlan> =
333-
convert_box_required!(hashjoin.right)?;
334-
let on: Vec<(Column, Column)> = hashjoin
335-
.on
336-
.iter()
337-
.map(|col| {
338-
let left = into_required!(col.left)?;
339-
let right = into_required!(col.right)?;
340-
Ok((left, right))
341-
})
342-
.collect::<Result<_, Self::Error>>()?;
343-
let join_type = protobuf::JoinType::from_i32(hashjoin.join_type)
344-
.ok_or_else(|| {
345-
proto_error(format!(
346-
"Received a HashJoinNode message with unknown JoinType {}",
347-
hashjoin.join_type
348-
))
349-
})?;
350-
351-
let partition_mode =
352-
protobuf::PartitionMode::from_i32(hashjoin.partition_mode)
353-
.ok_or_else(|| {
354-
proto_error(format!(
355-
"Received a HashJoinNode message with unknown PartitionMode {}",
356-
hashjoin.partition_mode
357-
))
358-
})?;
359-
let partition_mode = match partition_mode {
360-
protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
361-
protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
362-
};
363-
Ok(Arc::new(HashJoinExec::try_new(
364-
left,
365-
right,
366-
on,
367-
&join_type.into(),
368-
partition_mode,
369-
&hashjoin.null_equals_null,
370-
)?))
371-
}
372-
PhysicalPlanType::CrossJoin(crossjoin) => {
373-
let left: Arc<dyn ExecutionPlan> = convert_box_required!(crossjoin.left)?;
374-
let right: Arc<dyn ExecutionPlan> =
375-
convert_box_required!(crossjoin.right)?;
376-
Ok(Arc::new(CrossJoinExec::try_new(left, right)?))
377-
}
378-
PhysicalPlanType::ShuffleWriter(shuffle_writer) => {
379-
let input: Arc<dyn ExecutionPlan> =
380-
convert_box_required!(shuffle_writer.input)?;
381-
382-
let output_partitioning = parse_protobuf_hash_partitioning(
383-
shuffle_writer.output_partitioning.as_ref(),
384-
)?;
385-
386-
Ok(Arc::new(ShuffleWriterExec::try_new(
387-
shuffle_writer.job_id.clone(),
388-
shuffle_writer.stage_id as usize,
389-
input,
390-
"".to_string(), // this is intentional but hacky - the executor will fill this in
391-
output_partitioning,
392-
)?))
393-
}
394-
PhysicalPlanType::ShuffleReader(shuffle_reader) => {
395-
let schema = Arc::new(convert_required!(shuffle_reader.schema)?);
396-
let partition_location: Vec<Vec<PartitionLocation>> = shuffle_reader
397-
.partition
398-
.iter()
399-
.map(|p| {
400-
p.location
401-
.iter()
402-
.map(|l| l.clone().try_into())
403-
.collect::<Result<Vec<_>, _>>()
404-
})
405-
.collect::<Result<Vec<_>, BallistaError>>()?;
406-
let shuffle_reader =
407-
ShuffleReaderExec::try_new(partition_location, schema)?;
408-
Ok(Arc::new(shuffle_reader))
409-
}
410-
PhysicalPlanType::Empty(empty) => {
411-
let schema = Arc::new(convert_required!(empty.schema)?);
412-
Ok(Arc::new(EmptyExec::new(empty.produce_one_row, schema)))
413-
}
414-
PhysicalPlanType::Sort(sort) => {
415-
let input: Arc<dyn ExecutionPlan> = convert_box_required!(sort.input)?;
416-
let exprs = sort
417-
.expr
418-
.iter()
419-
.map(|expr| {
420-
let expr = expr.expr_type.as_ref().ok_or_else(|| {
421-
proto_error(format!(
422-
"physical_plan::from_proto() Unexpected expr {:?}",
423-
self
424-
))
425-
})?;
426-
if let protobuf::physical_expr_node::ExprType::Sort(sort_expr) = expr {
427-
let expr = sort_expr
428-
.expr
429-
.as_ref()
430-
.ok_or_else(|| {
431-
proto_error(format!(
432-
"physical_plan::from_proto() Unexpected sort expr {:?}",
433-
self
434-
))
435-
})?
436-
.as_ref();
437-
Ok(PhysicalSortExpr {
438-
expr: expr.try_into()?,
439-
options: SortOptions {
440-
descending: !sort_expr.asc,
441-
nulls_first: sort_expr.nulls_first,
442-
},
443-
})
444-
} else {
445-
Err(BallistaError::General(format!(
446-
"physical_plan::from_proto() {:?}",
447-
self
448-
)))
449-
}
450-
})
451-
.collect::<Result<Vec<_>, _>>()?;
452-
Ok(Arc::new(SortExec::try_new(exprs, input)?))
453-
}
454-
PhysicalPlanType::Unresolved(unresolved_shuffle) => {
455-
let schema = Arc::new(convert_required!(unresolved_shuffle.schema)?);
456-
Ok(Arc::new(UnresolvedShuffleExec {
457-
stage_id: unresolved_shuffle.stage_id as usize,
458-
schema,
459-
input_partition_count: unresolved_shuffle.input_partition_count
460-
as usize,
461-
output_partition_count: unresolved_shuffle.output_partition_count
462-
as usize,
463-
}))
464-
}
465-
}
466-
}
467-
}
468-
46987
impl From<&protobuf::PhysicalColumn> for Column {
47088
fn from(c: &protobuf::PhysicalColumn) -> Column {
47189
Column::new(&c.name, c.index as usize)

0 commit comments

Comments
 (0)