Skip to content

Commit 21f286a

Browse files
committed
Refactor Arrow stream creation in PyDataFrame to use PyCapsule directly
1 parent d91ecfa commit 21f286a

File tree

1 file changed

+7
-47
lines changed

1 file changed

+7
-47
lines changed

src/dataframe.rs

Lines changed: 7 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use std::collections::HashMap;
19-
use std::ffi::{c_void, CString};
19+
use std::ffi::CString;
2020
use std::sync::Arc;
2121

2222
use arrow::array::{new_null_array, RecordBatch, RecordBatchReader};
@@ -963,52 +963,12 @@ impl PyDataFrame {
963963
};
964964
let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
965965

966-
// Create a stream and transfer it to a raw pointer. The capsule takes
967-
// ownership and is responsible for freeing the stream unless PyArrow
968-
// steals it. PyArrow will set the capsule's pointer to NULL when it
969-
// imports the stream, signaling that it now owns the resources.
970-
let raw_stream = Box::into_raw(Box::new(FFI_ArrowArrayStream::new(reader)));
971-
972-
// Name used both for capsule creation and lookup in the destructor.
973-
const STREAM_NAME: &[u8] = b"arrow_array_stream\0";
974-
975-
unsafe extern "C" fn drop_stream_capsule(capsule: *mut pyo3::ffi::PyObject) {
976-
// Attempt to recover the raw stream pointer. If PyArrow imported the
977-
// stream it will have set the capsule pointer to NULL, in which case
978-
// `PyCapsule_GetPointer` returns NULL and we simply clear the error.
979-
let ptr = pyo3::ffi::PyCapsule_GetPointer(capsule, STREAM_NAME.as_ptr() as *const _)
980-
as *mut FFI_ArrowArrayStream;
981-
982-
if ptr.is_null() {
983-
// Ignore any exception raised by `PyCapsule_GetPointer` when the
984-
// pointer is already NULL.
985-
pyo3::ffi::PyErr_Clear();
986-
} else {
987-
// Reconstruct the Box and drop it so resources are released.
988-
drop(Box::from_raw(ptr));
989-
}
990-
}
991-
992-
let capsule_ptr = unsafe {
993-
pyo3::ffi::PyCapsule_New(
994-
raw_stream as *mut c_void,
995-
STREAM_NAME.as_ptr() as *const std::ffi::c_char,
996-
Some(drop_stream_capsule),
997-
)
998-
};
999-
1000-
if capsule_ptr.is_null() {
1001-
// Reclaim ownership to avoid leaking on failure
1002-
unsafe {
1003-
drop(Box::from_raw(raw_stream));
1004-
}
1005-
return Err(PyErr::fetch(py).into());
1006-
}
1007-
1008-
// Safety: `capsule_ptr` is a new reference from `PyCapsule_New`
1009-
let capsule = unsafe {
1010-
Bound::from_owned_ptr(py, capsule_ptr).downcast_into_unchecked::<PyCapsule>()
1011-
};
966+
// Create the Arrow stream and wrap it in a PyCapsule. The default
967+
// destructor provided by PyO3 will drop the stream unless ownership is
968+
// transferred to PyArrow during import.
969+
let stream = FFI_ArrowArrayStream::new(reader);
970+
let name = CString::new("arrow_array_stream").unwrap();
971+
let capsule = PyCapsule::new(py, stream, Some(name))?;
1012972
Ok(capsule)
1013973
}
1014974

0 commit comments

Comments
 (0)