diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs
index cfd6a8381..01806d937 100644
--- a/crates/iceberg/src/transaction.rs
+++ b/crates/iceberg/src/transaction.rs
@@ -161,7 +161,7 @@ impl<'a> Transaction<'a> {
}
/// Commit transaction.
- pub async fn commit(self, catalog: &impl Catalog) -> Result
{
+ pub async fn commit(self, catalog: &dyn Catalog) -> Result {
let table_commit = TableCommit::builder()
.ident(self.table.identifier().clone())
.updates(self.updates)
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
index 596228f7c..5fa9bc96a 100644
--- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -218,6 +218,17 @@ pub struct ParquetWriter {
current_row_num: usize,
}
+impl std::fmt::Debug for ParquetWriter {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("ParquetWriter")
+ .field("schema", &self.schema)
+ .field("out_file", &self.out_file)
+ .field("written_size", &self.written_size)
+ .field("current_row_num", &self.current_row_num)
+ .finish()
+ }
+}
+
/// Used to aggregate min and max value of each column.
struct MinMaxColAggregator {
lower_bounds: HashMap,
@@ -441,6 +452,7 @@ impl CurrentFileStatus for ParquetWriter {
/// # NOTES
///
/// We keep this wrapper been used inside only.
+#[derive(Debug)]
struct AsyncFileWriter(W);
impl AsyncFileWriter {
diff --git a/crates/iceberg/src/writer/file_writer/track_writer.rs b/crates/iceberg/src/writer/file_writer/track_writer.rs
index 6c60a1aa7..c74f884fc 100644
--- a/crates/iceberg/src/writer/file_writer/track_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/track_writer.rs
@@ -29,6 +29,12 @@ pub(crate) struct TrackWriter {
written_size: Arc,
}
+impl std::fmt::Debug for TrackWriter {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("TrackWriter").finish()
+ }
+}
+
impl TrackWriter {
pub fn new(writer: Box, written_size: Arc) -> Self {
Self {
diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs
index b7b927fdd..4ed3c4355 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -26,3 +26,5 @@ mod schema;
mod table;
pub use table::table_provider_factory::IcebergTableProviderFactory;
pub use table::*;
+
+mod sink;
diff --git a/crates/integrations/datafusion/src/sink/mod.rs b/crates/integrations/datafusion/src/sink/mod.rs
new file mode 100644
index 000000000..2f9968ee1
--- /dev/null
+++ b/crates/integrations/datafusion/src/sink/mod.rs
@@ -0,0 +1,118 @@
+use std::any::Any;
+use std::sync::atomic::AtomicU64;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion::error::{DataFusionError, Result};
+use datafusion::execution::{SendableRecordBatchStream, TaskContext};
+use datafusion::physical_plan::insert::DataSink;
+use datafusion::physical_plan::metrics::MetricsSet;
+use datafusion::physical_plan::DisplayAs;
+use futures::StreamExt;
+use iceberg::table::Table;
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::DataFileWriter;
+use iceberg::writer::file_writer::location_generator::{
+ DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::IcebergWriter;
+use iceberg::Catalog;
+use tokio::sync::Mutex;
+
+#[derive(Debug)]
+pub struct IcebergSink {
+ data_file_writer: Mutex<
+ DataFileWriter>,
+ >,
+ row_count: AtomicU64,
+ table: Table,
+ catalog: Arc,
+}
+
+impl IcebergSink {
+ pub(crate) fn new(
+ data_file_writer: DataFileWriter<
+ ParquetWriterBuilder,
+ >,
+ catalog: Arc,
+ table: Table,
+ ) -> Self {
+ Self {
+ data_file_writer: Mutex::new(data_file_writer),
+ row_count: AtomicU64::new(0),
+ table,
+ catalog,
+ }
+ }
+
+ pub(crate) async fn commit(&self) -> Result<()> {
+ let mut writer = self.data_file_writer.lock().await;
+ let data_files = writer
+ .close()
+ .await
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+ let mut append_action = Transaction::new(&self.table)
+ .fast_append(None, vec![])
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
+ append_action
+ .add_data_files(data_files)
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
+ let tx = append_action
+ .apply()
+ .await
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
+ tx.commit(self.catalog.as_ref())
+ .await
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
+ Ok(())
+ }
+}
+
+impl DisplayAs for IcebergSink {
+ fn fmt_as(
+ &self,
+ _t: datafusion::physical_plan::DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ f.write_str("IcebergSink")
+ }
+}
+
+#[async_trait]
+impl DataSink for IcebergSink {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn metrics(&self) -> Option {
+ // # TODO: Implement metrics for IcebergSink
+ None
+ }
+
+ async fn write_all(
+ &self,
+ mut data: SendableRecordBatchStream,
+ _context: &Arc,
+ ) -> Result {
+ while let Some(data_chunk) = data.as_mut().next().await {
+ if let Ok(data_chunk) = data_chunk {
+ let row_num = data_chunk.num_rows();
+ self.data_file_writer
+ .lock()
+ .await
+ .write(data_chunk)
+ .await
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
+ self.row_count
+ .fetch_add(row_num as u64, std::sync::atomic::Ordering::Relaxed);
+ } else {
+ // # TODO
+ // Add warn log here
+ }
+ }
+ self.commit().await?;
+ Ok(self.row_count.load(std::sync::atomic::Ordering::Relaxed))
+ }
+}
diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs
index 00c9e1322..f672d10a0 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -24,19 +24,33 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datafusion::catalog::Session;
use datafusion::datasource::{TableProvider, TableType};
-use datafusion::error::Result as DFResult;
+use datafusion::error::{DataFusionError as DFError, Result as DFResult};
+use datafusion::logical_expr::dml::InsertOp;
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_plan::insert::DataSinkExec;
use datafusion::physical_plan::ExecutionPlan;
use iceberg::arrow::schema_to_arrow_schema;
+use iceberg::spec::DataFileFormat;
use iceberg::table::Table;
+use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
+use iceberg::writer::file_writer::location_generator::{
+ DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::IcebergWriterBuilder;
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
use crate::physical_plan::scan::IcebergTableScan;
+use crate::sink::IcebergSink;
/// Represents a [`TableProvider`] for the Iceberg [`Catalog`],
/// managing access to a [`Table`].
#[derive(Debug, Clone)]
pub struct IcebergTableProvider {
+ /// catalog for this table provider.
+ /// catalog is optional which is used for insert operation.
+ catalog: Option>,
/// A table in the catalog.
table: Table,
/// Table snapshot id that will be queried via this provider.
@@ -48,11 +62,13 @@ pub struct IcebergTableProvider {
impl IcebergTableProvider {
pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
IcebergTableProvider {
+ catalog: None,
table,
snapshot_id: None,
schema,
}
}
+
/// Asynchronously tries to construct a new [`IcebergTableProvider`]
/// using the given client and table name to fetch an actual [`Table`]
/// in the provided namespace.
@@ -67,6 +83,7 @@ impl IcebergTableProvider {
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
Ok(IcebergTableProvider {
+ catalog: Some(client),
table,
snapshot_id: None,
schema,
@@ -78,6 +95,7 @@ impl IcebergTableProvider {
pub async fn try_new_from_table(table: Table) -> Result {
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
Ok(IcebergTableProvider {
+ catalog: None,
table,
snapshot_id: None,
schema,
@@ -102,6 +120,7 @@ impl IcebergTableProvider {
let schema = snapshot.schema(table.metadata())?;
let schema = Arc::new(schema_to_arrow_schema(&schema)?);
Ok(IcebergTableProvider {
+ catalog: None,
table,
snapshot_id: Some(snapshot_id),
schema,
@@ -147,6 +166,50 @@ impl TableProvider for IcebergTableProvider {
// Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
}
+
+ async fn insert_into(
+ &self,
+ _state: &dyn Session,
+ input: Arc,
+ insert_op: InsertOp,
+ ) -> DFResult> {
+ let Some(catalog) = &self.catalog else {
+ return Err(DFError::Internal(
+ "Static table can't not be inserted".to_string(),
+ ));
+ };
+
+ if !matches!(insert_op, InsertOp::Append) {
+ return Err(DFError::Internal(
+ "Only support append only table".to_string(),
+ ));
+ }
+
+ // create data file writer
+ let parquet_writer_builder = ParquetWriterBuilder::new(
+ WriterProperties::default(),
+ self.table.metadata().current_schema().clone(),
+ self.table.file_io().clone(),
+ DefaultLocationGenerator::new(self.table.metadata().clone())
+ .map_err(|err| DFError::External(Box::new(err)))?,
+ DefaultFileNameGenerator::new("".to_string(), None, DataFileFormat::Parquet),
+ );
+ let data_file_writer = DataFileWriterBuilder::new(parquet_writer_builder, None)
+ .build()
+ .await
+ .map_err(|err| DFError::External(Box::new(err)))?;
+
+ let iceberg_sink = IcebergSink::new(data_file_writer, catalog.clone(), self.table.clone());
+
+ Ok(Arc::new(DataSinkExec::new(
+ input,
+ Arc::new(iceberg_sink),
+ schema_to_arrow_schema(self.table.metadata().current_schema())
+ .map_err(|err| DFError::External(Box::new(err)))?
+ .into(),
+ None,
+ )))
+ }
}
#[cfg(test)]