Skip to content

Commit 8057fa1

Browse files
Remove unused code and add test case for logical plan round trip with custom object store
1 parent 71a42bb commit 8057fa1

File tree

4 files changed

+123
-310
lines changed

4 files changed

+123
-310
lines changed

ballista/rust/core/src/serde/logical_plan/from_proto.rs

Lines changed: 0 additions & 302 deletions
Original file line numberDiff line numberDiff line change
@@ -51,308 +51,6 @@ use std::{
5151
unimplemented,
5252
};
5353

54-
impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
55-
type Error = BallistaError;
56-
57-
fn try_into(self) -> Result<LogicalPlan, Self::Error> {
58-
let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
59-
proto_error(format!(
60-
"logical_plan::from_proto() Unsupported logical plan '{:?}'",
61-
self
62-
))
63-
})?;
64-
match plan {
65-
LogicalPlanType::Values(values) => {
66-
let n_cols = values.n_cols as usize;
67-
let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
68-
Ok(Vec::new())
69-
} else if values.values_list.len() % n_cols != 0 {
70-
Err(BallistaError::General(format!(
71-
"Invalid values list length, expect {} to be divisible by {}",
72-
values.values_list.len(),
73-
n_cols
74-
)))
75-
} else {
76-
values
77-
.values_list
78-
.chunks_exact(n_cols)
79-
.map(|r| {
80-
r.iter()
81-
.map(|v| v.try_into())
82-
.collect::<Result<Vec<_>, _>>()
83-
})
84-
.collect::<Result<Vec<_>, _>>()
85-
}?;
86-
LogicalPlanBuilder::values(values)?
87-
.build()
88-
.map_err(|e| e.into())
89-
}
90-
LogicalPlanType::Projection(projection) => {
91-
let input: LogicalPlan = convert_box_required!(projection.input)?;
92-
let x: Vec<Expr> = projection
93-
.expr
94-
.iter()
95-
.map(|expr| expr.try_into())
96-
.collect::<Result<Vec<_>, _>>()?;
97-
LogicalPlanBuilder::from(input)
98-
.project_with_alias(
99-
x,
100-
projection.optional_alias.as_ref().map(|a| match a {
101-
protobuf::projection_node::OptionalAlias::Alias(alias) => {
102-
alias.clone()
103-
}
104-
}),
105-
)?
106-
.build()
107-
.map_err(|e| e.into())
108-
}
109-
LogicalPlanType::Selection(selection) => {
110-
let input: LogicalPlan = convert_box_required!(selection.input)?;
111-
let expr: Expr = selection
112-
.expr
113-
.as_ref()
114-
.ok_or_else(|| {
115-
BallistaError::General("expression required".to_string())
116-
})?
117-
.try_into()?;
118-
LogicalPlanBuilder::from(input)
119-
.filter(expr)?
120-
.build()
121-
.map_err(|e| e.into())
122-
}
123-
LogicalPlanType::Window(window) => {
124-
let input: LogicalPlan = convert_box_required!(window.input)?;
125-
let window_expr = window
126-
.window_expr
127-
.iter()
128-
.map(|expr| expr.try_into())
129-
.collect::<Result<Vec<Expr>, _>>()?;
130-
LogicalPlanBuilder::from(input)
131-
.window(window_expr)?
132-
.build()
133-
.map_err(|e| e.into())
134-
}
135-
LogicalPlanType::Aggregate(aggregate) => {
136-
let input: LogicalPlan = convert_box_required!(aggregate.input)?;
137-
let group_expr = aggregate
138-
.group_expr
139-
.iter()
140-
.map(|expr| expr.try_into())
141-
.collect::<Result<Vec<Expr>, _>>()?;
142-
let aggr_expr = aggregate
143-
.aggr_expr
144-
.iter()
145-
.map(|expr| expr.try_into())
146-
.collect::<Result<Vec<Expr>, _>>()?;
147-
LogicalPlanBuilder::from(input)
148-
.aggregate(group_expr, aggr_expr)?
149-
.build()
150-
.map_err(|e| e.into())
151-
}
152-
LogicalPlanType::ListingScan(scan) => {
153-
let schema: Schema = convert_required!(scan.schema)?;
154-
155-
let mut projection = None;
156-
if let Some(columns) = &scan.projection {
157-
let column_indices = columns
158-
.columns
159-
.iter()
160-
.map(|name| schema.index_of(name))
161-
.collect::<Result<Vec<usize>, _>>()?;
162-
projection = Some(column_indices);
163-
}
164-
165-
let filters = scan
166-
.filters
167-
.iter()
168-
.map(|e| e.try_into())
169-
.collect::<Result<Vec<_>, _>>()?;
170-
171-
let file_format: Arc<dyn FileFormat> =
172-
match scan.file_format_type.as_ref().ok_or_else(|| {
173-
proto_error(format!(
174-
"logical_plan::from_proto() Unsupported file format '{:?}'",
175-
self
176-
))
177-
})? {
178-
&FileFormatType::Parquet(protobuf::ParquetFormat {
179-
enable_pruning,
180-
}) => Arc::new(
181-
ParquetFormat::default().with_enable_pruning(enable_pruning),
182-
),
183-
FileFormatType::Csv(protobuf::CsvFormat {
184-
has_header,
185-
delimiter,
186-
}) => Arc::new(
187-
CsvFormat::default()
188-
.with_has_header(*has_header)
189-
.with_delimiter(str_to_byte(delimiter)?),
190-
),
191-
FileFormatType::Avro(..) => Arc::new(AvroFormat::default()),
192-
};
193-
194-
let options = ListingOptions {
195-
file_extension: scan.file_extension.clone(),
196-
format: file_format,
197-
table_partition_cols: scan.table_partition_cols.clone(),
198-
collect_stat: scan.collect_stat,
199-
target_partitions: scan.target_partitions as usize,
200-
};
201-
202-
let provider = ListingTable::new(
203-
Arc::new(LocalFileSystem {}),
204-
scan.path.clone(),
205-
Arc::new(schema),
206-
options,
207-
);
208-
209-
LogicalPlanBuilder::scan_with_filters(
210-
&scan.table_name,
211-
Arc::new(provider),
212-
projection,
213-
filters,
214-
)?
215-
.build()
216-
.map_err(|e| e.into())
217-
}
218-
LogicalPlanType::Sort(sort) => {
219-
let input: LogicalPlan = convert_box_required!(sort.input)?;
220-
let sort_expr: Vec<Expr> = sort
221-
.expr
222-
.iter()
223-
.map(|expr| expr.try_into())
224-
.collect::<Result<Vec<Expr>, _>>()?;
225-
LogicalPlanBuilder::from(input)
226-
.sort(sort_expr)?
227-
.build()
228-
.map_err(|e| e.into())
229-
}
230-
LogicalPlanType::Repartition(repartition) => {
231-
use datafusion::logical_plan::Partitioning;
232-
let input: LogicalPlan = convert_box_required!(repartition.input)?;
233-
use protobuf::repartition_node::PartitionMethod;
234-
let pb_partition_method = repartition.partition_method.clone().ok_or_else(|| {
235-
BallistaError::General(String::from(
236-
"Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'",
237-
))
238-
})?;
239-
240-
let partitioning_scheme = match pb_partition_method {
241-
PartitionMethod::Hash(protobuf::HashRepartition {
242-
hash_expr: pb_hash_expr,
243-
partition_count,
244-
}) => Partitioning::Hash(
245-
pb_hash_expr
246-
.iter()
247-
.map(|pb_expr| pb_expr.try_into())
248-
.collect::<Result<Vec<_>, _>>()?,
249-
partition_count as usize,
250-
),
251-
PartitionMethod::RoundRobin(partition_count) => {
252-
Partitioning::RoundRobinBatch(partition_count as usize)
253-
}
254-
};
255-
256-
LogicalPlanBuilder::from(input)
257-
.repartition(partitioning_scheme)?
258-
.build()
259-
.map_err(|e| e.into())
260-
}
261-
LogicalPlanType::EmptyRelation(empty_relation) => {
262-
LogicalPlanBuilder::empty(empty_relation.produce_one_row)
263-
.build()
264-
.map_err(|e| e.into())
265-
}
266-
LogicalPlanType::CreateExternalTable(create_extern_table) => {
267-
let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
268-
BallistaError::General(String::from(
269-
"Protobuf deserialization error, CreateExternalTableNode was missing required field schema.",
270-
))
271-
})?;
272-
273-
let pb_file_type: protobuf::FileType =
274-
create_extern_table.file_type.try_into()?;
275-
276-
Ok(LogicalPlan::CreateExternalTable(CreateExternalTable {
277-
schema: pb_schema.try_into()?,
278-
name: create_extern_table.name.clone(),
279-
location: create_extern_table.location.clone(),
280-
file_type: pb_file_type.into(),
281-
has_header: create_extern_table.has_header,
282-
}))
283-
}
284-
LogicalPlanType::Analyze(analyze) => {
285-
let input: LogicalPlan = convert_box_required!(analyze.input)?;
286-
LogicalPlanBuilder::from(input)
287-
.explain(analyze.verbose, true)?
288-
.build()
289-
.map_err(|e| e.into())
290-
}
291-
LogicalPlanType::Explain(explain) => {
292-
let input: LogicalPlan = convert_box_required!(explain.input)?;
293-
LogicalPlanBuilder::from(input)
294-
.explain(explain.verbose, false)?
295-
.build()
296-
.map_err(|e| e.into())
297-
}
298-
LogicalPlanType::Limit(limit) => {
299-
let input: LogicalPlan = convert_box_required!(limit.input)?;
300-
LogicalPlanBuilder::from(input)
301-
.limit(limit.limit as usize)?
302-
.build()
303-
.map_err(|e| e.into())
304-
}
305-
LogicalPlanType::Join(join) => {
306-
let left_keys: Vec<Column> =
307-
join.left_join_column.iter().map(|i| i.into()).collect();
308-
let right_keys: Vec<Column> =
309-
join.right_join_column.iter().map(|i| i.into()).collect();
310-
let join_type =
311-
protobuf::JoinType::from_i32(join.join_type).ok_or_else(|| {
312-
proto_error(format!(
313-
"Received a JoinNode message with unknown JoinType {}",
314-
join.join_type
315-
))
316-
})?;
317-
let join_constraint = protobuf::JoinConstraint::from_i32(
318-
join.join_constraint,
319-
)
320-
.ok_or_else(|| {
321-
proto_error(format!(
322-
"Received a JoinNode message with unknown JoinConstraint {}",
323-
join.join_constraint
324-
))
325-
})?;
326-
327-
let builder = LogicalPlanBuilder::from(convert_box_required!(join.left)?);
328-
let builder = match join_constraint.into() {
329-
JoinConstraint::On => builder.join(
330-
&convert_box_required!(join.right)?,
331-
join_type.into(),
332-
(left_keys, right_keys),
333-
)?,
334-
JoinConstraint::Using => builder.join_using(
335-
&convert_box_required!(join.right)?,
336-
join_type.into(),
337-
left_keys,
338-
)?,
339-
};
340-
341-
builder.build().map_err(|e| e.into())
342-
}
343-
LogicalPlanType::CrossJoin(crossjoin) => {
344-
let left = convert_box_required!(crossjoin.left)?;
345-
let right = convert_box_required!(crossjoin.right)?;
346-
347-
LogicalPlanBuilder::from(left)
348-
.cross_join(&right)?
349-
.build()
350-
.map_err(|e| e.into())
351-
}
352-
}
353-
}
354-
}
355-
35654
impl From<&protobuf::Column> for Column {
35755
fn from(c: &protobuf::Column) -> Column {
35856
let c = c.clone();

0 commit comments

Comments
 (0)