Skip to content

Commit a3e799e

Browse files
committed
amend ListingTableConfig
if file_schema is provided, then schema_policy is automatically Fixed
1 parent 3456bdf commit a3e799e

File tree

1 file changed

+111
-21
lines changed
  • datafusion/core/src/datasource/listing

1 file changed

+111
-21
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 111 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,15 @@ impl ListingTableConfig {
150150
///
151151
/// If the schema is provided, it must contain only the fields in the file
152152
/// without the table partitioning columns.
153+
///
154+
/// Note: Providing a schema automatically sets the schema policy to `SchemaPolicy::Fixed`.
153155
pub fn with_schema(self, schema: SchemaRef) -> Self {
154156
Self {
155157
table_paths: self.table_paths,
156-
file_schema: Some(schema),
158+
file_schema: Some(schema.clone()),
157159
options: self.options,
158160
schema_adapter_factory: self.schema_adapter_factory,
159-
schema_policy: Some(SchemaPolicy::Fixed(schema.clone())),
161+
schema_policy: Some(SchemaPolicy::Fixed(schema)),
160162
}
161163
}
162164

@@ -165,37 +167,99 @@ impl ListingTableConfig {
165167
/// If not provided, format and other options are inferred via
166168
/// [`Self::infer_options`].
167169
pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
168-
Self {
170+
let config = Self {
169171
table_paths: self.table_paths,
170172
file_schema: self.file_schema,
171173
options: Some(listing_options),
172174
schema_adapter_factory: self.schema_adapter_factory,
173175
schema_policy: self.schema_policy,
174-
}
176+
};
177+
// Ensure schema policy is consistent with schema
178+
config.ensure_schema_policy()
175179
}
176180

177181
/// Add `schema_adapter_factory` to [`ListingTableConfig`]
178182
pub fn with_schema_adapter_factory(
179183
self,
180184
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
181185
) -> Self {
182-
Self {
186+
let config = Self {
183187
table_paths: self.table_paths,
184188
file_schema: self.file_schema,
185189
options: self.options,
186190
schema_adapter_factory: Some(schema_adapter_factory),
187191
schema_policy: self.schema_policy,
188-
}
192+
};
193+
// Ensure schema policy is consistent with schema
194+
config.ensure_schema_policy()
189195
}
190196

191197
/// Set the schema policy for the [`ListingTable`]
192198
pub fn with_schema_policy(self, schema_policy: SchemaPolicy) -> Self {
193-
Self {
194-
table_paths: self.table_paths,
195-
file_schema: self.file_schema,
196-
options: self.options,
197-
schema_adapter_factory: self.schema_adapter_factory,
198-
schema_policy: Some(schema_policy),
199+
// If we have a file_schema, enforce Fixed policy with that schema
200+
if let Some(schema) = &self.file_schema {
201+
match schema_policy {
202+
SchemaPolicy::Fixed(_) => {
203+
// Use provided Fixed policy with our schema
204+
Self {
205+
table_paths: self.table_paths,
206+
file_schema: self.file_schema,
207+
options: self.options,
208+
schema_adapter_factory: self.schema_adapter_factory,
209+
schema_policy: Some(SchemaPolicy::Fixed(schema.clone())),
210+
}
211+
}
212+
_ => {
213+
// When file_schema is present, always use Fixed policy regardless of requested policy
214+
Self {
215+
table_paths: self.table_paths,
216+
file_schema: self.file_schema,
217+
options: self.options,
218+
schema_adapter_factory: self.schema_adapter_factory,
219+
schema_policy: Some(SchemaPolicy::Fixed(schema.clone())),
220+
}
221+
}
222+
}
223+
} else {
224+
// No file_schema, so accept the requested policy
225+
Self {
226+
table_paths: self.table_paths,
227+
file_schema: self.file_schema,
228+
options: self.options,
229+
schema_adapter_factory: self.schema_adapter_factory,
230+
schema_policy: Some(schema_policy),
231+
}
232+
}
233+
}
234+
235+
/// Ensures that if a file schema is provided, the schema policy is set to Fixed
236+
fn ensure_schema_policy(self) -> Self {
237+
if let Some(schema) = &self.file_schema {
238+
match self.schema_policy {
239+
Some(SchemaPolicy::Fixed(_)) => {
240+
// Already have a Fixed policy, but make sure it's using our schema
241+
Self {
242+
table_paths: self.table_paths,
243+
file_schema: self.file_schema,
244+
options: self.options,
245+
schema_adapter_factory: self.schema_adapter_factory,
246+
schema_policy: Some(SchemaPolicy::Fixed(schema.clone())),
247+
}
248+
}
249+
_ => {
250+
// If schema is present but policy isn't Fixed, make it Fixed
251+
Self {
252+
table_paths: self.table_paths,
253+
file_schema: self.file_schema,
254+
options: self.options,
255+
schema_adapter_factory: self.schema_adapter_factory,
256+
schema_policy: Some(SchemaPolicy::Fixed(schema.clone())),
257+
}
258+
}
259+
}
260+
} else {
261+
// No schema, no change needed
262+
self
199263
}
200264
}
201265

@@ -270,13 +334,16 @@ impl ListingTableConfig {
270334
.with_file_extension(listing_file_extension)
271335
.with_target_partitions(state.config().target_partitions());
272336

273-
Ok(Self {
337+
let config = Self {
274338
table_paths: self.table_paths,
275339
file_schema: self.file_schema,
276340
options: Some(listing_options),
277341
schema_adapter_factory: self.schema_adapter_factory,
278342
schema_policy: self.schema_policy,
279-
})
343+
};
344+
345+
// Ensure schema policy is consistent with schema
346+
Ok(config.ensure_schema_policy())
280347
}
281348

282349
/// Infer the [`SchemaRef`] based on `table_path`s.
@@ -289,6 +356,17 @@ impl ListingTableConfig {
289356
pub async fn infer_schema(self, state: &dyn Session) -> Result<Self> {
290357
match self.options {
291358
Some(options) => {
359+
// If a file schema is already provided, use it and ensure policy is Fixed
360+
if let Some(schema) = self.file_schema {
361+
return Ok(Self {
362+
table_paths: self.table_paths,
363+
file_schema: Some(schema.clone()),
364+
options: Some(options),
365+
schema_adapter_factory: self.schema_adapter_factory,
366+
schema_policy: Some(SchemaPolicy::Fixed(schema)),
367+
});
368+
}
369+
292370
let schema = match self.schema_policy.unwrap_or(SchemaPolicy::First) {
293371
// For Fixed policy, we already have a schema
294372
SchemaPolicy::Fixed(schema) => schema,
@@ -312,6 +390,7 @@ impl ListingTableConfig {
312390
}
313391
};
314392

393+
// When we infer a schema, keep the current policy
315394
Ok(Self {
316395
table_paths: self.table_paths,
317396
file_schema: Some(schema),
@@ -354,13 +433,16 @@ impl ListingTableConfig {
354433
})
355434
.collect::<Vec<_>>();
356435
let options = options.with_table_partition_cols(partitions);
357-
Ok(Self {
436+
let config = Self {
358437
table_paths: self.table_paths,
359438
file_schema: self.file_schema,
360439
options: Some(options),
361440
schema_adapter_factory: self.schema_adapter_factory,
362441
schema_policy: self.schema_policy,
363-
})
442+
};
443+
444+
// Ensure schema policy is consistent with schema
445+
Ok(config.ensure_schema_policy())
364446
}
365447
None => config_err!("No `ListingOptions` set for inferring schema"),
366448
}
@@ -855,11 +937,19 @@ impl ListingTable {
855937
///
856938
/// See documentation and example on [`ListingTable`] and [`ListingTableConfig`]
857939
pub fn try_new(config: ListingTableConfig) -> Result<Self> {
858-
let file_schema = match config.schema_policy {
859-
Some(SchemaPolicy::Fixed(schema)) => schema,
860-
_ => config
861-
.file_schema
862-
.ok_or_else(|| DataFusionError::Internal("No schema provided.".into()))?,
940+
// If file_schema is provided but schema_policy is not Fixed, apply the rule
941+
let file_schema = match (config.file_schema, &config.schema_policy) {
942+
// If we have an explicit Fixed policy, use it
943+
(_, Some(SchemaPolicy::Fixed(schema))) => schema.clone(),
944+
945+
// If we have a file_schema but no fixed policy, use the file_schema
946+
// Note: We don't change schema_policy here because that's done during config creation
947+
(Some(schema), _) => schema.clone(),
948+
949+
// No schema provided - error condition
950+
(None, _) => {
951+
return Err(DataFusionError::Internal("No schema provided.".into()));
952+
}
863953
};
864954

865955
let options = config.options.ok_or_else(|| {

0 commit comments

Comments
 (0)