Skip to content

Commit 1e352c3

Browse files
committed
fix binary array print formatting
1 parent 60e869e commit 1e352c3

File tree

16 files changed

+196
-38
lines changed

16 files changed

+196
-38
lines changed

datafusion-cli/src/print_format.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,10 +202,10 @@ mod tests {
202202
fn test_print_batches_to_json_empty() -> Result<()> {
203203
let batches = vec![];
204204
let r = print_batches_to_json::<JsonArray>(&batches)?;
205-
assert_eq!("", r);
205+
assert_eq!("{}", r);
206206

207207
let r = print_batches_to_json::<LineDelimited>(&batches)?;
208-
assert_eq!("", r);
208+
assert_eq!("{}", r);
209209

210210
let schema = Arc::new(Schema::new(vec![
211211
Field::new("a", DataType::Int32, false),

datafusion-examples/examples/flight_client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::sync::Arc;
2020
use arrow::io::flight::deserialize_schemas;
2121
use arrow_format::flight::data::{flight_descriptor, FlightDescriptor, Ticket};
2222
use arrow_format::flight::service::flight_service_client::FlightServiceClient;
23-
use datafusion::arrow::io::print;
23+
use datafusion::arrow_print;
2424
use std::collections::HashMap;
2525

2626
/// This example shows how to wrap DataFusion with `FlightService` to support looking up schema information for
@@ -74,7 +74,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7474
}
7575

7676
// print the results
77-
print::print(&results);
77+
println!("{}", arrow_print::write(&results));
7878

7979
Ok(())
8080
}

datafusion/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,13 @@ rand = "0.8"
7979
num-traits = { version = "0.2", optional = true }
8080
pyo3 = { version = "0.14", optional = true }
8181
avro-schema = { version = "0.2", optional = true }
82+
# used to print arrow arrays in a nice columnar format
83+
comfy-table = { version = "5.0", default-features = false }
8284

8385
[dependencies.arrow]
8486
package = "arrow2"
8587
version="0.8"
86-
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "io_print", "ahash", "compute"]
88+
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute"]
8789

8890
[dev-dependencies]
8991
criterion = "0.3"

datafusion/src/arrow_print.rs

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Fork of arrow::io::print to implement custom Binary Array formatting logic.
19+
20+
// adapted from https://github.com/jorgecarleitao/arrow2/blob/ef7937dfe56033c2cc491482c67587b52cd91554/src/array/display.rs
21+
// see: https://github.com/jorgecarleitao/arrow2/issues/771
22+
23+
use arrow::{array::*, record_batch::RecordBatch};
24+
25+
use comfy_table::{Cell, Table};
26+
27+
macro_rules! dyn_display {
28+
($array:expr, $ty:ty, $expr:expr) => {{
29+
let a = $array.as_any().downcast_ref::<$ty>().unwrap();
30+
Box::new(move |row: usize| format!("{}", $expr(a.value(row))))
31+
}};
32+
}
33+
34+
fn df_get_array_value_display<'a>(
35+
array: &'a dyn Array,
36+
) -> Box<dyn Fn(usize) -> String + 'a> {
37+
use arrow::datatypes::DataType::*;
38+
match array.data_type() {
39+
Binary => dyn_display!(array, BinaryArray<i32>, |x: &[u8]| {
40+
x.iter().fold("".to_string(), |mut acc, x| {
41+
acc.push_str(&format!("{:02x}", x));
42+
acc
43+
})
44+
}),
45+
LargeBinary => dyn_display!(array, BinaryArray<i64>, |x: &[u8]| {
46+
x.iter().fold("".to_string(), |mut acc, x| {
47+
acc.push_str(&format!("{:02x}", x));
48+
acc
49+
})
50+
}),
51+
List(_) => {
52+
let f = |x: Box<dyn Array>| {
53+
let display = df_get_array_value_display(x.as_ref());
54+
let string_values = (0..x.len()).map(display).collect::<Vec<String>>();
55+
format!("[{}]", string_values.join(", "))
56+
};
57+
dyn_display!(array, ListArray<i32>, f)
58+
}
59+
FixedSizeList(_, _) => {
60+
let f = |x: Box<dyn Array>| {
61+
let display = df_get_array_value_display(x.as_ref());
62+
let string_values = (0..x.len()).map(display).collect::<Vec<String>>();
63+
format!("[{}]", string_values.join(", "))
64+
};
65+
dyn_display!(array, FixedSizeListArray, f)
66+
}
67+
LargeList(_) => {
68+
let f = |x: Box<dyn Array>| {
69+
let display = df_get_array_value_display(x.as_ref());
70+
let string_values = (0..x.len()).map(display).collect::<Vec<String>>();
71+
format!("[{}]", string_values.join(", "))
72+
};
73+
dyn_display!(array, ListArray<i64>, f)
74+
}
75+
Struct(_) => {
76+
let a = array.as_any().downcast_ref::<StructArray>().unwrap();
77+
let displays = a
78+
.values()
79+
.iter()
80+
.map(|x| df_get_array_value_display(x.as_ref()))
81+
.collect::<Vec<_>>();
82+
Box::new(move |row: usize| {
83+
let mut string = displays
84+
.iter()
85+
.zip(a.fields().iter().map(|f| f.name()))
86+
.map(|(f, name)| (f(row), name))
87+
.fold("{".to_string(), |mut acc, (v, name)| {
88+
acc.push_str(&format!("{}: {}, ", name, v));
89+
acc
90+
});
91+
if string.len() > 1 {
92+
// remove last ", "
93+
string.pop();
94+
string.pop();
95+
}
96+
string.push('}');
97+
string
98+
})
99+
}
100+
_ => get_display(array),
101+
}
102+
}
103+
104+
/// Returns a function of index returning the string representation of the item of `array`.
105+
/// This outputs an empty string on nulls.
106+
pub fn df_get_display<'a>(array: &'a dyn Array) -> Box<dyn Fn(usize) -> String + 'a> {
107+
let value_display = df_get_array_value_display(array);
108+
Box::new(move |row| {
109+
if array.is_null(row) {
110+
"".to_string()
111+
} else {
112+
value_display(row)
113+
}
114+
})
115+
}
116+
117+
/// Convert a series of record batches into a String
118+
pub fn write(results: &[RecordBatch]) -> String {
119+
let mut table = Table::new();
120+
table.load_preset("||--+-++| ++++++");
121+
122+
if results.is_empty() {
123+
return table.to_string();
124+
}
125+
126+
let schema = results[0].schema();
127+
128+
let mut header = Vec::new();
129+
for field in schema.fields() {
130+
header.push(Cell::new(field.name()));
131+
}
132+
table.set_header(header);
133+
134+
for batch in results {
135+
let displayes = batch
136+
.columns()
137+
.iter()
138+
.map(|array| df_get_display(array.as_ref()))
139+
.collect::<Vec<_>>();
140+
141+
for row in 0..batch.num_rows() {
142+
let mut cells = Vec::new();
143+
(0..batch.num_columns()).for_each(|col| {
144+
let string = displayes[col](row);
145+
cells.push(Cell::new(&string));
146+
});
147+
table.add_row(cells);
148+
}
149+
}
150+
table.to_string()
151+
}

datafusion/src/execution/dataframe_impl.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use crate::{
2929
dataframe::*,
3030
physical_plan::{collect, collect_partitioned},
3131
};
32-
use arrow::io::print;
3332
use arrow::record_batch::RecordBatch;
3433

3534
use crate::physical_plan::{
@@ -168,14 +167,14 @@ impl DataFrame for DataFrameImpl {
168167
/// Print results.
169168
async fn show(&self) -> Result<()> {
170169
let results = self.collect().await?;
171-
print::print(&results);
170+
print!("{}", crate::arrow_print::write(&results));
172171
Ok(())
173172
}
174173

175174
/// Print results and limit rows.
176175
async fn show_limit(&self, num: usize) -> Result<()> {
177176
let results = self.limit(num)?.collect().await?;
178-
print::print(&results);
177+
print!("{}", crate::arrow_print::write(&results));
179178
Ok(())
180179
}
181180

datafusion/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
//! let results: Vec<RecordBatch> = df.collect().await?;
5858
//!
5959
//! // format the results
60-
//! let pretty_results = datafusion::arrow::io::print::write(&results);
60+
//! let pretty_results = datafusion::arrow_print::write(&results);
6161
//!
6262
//! let expected = vec![
6363
//! "+---+--------------------------+",
@@ -92,7 +92,7 @@
9292
//! let results: Vec<RecordBatch> = df.collect().await?;
9393
//!
9494
//! // format the results
95-
//! let pretty_results = datafusion::arrow::io::print::write(&results);
95+
//! let pretty_results = datafusion::arrow_print::write(&results);
9696
//!
9797
//! let expected = vec![
9898
//! "+---+----------------+",
@@ -229,6 +229,7 @@ pub mod variable;
229229
pub use arrow;
230230
pub use parquet;
231231

232+
pub mod arrow_print;
232233
mod arrow_temporal_util;
233234

234235
pub mod field_util;

datafusion/src/physical_plan/file_format/csv.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ impl ExecutionPlan for CsvExec {
250250
mod tests {
251251
use super::*;
252252
use crate::{
253+
assert_batches_eq,
253254
datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem},
254255
scalar::ScalarValue,
255256
test_util::aggr_test_schema,
@@ -298,7 +299,7 @@ mod tests {
298299
"+----+-----+------------+",
299300
];
300301

301-
crate::assert_batches_eq!(expected, &[batch_slice(&batch, 0, 5)]);
302+
assert_batches_eq!(expected, &[batch_slice(&batch, 0, 5)]);
302303
Ok(())
303304
}
304305

@@ -343,7 +344,7 @@ mod tests {
343344
"+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+",
344345
];
345346

346-
crate::assert_batches_eq!(expected, &[batch]);
347+
assert_batches_eq!(expected, &[batch]);
347348

348349
Ok(())
349350
}
@@ -396,7 +397,7 @@ mod tests {
396397
"| b | 2021-10-26 |",
397398
"+----+------------+",
398399
];
399-
crate::assert_batches_eq!(expected, &[batch_slice(&batch, 0, 5)]);
400+
assert_batches_eq!(expected, &[batch_slice(&batch, 0, 5)]);
400401
Ok(())
401402
}
402403

datafusion/src/physical_plan/file_format/file_stream.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ mod tests {
192192

193193
use super::*;
194194
use crate::{
195+
assert_batches_eq,
195196
error::Result,
196197
test::{make_partition, object_store::TestObjectStore},
197198
};
@@ -230,7 +231,7 @@ mod tests {
230231
let batches = create_and_collect(None).await;
231232

232233
#[rustfmt::skip]
233-
crate::assert_batches_eq!(&[
234+
assert_batches_eq!(&[
234235
"+---+",
235236
"| i |",
236237
"+---+",
@@ -254,7 +255,7 @@ mod tests {
254255
async fn with_limit_between_files() -> Result<()> {
255256
let batches = create_and_collect(Some(5)).await;
256257
#[rustfmt::skip]
257-
crate::assert_batches_eq!(&[
258+
assert_batches_eq!(&[
258259
"+---+",
259260
"| i |",
260261
"+---+",
@@ -273,7 +274,7 @@ mod tests {
273274
async fn with_limit_at_middle_of_batch() -> Result<()> {
274275
let batches = create_and_collect(Some(6)).await;
275276
#[rustfmt::skip]
276-
crate::assert_batches_eq!(&[
277+
assert_batches_eq!(&[
277278
"+---+",
278279
"| i |",
279280
"+---+",

datafusion/src/physical_plan/file_format/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ fn create_dict_array(
269269
#[cfg(test)]
270270
mod tests {
271271
use crate::{
272+
assert_batches_eq,
272273
test::{build_table_i32, columns, object_store::TestObjectStore},
273274
test_util::aggr_test_schema,
274275
};
@@ -399,7 +400,7 @@ mod tests {
399400
"| 2 | 0 | 12 | 2021 | 26 |",
400401
"+---+----+----+------+-----+",
401402
];
402-
crate::assert_batches_eq!(expected, &[projected_batch]);
403+
assert_batches_eq!(expected, &[projected_batch]);
403404

404405
// project another batch that is larger than the previous one
405406
let file_batch = build_table_i32(
@@ -429,7 +430,7 @@ mod tests {
429430
"| 9 | -6 | 16 | 2021 | 27 |",
430431
"+---+-----+----+------+-----+",
431432
];
432-
crate::assert_batches_eq!(expected, &[projected_batch]);
433+
assert_batches_eq!(expected, &[projected_batch]);
433434

434435
// project another batch that is smaller than the previous one
435436
let file_batch = build_table_i32(
@@ -457,7 +458,7 @@ mod tests {
457458
"| 3 | 4 | 6 | 2021 | 28 |",
458459
"+---+---+---+------+-----+",
459460
];
460-
crate::assert_batches_eq!(expected, &[projected_batch]);
461+
assert_batches_eq!(expected, &[projected_batch]);
461462
}
462463

463464
// sets default for configs that play no role in projections

datafusion/src/physical_plan/file_format/parquet.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,7 @@ fn read_partition(
458458

459459
#[cfg(test)]
460460
mod tests {
461+
use crate::assert_batches_eq;
461462
use crate::datasource::{
462463
file_format::{parquet::ParquetFormat, FileFormat},
463464
object_store::local::{
@@ -566,7 +567,7 @@ mod tests {
566567
"| 1 | false | 1 | 10 |",
567568
"+----+----------+-------------+-------+",
568569
];
569-
crate::assert_batches_eq!(expected, &[batch]);
570+
assert_batches_eq!(expected, &[batch]);
570571

571572
let batch = results.next().await;
572573
assert!(batch.is_none());

datafusion/src/physical_plan/hash_aggregate.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1023,10 +1023,11 @@ mod tests {
10231023
use futures::FutureExt;
10241024

10251025
use super::*;
1026+
use crate::assert_batches_sorted_eq;
1027+
use crate::physical_plan::common;
10261028
use crate::physical_plan::expressions::{col, Avg};
10271029
use crate::test::assert_is_pending;
10281030
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
1029-
use crate::{assert_batches_sorted_eq, physical_plan::common};
10301031

10311032
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
10321033

0 commit comments

Comments
 (0)