Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
929 changes: 485 additions & 444 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ version = "0.1.0"
edition = "2021"
readme = "README.md"
license = "Apache-2.0"
rust-version = "1.76"
rust-version = "1.85"
build = "build.rs"

[dependencies]
anyhow = "1"
arrow = { version = "53.3", features = ["pyarrow", "ipc"] }
arrow-flight = "53.3"
arrow = { version = "54", features = ["pyarrow", "ipc"] }
arrow-flight = "54"
async-stream = "0.3"
async-channel = "2.3"
bytesize = "1.3"
datafusion = { version = "43.0", features = ["pyarrow", "avro"] }
datafusion-python = { version = "43.1" }
datafusion-proto = "43.0"
datafusion = { version = "45", features = ["pyarrow", "avro"] }
datafusion-python = { version = "45" }
datafusion-proto = "45"
env_logger = "0.11"
futures = "0.3"
glob = "0.3.1"
Expand All @@ -52,15 +52,15 @@ object_store = { version = "0.11.0", features = [
] }
parking_lot = { version = "0.12", features = ["deadlock_detection"] }
prost = "0.13"
pyo3 = { version = "0.22.6", features = [
pyo3 = { version = "0.23", features = [
"extension-module",
"abi3",
"abi3-py38",
] }
pyo3-async-runtimes = { version = "0.22", features = ["tokio-runtime"] }
pyo3-async-runtimes = { version = "0.23", features = ["tokio-runtime"] }
pyo3-pylogger = "0.3.0"
rust_decimal = "1.36"
tokio = { version = "1.40", features = [
tokio = { version = "1.43", features = [
"macros",
"rt",
"rt-multi-thread",
Expand Down
56 changes: 45 additions & 11 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,19 @@ use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use datafusion::prelude::DataFrame;
use datafusion_python::errors::PyDataFusionError;
use datafusion_python::physical_plan::PyExecutionPlan;
use datafusion_python::sql::logical::PyLogicalPlan;
use datafusion_python::utils::wait_for_future;
use futures::stream::StreamExt;
use itertools::Itertools;
use log::trace;
use pyo3::exceptions::PyStopAsyncIteration;
use pyo3::exceptions::PyStopIteration;
use pyo3::prelude::*;
use std::borrow::Cow;
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::isolator::PartitionIsolatorExec;
use crate::max_rows::MaxRowsExec;
Expand Down Expand Up @@ -428,9 +432,12 @@ impl PyDataFrameStage {
}
}

// PyRecordBatch and PyRecordBatchStream are borrowed, and slightly modified from datafusion-python
// they are not publicly exposed in that repo

#[pyclass]
pub struct PyRecordBatch {
batch: RecordBatch,
pub batch: RecordBatch,
}

#[pymethods]
Expand All @@ -448,31 +455,58 @@ impl From<RecordBatch> for PyRecordBatch {

#[pyclass]
pub struct PyRecordBatchStream {
stream: SendableRecordBatchStream,
stream: Arc<Mutex<SendableRecordBatchStream>>,
}

impl PyRecordBatchStream {
pub fn new(stream: SendableRecordBatchStream) -> Self {
Self { stream }
Self {
stream: Arc::new(Mutex::new(stream)),
}
}
}

#[pymethods]
impl PyRecordBatchStream {
fn next(&mut self, py: Python) -> PyResult<Option<PyObject>> {
let result = self.stream.next();
match wait_for_future(py, result) {
None => Ok(None),
Some(Ok(b)) => Ok(Some(b.to_pyarrow(py)?)),
Some(Err(e)) => Err(e.into()),
}
fn next(&mut self, py: Python) -> PyResult<PyObject> {
let stream = self.stream.clone();
wait_for_future(py, next_stream(stream, true)).and_then(|b| b.to_pyarrow(py))
}

fn __next__(&mut self, py: Python) -> PyResult<Option<PyObject>> {
fn __next__(&mut self, py: Python) -> PyResult<PyObject> {
self.next(py)
}

fn __anext__<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let stream = self.stream.clone();
pyo3_async_runtimes::tokio::future_into_py(py, next_stream(stream, false))
}

fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}

fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}
}

async fn next_stream(
stream: Arc<Mutex<SendableRecordBatchStream>>,
sync: bool,
) -> PyResult<PyRecordBatch> {
let mut stream = stream.lock().await;
match stream.next().await {
Some(Ok(batch)) => Ok(batch.into()),
Some(Err(e)) => Err(PyDataFusionError::from(e))?,
None => {
// Depending on whether the iteration is sync or not, we raise either a
// StopIteration or a StopAsyncIteration
if sync {
Err(PyStopIteration::new_err("stream exhausted"))
} else {
Err(PyStopAsyncIteration::new_err("stream exhausted"))
}
}
}
}
Loading