Skip to content

Commit a104661

Browse files
milenkovicmalamb
andauthored
feat: add resolved target to DmlStatement (to eliminate need for table lookup after deserialization) (#14631)
* feat: serialize table source to DML proto * Update datafusion/core/src/dataframe/mod.rs Co-authored-by: Andrew Lamb <[email protected]> * remove redundant comment --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 25b2a92 commit a104661

File tree

13 files changed

+147
-57
lines changed

13 files changed

+147
-57
lines changed

datafusion/core/src/dataframe/mod.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ use crate::arrow::util::pretty;
2525
use crate::datasource::file_format::csv::CsvFormatFactory;
2626
use crate::datasource::file_format::format_as_file_type;
2727
use crate::datasource::file_format::json::JsonFormatFactory;
28-
use crate::datasource::{provider_as_source, MemTable, TableProvider};
28+
use crate::datasource::{
29+
provider_as_source, DefaultTableSource, MemTable, TableProvider,
30+
};
2931
use crate::error::Result;
3032
use crate::execution::context::{SessionState, TaskContext};
3133
use crate::execution::FunctionRegistry;
@@ -62,6 +64,7 @@ use datafusion_functions_aggregate::expr_fn::{
6264

6365
use async_trait::async_trait;
6466
use datafusion_catalog::Session;
67+
use datafusion_sql::TableReference;
6568

6669
/// Contains options that control how data is
6770
/// written out from a DataFrame
@@ -1526,8 +1529,6 @@ impl DataFrame {
15261529
table_name: &str,
15271530
write_options: DataFrameWriteOptions,
15281531
) -> Result<Vec<RecordBatch>, DataFusionError> {
1529-
let arrow_schema = Schema::from(self.schema());
1530-
15311532
let plan = if write_options.sort_by.is_empty() {
15321533
self.plan
15331534
} else {
@@ -1536,10 +1537,19 @@ impl DataFrame {
15361537
.build()?
15371538
};
15381539

1540+
let table_ref: TableReference = table_name.into();
1541+
let table_schema = self.session_state.schema_for_ref(table_ref.clone())?;
1542+
let target = match table_schema.table(table_ref.table()).await? {
1543+
Some(ref provider) => Ok(Arc::clone(provider)),
1544+
_ => plan_err!("No table named '{table_name}'"),
1545+
}?;
1546+
1547+
let target = Arc::new(DefaultTableSource::new(target));
1548+
15391549
let plan = LogicalPlanBuilder::insert_into(
15401550
plan,
15411551
table_name.to_owned(),
1542-
&arrow_schema,
1552+
target,
15431553
write_options.insert_op,
15441554
)?
15451555
.build()?;

datafusion/core/src/datasource/listing/table.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1195,7 +1195,7 @@ mod tests {
11951195
use crate::datasource::file_format::json::JsonFormat;
11961196
#[cfg(feature = "parquet")]
11971197
use crate::datasource::file_format::parquet::ParquetFormat;
1198-
use crate::datasource::{provider_as_source, MemTable};
1198+
use crate::datasource::{provider_as_source, DefaultTableSource, MemTable};
11991199
use crate::execution::options::ArrowReadOptions;
12001200
use crate::prelude::*;
12011201
use crate::{
@@ -2065,6 +2065,8 @@ mod tests {
20652065
session_ctx.register_table("source", source_table.clone())?;
20662066
// Convert the source table into a provider so that it can be used in a query
20672067
let source = provider_as_source(source_table);
2068+
let target = session_ctx.table_provider("t").await?;
2069+
let target = Arc::new(DefaultTableSource::new(target));
20682070
// Create a table scan logical plan to read from the source table
20692071
let scan_plan = LogicalPlanBuilder::scan("source", source, None)?
20702072
.filter(filter_predicate)?
@@ -2073,7 +2075,7 @@ mod tests {
20732075
// Therefore, we will have 8 partitions in the final plan.
20742076
// Create an insert plan to insert the source data into the initial table
20752077
let insert_into_table =
2076-
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, InsertOp::Append)?
2078+
LogicalPlanBuilder::insert_into(scan_plan, "t", target, InsertOp::Append)?
20772079
.build()?;
20782080
// Create a physical plan from the insert plan
20792081
let plan = session_ctx

datafusion/core/src/datasource/memory.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ impl DataSink for MemSink {
390390
mod tests {
391391

392392
use super::*;
393-
use crate::datasource::provider_as_source;
393+
use crate::datasource::{provider_as_source, DefaultTableSource};
394394
use crate::physical_plan::collect;
395395
use crate::prelude::SessionContext;
396396

@@ -640,6 +640,7 @@ mod tests {
640640
// Create and register the initial table with the provided schema and data
641641
let initial_table = Arc::new(MemTable::try_new(schema.clone(), initial_data)?);
642642
session_ctx.register_table("t", initial_table.clone())?;
643+
let target = Arc::new(DefaultTableSource::new(initial_table.clone()));
643644
// Create and register the source table with the provided schema and inserted data
644645
let source_table = Arc::new(MemTable::try_new(schema.clone(), inserted_data)?);
645646
session_ctx.register_table("source", source_table.clone())?;
@@ -649,7 +650,7 @@ mod tests {
649650
let scan_plan = LogicalPlanBuilder::scan("source", source, None)?.build()?;
650651
// Create an insert plan to insert the source data into the initial table
651652
let insert_into_table =
652-
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, InsertOp::Append)?
653+
LogicalPlanBuilder::insert_into(scan_plan, "t", target, InsertOp::Append)?
653654
.build()?;
654655
// Create a physical plan from the insert plan
655656
let plan = session_ctx

datafusion/core/src/physical_planner.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::sync::Arc;
2424
use crate::datasource::file_format::file_type_to_format;
2525
use crate::datasource::listing::ListingTableUrl;
2626
use crate::datasource::physical_plan::FileSinkConfig;
27-
use crate::datasource::source_as_provider;
27+
use crate::datasource::{source_as_provider, DefaultTableSource};
2828
use crate::error::{DataFusionError, Result};
2929
use crate::execution::context::{ExecutionProps, SessionState};
3030
use crate::logical_expr::utils::generate_sort_key;
@@ -541,19 +541,22 @@ impl DefaultPhysicalPlanner {
541541
.await?
542542
}
543543
LogicalPlan::Dml(DmlStatement {
544-
table_name,
544+
target,
545545
op: WriteOp::Insert(insert_op),
546546
..
547547
}) => {
548-
let name = table_name.table();
549-
let schema = session_state.schema_for_ref(table_name.clone())?;
550-
if let Some(provider) = schema.table(name).await? {
548+
if let Some(provider) =
549+
target.as_any().downcast_ref::<DefaultTableSource>()
550+
{
551551
let input_exec = children.one()?;
552552
provider
553+
.table_provider
553554
.insert_into(session_state, input_exec, *insert_op)
554555
.await?
555556
} else {
556-
return exec_err!("Table '{table_name}' does not exist");
557+
return exec_err!(
558+
"Table source can't be downcasted to DefaultTableSource"
559+
);
557560
}
558561
}
559562
LogicalPlan::Window(Window { window_expr, .. }) => {

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -384,14 +384,12 @@ impl LogicalPlanBuilder {
384384
pub fn insert_into(
385385
input: LogicalPlan,
386386
table_name: impl Into<TableReference>,
387-
table_schema: &Schema,
387+
target: Arc<dyn TableSource>,
388388
insert_op: InsertOp,
389389
) -> Result<Self> {
390-
let table_schema = table_schema.clone().to_dfschema_ref()?;
391-
392390
Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
393391
table_name.into(),
394-
table_schema,
392+
target,
395393
WriteOp::Insert(insert_op),
396394
Arc::new(input),
397395
))))

datafusion/expr/src/logical_plan/dml.rs

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use arrow::datatypes::{DataType, Field, Schema};
2525
use datafusion_common::file_options::file_type::FileType;
2626
use datafusion_common::{DFSchemaRef, TableReference};
2727

28-
use crate::LogicalPlan;
28+
use crate::{LogicalPlan, TableSource};
2929

3030
/// Operator that copies the contents of a database to file(s)
3131
#[derive(Clone)]
@@ -91,31 +91,64 @@ impl Hash for CopyTo {
9191

9292
/// The operator that modifies the content of a database (adapted from
9393
/// substrait WriteRel)
94-
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
94+
#[derive(Clone)]
9595
pub struct DmlStatement {
9696
/// The table name
9797
pub table_name: TableReference,
98-
/// The schema of the table (must align with Rel input)
99-
pub table_schema: DFSchemaRef,
98+
/// this is target table to insert into
99+
pub target: Arc<dyn TableSource>,
100100
/// The type of operation to perform
101101
pub op: WriteOp,
102102
/// The relation that determines the tuples to add/remove/modify the schema must match with table_schema
103103
pub input: Arc<LogicalPlan>,
104104
/// The schema of the output relation
105105
pub output_schema: DFSchemaRef,
106106
}
107+
impl Eq for DmlStatement {}
108+
impl Hash for DmlStatement {
109+
fn hash<H: Hasher>(&self, state: &mut H) {
110+
self.table_name.hash(state);
111+
self.target.schema().hash(state);
112+
self.op.hash(state);
113+
self.input.hash(state);
114+
self.output_schema.hash(state);
115+
}
116+
}
117+
118+
impl PartialEq for DmlStatement {
119+
fn eq(&self, other: &Self) -> bool {
120+
self.table_name == other.table_name
121+
&& self.target.schema() == other.target.schema()
122+
&& self.op == other.op
123+
&& self.input == other.input
124+
&& self.output_schema == other.output_schema
125+
}
126+
}
127+
128+
impl Debug for DmlStatement {
129+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
130+
f.debug_struct("DmlStatement")
131+
.field("table_name", &self.table_name)
132+
.field("target", &"...")
133+
.field("target_schema", &self.target.schema())
134+
.field("op", &self.op)
135+
.field("input", &self.input)
136+
.field("output_schema", &self.output_schema)
137+
.finish()
138+
}
139+
}
107140

108141
impl DmlStatement {
109142
/// Creates a new DML statement with the output schema set to a single `count` column.
110143
pub fn new(
111144
table_name: TableReference,
112-
table_schema: DFSchemaRef,
145+
target: Arc<dyn TableSource>,
113146
op: WriteOp,
114147
input: Arc<LogicalPlan>,
115148
) -> Self {
116149
Self {
117150
table_name,
118-
table_schema,
151+
target,
119152
op,
120153
input,
121154

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -784,15 +784,15 @@ impl LogicalPlan {
784784
}
785785
LogicalPlan::Dml(DmlStatement {
786786
table_name,
787-
table_schema,
787+
target,
788788
op,
789789
..
790790
}) => {
791791
self.assert_no_expressions(expr)?;
792792
let input = self.only_input(inputs)?;
793793
Ok(LogicalPlan::Dml(DmlStatement::new(
794794
table_name.clone(),
795-
Arc::clone(table_schema),
795+
Arc::clone(target),
796796
op.clone(),
797797
Arc::new(input),
798798
)))

datafusion/expr/src/logical_plan/tree_node.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,14 +228,14 @@ impl TreeNode for LogicalPlan {
228228
}),
229229
LogicalPlan::Dml(DmlStatement {
230230
table_name,
231-
table_schema,
231+
target,
232232
op,
233233
input,
234234
output_schema,
235235
}) => input.map_elements(f)?.update_data(|input| {
236236
LogicalPlan::Dml(DmlStatement {
237237
table_name,
238-
table_schema,
238+
target,
239239
op,
240240
input,
241241
output_schema,

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ message DmlNode{
278278
Type dml_type = 1;
279279
LogicalPlanNode input = 2;
280280
TableReference table_name = 3;
281-
datafusion_common.DfSchema schema = 4;
281+
LogicalPlanNode target = 5;
282282
}
283283

284284
message UnnestNode {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 12 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)