Skip to content

Commit

Permalink
adopt review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
sungwy committed Aug 19, 2024
1 parent 2a3bdb3 commit 3308626
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 80 deletions.
4 changes: 2 additions & 2 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ crate-type = ["cdylib"]

[dependencies]
iceberg = { path = "../../crates/iceberg" }
pyo3 = { version = "0.22", features = ["extension-module"] }
arrow = { version = "52.2.0", features = ["ffi"] }
pyo3 = { version = "0.21.1", features = ["extension-module"] }
arrow = { version = "52.2.0", features = ["pyarrow"] }
libc = "0.2"
16 changes: 14 additions & 2 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,21 @@ fn hello_world() -> PyResult<String> {
}

#[pymodule]
fn pyiceberg_core_rust(m: &Bound<'_, PyModule>) -> PyResult<()> {
fn submodule(_py: Python, module: &Bound<'_, PyModule>) -> PyResult<()> {
use transform::bucket_transform;

module.add_wrapped(wrap_pyfunction!(bucket_transform))?;
Ok(())
}

#[pymodule]
fn pyiceberg_core_rust(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(hello_world, m)?)?;
m.add_function(wrap_pyfunction!(bucket_transform, m)?)?;

// https://github.com/PyO3/pyo3/issues/759
let child_module = PyModule::new_bound(py, "pyiceberg_core.transform")?;
submodule(py, &child_module)?;
m.add("transform", child_module.clone())?;
py.import_bound("sys")?.getattr("modules")?.set_item("pyiceberg_core.transform", child_module)?;
Ok(())
}
83 changes: 8 additions & 75 deletions bindings/python/src/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,54 +15,16 @@
// specific language governing permissions and limitations
// under the License.

use std::{error, fmt, sync::Arc};
use std::{error, fmt};
use iceberg::spec::Transform;
use iceberg::transform::create_transform_function;
use iceberg::Error;

use arrow::{
array::{make_array, Array, ArrayData, ArrayRef},
error::ArrowError,
ffi::{from_ffi, to_ffi},
array::{make_array, Array, ArrayData},
};
use libc::uintptr_t;
use pyo3::{exceptions::PyOSError, exceptions::PyValueError, prelude::*};

#[derive(Debug)]
enum PyO3ArrowError {
ArrowError(ArrowError),
}

impl fmt::Display for PyO3ArrowError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
PyO3ArrowError::ArrowError(ref e) => e.fmt(f),
}
}
}

impl error::Error for PyO3ArrowError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match *self {
// The cause is the underlying implementation error type. Is implicitly
// cast to the trait object `&error::Error`. This works because the
// underlying type already implements the `Error` trait.
PyO3ArrowError::ArrowError(ref e) => Some(e),
}
}
}

impl From<ArrowError> for PyO3ArrowError {
fn from(err: ArrowError) -> PyO3ArrowError {
PyO3ArrowError::ArrowError(err)
}
}

impl From<PyO3ArrowError> for PyErr {
fn from(err: PyO3ArrowError) -> PyErr {
PyOSError::new_err(err.to_string())
}
}
use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use pyo3::{exceptions::PyValueError, prelude::*};

#[derive(Debug)]
enum PyO3IcebergError {
Expand Down Expand Up @@ -100,43 +62,14 @@ impl From<PyO3IcebergError> for PyErr {
}
}

fn to_rust_arrow_array(ob: PyObject, py: Python) -> PyResult<ArrayRef> {
// prepare a pointer to receive the Array struct
let (array, schema) = to_ffi(&ArrayData::new_empty(&arrow::datatypes::DataType::Null))
.map_err(PyO3ArrowError::from)?;
let array_pointer = &array as *const _ as uintptr_t;
let schema_pointer = &schema as *const _ as uintptr_t;

// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
ob.call_method1(py, "_export_to_c", (array_pointer, schema_pointer))?;

let array = unsafe { from_ffi(array, &schema) }.map_err(PyO3ArrowError::from)?;
let array = make_array(array);
Ok(array)
}

fn to_pyarrow_array(array: ArrayRef, py: Python) -> PyResult<PyObject> {
let (array, schema) = to_ffi(&array.to_data()).map_err(PyO3ArrowError::from)?;
let array_pointer = &array as *const _ as uintptr_t;
let schema_pointer = &schema as *const _ as uintptr_t;

let pa = py.import_bound("pyarrow")?;

let array = pa.getattr("Array")?.call_method1(
"_import_from_c",
(array_pointer as uintptr_t, schema_pointer as uintptr_t),
)?;
Ok(array.to_object(py))
}

#[pyfunction]
pub fn bucket_transform(array: PyObject, num_buckets: u32, py: Python) -> PyResult<PyObject> {
// import
let array = to_rust_arrow_array(array, py)?;
let array = ArrayData::from_pyarrow_bound(array.bind(py))?;
let array = make_array(array);
let bucket = create_transform_function(&Transform::Bucket(num_buckets)).map_err(PyO3IcebergError::from)?;
let array = bucket.transform(array).map_err(PyO3IcebergError::from)?;
let array = Arc::new(array);
// export
to_pyarrow_array(array, py)
let array = array.into_data();
array.to_pyarrow(py)
}
2 changes: 1 addition & 1 deletion bindings/python/tests/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

from pyiceberg_core import bucket_transform
from pyiceberg_core.transform import bucket_transform

import pytest
import pyarrow as pa
Expand Down

0 comments on commit 3308626

Please sign in to comment.