4444//! | `pyarrow.Array` | [ArrayData] |
4545//! | `pyarrow.RecordBatch` | [RecordBatch] |
4646//! | `pyarrow.RecordBatchReader` | [ArrowArrayStreamReader] / `Box<dyn RecordBatchReader + Send>` (1) |
47+ //! | `pyarrow.Table` | [Table] (2) |
4748//!
4849//! (1) `pyarrow.RecordBatchReader` can be imported as [ArrowArrayStreamReader]. Either
4950//! [ArrowArrayStreamReader] or `Box<dyn RecordBatchReader + Send>` can be exported
5051//! as `pyarrow.RecordBatchReader`. (`Box<dyn RecordBatchReader + Send>` is typically
5152//! easier to create.)
5253//!
53- //! PyArrow has the notion of chunked arrays and tables, but arrow-rs doesn't
54- //! have these same concepts. A chunked table is instead represented with
55- //! `Vec<RecordBatch>`. A `pyarrow.Table` can be imported to Rust by calling
56- //! [pyarrow.Table.to_reader()](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_reader)
57- //! and then importing the reader as a [ArrowArrayStreamReader].
54+ //! (2) Although arrow-rs offers [Table], a convenience wrapper for [pyarrow.Table](https://arrow.apache.org/docs/python/generated/pyarrow.Table)
55+ //! that internally holds `Vec<RecordBatch>`, it is meant primarily for use cases where you already
56+ //! have `Vec<RecordBatch>` on the Rust side and want to export that in bulk as a `pyarrow.Table`.
57+ //! In general, it is recommended to use streaming approaches instead of dealing with data in bulk.
58+ //! For example, a `pyarrow.Table` (or any other object that implements the ArrayStream PyCapsule
59+ //! interface) can be imported to Rust through `PyArrowType<ArrowArrayStreamReader>` instead of
60+ //! forcing eager reading into `Vec<RecordBatch>`.
5861
5962use std:: convert:: { From , TryFrom } ;
6063use std:: ptr:: { addr_of, addr_of_mut} ;
@@ -68,13 +71,13 @@ use arrow_array::{
6871 make_array,
6972} ;
7073use arrow_data:: ArrayData ;
71- use arrow_schema:: { ArrowError , DataType , Field , Schema } ;
74+ use arrow_schema:: { ArrowError , DataType , Field , Schema , SchemaRef } ;
7275use pyo3:: exceptions:: { PyTypeError , PyValueError } ;
7376use pyo3:: ffi:: Py_uintptr_t ;
74- use pyo3:: import_exception;
7577use pyo3:: prelude:: * ;
7678use pyo3:: pybacked:: PyBackedStr ;
77- use pyo3:: types:: { PyCapsule , PyList , PyTuple } ;
79+ use pyo3:: types:: { PyCapsule , PyDict , PyList , PyTuple } ;
80+ use pyo3:: { import_exception, intern} ;
7881
7982import_exception ! ( pyarrow, ArrowException ) ;
8083/// Represents an exception raised by PyArrow.
@@ -484,6 +487,100 @@ impl IntoPyArrow for ArrowArrayStreamReader {
484487 }
485488}
486489
490+ /// This is a convenience wrapper around `Vec<RecordBatch>` that tries to simplify conversion from
491+ /// and to `pyarrow.Table`.
492+ ///
493+ /// This could be used in circumstances where you either want to consume a `pyarrow.Table` directly
494+ /// (although technically, since `pyarrow.Table` implements the ArrayStreamReader PyCapsule
495+ /// interface, one could also consume a `PyArrowType<ArrowArrayStreamReader>` instead) or, more
496+ /// importantly, where one wants to export a `pyarrow.Table` from a `Vec<RecordBatch>` from the Rust
497+ /// side.
498+ ///
499+ /// ```ignore
500+ /// #[pyfunction]
501+ /// fn return_table(...) -> PyResult<PyArrowType<Table>> {
502+ /// let batches: Vec<RecordBatch>;
503+ /// let schema: SchemaRef;
504+ /// PyArrowType(Table::try_new(batches, schema).map_err(|err| err.into_py_err(py))?)
505+ /// }
506+ /// ```
507+ #[ derive( Clone ) ]
508+ pub struct Table {
509+ record_batches : Vec < RecordBatch > ,
510+ schema : SchemaRef ,
511+ }
512+
513+ impl Table {
514+ pub fn try_new (
515+ record_batches : Vec < RecordBatch > ,
516+ schema : SchemaRef ,
517+ ) -> Result < Self , ArrowError > {
518+ for record_batch in & record_batches {
519+ if schema != record_batch. schema ( ) {
520+ return Err ( ArrowError :: SchemaError ( format ! (
521+ "All record batches must have the same schema. \
522+ Expected schema: {:?}, got schema: {:?}",
523+ schema,
524+ record_batch. schema( )
525+ ) ) ) ;
526+ }
527+ }
528+ Ok ( Self {
529+ record_batches,
530+ schema,
531+ } )
532+ }
533+
534+ pub fn record_batches ( & self ) -> & [ RecordBatch ] {
535+ & self . record_batches
536+ }
537+
538+ pub fn schema ( & self ) -> SchemaRef {
539+ self . schema . clone ( )
540+ }
541+
542+ pub fn into_inner ( self ) -> ( Vec < RecordBatch > , SchemaRef ) {
543+ ( self . record_batches , self . schema )
544+ }
545+ }
546+
547+ impl TryFrom < Box < dyn RecordBatchReader > > for Table {
548+ type Error = ArrowError ;
549+
550+ fn try_from ( value : Box < dyn RecordBatchReader > ) -> Result < Self , ArrowError > {
551+ let schema = value. schema ( ) ;
552+ let batches = value. collect :: < Result < Vec < _ > , _ > > ( ) ?;
553+ Self :: try_new ( batches, schema)
554+ }
555+ }
556+
557+ /// Convert a `pyarrow.Table` (or any other ArrowArrayStream compliant object) into [`Table`]
558+ impl FromPyArrow for Table {
559+ fn from_pyarrow_bound ( ob : & Bound < PyAny > ) -> PyResult < Self > {
560+ let reader: Box < dyn RecordBatchReader > =
561+ Box :: new ( ArrowArrayStreamReader :: from_pyarrow_bound ( ob) ?) ;
562+ Self :: try_from ( reader) . map_err ( |err| PyErr :: new :: < PyValueError , _ > ( err. to_string ( ) ) )
563+ }
564+ }
565+
566+ /// Convert a [`Table`] into `pyarrow.Table`.
567+ impl IntoPyArrow for Table {
568+ fn into_pyarrow ( self , py : Python ) -> PyResult < Bound < PyAny > > {
569+ let module = py. import ( intern ! ( py, "pyarrow" ) ) ?;
570+ let class = module. getattr ( intern ! ( py, "Table" ) ) ?;
571+
572+ let py_batches = PyList :: new ( py, self . record_batches . into_iter ( ) . map ( PyArrowType ) ) ?;
573+ let py_schema = PyArrowType ( Arc :: unwrap_or_clone ( self . schema ) ) ;
574+
575+ let kwargs = PyDict :: new ( py) ;
576+ kwargs. set_item ( "schema" , py_schema) ?;
577+
578+ let reader = class. call_method ( "from_batches" , ( py_batches, ) , Some ( & kwargs) ) ?;
579+
580+ Ok ( reader)
581+ }
582+ }
583+
487584/// A newtype wrapper for types implementing [`FromPyArrow`] or [`IntoPyArrow`].
488585///
489586/// When wrapped around a type `T: FromPyArrow`, it
0 commit comments