Skip to content

Commit 44f4be2

Browse files
alambberkaysynnada
andauthored
Minor: Add doc example to RecordBatchStreamAdapter (#13725)
* Minor: Add doc example to RecordBatchStreamAdapter * Update datafusion/physical-plan/src/stream.rs Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> --------- Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com>
1 parent de9e16b commit 44f4be2

1 file changed

Lines changed: 25 additions & 2 deletions

File tree

datafusion/physical-plan/src/stream.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,9 @@ impl RecordBatchReceiverStream {
337337

338338
pin_project! {
339339
/// Combines a [`Stream`] with a [`SchemaRef`] implementing
340-
/// [`RecordBatchStream`] for the combination
340+
/// [`SendableRecordBatchStream`] for the combination
341+
///
342+
/// See [`Self::new`] for an example
341343
pub struct RecordBatchStreamAdapter<S> {
342344
schema: SchemaRef,
343345

@@ -347,7 +349,28 @@ pin_project! {
347349
}
348350

349351
impl<S> RecordBatchStreamAdapter<S> {
350-
/// Creates a new [`RecordBatchStreamAdapter`] from the provided schema and stream
352+
/// Creates a new [`RecordBatchStreamAdapter`] from the provided schema and stream.
353+
///
354+
/// Note to create a [`SendableRecordBatchStream`] you pin the result
355+
///
356+
/// # Example
357+
/// ```
358+
/// # use arrow::array::record_batch;
359+
/// # use datafusion_execution::SendableRecordBatchStream;
360+
/// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
361+
/// // Create stream of Result<RecordBatch>
362+
/// let batch = record_batch!(
363+
/// ("a", Int32, [1, 2, 3]),
364+
/// ("b", Float64, [Some(4.0), None, Some(5.0)])
365+
/// ).expect("created batch");
366+
/// let schema = batch.schema();
367+
/// let stream = futures::stream::iter(vec![Ok(batch)]);
368+
/// // Convert the stream to a SendableRecordBatchStream
369+
/// let adapter = RecordBatchStreamAdapter::new(schema, stream);
370+
/// // Now you can use the adapter as a SendableRecordBatchStream
371+
/// let batch_stream: SendableRecordBatchStream = Box::pin(adapter);
372+
/// // ...
373+
/// ```
351374
pub fn new(schema: SchemaRef, stream: S) -> Self {
352375
Self { schema, stream }
353376
}

0 commit comments

Comments
 (0)