File tree 3 files changed +44
-24
lines changed
3 files changed +44
-24
lines changed Original file line number Diff line number Diff line change 18
18
//! [`MemTable`] for querying `Vec<RecordBatch>` by DataFusion.
19
19
20
20
use futures:: StreamExt ;
21
+ use log:: debug;
21
22
use std:: any:: Any ;
22
23
use std:: fmt:: { self , Debug , Display } ;
23
24
use std:: sync:: Arc ;
@@ -55,23 +56,26 @@ pub struct MemTable {
55
56
impl MemTable {
56
57
/// Create a new in-memory table from the provided schema and record batches
57
58
pub fn try_new ( schema : SchemaRef , partitions : Vec < Vec < RecordBatch > > ) -> Result < Self > {
58
- if partitions
59
- . iter ( )
60
- . flatten ( )
61
- . all ( |batches| schema. contains ( & batches. schema ( ) ) )
62
- {
63
- Ok ( Self {
64
- schema,
65
- batches : partitions
66
- . into_iter ( )
67
- . map ( |e| Arc :: new ( RwLock :: new ( e) ) )
68
- . collect :: < Vec < _ > > ( ) ,
69
- } )
70
- } else {
71
- Err ( DataFusionError :: Plan (
72
- "Mismatch between schema and batches" . to_string ( ) ,
73
- ) )
59
+ for batches in partitions. iter ( ) . flatten ( ) {
60
+ let batches_schema = batches. schema ( ) ;
61
+ if !schema. contains ( & batches_schema) {
62
+ debug ! (
63
+ "mem table schema does not contain batches schema. \
64
+ Target_schema: {schema:?}. Batches Schema: {batches_schema:?}"
65
+ ) ;
66
+ return Err ( DataFusionError :: Plan (
67
+ "Mismatch between schema and batches" . to_string ( ) ,
68
+ ) ) ;
69
+ }
74
70
}
71
+
72
+ Ok ( Self {
73
+ schema,
74
+ batches : partitions
75
+ . into_iter ( )
76
+ . map ( |e| Arc :: new ( RwLock :: new ( e) ) )
77
+ . collect :: < Vec < _ > > ( ) ,
78
+ } )
75
79
}
76
80
77
81
/// Create a mem table by reading from another data source
Original file line number Diff line number Diff line change @@ -25,6 +25,7 @@ use async_trait::async_trait;
25
25
26
26
use datafusion_common:: { DataFusionError , Result } ;
27
27
use datafusion_expr:: { Expr , TableType } ;
28
+ use log:: debug;
28
29
29
30
use crate :: datasource:: TableProvider ;
30
31
use crate :: execution:: context:: { SessionState , TaskContext } ;
@@ -53,10 +54,17 @@ impl StreamingTable {
53
54
schema : SchemaRef ,
54
55
partitions : Vec < Arc < dyn PartitionStream > > ,
55
56
) -> Result < Self > {
56
- if !partitions. iter ( ) . all ( |x| schema. contains ( x. schema ( ) ) ) {
57
- return Err ( DataFusionError :: Plan (
58
- "Mismatch between schema and batches" . to_string ( ) ,
59
- ) ) ;
57
+ for x in partitions. iter ( ) {
58
+ let partition_schema = x. schema ( ) ;
59
+ if !schema. contains ( partition_schema) {
60
+ debug ! (
61
+ "target schema does not contain partition schema. \
62
+ Target_schema: {schema:?}. Partiton Schema: {partition_schema:?}"
63
+ ) ;
64
+ return Err ( DataFusionError :: Plan (
65
+ "Mismatch between schema and batches" . to_string ( ) ,
66
+ ) ) ;
67
+ }
60
68
}
61
69
62
70
Ok ( Self {
Original file line number Diff line number Diff line change @@ -26,6 +26,7 @@ use futures::stream::StreamExt;
26
26
27
27
use datafusion_common:: { DataFusionError , Result , Statistics } ;
28
28
use datafusion_physical_expr:: PhysicalSortExpr ;
29
+ use log:: debug;
29
30
30
31
use crate :: datasource:: streaming:: PartitionStream ;
31
32
use crate :: physical_plan:: stream:: RecordBatchStreamAdapter ;
@@ -48,10 +49,17 @@ impl StreamingTableExec {
48
49
projection : Option < & Vec < usize > > ,
49
50
infinite : bool ,
50
51
) -> Result < Self > {
51
- if !partitions. iter ( ) . all ( |x| schema. contains ( x. schema ( ) ) ) {
52
- return Err ( DataFusionError :: Plan (
53
- "Mismatch between schema and batches" . to_string ( ) ,
54
- ) ) ;
52
+ for x in partitions. iter ( ) {
53
+ let partition_schema = x. schema ( ) ;
54
+ if !schema. contains ( partition_schema) {
55
+ debug ! (
56
+ "target schema does not contain partition schema. \
57
+ Target_schema: {schema:?}. Partiton Schema: {partition_schema:?}"
58
+ ) ;
59
+ return Err ( DataFusionError :: Plan (
60
+ "Mismatch between schema and batches" . to_string ( ) ,
61
+ ) ) ;
62
+ }
55
63
}
56
64
57
65
let projected_schema = match projection {
You can’t perform that action at this time.
0 commit comments