Skip to content

Commit 9beabb7

Browse files
committed
remove if empty merge type
1 parent 0554747 commit 9beabb7

File tree

3 files changed

+66
-15
lines changed

3 files changed

+66
-15
lines changed

dlt/common/schema/typing.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,17 @@
100100
"""Known hints of a column"""
101101
COLUMN_HINTS: Set[TColumnHint] = set(get_args(TColumnHint))
102102

103+
TColumnPropMergeType = Literal[
104+
"replace",
105+
"remove_if_empty",
106+
]
107+
103108

104109
class TColumnPropInfo(NamedTuple):
105110
name: Union[TColumnProp, str]
106111
defaults: Tuple[Any, ...] = (None,)
107112
is_hint: bool = False
113+
merge_type: TColumnPropMergeType = "replace"
108114

109115

110116
_ColumnPropInfos = [
@@ -117,10 +123,10 @@ class TColumnPropInfo(NamedTuple):
117123
TColumnPropInfo("variant", (False, None)),
118124
TColumnPropInfo("partition", (False, None)),
119125
TColumnPropInfo("cluster", (False, None)),
120-
TColumnPropInfo("primary_key", (False, None)),
126+
TColumnPropInfo("primary_key", (False, None), False, "remove_if_empty"),
121127
TColumnPropInfo("sort", (False, None)),
122128
TColumnPropInfo("unique", (False, None)),
123-
TColumnPropInfo("merge_key", (False, None)),
129+
TColumnPropInfo("merge_key", (False, None), False, "remove_if_empty"),
124130
TColumnPropInfo("row_key", (False, None)),
125131
TColumnPropInfo("parent_key", (False, None)),
126132
TColumnPropInfo("root_key", (False, None)),
@@ -149,6 +155,10 @@ class TColumnPropInfo(NamedTuple):
149155
]
150156
TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]]
151157

158+
RemoveIfEmptyPropInfos = {
159+
info.name: info for info in _ColumnPropInfos if info.merge_type == "remove_if_empty"
160+
}
161+
152162

153163
class TColumnType(TypedDict, total=False):
154164
data_type: Optional[TDataType]

dlt/common/schema/utils.py

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
VERSION_TABLE_NAME,
3232
PIPELINE_STATE_TABLE_NAME,
3333
ColumnPropInfos,
34+
TColumnPropMergeType,
3435
TColumnName,
3536
TFileFormat,
3637
TPartialTableSchema,
@@ -154,6 +155,22 @@ def has_default_column_prop_value(prop: str, value: Any) -> bool:
154155
return value in (None, False)
155156

156157

158+
def has_merge_type(prop: str, merge_type: TColumnPropMergeType = "remove_if_empty") -> bool:
159+
if prop in ColumnPropInfos:
160+
return ColumnPropInfos[prop].merge_type == merge_type
161+
return False
162+
163+
164+
def remove_column_props_with_merge_type(
165+
column_schema: TColumnSchema, merge_type: TColumnPropMergeType = "remove_if_empty"
166+
) -> TColumnSchema:
167+
"""Removes properties that have merge type remove if empty"""
168+
for prop in list(column_schema.keys()):
169+
if has_merge_type(prop, merge_type):
170+
column_schema.pop(prop) # type: ignore
171+
return column_schema
172+
173+
157174
def remove_column_defaults(column_schema: TColumnSchema) -> TColumnSchema:
158175
"""Removes default values from `column_schema` in place, returns the input for chaining"""
159176
# remove hints with default values
@@ -420,15 +437,28 @@ def diff_table_references(
420437

421438

422439
def merge_column(
423-
col_a: TColumnSchema, col_b: TColumnSchema, merge_defaults: bool = True
440+
col_a: TColumnSchema,
441+
col_b: TColumnSchema,
442+
merge_defaults: bool = True,
443+
respect_merge_type: bool = False,
424444
) -> TColumnSchema:
425-
"""Merges `col_b` into `col_a`. if `merge_defaults` is True, only hints from `col_b` that are not default in `col_a` will be set.
445+
"""Merges col_b into col_a in place. Returns col_a.
426446
427-
Modifies col_a in place and returns it
447+
merge_defaults: If False, only merge non-default values from col_b
448+
respect_merge_type: If True, apply "remove_if_empty" merge rules to col_a properties
428449
"""
429-
col_b_clean = col_b if merge_defaults else remove_column_defaults(copy(col_b))
430-
for n, v in col_b_clean.items():
431-
col_a[n] = v # type: ignore
450+
451+
col_b_clean = copy(col_b) if merge_defaults else remove_column_defaults(copy(col_b))
452+
453+
for prop in list(col_a.keys()):
454+
if prop in col_b_clean:
455+
col_a[prop] = col_b_clean.pop(prop) # type: ignore
456+
else:
457+
if respect_merge_type and has_merge_type(prop, "remove_if_empty"):
458+
col_a.pop(prop) # type: ignore
459+
460+
for prop, value in col_b_clean.items():
461+
col_a[prop] = value # type: ignore
432462

433463
return col_a
434464

@@ -438,6 +468,7 @@ def merge_columns(
438468
columns_b: TTableSchemaColumns,
439469
merge_columns: bool = False,
440470
columns_partial: bool = True,
471+
respect_merge_type: bool = False,
441472
) -> TTableSchemaColumns:
442473
"""Merges `columns_a` with `columns_b`. `columns_a` is modified in place.
443474
@@ -458,14 +489,19 @@ def merge_columns(
458489
if column_a and not is_complete_column(column_a):
459490
columns_a.pop(col_name)
460491
if column_a and merge_columns:
461-
column_b = merge_column(column_a, column_b)
492+
column_b = merge_column(
493+
column_a, column_b, merge_defaults=True, respect_merge_type=respect_merge_type
494+
)
462495
# set new or updated column
463496
columns_a[col_name] = column_b
464497
return columns_a
465498

466499

467500
def diff_table(
468-
schema_name: str, tab_a: TTableSchema, tab_b: TPartialTableSchema
501+
schema_name: str,
502+
tab_a: TTableSchema,
503+
tab_b: TPartialTableSchema,
504+
respect_merge_type: bool = False,
469505
) -> TPartialTableSchema:
470506
"""Creates a partial table that contains properties found in `tab_b` that are not present or different in `tab_a`.
471507
The name is always present in returned partial.
@@ -480,18 +516,22 @@ def diff_table(
480516
ensure_compatible_tables(schema_name, tab_a, tab_b, ensure_columns=False)
481517

482518
# get new columns, changes in the column data type or other properties are not allowed
483-
tab_a_columns = tab_a["columns"]
519+
tab_a_columns = copy(tab_a["columns"])
484520
new_columns: List[TColumnSchema] = []
485521
for col_b_name, col_b in tab_b["columns"].items():
486522
if col_b_name in tab_a_columns:
487-
col_a = tab_a_columns[col_b_name]
523+
col_a = tab_a_columns.pop(col_b_name)
488524
# all other properties can change
489-
merged_column = merge_column(copy(col_a), col_b)
525+
merged_column = merge_column(copy(col_a), col_b, respect_merge_type=respect_merge_type)
490526
if merged_column != col_a:
491527
new_columns.append(merged_column)
492528
else:
493529
new_columns.append(col_b)
494530

531+
if respect_merge_type:
532+
for col_a in tab_a_columns.values():
533+
remove_column_props_with_merge_type(col_a, "remove_if_empty")
534+
495535
# return partial table containing only name and properties that differ (column, filters etc.)
496536
table_name = tab_a["name"]
497537

dlt/extract/extractors.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,9 @@ def _compute_and_update_tables(
273273
computed_table["x-normalizer"] = {"evolve-columns-once": True}
274274
existing_table = self.schema.tables.get(table_name, None)
275275
if existing_table:
276-
# TODO: revise this. computed table should overwrite certain hints (ie. primary and merge keys) completely
277-
diff_table = utils.diff_table(self.schema.name, existing_table, computed_table)
276+
diff_table = utils.diff_table(
277+
self.schema.name, existing_table, computed_table, respect_merge_type=True
278+
)
278279
else:
279280
diff_table = computed_table
280281

0 commit comments

Comments
 (0)