Skip to content

Commit 7864b83

Browse files
committed
Push SessionState into FileFormat (apache#4349)
1 parent 975ff15 commit 7864b83

File tree

12 files changed

+234
-175
lines changed

12 files changed

+234
-175
lines changed

datafusion/core/src/datasource/file_format/avro.rs

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use object_store::{GetResult, ObjectMeta, ObjectStore};
2828
use super::FileFormat;
2929
use crate::avro_to_arrow::read_avro_schema_from_reader;
3030
use crate::error::Result;
31+
use crate::execution::context::SessionState;
3132
use crate::logical_expr::Expr;
3233
use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
3334
use crate::physical_plan::ExecutionPlan;
@@ -47,6 +48,7 @@ impl FileFormat for AvroFormat {
4748

4849
async fn infer_schema(
4950
&self,
51+
_ctx: &SessionState,
5052
store: &Arc<dyn ObjectStore>,
5153
objects: &[ObjectMeta],
5254
) -> Result<SchemaRef> {
@@ -68,6 +70,7 @@ impl FileFormat for AvroFormat {
6870

6971
async fn infer_stats(
7072
&self,
73+
_ctx: &SessionState,
7174
_store: &Arc<dyn ObjectStore>,
7275
_table_schema: SchemaRef,
7376
_object: &ObjectMeta,
@@ -77,6 +80,7 @@ impl FileFormat for AvroFormat {
7780

7881
async fn create_physical_plan(
7982
&self,
83+
_ctx: &SessionState,
8084
conf: FileScanConfig,
8185
_filters: &[Expr],
8286
) -> Result<Arc<dyn ExecutionPlan>> {
@@ -101,10 +105,11 @@ mod tests {
101105
#[tokio::test]
102106
async fn read_small_batches() -> Result<()> {
103107
let config = SessionConfig::new().with_batch_size(2);
104-
let ctx = SessionContext::with_config(config);
108+
let session_ctx = SessionContext::with_config(config);
109+
let ctx = session_ctx.state();
105110
let task_ctx = ctx.task_ctx();
106111
let projection = None;
107-
let exec = get_exec("alltypes_plain.avro", projection, None).await?;
112+
let exec = get_exec(&ctx, "alltypes_plain.avro", projection, None).await?;
108113
let stream = exec.execute(0, task_ctx)?;
109114

110115
let tt_batches = stream
@@ -124,9 +129,10 @@ mod tests {
124129
#[tokio::test]
125130
async fn read_limit() -> Result<()> {
126131
let session_ctx = SessionContext::new();
127-
let task_ctx = session_ctx.task_ctx();
132+
let ctx = session_ctx.state();
133+
let task_ctx = ctx.task_ctx();
128134
let projection = None;
129-
let exec = get_exec("alltypes_plain.avro", projection, Some(1)).await?;
135+
let exec = get_exec(&ctx, "alltypes_plain.avro", projection, Some(1)).await?;
130136
let batches = collect(exec, task_ctx).await?;
131137
assert_eq!(1, batches.len());
132138
assert_eq!(11, batches[0].num_columns());
@@ -138,9 +144,10 @@ mod tests {
138144
#[tokio::test]
139145
async fn read_alltypes_plain_avro() -> Result<()> {
140146
let session_ctx = SessionContext::new();
141-
let task_ctx = session_ctx.task_ctx();
147+
let ctx = session_ctx.state();
148+
let task_ctx = ctx.task_ctx();
142149
let projection = None;
143-
let exec = get_exec("alltypes_plain.avro", projection, None).await?;
150+
let exec = get_exec(&ctx, "alltypes_plain.avro", projection, None).await?;
144151

145152
let x: Vec<String> = exec
146153
.schema()
@@ -190,9 +197,10 @@ mod tests {
190197
#[tokio::test]
191198
async fn read_bool_alltypes_plain_avro() -> Result<()> {
192199
let session_ctx = SessionContext::new();
193-
let task_ctx = session_ctx.task_ctx();
200+
let ctx = session_ctx.state();
201+
let task_ctx = ctx.task_ctx();
194202
let projection = Some(vec![1]);
195-
let exec = get_exec("alltypes_plain.avro", projection, None).await?;
203+
let exec = get_exec(&ctx, "alltypes_plain.avro", projection, None).await?;
196204

197205
let batches = collect(exec, task_ctx).await?;
198206
assert_eq!(batches.len(), 1);
@@ -216,9 +224,10 @@ mod tests {
216224
#[tokio::test]
217225
async fn read_i32_alltypes_plain_avro() -> Result<()> {
218226
let session_ctx = SessionContext::new();
219-
let task_ctx = session_ctx.task_ctx();
227+
let ctx = session_ctx.state();
228+
let task_ctx = ctx.task_ctx();
220229
let projection = Some(vec![0]);
221-
let exec = get_exec("alltypes_plain.avro", projection, None).await?;
230+
let exec = get_exec(&ctx, "alltypes_plain.avro", projection, None).await?;
222231

223232
let batches = collect(exec, task_ctx).await?;
224233
assert_eq!(batches.len(), 1);
@@ -239,9 +248,10 @@ mod tests {
239248
#[tokio::test]
240249
async fn read_i96_alltypes_plain_avro() -> Result<()> {
241250
let session_ctx = SessionContext::new();
242-
let task_ctx = session_ctx.task_ctx();
251+
let ctx = session_ctx.state();
252+
let task_ctx = ctx.task_ctx();
243253
let projection = Some(vec![10]);
244-
let exec = get_exec("alltypes_plain.avro", projection, None).await?;
254+
let exec = get_exec(&ctx, "alltypes_plain.avro", projection, None).await?;
245255

246256
let batches = collect(exec, task_ctx).await?;
247257
assert_eq!(batches.len(), 1);
@@ -262,9 +272,10 @@ mod tests {
262272
#[tokio::test]
263273
async fn read_f32_alltypes_plain_avro() -> Result<()> {
264274
let session_ctx = SessionContext::new();
265-
let task_ctx = session_ctx.task_ctx();
275+
let ctx = session_ctx.state();
276+
let task_ctx = ctx.task_ctx();
266277
let projection = Some(vec![6]);
267-
let exec = get_exec("alltypes_plain.avro", projection, None).await?;
278+
let exec = get_exec(&ctx, "alltypes_plain.avro", projection, None).await?;
268279

269280
let batches = collect(exec, task_ctx).await?;
270281
assert_eq!(batches.len(), 1);
@@ -288,9 +299,10 @@ mod tests {
288299
#[tokio::test]
289300
async fn read_f64_alltypes_plain_avro() -> Result<()> {
290301
let session_ctx = SessionContext::new();
291-
let task_ctx = session_ctx.task_ctx();
302+
let ctx = session_ctx.state();
303+
let task_ctx = ctx.task_ctx();
292304
let projection = Some(vec![7]);
293-
let exec = get_exec("alltypes_plain.avro", projection, None).await?;
305+
let exec = get_exec(&ctx, "alltypes_plain.avro", projection, None).await?;
294306

295307
let batches = collect(exec, task_ctx).await?;
296308
assert_eq!(batches.len(), 1);
@@ -314,9 +326,10 @@ mod tests {
314326
#[tokio::test]
315327
async fn read_binary_alltypes_plain_avro() -> Result<()> {
316328
let session_ctx = SessionContext::new();
317-
let task_ctx = session_ctx.task_ctx();
329+
let ctx = session_ctx.state();
330+
let task_ctx = ctx.task_ctx();
318331
let projection = Some(vec![9]);
319-
let exec = get_exec("alltypes_plain.avro", projection, None).await?;
332+
let exec = get_exec(&ctx, "alltypes_plain.avro", projection, None).await?;
320333

321334
let batches = collect(exec, task_ctx).await?;
322335
assert_eq!(batches.len(), 1);
@@ -338,14 +351,15 @@ mod tests {
338351
}
339352

340353
async fn get_exec(
354+
ctx: &SessionState,
341355
file_name: &str,
342356
projection: Option<Vec<usize>>,
343357
limit: Option<usize>,
344358
) -> Result<Arc<dyn ExecutionPlan>> {
345359
let testdata = crate::test_util::arrow_test_data();
346360
let store_root = format!("{}/avro", testdata);
347361
let format = AvroFormat {};
348-
scan_format(&format, &store_root, file_name, projection, limit).await
362+
scan_format(ctx, &format, &store_root, file_name, projection, limit).await
349363
}
350364
}
351365

@@ -356,13 +370,16 @@ mod tests {
356370

357371
use super::super::test_util::scan_format;
358372
use crate::error::DataFusionError;
373+
use crate::prelude::SessionContext;
359374

360375
#[tokio::test]
361376
async fn test() -> Result<()> {
377+
let session_ctx = SessionContext::new();
378+
let ctx = session_ctx.state();
362379
let format = AvroFormat {};
363380
let testdata = crate::test_util::arrow_test_data();
364381
let filename = "avro/alltypes_plain.avro";
365-
let result = scan_format(&format, &testdata, filename, None, None).await;
382+
let result = scan_format(&ctx, &format, &testdata, filename, None, None).await;
366383
assert!(matches!(
367384
result,
368385
Err(DataFusionError::NotImplemented(msg))

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use super::FileFormat;
3535
use crate::datasource::file_format::file_type::FileCompressionType;
3636
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
3737
use crate::error::Result;
38+
use crate::execution::context::SessionState;
3839
use crate::logical_expr::Expr;
3940
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
4041
use crate::physical_plan::ExecutionPlan;
@@ -113,6 +114,7 @@ impl FileFormat for CsvFormat {
113114

114115
async fn infer_schema(
115116
&self,
117+
_ctx: &SessionState,
116118
store: &Arc<dyn ObjectStore>,
117119
objects: &[ObjectMeta],
118120
) -> Result<SchemaRef> {
@@ -150,6 +152,7 @@ impl FileFormat for CsvFormat {
150152

151153
async fn infer_stats(
152154
&self,
155+
_ctx: &SessionState,
153156
_store: &Arc<dyn ObjectStore>,
154157
_table_schema: SchemaRef,
155158
_object: &ObjectMeta,
@@ -159,6 +162,7 @@ impl FileFormat for CsvFormat {
159162

160163
async fn create_physical_plan(
161164
&self,
165+
_ctx: &SessionState,
162166
conf: FileScanConfig,
163167
_filters: &[Expr],
164168
) -> Result<Arc<dyn ExecutionPlan>> {
@@ -184,11 +188,12 @@ mod tests {
184188
#[tokio::test]
185189
async fn read_small_batches() -> Result<()> {
186190
let config = SessionConfig::new().with_batch_size(2);
187-
let ctx = SessionContext::with_config(config);
191+
let session_ctx = SessionContext::with_config(config);
192+
let ctx = session_ctx.state();
193+
let task_ctx = ctx.task_ctx();
188194
// skip column 9 that overflows the automaticly discovered column type of i64 (u64 would work)
189195
let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]);
190-
let exec = get_exec("aggregate_test_100.csv", projection, None).await?;
191-
let task_ctx = ctx.task_ctx();
196+
let exec = get_exec(&ctx, "aggregate_test_100.csv", projection, None).await?;
192197
let stream = exec.execute(0, task_ctx)?;
193198

194199
let tt_batches: i32 = stream
@@ -212,9 +217,10 @@ mod tests {
212217
#[tokio::test]
213218
async fn read_limit() -> Result<()> {
214219
let session_ctx = SessionContext::new();
220+
let ctx = session_ctx.state();
215221
let task_ctx = session_ctx.task_ctx();
216222
let projection = Some(vec![0, 1, 2, 3]);
217-
let exec = get_exec("aggregate_test_100.csv", projection, Some(1)).await?;
223+
let exec = get_exec(&ctx, "aggregate_test_100.csv", projection, Some(1)).await?;
218224
let batches = collect(exec, task_ctx).await?;
219225
assert_eq!(1, batches.len());
220226
assert_eq!(4, batches[0].num_columns());
@@ -225,8 +231,11 @@ mod tests {
225231

226232
#[tokio::test]
227233
async fn infer_schema() -> Result<()> {
234+
let session_ctx = SessionContext::new();
235+
let ctx = session_ctx.state();
236+
228237
let projection = None;
229-
let exec = get_exec("aggregate_test_100.csv", projection, None).await?;
238+
let exec = get_exec(&ctx, "aggregate_test_100.csv", projection, None).await?;
230239

231240
let x: Vec<String> = exec
232241
.schema()
@@ -259,9 +268,10 @@ mod tests {
259268
#[tokio::test]
260269
async fn read_char_column() -> Result<()> {
261270
let session_ctx = SessionContext::new();
271+
let ctx = session_ctx.state();
262272
let task_ctx = session_ctx.task_ctx();
263273
let projection = Some(vec![0]);
264-
let exec = get_exec("aggregate_test_100.csv", projection, None).await?;
274+
let exec = get_exec(&ctx, "aggregate_test_100.csv", projection, None).await?;
265275

266276
let batches = collect(exec, task_ctx).await.expect("Collect batches");
267277

@@ -281,12 +291,13 @@ mod tests {
281291
}
282292

283293
async fn get_exec(
294+
ctx: &SessionState,
284295
file_name: &str,
285296
projection: Option<Vec<usize>>,
286297
limit: Option<usize>,
287298
) -> Result<Arc<dyn ExecutionPlan>> {
288299
let root = format!("{}/csv", crate::test_util::arrow_test_data());
289300
let format = CsvFormat::default();
290-
scan_format(&format, &root, file_name, projection, limit).await
301+
scan_format(ctx, &format, &root, file_name, projection, limit).await
291302
}
292303
}

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use super::FileScanConfig;
3636
use crate::datasource::file_format::file_type::FileCompressionType;
3737
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
3838
use crate::error::Result;
39+
use crate::execution::context::SessionState;
3940
use crate::logical_expr::Expr;
4041
use crate::physical_plan::file_format::NdJsonExec;
4142
use crate::physical_plan::ExecutionPlan;
@@ -86,6 +87,7 @@ impl FileFormat for JsonFormat {
8687

8788
async fn infer_schema(
8889
&self,
90+
_ctx: &SessionState,
8991
store: &Arc<dyn ObjectStore>,
9092
objects: &[ObjectMeta],
9193
) -> Result<SchemaRef> {
@@ -129,6 +131,7 @@ impl FileFormat for JsonFormat {
129131

130132
async fn infer_stats(
131133
&self,
134+
_ctx: &SessionState,
132135
_store: &Arc<dyn ObjectStore>,
133136
_table_schema: SchemaRef,
134137
_object: &ObjectMeta,
@@ -138,6 +141,7 @@ impl FileFormat for JsonFormat {
138141

139142
async fn create_physical_plan(
140143
&self,
144+
_ctx: &SessionState,
141145
conf: FileScanConfig,
142146
_filters: &[Expr],
143147
) -> Result<Arc<dyn ExecutionPlan>> {
@@ -161,10 +165,11 @@ mod tests {
161165
#[tokio::test]
162166
async fn read_small_batches() -> Result<()> {
163167
let config = SessionConfig::new().with_batch_size(2);
164-
let ctx = SessionContext::with_config(config);
165-
let projection = None;
166-
let exec = get_exec(projection, None).await?;
168+
let session_ctx = SessionContext::with_config(config);
169+
let ctx = session_ctx.state();
167170
let task_ctx = ctx.task_ctx();
171+
let projection = None;
172+
let exec = get_exec(&ctx, projection, None).await?;
168173
let stream = exec.execute(0, task_ctx)?;
169174

170175
let tt_batches: i32 = stream
@@ -188,9 +193,10 @@ mod tests {
188193
#[tokio::test]
189194
async fn read_limit() -> Result<()> {
190195
let session_ctx = SessionContext::new();
191-
let task_ctx = session_ctx.task_ctx();
196+
let ctx = session_ctx.state();
197+
let task_ctx = ctx.task_ctx();
192198
let projection = None;
193-
let exec = get_exec(projection, Some(1)).await?;
199+
let exec = get_exec(&ctx, projection, Some(1)).await?;
194200
let batches = collect(exec, task_ctx).await?;
195201
assert_eq!(1, batches.len());
196202
assert_eq!(4, batches[0].num_columns());
@@ -202,7 +208,9 @@ mod tests {
202208
#[tokio::test]
203209
async fn infer_schema() -> Result<()> {
204210
let projection = None;
205-
let exec = get_exec(projection, None).await?;
211+
let session_ctx = SessionContext::new();
212+
let ctx = session_ctx.state();
213+
let exec = get_exec(&ctx, projection, None).await?;
206214

207215
let x: Vec<String> = exec
208216
.schema()
@@ -218,9 +226,10 @@ mod tests {
218226
#[tokio::test]
219227
async fn read_int_column() -> Result<()> {
220228
let session_ctx = SessionContext::new();
221-
let task_ctx = session_ctx.task_ctx();
229+
let ctx = session_ctx.state();
230+
let task_ctx = ctx.task_ctx();
222231
let projection = Some(vec![0]);
223-
let exec = get_exec(projection, None).await?;
232+
let exec = get_exec(&ctx, projection, None).await?;
224233

225234
let batches = collect(exec, task_ctx).await.expect("Collect batches");
226235

@@ -243,22 +252,25 @@ mod tests {
243252
}
244253

245254
async fn get_exec(
255+
ctx: &SessionState,
246256
projection: Option<Vec<usize>>,
247257
limit: Option<usize>,
248258
) -> Result<Arc<dyn ExecutionPlan>> {
249259
let filename = "tests/jsons/2.json";
250260
let format = JsonFormat::default();
251-
scan_format(&format, ".", filename, projection, limit).await
261+
scan_format(ctx, &format, ".", filename, projection, limit).await
252262
}
253263

254264
#[tokio::test]
255265
async fn infer_schema_with_limit() {
266+
let session = SessionContext::new();
267+
let ctx = session.state();
256268
let store = Arc::new(LocalFileSystem::new()) as _;
257269
let filename = "tests/jsons/schema_infer_limit.json";
258270
let format = JsonFormat::default().with_schema_infer_max_rec(Some(3));
259271

260272
let file_schema = format
261-
.infer_schema(&store, &[local_unpartitioned_file(filename)])
273+
.infer_schema(&ctx, &store, &[local_unpartitioned_file(filename)])
262274
.await
263275
.expect("Schema inference");
264276

0 commit comments

Comments
 (0)