Skip to content

Commit 6ad23bc

Browse files
refactor: convert to postgres values directly from arrow (#7131)
* refactor: convert to pg values directly from arrow Signed-off-by: luofucong <[email protected]> * resolve PR comments Signed-off-by: luofucong <[email protected]> --------- Signed-off-by: luofucong <[email protected]>
1 parent 03a29c6 commit 6ad23bc

File tree

2 files changed

+714
-446
lines changed

2 files changed

+714
-446
lines changed

src/servers/src/postgres/handler.rs

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use futures::{Sink, SinkExt, Stream, StreamExt, future, stream};
2828
use pgwire::api::portal::{Format, Portal};
2929
use pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler};
3030
use pgwire::api::results::{
31-
DataRowEncoder, DescribePortalResponse, DescribeStatementResponse, QueryResponse, Response, Tag,
31+
DescribePortalResponse, DescribeStatementResponse, QueryResponse, Response, Tag,
3232
};
3333
use pgwire::api::stmt::{QueryParser, StoredStatement};
3434
use pgwire::api::{ClientInfo, ErrorHandler, Type};
@@ -160,25 +160,16 @@ where
160160
let pg_schema = Arc::new(schema_to_pg(schema.as_ref(), field_format).map_err(convert_err)?);
161161
let pg_schema_ref = pg_schema.clone();
162162
let data_row_stream = recordbatches_stream
163-
.map(|record_batch_result| match record_batch_result {
164-
Ok(rb) => stream::iter(
165-
// collect rows from a single recordbatch into vector to avoid
166-
// borrowing it
167-
rb.rows().map(Ok).collect::<Vec<_>>(),
168-
)
163+
.map(move |result| match result {
164+
Ok(record_batch) => stream::iter(RecordBatchRowIterator::new(
165+
query_ctx.clone(),
166+
pg_schema_ref.clone(),
167+
record_batch,
168+
))
169169
.boxed(),
170170
Err(e) => stream::once(future::err(convert_err(e))).boxed(),
171171
})
172-
.flatten() // flatten into stream<result<row>>
173-
.map(move |row| {
174-
row.and_then(|row| {
175-
let mut encoder = DataRowEncoder::new(pg_schema_ref.clone());
176-
for (value, column) in row.into_iter().zip(schema.column_schemas()) {
177-
encode_value(&query_ctx, value, &mut encoder, &column.data_type)?;
178-
}
179-
encoder.finish()
180-
})
181-
});
172+
.flatten();
182173

183174
Ok(Response::Query(QueryResponse::new(
184175
pg_schema,

0 commit comments

Comments
 (0)