Skip to content

Commit

Permalink
Use 'try_stream!'?
Browse files Browse the repository at this point in the history
  • Loading branch information
rshkv committed Jan 7, 2025
1 parent 27a5068 commit c7aeced
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 6 deletions.
27 changes: 25 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ arrow-schema = { workspace = true }
arrow-select = { workspace = true }
arrow-string = { workspace = true }
async-std = { workspace = true, optional = true, features = ["attributes"] }
async-stream = { workspace = true }
async-trait = { workspace = true }
bimap = { workspace = true }
bitvec = { workspace = true }
Expand Down
9 changes: 5 additions & 4 deletions crates/iceberg/src/inspect/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
use arrow_array::types::{Int64Type, TimestampMillisecondType};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use async_stream::try_stream;
use futures::StreamExt;

use crate::scan::ArrowRecordBatchStream;
Expand Down Expand Up @@ -76,7 +77,7 @@ impl<'a> SnapshotsTable<'a> {
let arrow_schema = Arc::new(self.schema());
let table_metadata = self.table.metadata_ref();

Ok(futures::stream::once(async move {
Ok(try_stream! {
let mut committed_at =
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
Expand All @@ -98,15 +99,15 @@ impl<'a> SnapshotsTable<'a> {
summary.append(true)?;
}

Ok(RecordBatch::try_new(arrow_schema, vec![
yield RecordBatch::try_new(arrow_schema, vec![
Arc::new(committed_at.finish()),
Arc::new(snapshot_id.finish()),
Arc::new(parent_id.finish()),
Arc::new(operation.finish()),
Arc::new(manifest_list.finish()),
Arc::new(summary.finish()),
])?)
})
])?;
}
.boxed())
}
}
Expand Down

0 comments on commit c7aeced

Please sign in to comment.