@@ -558,7 +558,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
558
558
let batch_size = self
559
559
. batch_size
560
560
. min ( self . metadata . file_metadata ( ) . num_rows ( ) as usize ) ;
561
- let reader = ReaderFactory {
561
+ let reader_factory = ReaderFactory {
562
562
input : self . input . 0 ,
563
563
filter : self . filter ,
564
564
metadata : self . metadata . clone ( ) ,
@@ -569,7 +569,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
569
569
570
570
// Ensure schema of ParquetRecordBatchStream respects projection, and does
571
571
// not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
572
- let projected_fields = match reader . fields . as_deref ( ) . map ( |pf| & pf. arrow_type ) {
572
+ let projected_fields = match reader_factory . fields . as_deref ( ) . map ( |pf| & pf. arrow_type ) {
573
573
Some ( DataType :: Struct ( fields) ) => {
574
574
fields. filter_leaves ( |idx, _| self . projection . leaf_included ( idx) )
575
575
}
@@ -585,7 +585,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
585
585
projection : self . projection ,
586
586
selection : self . selection ,
587
587
schema,
588
- reader : Some ( reader ) ,
588
+ reader_factory : Some ( reader_factory ) ,
589
589
state : StreamState :: Init ,
590
590
} )
591
591
}
@@ -765,7 +765,7 @@ pub struct ParquetRecordBatchStream<T> {
765
765
selection : Option < RowSelection > ,
766
766
767
767
/// This is an option so it can be moved into a future
768
- reader : Option < ReaderFactory < T > > ,
768
+ reader_factory : Option < ReaderFactory < T > > ,
769
769
770
770
state : StreamState < T > ,
771
771
}
@@ -827,7 +827,7 @@ where
827
827
828
828
let selection = self . selection . as_mut ( ) . map ( |s| s. split_off ( row_count) ) ;
829
829
830
- let reader_factory = self . reader . take ( ) . expect ( "lost reader" ) ;
830
+ let reader_factory = self . reader_factory . take ( ) . expect ( "lost reader factory " ) ;
831
831
832
832
let ( reader_factory, maybe_reader) = reader_factory
833
833
. read_row_group (
@@ -841,7 +841,7 @@ where
841
841
self . state = StreamState :: Error ;
842
842
err
843
843
} ) ?;
844
- self . reader = Some ( reader_factory) ;
844
+ self . reader_factory = Some ( reader_factory) ;
845
845
846
846
if let Some ( reader) = maybe_reader {
847
847
return Ok ( Some ( reader) ) ;
@@ -881,7 +881,7 @@ where
881
881
None => return Poll :: Ready ( None ) ,
882
882
} ;
883
883
884
- let reader = self . reader . take ( ) . expect ( "lost reader" ) ;
884
+ let reader = self . reader_factory . take ( ) . expect ( "lost reader factory " ) ;
885
885
886
886
let row_count = self . metadata . row_group ( row_group_idx) . num_rows ( ) as usize ;
887
887
@@ -900,7 +900,7 @@ where
900
900
}
901
901
StreamState :: Reading ( f) => match ready ! ( f. poll_unpin( cx) ) {
902
902
Ok ( ( reader_factory, maybe_reader) ) => {
903
- self . reader = Some ( reader_factory) ;
903
+ self . reader_factory = Some ( reader_factory) ;
904
904
match maybe_reader {
905
905
// Read records from [`ParquetRecordBatchReader`]
906
906
Some ( reader) => self . state = StreamState :: Decoding ( reader) ,
0 commit comments