Skip to content

Commit b0aafc2

Browse files
committed
Initial commit
1 parent c292641 commit b0aafc2

File tree

3 files changed

+86
-4
lines changed

3 files changed

+86
-4
lines changed

dlt/common/libs/sqlglot.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Optional, Union, Set, Any, Iterable, Literal
1+
from typing import Optional, Union, Set, Any, Iterable, Literal, cast
22

33
from dlt.common.utils import without_none
44
from dlt.common.exceptions import TerminalValueError
@@ -632,3 +632,14 @@ def _literal(v: Any) -> sge.Expression:
632632
return sge.Tuple(expressions=[_literal(v) for v in value])
633633
else:
634634
return _literal(value)
635+
636+
637+
def wrap_identifiers_in_columns(expression: sge.Query) -> sge.Query:
638+
"""Wrap bare Identifier nodes inside Alias with Column nodes"""
639+
640+
def transform(node: sge.Expression) -> sge.Expression:
641+
if isinstance(node, sge.Alias) and isinstance(node.this, sge.Identifier):
642+
return sge.Alias(this=sge.Column(this=node.this.copy()), alias=node.alias)
643+
return node
644+
645+
return cast(sge.Query, expression.transform(transform))

dlt/dataset/lineage.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from typing import Optional, Tuple, cast
33

44
import sqlglot.expressions as sge
5-
65
from sqlglot.errors import OptimizeError
76
from sqlglot.schema import Schema as SQLGlotSchema, ensure_schema
87
from sqlglot.optimizer.annotate_types import annotate_types
@@ -16,6 +15,7 @@
1615
set_metadata,
1716
get_metadata,
1817
TSqlGlotDialect,
18+
wrap_identifiers_in_columns,
1919
)
2020
from dlt.common.schema.typing import (
2121
TTableSchemaColumns,
@@ -111,6 +111,8 @@ def compute_columns_schema(
111111
else:
112112
select_expression = expression
113113

114+
select_expression = wrap_identifiers_in_columns(select_expression)
115+
114116
# prevent normalization
115117
select_expression.meta["case_sensitive"] = True
116118

@@ -132,7 +134,7 @@ def compute_columns_schema(
132134
f"Failed to resolve SQL query against the schema received: {e}"
133135
) from e
134136

135-
expression = annotate_types(expression, schema=sqlglot_schema)
137+
select_expression = annotate_types(select_expression, schema=sqlglot_schema)
136138

137139
# NOTE: this has to be fixed
138140
if allow_anonymous_columns is False:
@@ -173,4 +175,4 @@ def compute_columns_schema(
173175
if propagated_name and col.output_name != propagated_name:
174176
dlt_table_schema[col.output_name]["x-original-name"] = propagated_name # type: ignore[typeddict-unknown-key]
175177

176-
return dlt_table_schema, expression
178+
return dlt_table_schema, select_expression

tests/hub/test_transformations.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import dlt
2+
from dlt.sources.rest_api import rest_api_resources
23

34

45
def test_transformation_decorator() -> None:
@@ -12,3 +13,71 @@ def get_even_rows(dataset: dlt.Dataset):
1213
# get instance without license
1314
transformation = get_even_rows(dlt.dataset("duckdb", "mock_dataset"))
1415
assert transformation.name == "get_even_rows"
16+
17+
18+
def test_missing_columns_bug() -> None:
19+
"""Regression test: bare Identifier nodes were not properly type annotated in dlt.dataset.lineage.compute_columns_schema,
20+
causing success_count and success_rate to have UNKNOWN typetype and be excluded as incomplete columns by dlt.
21+
"""
22+
import dlthub.data_quality as dq
23+
24+
@dlt.source
25+
def jaffleshop():
26+
jaffle_rest_resources = rest_api_resources(
27+
{
28+
"client": {
29+
"base_url": "https://jaffle-shop.dlthub.com/api/v1",
30+
"paginator": {"type": "header_link"},
31+
},
32+
"resources": [
33+
"customers",
34+
"products",
35+
"orders",
36+
],
37+
"resource_defaults": {
38+
"endpoint": {
39+
"params": {
40+
"start_date": "2017-01-01",
41+
"end_date": "2017-01-15",
42+
},
43+
},
44+
},
45+
}
46+
)
47+
48+
return jaffle_rest_resources
49+
50+
@dlt.hub.transformation
51+
def jaffle_checks(dataset: dlt.Dataset) -> dlt.Relation:
52+
checks = {"orders": [dq.checks.is_unique("id"), dq.checks.case("subtotal > 0")]}
53+
return dq.prepare_checks(dataset, checks=checks) # type: ignore
54+
55+
pipeline = dlt.pipeline("test_missing_columns", destination="duckdb")
56+
pipeline.run([jaffleshop()])
57+
pipeline.run(jaffle_checks(pipeline.dataset()))
58+
59+
expected_column_names = [
60+
"table_name",
61+
"check_qualified_name",
62+
"row_count",
63+
"success_count", # was missing due to unqualified UNION columns
64+
"success_rate", # was missing due to unqualified UNION columns
65+
]
66+
67+
# direct query execution returns raw select output (no dlt columns)
68+
query = dq.prepare_checks(
69+
pipeline.dataset(),
70+
checks={
71+
"orders": [dq.checks.is_unique("id"), dq.checks.case("subtotal > 0")], # type: ignore
72+
},
73+
)
74+
assert query.arrow().column_names == expected_column_names
75+
76+
# materialized table includes _dlt_load_id added by pipeline
77+
with pipeline.sql_client() as client:
78+
with client.execute_query(
79+
f"SELECT * FROM {pipeline.pipeline_name}.{pipeline.dataset_name}.jaffle_checks"
80+
) as cursor:
81+
df = cursor.df()
82+
columns = list(df.columns)
83+
assert columns == expected_column_names + ["_dlt_load_id"]

0 commit comments

Comments
 (0)