Skip to content

Commit 855eb54

Browse files
refactor: convert to mysql values directly from arrow (#7096)
Signed-off-by: luofucong <[email protected]>
1 parent 3119464 commit 855eb54

File tree

5 files changed

+418
-59
lines changed

5 files changed

+418
-59
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/recordbatch/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,9 @@ snafu.workspace = true
2727
tokio.workspace = true
2828

2929
[dev-dependencies]
30+
criterion = "0.7.0"
3031
tokio.workspace = true
32+
33+
[[bench]]
34+
name = "iter_record_batch_rows"
35+
harness = false
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::hint::black_box;
16+
use std::sync::Arc;
17+
18+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
19+
use datafusion::arrow::array::{Int32Array, TimestampMillisecondArray};
20+
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
21+
use datafusion_common::arrow::array::{ArrayRef, RecordBatch, StringArray};
22+
use datafusion_common::arrow::datatypes::Schema;
23+
use datafusion_common::{ScalarValue, utils};
24+
use datatypes::arrow::array::AsArray;
25+
use datatypes::arrow::datatypes::{
26+
Int32Type, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
27+
TimestampSecondType,
28+
};
29+
use datatypes::schema::SchemaRef;
30+
31+
fn prepare_record_batch(rows: usize) -> RecordBatch {
32+
let schema = Schema::new(vec![
33+
Field::new(
34+
"ts",
35+
DataType::Timestamp(TimeUnit::Millisecond, None),
36+
false,
37+
),
38+
Field::new("i", DataType::Int32, true),
39+
Field::new("s", DataType::Utf8, true),
40+
]);
41+
42+
let columns: Vec<ArrayRef> = vec![
43+
Arc::new(TimestampMillisecondArray::from_iter_values(
44+
(0..rows).map(|x| (1760313600000 + x) as i64),
45+
)),
46+
Arc::new(Int32Array::from_iter_values((0..rows).map(|x| x as i32))),
47+
Arc::new(StringArray::from_iter((0..rows).map(|x| {
48+
if x % 2 == 0 {
49+
Some(format!("s_{x}"))
50+
} else {
51+
None
52+
}
53+
}))),
54+
];
55+
56+
RecordBatch::try_new(Arc::new(schema), columns).unwrap()
57+
}
58+
59+
fn iter_by_greptimedb_values(schema: SchemaRef, record_batch: RecordBatch) {
60+
let record_batch =
61+
common_recordbatch::RecordBatch::try_from_df_record_batch(schema, record_batch).unwrap();
62+
for row in record_batch.rows() {
63+
black_box(row);
64+
}
65+
}
66+
67+
fn iter_by_loop_rows_and_columns(record_batch: RecordBatch) {
68+
for i in 0..record_batch.num_rows() {
69+
for column in record_batch.columns() {
70+
match column.data_type() {
71+
DataType::Timestamp(time_unit, _) => {
72+
let v = match time_unit {
73+
TimeUnit::Second => {
74+
let array = column.as_primitive::<TimestampSecondType>();
75+
array.value(i)
76+
}
77+
TimeUnit::Millisecond => {
78+
let array = column.as_primitive::<TimestampMillisecondType>();
79+
array.value(i)
80+
}
81+
TimeUnit::Microsecond => {
82+
let array = column.as_primitive::<TimestampMicrosecondType>();
83+
array.value(i)
84+
}
85+
TimeUnit::Nanosecond => {
86+
let array = column.as_primitive::<TimestampNanosecondType>();
87+
array.value(i)
88+
}
89+
};
90+
black_box(v);
91+
}
92+
DataType::Int32 => {
93+
let array = column.as_primitive::<Int32Type>();
94+
let v = array.value(i);
95+
black_box(v);
96+
}
97+
DataType::Utf8 => {
98+
let array = column.as_string::<i32>();
99+
let v = array.value(i);
100+
black_box(v);
101+
}
102+
_ => unreachable!(),
103+
}
104+
}
105+
}
106+
}
107+
108+
fn iter_by_datafusion_scalar_values(record_batch: RecordBatch) {
109+
let columns = record_batch.columns();
110+
for i in 0..record_batch.num_rows() {
111+
let row = utils::get_row_at_idx(columns, i).unwrap();
112+
black_box(row);
113+
}
114+
}
115+
116+
fn iter_by_datafusion_scalar_values_with_buf(record_batch: RecordBatch) {
117+
let columns = record_batch.columns();
118+
let mut buf = vec![ScalarValue::Null; columns.len()];
119+
for i in 0..record_batch.num_rows() {
120+
utils::extract_row_at_idx_to_buf(columns, i, &mut buf).unwrap();
121+
}
122+
}
123+
124+
pub fn criterion_benchmark(c: &mut Criterion) {
125+
let mut group = c.benchmark_group("iter_record_batch");
126+
127+
for rows in [1usize, 10, 100, 1_000, 10_000] {
128+
group.bench_with_input(
129+
BenchmarkId::new("by_greptimedb_values", rows),
130+
&rows,
131+
|b, rows| {
132+
let record_batch = prepare_record_batch(*rows);
133+
let schema =
134+
Arc::new(datatypes::schema::Schema::try_from(record_batch.schema()).unwrap());
135+
b.iter(|| {
136+
iter_by_greptimedb_values(schema.clone(), record_batch.clone());
137+
})
138+
},
139+
);
140+
141+
group.bench_with_input(
142+
BenchmarkId::new("by_loop_rows_and_columns", rows),
143+
&rows,
144+
|b, rows| {
145+
let record_batch = prepare_record_batch(*rows);
146+
b.iter(|| {
147+
iter_by_loop_rows_and_columns(record_batch.clone());
148+
})
149+
},
150+
);
151+
152+
group.bench_with_input(
153+
BenchmarkId::new("by_datafusion_scalar_values", rows),
154+
&rows,
155+
|b, rows| {
156+
let record_batch = prepare_record_batch(*rows);
157+
b.iter(|| {
158+
iter_by_datafusion_scalar_values(record_batch.clone());
159+
})
160+
},
161+
);
162+
163+
group.bench_with_input(
164+
BenchmarkId::new("by_datafusion_scalar_values_with_buf", rows),
165+
&rows,
166+
|b, rows| {
167+
let record_batch = prepare_record_batch(*rows);
168+
b.iter(|| {
169+
iter_by_datafusion_scalar_values_with_buf(record_batch.clone());
170+
})
171+
},
172+
);
173+
}
174+
175+
group.finish();
176+
}
177+
178+
criterion_group!(benches, criterion_benchmark);
179+
criterion_main!(benches);

src/servers/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ catalog.workspace = true
3535
chrono.workspace = true
3636
common-base.workspace = true
3737
common-catalog.workspace = true
38+
common-decimal.workspace = true
3839
common-error.workspace = true
3940
common-frontend.workspace = true
4041
common-grpc.workspace = true

0 commit comments

Comments
 (0)