Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,17 @@
COLUMN_HINTS: Set[TColumnHint] = set(get_args(TColumnHint))


TColumnPropMergeType = Literal[
"replacable",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just mark certain hints as compound

"removable",
]


class TColumnPropInfo(NamedTuple):
name: Union[TColumnProp, str]
defaults: Tuple[Any, ...] = (None,)
is_hint: bool = False
merge_type: TColumnPropMergeType = "replacable"


_ColumnPropInfos = [
Expand All @@ -117,10 +124,10 @@ class TColumnPropInfo(NamedTuple):
TColumnPropInfo("variant", (False, None)),
TColumnPropInfo("partition", (False, None)),
TColumnPropInfo("cluster", (False, None)),
TColumnPropInfo("primary_key", (False, None)),
TColumnPropInfo("primary_key", (False, None), False, "removable"),
TColumnPropInfo("sort", (False, None)),
TColumnPropInfo("unique", (False, None)),
TColumnPropInfo("merge_key", (False, None)),
TColumnPropInfo("merge_key", (False, None), False, "removable"),
TColumnPropInfo("row_key", (False, None)),
TColumnPropInfo("parent_key", (False, None)),
TColumnPropInfo("root_key", (False, None)),
Expand Down Expand Up @@ -149,6 +156,10 @@ class TColumnPropInfo(NamedTuple):
]
TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]]

RemovablePropInfos = {
info.name: info for info in _ColumnPropInfos if info.merge_type == "removable"
}


class TColumnType(TypedDict, total=False):
data_type: Optional[TDataType]
Expand Down
50 changes: 49 additions & 1 deletion dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import yaml
from argparse import Namespace
from copy import deepcopy, copy
from typing import Dict, List, Sequence, Tuple, Type, Any, cast, Iterable, Optional, Union
from typing import Dict, List, Sequence, Tuple, Type, Any, cast, Iterable, Optional, Union, Set

from dlt.common.pendulum import pendulum
from dlt.common.time import ensure_pendulum_datetime_utc
Expand Down Expand Up @@ -55,6 +55,7 @@
TSchemaContract,
TSortOrder,
TTableReference,
RemovablePropInfos,
)
from dlt.common.schema.exceptions import (
CannotCoerceColumnException,
Expand Down Expand Up @@ -121,6 +122,43 @@ def apply_defaults(stored_schema: TStoredSchema) -> TStoredSchema:
return stored_schema


def remove_props_with_empty_hint(
column_schema: TColumnSchema, empty_prop_hints: Dict[str, Any]
) -> TColumnSchema:
"""Removes properties that have non-None empty hint values from the provided column schema in place.

Used when a hint is explicitly cleared/unset with an empty value.

Args:
column_schema (TColumnSchema): The column schema to modify (modified in place)
empty_prop_hints (Dict[str, Any]): Dict of property names and their non-None empty values to verify

Returns:
TColumnSchema: The modified column schema
"""
for prop, value in empty_prop_hints.items():
if prop in RemovablePropInfos:
if value is False or value:
# This should not happen
raise ValueError(
f"Cannot remove property '{prop}' from column '{column_schema['name']}' because"
f" it has a non-empty value: {value}. This property's value should be replaced"
" instead."
)
else:
if prop in ("primary_key", "merge_key") and not is_nullable_column(column_schema):
logger.warning(
f"Removing '{prop}' from column '{column_schema['name']}', "
"but 'unique' constraint remains set to True."
)
column_schema.pop(prop, None) # type: ignore
else:
raise ValueError(
f"""'{prop}' cannot be removed from column '{column_schema["name"]}' because it is not a removable property."""
)
return column_schema


def remove_defaults(stored_schema: TStoredSchema) -> TStoredSchema:
"""Removes default values from `stored_schema` in place, returns the input for chaining

Expand Down Expand Up @@ -472,6 +510,10 @@ def diff_table(
It returns new columns (not present in tab_a) and merges columns from tab_b into tab_a (overriding non-default hint values).
If any columns are returned they contain full data (not diffs of columns)

Additionally, processes `x-extractor.empty_prop_hints` metadata from `tab_b` to remove
properties that have been explicitly unset with a non-None empty value. These removals are applied to
both new columns and existing columns in tab_a.

Raises SchemaException if tables cannot be merged
* when columns with the same name have different data types
* when table links to different parent tables
Expand All @@ -492,6 +534,12 @@ def diff_table(
else:
new_columns.append(col_b)

# remove column properties that were unset
empty_prop_hints = tab_b.get("x-extractor", {}).get("empty_prop_hints", {})
if empty_prop_hints:
for col in new_columns + list(tab_a_columns.values()):
remove_props_with_empty_hint(col, empty_prop_hints)

# return partial table containing only name and properties that differ (column, filters etc.)
table_name = tab_a["name"]

Expand Down
13 changes: 5 additions & 8 deletions dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,15 +540,9 @@ def apply_hints(
# set to empty columns
t["columns"] = ensure_table_schema_columns(columns)
if primary_key is not None:
if primary_key:
t["primary_key"] = primary_key
else:
t.pop("primary_key", None)
t["primary_key"] = primary_key
if merge_key is not None:
if merge_key:
t["merge_key"] = merge_key
else:
t.pop("merge_key", None)
t["merge_key"] = merge_key
if schema_contract is not None:
if schema_contract:
t["schema_contract"] = schema_contract
Expand Down Expand Up @@ -679,6 +673,9 @@ def _resolve_hint(item: TDataItem, hint: TTableHintTemplate[TAny]) -> TAny:

@staticmethod
def _merge_key(hint: TColumnProp, keys: TColumnNames, partial: TPartialTableSchema) -> None:
if keys is not None and not keys:
partial.setdefault("x-extractor", {}).setdefault("empty_prop_hints", {})[hint] = keys
return
if isinstance(keys, str):
keys = [keys]
for key in keys:
Expand Down
23 changes: 23 additions & 0 deletions tests/common/schema/test_merges.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,3 +554,26 @@ def test_merge_tables_references() -> None:
# },
# **column,
# }


def test_remove_props_from_column() -> None:
"""Test removing properties from column schema with empty value remove hints"""
# non-removable property not allowed
col = utils.new_column("test_col", data_type="text")
with pytest.raises(ValueError) as exc_info:
utils.remove_props_with_empty_hint(col, {"data_type": None})
assert "not a removable property" in str(exc_info.value)

# non empty remove hint not allowed
col = utils.new_column("test_col", data_type="text", nullable=True)
col["primary_key"] = False
with pytest.raises(ValueError) as exc_info:
utils.remove_props_with_empty_hint(col, {"primary_key": False})

# multiple empty value remove hints allowed
col = utils.new_column("test_col", data_type="text", nullable=True)
col["primary_key"] = False
col["merge_key"] = True
result = utils.remove_props_with_empty_hint(col, {"primary_key": "", "merge_key": []})
assert "primary_key" not in result
assert "merge_key" not in result
7 changes: 6 additions & 1 deletion tests/extract/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -1767,12 +1767,17 @@ def empty_gen():
"validator": None,
"write_disposition": "append",
"original_columns": {},
"merge_key": "",
"primary_key": [],
}
table = empty_r.compute_table_schema()
assert table["name"] == "empty_gen"
assert "parent" not in table
assert table["columns"] == {}
assert empty_r.compute_table_schema() == empty_table_schema
assert empty_r.compute_table_schema() == {
**empty_table_schema,
"x-extractor": {"empty_prop_hints": {"merge_key": "", "primary_key": []}},
}

# combine columns with primary key
empty_r = empty()
Expand Down
99 changes: 98 additions & 1 deletion tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
import random
import shutil
import threading
import yaml
from time import sleep
from typing import Any, List, Tuple, cast
from typing import Any, List, Tuple, cast, Union
from tenacity import retry_if_exception, Retrying, stop_after_attempt
from unittest.mock import patch
import pytest
Expand Down Expand Up @@ -1837,6 +1838,102 @@ def infer():
# print(pipeline.default_schema.to_pretty_yaml())


@pytest.mark.parametrize(
"empty_value",
["", []],
ids=["empty_string", "empty_list"],
)
def test_apply_hints_with_empty_values(empty_value: Union[str, List[Any]]) -> None:
"""Tests that empty value primary_key hint is respected"""

@dlt.resource
def some_data():
yield {"id": 1, "val": "some_data"}

s = some_data()
pipeline = dlt.pipeline(pipeline_name="empty_value_hints", destination=DUMMY_COMPLETE)

# check initial schema
pipeline.run(s)
table = pipeline.default_schema.get_table("some_data")
assert table["columns"]["id"] == {
"name": "id",
"data_type": "bigint",
"nullable": True,
}

# check schema after setting primary key
s.apply_hints(primary_key=["id"])
pipeline.run(s)
table = pipeline.default_schema.get_table("some_data")
assert table["columns"]["id"] == {
"name": "id",
"data_type": "bigint",
"nullable": False,
"primary_key": True,
}
assert table["columns"]["val"] == {
"name": "val",
"data_type": "text",
"nullable": True,
}

# check schema after passing an empty value as primary key hint, which should remove primary key,
# empty value hint in primary_key argument overrides column level hints
s.apply_hints(primary_key=empty_value, columns={"val": {"primary_key": True}})
pipeline.run(s)
table = pipeline.default_schema.get_table("some_data")
assert table["columns"]["id"] == {
"name": "id",
"data_type": "bigint",
"nullable": False, # Nullable should be unset separately
}
assert table["columns"]["val"] == {
"name": "val",
"data_type": "text",
"nullable": True,
}


@pytest.mark.parametrize(
"empty_value",
["", []],
ids=["empty_string", "empty_list"],
)
def test_apply_hints_with_empty_values_with_schema(empty_value: Union[str, List[Any]]) -> None:
"""Tests that empty value primary_key hint is respected with an explicit schema"""
pipeline = dlt.pipeline(
pipeline_name="empty_value_hints_with_schema", destination=DUMMY_COMPLETE
)

with open("tests/common/cases/schemas/eth/ethereum_schema_v11.yml", "r", encoding="utf-8") as f:
schema = dlt.Schema.from_dict(yaml.safe_load(f))

@dlt.source(schema=schema)
def ethereum():
@dlt.resource
def blocks():
with open(
"tests/normalize/cases/ethereum.blocks.9c1d9b504ea240a482b007788d5cd61c_2.json",
"r",
encoding="utf-8",
) as f:
yield json.load(f)

return blocks()

source = ethereum()
pipeline.run(source)
table = pipeline.default_schema.get_table("blocks")
assert table["columns"]["number"].get("primary_key") is True

# check schema after passing an empty value as primary key hint, which should remove primary
source.blocks.apply_hints(primary_key=empty_value)
pipeline.run(source)
table = pipeline.default_schema.get_table("blocks")
assert table["columns"]["number"].get("primary_key") is None


def test_invalid_data_edge_cases() -> None:
# pass lambda directly to run, allowed now because functions can be extracted too
pipeline = dlt.pipeline(pipeline_name="invalid", destination=DUMMY_COMPLETE)
Expand Down
Loading