Skip to content

Commit b6c9e66

Browse files
author
Devdutt Shenoi
committed
refactor: schema mismatch check
#1218 (comment)
1 parent 1afa318 commit b6c9e66

File tree

1 file changed

+10
-25
lines changed

1 file changed

+10
-25
lines changed

src/event/format/mod.rs

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,18 @@ pub trait EventFormat: Sized {
171171
)),
172172
);
173173

174-
// prepare the record batch and new fields to be added
175-
let mut new_schema = Arc::new(Schema::new(schema));
176-
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
174+
if static_schema_flag
175+
&& schema.iter().any(|field| {
176+
storage_schema
177+
.get(field.name())
178+
.is_none_or(|storage_field| storage_field != field)
179+
})
180+
{
177181
return Err(anyhow!("Schema mismatch"));
178182
}
183+
184+
// prepare the record batch and new fields to be added
185+
let mut new_schema = Arc::new(Schema::new(schema));
179186
new_schema =
180187
update_field_type_in_schema(new_schema, None, time_partition, None, schema_version);
181188

@@ -190,28 +197,6 @@ pub trait EventFormat: Sized {
190197
Ok((rb, is_first))
191198
}
192199

193-
fn is_schema_matching(
194-
new_schema: Arc<Schema>,
195-
storage_schema: &HashMap<String, Arc<Field>>,
196-
static_schema_flag: bool,
197-
) -> bool {
198-
if !static_schema_flag {
199-
return true;
200-
}
201-
for field in new_schema.fields() {
202-
let Some(storage_field) = storage_schema.get(field.name()) else {
203-
return false;
204-
};
205-
if field.name() != storage_field.name() {
206-
return false;
207-
}
208-
if field.data_type() != storage_field.data_type() {
209-
return false;
210-
}
211-
}
212-
true
213-
}
214-
215200
#[allow(clippy::too_many_arguments)]
216201
fn into_event(
217202
self,

0 commit comments

Comments
 (0)