Skip to content

Commit 33b6931

Browse files
authored
Ignore last test, fix cargo clippy, format and pass integration tests (#10)
* Fix tests * Ignore last test, fix clippy, fmt and enable integration * more clippy fix
1 parent f9504e7 commit 33b6931

File tree

22 files changed

+78
-71
lines changed

22 files changed

+78
-71
lines changed

Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,5 @@ members = [
3131
exclude = ["python"]
3232

3333
[patch.crates-io]
34-
arrow2 = { path = "/home/houqp/Documents/code/arrow/arrow2" }
35-
arrow-flight = { path = "/home/houqp/Documents/code/arrow/arrow2/arrow-flight" }
36-
parquet2 = { path = "/home/houqp/Documents/code/arrow/parquet2" }
34+
arrow2 = { git = "https://github.com/houqp/arrow2.git", branch = "qp_ord" }
35+
arrow-flight = { git = "https://github.com/houqp/arrow2.git", branch = "qp_ord" }

ballista/rust/core/src/execution_plans/shuffle_writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ mod tests {
507507
fn column_by_name(&self, column_name: &str) -> Option<&ArrayRef> {
508508
self.fields()
509509
.iter()
510-
.position(|c| c.name() == &column_name)
510+
.position(|c| c.name() == column_name)
511511
.map(|pos| self.values()[pos].borrow())
512512
}
513513
}

benchmarks/src/bin/tpch.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ use datafusion::physical_plan::{collect, displayable};
3737
use datafusion::prelude::*;
3838

3939
use arrow::io::parquet::write::{Compression, Version, WriteOptions};
40+
use ballista::prelude::{
41+
BallistaConfig, BallistaContext, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
42+
};
4043
use structopt::StructOpt;
4144

4245
#[cfg(feature = "snmalloc")]
@@ -179,7 +182,7 @@ async fn main() -> Result<()> {
179182
env_logger::init();
180183
match TpchOpt::from_args() {
181184
TpchOpt::Benchmark(BallistaBenchmark(opt)) => {
182-
todo!() //benchmark_ballista(opt).await.map(|_| ())
185+
benchmark_ballista(opt).await.map(|_| ())
183186
}
184187
TpchOpt::Benchmark(DataFusionBenchmark(opt)) => {
185188
benchmark_datafusion(opt).await.map(|_| ())
@@ -239,7 +242,6 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
239242
Ok(result)
240243
}
241244

242-
/*
243245
async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
244246
println!("Running benchmarks with the following options: {:?}", opt);
245247

@@ -316,7 +318,6 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
316318

317319
Ok(())
318320
}
319-
*/
320321

321322
fn get_query_sql(query: usize) -> Result<String> {
322323
if query > 0 && query < 23 {

datafusion-examples/examples/simple_udf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ use datafusion::arrow::{
2121
record_batch::RecordBatch,
2222
};
2323

24+
use arrow::array::Array;
2425
use datafusion::prelude::*;
2526
use datafusion::{error::Result, physical_plan::functions::make_scalar_function};
2627
use std::sync::Arc;
27-
use arrow::array::Array;
2828

2929
// create local execution context with an in-memory table
3030
fn create_context() -> Result<ExecutionContext> {

datafusion/benches/data_utils/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ fn create_record_batch(
122122
vec![
123123
Arc::new(Utf8Array::<i32>::from_slice(keys)),
124124
Arc::new(Float32Array::from_slice(vec![i as f32; batch_size])),
125-
Arc::new(Float64Array::from_slice(values)),
126-
Arc::new(UInt64Array::from_slice(integer_values_wide)),
125+
Arc::new(Float64Array::from(values)),
126+
Arc::new(UInt64Array::from(integer_values_wide)),
127127
Arc::new(UInt64Array::from_slice(integer_values_narrow)),
128128
],
129129
)

datafusion/benches/physical_plan.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ use criterion::{BatchSize, Criterion};
2121
extern crate arrow;
2222
extern crate datafusion;
2323

24-
use std::{iter::FromIterator, sync::Arc};
24+
use std::sync::Arc;
2525

2626
use arrow::{
27-
array::{ArrayRef, Int64Array, StringArray},
27+
array::{ArrayRef, Int64Array, Utf8Array},
2828
record_batch::RecordBatch,
2929
};
3030
use tokio::runtime::Runtime;
@@ -39,7 +39,7 @@ use datafusion::physical_plan::{
3939
// Initialise the operator using the provided record batches and the sort key
4040
// as inputs. All record batches must have the same schema.
4141
fn sort_preserving_merge_operator(batches: Vec<RecordBatch>, sort: &[&str]) {
42-
let schema = batches[0].schema();
42+
let schema = batches[0].schema().clone();
4343

4444
let sort = sort
4545
.iter()
@@ -51,7 +51,7 @@ fn sort_preserving_merge_operator(batches: Vec<RecordBatch>, sort: &[&str]) {
5151

5252
let exec = MemoryExec::try_new(
5353
&batches.into_iter().map(|rb| vec![rb]).collect::<Vec<_>>(),
54-
schema.clone(),
54+
schema,
5555
None,
5656
)
5757
.unwrap();
@@ -104,9 +104,9 @@ fn batches(
104104
col_b.sort();
105105
col_c.sort();
106106

107-
let col_a: ArrayRef = Arc::new(StringArray::from_iter(col_a));
108-
let col_b: ArrayRef = Arc::new(StringArray::from_iter(col_b));
109-
let col_c: ArrayRef = Arc::new(StringArray::from_iter(col_c));
107+
let col_a: ArrayRef = Arc::new(Utf8Array::<i32>::from(col_a));
108+
let col_b: ArrayRef = Arc::new(Utf8Array::<i32>::from(col_b));
109+
let col_c: ArrayRef = Arc::new(Utf8Array::<i32>::from(col_c));
110110
let col_d: ArrayRef = Arc::new(Int64Array::from(col_d));
111111

112112
let rb = RecordBatch::try_from_iter(vec![

datafusion/src/arrow_temporal_util.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ mod tests {
211211
// Note: Use chrono APIs that are different than
212212
// naive_datetime_to_timestamp to compute the utc offset to
213213
// try and double check the logic
214-
let utc_offset_secs = match Local.offset_from_local_datetime(&naive_datetime) {
214+
let utc_offset_secs = match Local.offset_from_local_datetime(naive_datetime) {
215215
LocalResult::Single(local_offset) => {
216216
local_offset.fix().local_minus_utc() as i64
217217
}

datafusion/src/execution/dataframe_impl.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,13 +160,15 @@ impl DataFrame for DataFrameImpl {
160160
/// Print results.
161161
async fn show(&self) -> Result<()> {
162162
let results = self.collect().await?;
163-
Ok(print::print(&results))
163+
print::print(&results);
164+
Ok(())
164165
}
165166

166167
/// Print results and limit rows.
167168
async fn show_limit(&self, num: usize) -> Result<()> {
168169
let results = self.limit(num)?.collect().await?;
169-
Ok(print::print(&results))
170+
print::print(&results);
171+
Ok(())
170172
}
171173

172174
/// Convert the logical plan represented by this DataFrame into a physical plan and

datafusion/src/physical_plan/array_expressions.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919
2020
use crate::error::{DataFusionError, Result};
2121
use arrow::array::*;
22-
use arrow::compute::concat;
2322
use arrow::datatypes::DataType;
24-
use std::sync::Arc;
2523

2624
use super::ColumnarValue;
2725

@@ -35,7 +33,10 @@ fn array_array(arrays: &[&dyn Array]) -> Result<ArrayRef> {
3533

3634
macro_rules! array {
3735
($PRIMITIVE: ty, $ARRAY: ty, $DATA_TYPE: path) => {{
38-
let array = MutablePrimitiveArray::<$PRIMITIVE>::with_capacity_from(first.len() * size, $DATA_TYPE);
36+
let array = MutablePrimitiveArray::<$PRIMITIVE>::with_capacity_from(
37+
first.len() * size,
38+
$DATA_TYPE,
39+
);
3940
let mut array = MutableFixedSizeListArray::new(array, size);
4041
// for each entry in the array
4142
for index in 0..first.len() {
@@ -73,7 +74,6 @@ fn array_array(arrays: &[&dyn Array]) -> Result<ArrayRef> {
7374
}};
7475
}
7576

76-
7777
match first.data_type() {
7878
DataType::Boolean => {
7979
let array = MutableBooleanArray::with_capacity(first.len() * size);
@@ -91,7 +91,7 @@ fn array_array(arrays: &[&dyn Array]) -> Result<ArrayRef> {
9191
}
9292
}
9393
Ok(array.as_arc())
94-
},
94+
}
9595
DataType::UInt8 => array!(u8, PrimitiveArray<u8>, DataType::UInt8),
9696
DataType::UInt16 => array!(u16, PrimitiveArray<u16>, DataType::UInt16),
9797
DataType::UInt32 => array!(u32, PrimitiveArray<u32>, DataType::UInt32),
@@ -109,7 +109,6 @@ fn array_array(arrays: &[&dyn Array]) -> Result<ArrayRef> {
109109
data_type
110110
))),
111111
}
112-
113112
}
114113

115114
/// put values in an array.

datafusion/src/physical_plan/csv.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,17 +308,18 @@ impl CsvExec {
308308
filenames: &[String],
309309
options: &CsvReadOptions,
310310
) -> Result<Schema> {
311-
Ok(infer_schema_from_files(
311+
infer_schema_from_files(
312312
filenames,
313313
options.delimiter,
314314
Some(options.schema_infer_max_records),
315315
options.has_header,
316-
)?)
316+
)
317317
}
318318
}
319319

320320
type Payload = ArrowResult<RecordBatch>;
321321

322+
#[allow(clippy::too_many_arguments)]
322323
fn producer_task<R: Read>(
323324
reader: R,
324325
response_tx: Sender<Payload>,

datafusion/src/physical_plan/expressions/binary.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -259,11 +259,8 @@ fn evaluate_scalar(
259259
Ok(None)
260260
}
261261
}
262-
} else if matches!(op, Or) {
263-
// TODO: optimize scalar Or
264-
Ok(None)
265-
} else if matches!(op, And) {
266-
// TODO: optimize scalar And
262+
} else if matches!(op, Or | And) {
263+
// TODO: optimize scalar Or | And
267264
Ok(None)
268265
} else {
269266
match (lhs.data_type(), op) {

datafusion/src/physical_plan/filter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,16 @@ use crate::physical_plan::{
3030
DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
3131
};
3232

33-
use arrow::array::{BooleanArray, Array};
33+
use arrow::array::{Array, BooleanArray};
3434
use arrow::compute::filter::filter_record_batch;
3535
use arrow::datatypes::{DataType, SchemaRef};
3636
use arrow::error::Result as ArrowResult;
3737
use arrow::record_batch::RecordBatch;
3838

3939
use async_trait::async_trait;
4040

41-
use futures::stream::{Stream, StreamExt};
4241
use arrow::compute::boolean::{and, is_not_null};
42+
use futures::stream::{Stream, StreamExt};
4343

4444
/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
4545
/// include in its output batches.

datafusion/src/physical_plan/functions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,7 @@ pub fn create_physical_fun(
587587
))),
588588
}),
589589
BuiltinScalarFunction::BitLength => Arc::new(|args| match &args[0] {
590-
ColumnarValue::Array(v) => todo!(),
590+
ColumnarValue::Array(_v) => todo!(),
591591
ColumnarValue::Scalar(v) => match v {
592592
ScalarValue::Utf8(v) => Ok(ColumnarValue::Scalar(ScalarValue::Int32(
593593
v.as_ref().map(|x| (x.len() * 8) as i32),

datafusion/src/physical_plan/hash_aggregate.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -880,6 +880,7 @@ impl RecordBatchStream for HashAggregateStream {
880880

881881
/// Given Vec<Vec<ArrayRef>>, concatenates the inners `Vec<ArrayRef>` into `ArrayRef`, returning `Vec<ArrayRef>`
882882
/// This assumes that `arrays` is not empty.
883+
#[allow(dead_code)]
883884
fn concatenate(arrays: Vec<Vec<ArrayRef>>) -> ArrowResult<Vec<ArrayRef>> {
884885
(0..arrays[0].len())
885886
.map(|column| {
@@ -968,7 +969,7 @@ fn create_batch_from_map(
968969
.zip(output_schema.fields().iter())
969970
.map(|(col, desired_field)| {
970971
arrow::compute::cast::cast(col.as_ref(), desired_field.data_type())
971-
.map(|v| Arc::from(v))
972+
.map(Arc::from)
972973
})
973974
.collect::<ArrowResult<Vec<_>>>()?;
974975

datafusion/src/physical_plan/parquet.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! Execution plan for reading Parquet files
1919
20+
/// FIXME: https://github.com/apache/arrow-datafusion/issues/1058
2021
use fmt::Debug;
2122
use std::fmt;
2223
use std::fs::File;
@@ -47,7 +48,7 @@ use log::debug;
4748
use parquet::statistics::{
4849
BinaryStatistics as ParquetBinaryStatistics,
4950
BooleanStatistics as ParquetBooleanStatistics,
50-
PrimitiveStatistics as ParquetPrimitiveStatistics, Statistics as ParquetStatistics,
51+
PrimitiveStatistics as ParquetPrimitiveStatistics,
5152
};
5253

5354
use tokio::{
@@ -294,6 +295,7 @@ impl ParquetFileMetrics {
294295

295296
type Payload = ArrowResult<RecordBatch>;
296297

298+
#[allow(dead_code)]
297299
fn producer_task(
298300
path: &str,
299301
response_tx: Sender<Payload>,
@@ -416,6 +418,7 @@ impl ExecutionPlan for ParquetExec {
416418
}
417419
}
418420

421+
#[allow(dead_code)]
419422
fn send_result(
420423
response_tx: &Sender<ArrowResult<RecordBatch>>,
421424
result: ArrowResult<RecordBatch>,
@@ -520,7 +523,7 @@ macro_rules! get_min_max_values {
520523
.collect();
521524

522525
// ignore errors converting to arrays (e.g. different types)
523-
ScalarValue::iter_to_array(scalar_values).ok().map(|v| Arc::from(v))
526+
ScalarValue::iter_to_array(scalar_values).ok().map(Arc::from)
524527
}}
525528
}
526529

@@ -575,7 +578,7 @@ fn read_partition(
575578
metrics: ExecutionPlanMetricsSet,
576579
projection: &[usize],
577580
predicate_builder: &Option<PruningPredicate>,
578-
batch_size: usize,
581+
_batch_size: usize,
579582
response_tx: Sender<ArrowResult<RecordBatch>>,
580583
limit: Option<usize>,
581584
) -> Result<()> {
@@ -593,7 +596,7 @@ fn read_partition(
593596
)?;
594597

595598
if let Some(predicate_builder) = predicate_builder {
596-
let file_metadata = reader.metadata();
599+
let _file_metadata = reader.metadata();
597600
reader.set_groups_filter(Arc::new(build_row_group_predicate(
598601
predicate_builder,
599602
file_metrics,

datafusion/src/physical_plan/repartition.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::physical_plan::hash_utils::create_hashes;
2828
use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics};
2929
use arrow::record_batch::RecordBatch;
3030
use arrow::{
31-
array::{Array, ArrayRef, UInt32Array, UInt64Array, Utf8Array},
31+
array::{Array, UInt64Array},
3232
error::Result as ArrowResult,
3333
};
3434
use arrow::{compute::take, datatypes::SchemaRef};
@@ -462,6 +462,7 @@ mod tests {
462462
physical_plan::{expressions::col, memory::MemoryExec},
463463
test::exec::{BarrierExec, ErrorExec, MockExec},
464464
};
465+
use arrow::array::{ArrayRef, UInt32Array, Utf8Array};
465466
use arrow::datatypes::{DataType, Field, Schema};
466467
use arrow::error::ArrowError;
467468
use arrow::record_batch::RecordBatch;

datafusion/src/physical_plan/sort_preserving_merge.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -898,11 +898,11 @@ mod tests {
898898
let schema = partitions[0][0].schema();
899899
let sort = vec![
900900
PhysicalSortExpr {
901-
expr: col("b", &schema).unwrap(),
901+
expr: col("b", schema).unwrap(),
902902
options: Default::default(),
903903
},
904904
PhysicalSortExpr {
905-
expr: col("c", &schema).unwrap(),
905+
expr: col("c", schema).unwrap(),
906906
options: Default::default(),
907907
},
908908
];

datafusion/src/physical_plan/windows/aggregate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl AggregateWindowExpr {
9595
.collect::<Vec<ArrayRef>>();
9696
let results = results.iter().map(|i| i.as_ref()).collect::<Vec<_>>();
9797
concat::concatenate(&results)
98-
.map(|x| ArrayRef::from(x))
98+
.map(ArrayRef::from)
9999
.map_err(DataFusionError::ArrowError)
100100
}
101101

datafusion/src/physical_plan/windows/built_in.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl WindowExpr for BuiltInWindowExpr {
9999
};
100100
let results = results.iter().map(|i| i.as_ref()).collect::<Vec<_>>();
101101
concat::concatenate(&results)
102-
.map(|x| ArrayRef::from(x))
102+
.map(ArrayRef::from)
103103
.map_err(DataFusionError::ArrowError)
104104
}
105105
}

0 commit comments

Comments
 (0)