Skip to content

Commit cb00a2d

Browse files
authored
ballista: add missing aggr_expr to PhysicalExprNode. (#1989)
1 parent b702e08 commit cb00a2d

File tree

1 file changed

+67
-0
lines changed

1 file changed

+67
-0
lines changed

ballista/rust/core/src/serde/physical_plan/to_proto.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ impl TryInto<protobuf::PhysicalExprNode> for Arc<dyn AggregateExpr> {
5151
type Error = BallistaError;
5252

5353
fn try_into(self) -> Result<protobuf::PhysicalExprNode, Self::Error> {
54+
use datafusion::physical_plan::expressions;
5455
use datafusion_proto::protobuf::AggregateFunction;
5556
let aggr_function = if self.as_any().downcast_ref::<Avg>().is_some() {
5657
Ok(AggregateFunction::Avg.into())
@@ -62,6 +63,72 @@ impl TryInto<protobuf::PhysicalExprNode> for Arc<dyn AggregateExpr> {
6263
Ok(AggregateFunction::Min.into())
6364
} else if self.as_any().downcast_ref::<Max>().is_some() {
6465
Ok(AggregateFunction::Max.into())
66+
} else if self
67+
.as_any()
68+
.downcast_ref::<expressions::ApproxDistinct>()
69+
.is_some()
70+
{
71+
Ok(AggregateFunction::ApproxDistinct.into())
72+
} else if self
73+
.as_any()
74+
.downcast_ref::<expressions::ArrayAgg>()
75+
.is_some()
76+
{
77+
Ok(AggregateFunction::ArrayAgg.into())
78+
} else if self
79+
.as_any()
80+
.downcast_ref::<expressions::Variance>()
81+
.is_some()
82+
{
83+
Ok(AggregateFunction::Variance.into())
84+
} else if self
85+
.as_any()
86+
.downcast_ref::<expressions::VariancePop>()
87+
.is_some()
88+
{
89+
Ok(AggregateFunction::VariancePop.into())
90+
} else if self
91+
.as_any()
92+
.downcast_ref::<expressions::Covariance>()
93+
.is_some()
94+
{
95+
Ok(AggregateFunction::Covariance.into())
96+
} else if self
97+
.as_any()
98+
.downcast_ref::<expressions::CovariancePop>()
99+
.is_some()
100+
{
101+
Ok(AggregateFunction::CovariancePop.into())
102+
} else if self
103+
.as_any()
104+
.downcast_ref::<expressions::Stddev>()
105+
.is_some()
106+
{
107+
Ok(AggregateFunction::Stddev.into())
108+
} else if self
109+
.as_any()
110+
.downcast_ref::<expressions::StddevPop>()
111+
.is_some()
112+
{
113+
Ok(AggregateFunction::StddevPop.into())
114+
} else if self
115+
.as_any()
116+
.downcast_ref::<expressions::Correlation>()
117+
.is_some()
118+
{
119+
Ok(AggregateFunction::Correlation.into())
120+
} else if self
121+
.as_any()
122+
.downcast_ref::<expressions::ApproxPercentileCont>()
123+
.is_some()
124+
{
125+
Ok(AggregateFunction::ApproxPercentileCont.into())
126+
} else if self
127+
.as_any()
128+
.downcast_ref::<expressions::ApproxMedian>()
129+
.is_some()
130+
{
131+
Ok(AggregateFunction::ApproxMedian.into())
65132
} else {
66133
Err(BallistaError::NotImplemented(format!(
67134
"Aggregate function not supported: {:?}",

0 commit comments

Comments
 (0)