Skip to content

Commit a371610

Browse files
committed
[dagster-dlt] resolve table names for dynamic tables that use functions
1 parent 34ab0a7 commit a371610

2 files changed

Lines changed: 151 additions & 18 deletions

File tree

python_modules/libraries/dagster-dlt/dagster_dlt/resource.py

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,44 @@ def _extract_table_schema_metadata(self, table_name: str, schema: Schema) -> Tab
8888
table_schema = TableSchema(columns=[])
8989
return table_schema
9090

91+
def _resolve_root_table_names(self, resource: DltResource, schema: Schema) -> list[str]:
92+
root_table_names = [
93+
table_name
94+
for table in schema.data_tables()
95+
if table.get("resource") == resource.name and (table_name := table.get("name")) is not None
96+
]
97+
if root_table_names:
98+
return root_table_names
99+
100+
table_name = resource.table_name
101+
if callable(table_name):
102+
table_name = resource.name
103+
104+
return [schema.naming.normalize_table_identifier(str(table_name))]
105+
106+
def _resolve_child_table_names(self, root_table_names: list[str], schema: Schema) -> list[str]:
107+
child_names_by_parent: dict[str, list[str]] = {}
108+
for table in schema.data_tables():
109+
parent_name = table.get("parent")
110+
table_name = table.get("name")
111+
if parent_name and table_name:
112+
child_names_by_parent.setdefault(parent_name, []).append(table_name)
113+
114+
child_table_names: list[str] = []
115+
visited = set(root_table_names)
116+
pending_parents = list(root_table_names)
117+
118+
while pending_parents:
119+
parent_name = pending_parents.pop(0)
120+
for child_name in child_names_by_parent.get(parent_name, []):
121+
if child_name in visited:
122+
continue
123+
visited.add(child_name)
124+
child_table_names.append(child_name)
125+
pending_parents.append(child_name)
126+
127+
return child_table_names
128+
91129
def extract_resource_metadata(
92130
self,
93131
context: OpExecutionContext | AssetExecutionContext,
@@ -121,49 +159,53 @@ def extract_resource_metadata(
121159
# shared metadata that is displayed for all assets
122160
base_metadata = {k: v for k, v in load_info_dict.items() if k in dlt_base_metadata_types}
123161
default_schema = dlt_pipeline.default_schema
124-
normalized_table_name = default_schema.naming.normalize_table_identifier(
125-
str(resource.table_name)
126-
)
162+
root_table_names = self._resolve_root_table_names(resource, default_schema)
163+
primary_table_name = root_table_names[0] if len(root_table_names) == 1 else None
127164
# job metadata for specific target `normalized_table_name`
128165
base_metadata["jobs"] = [
129166
job
130167
for load_package in load_info_dict.get("load_packages", [])
131168
for job in load_package.get("jobs", [])
132-
if job.get("table_name") == normalized_table_name
169+
if job.get("table_name") in root_table_names
133170
]
134-
rows_loaded = dlt_pipeline.last_trace.last_normalize_info.row_counts.get(
135-
normalized_table_name
171+
rows_loaded = sum(
172+
dlt_pipeline.last_trace.last_normalize_info.row_counts.get(root_table_name, 0)
173+
for root_table_name in root_table_names
136174
)
137175
if rows_loaded:
138176
base_metadata["rows_loaded"] = MetadataValue.int(rows_loaded)
139177

140178
schema: str | None = None
141179
for load_package in load_info_dict.get("load_packages", []):
142180
for table in load_package.get("tables", []):
143-
if table.get("name") == normalized_table_name:
181+
if table.get("name") == primary_table_name:
144182
schema = table.get("schema_name")
145183
break
146184
if schema:
147185
break
148186

149187
destination_name: str | None = base_metadata.get("destination_name")
150188
table_name = None
151-
if destination_name and schema:
152-
table_name = ".".join([destination_name, schema, normalized_table_name])
189+
if destination_name and schema and primary_table_name:
190+
table_name = ".".join([destination_name, schema, primary_table_name])
153191

154-
child_table_names = [
155-
name
156-
for name in default_schema.data_table_names()
157-
if name.startswith(f"{normalized_table_name}__")
158-
]
159-
child_table_schemas = {
192+
child_table_names = self._resolve_child_table_names(root_table_names, default_schema)
193+
supplemental_table_names = child_table_names
194+
if len(root_table_names) > 1:
195+
supplemental_table_names = [*root_table_names, *child_table_names]
196+
197+
additional_table_schemas = {
160198
table_name: self._extract_table_schema_metadata(table_name, default_schema)
161-
for table_name in child_table_names
199+
for table_name in supplemental_table_names
162200
}
163-
table_schema = self._extract_table_schema_metadata(normalized_table_name, default_schema)
201+
table_schema = (
202+
self._extract_table_schema_metadata(primary_table_name, default_schema)
203+
if primary_table_name
204+
else None
205+
)
164206

165207
base_metadata = {
166-
**child_table_schemas,
208+
**additional_table_schemas,
167209
**base_metadata,
168210
**TableMetadataSet(
169211
column_schema=table_schema,

python_modules/libraries/dagster-dlt/dagster_dlt_tests/test_asset_decorator.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,97 @@ def example_pipeline_assets(
200200
)
201201

202202

203+
def test_dynamic_table_name_metadata(dlt_pipeline: Pipeline) -> None:
204+
def dynamic_table_name(item: dict[str, Any]) -> str:
205+
return item["category"]
206+
207+
@dlt.source
208+
def dynamic_source():
209+
@dlt.resource(table_name=dynamic_table_name)
210+
def events():
211+
yield {"id": 1, "category": "alpha", "values": [1, 2]}
212+
yield {"id": 2, "category": "beta", "values": [3]}
213+
214+
return events
215+
216+
@dlt_assets(dlt_source=dynamic_source(), dlt_pipeline=dlt_pipeline)
217+
def dynamic_pipeline_assets(
218+
context: AssetExecutionContext, dlt_pipeline_resource: DagsterDltResource
219+
):
220+
yield from dlt_pipeline_resource.run(context=context)
221+
222+
res = materialize(
223+
[dynamic_pipeline_assets],
224+
resources={"dlt_pipeline_resource": DagsterDltResource()},
225+
)
226+
227+
assert res.success
228+
229+
[materialization] = [event.materialization for event in res.get_asset_materialization_events()]
230+
assert materialization.asset_key == AssetKey("dlt_dynamic_source_events")
231+
assert materialization.metadata["rows_loaded"] == IntMetadataValue(2)
232+
assert "dagster/table_name" not in materialization.metadata
233+
assert "dagster/column_schema" not in materialization.metadata
234+
assert materialization.metadata["alpha"] == TableSchemaMetadataValue(
235+
schema=TableSchema(
236+
columns=[
237+
TableColumn(
238+
name="id",
239+
type="bigint",
240+
constraints=TableColumnConstraints(nullable=True, unique=False),
241+
),
242+
TableColumn(
243+
name="category",
244+
type="text",
245+
constraints=TableColumnConstraints(nullable=True, unique=False),
246+
),
247+
TableColumn(
248+
name="_dlt_load_id",
249+
type="text",
250+
constraints=TableColumnConstraints(nullable=False, unique=False),
251+
),
252+
TableColumn(
253+
name="_dlt_id",
254+
type="text",
255+
constraints=TableColumnConstraints(
256+
nullable=False, unique=True, other=["row_key"]
257+
),
258+
),
259+
]
260+
),
261+
)
262+
assert materialization.metadata["alpha__values"] == TableSchemaMetadataValue(
263+
schema=TableSchema(
264+
columns=[
265+
TableColumn(
266+
name="value",
267+
type="bigint",
268+
constraints=TableColumnConstraints(nullable=True, unique=False),
269+
),
270+
TableColumn(
271+
name="_dlt_parent_id",
272+
type="text",
273+
constraints=TableColumnConstraints(
274+
nullable=False, unique=False, other=["parent_key"]
275+
),
276+
),
277+
TableColumn(
278+
name="_dlt_list_idx",
279+
type="bigint",
280+
constraints=TableColumnConstraints(nullable=False, unique=False),
281+
),
282+
TableColumn(
283+
name="_dlt_id",
284+
type="text",
285+
constraints=TableColumnConstraints(
286+
nullable=False, unique=True, other=["row_key"]
287+
),
288+
),
289+
]
290+
),
291+
)
292+
293+
203294
def test_multi_asset_names_do_not_conflict(dlt_pipeline: Pipeline) -> None:
204295
class CustomDagsterDltTranslator(DagsterDltTranslator):
205296
def get_asset_spec(self, data: DltResourceTranslatorData) -> AssetSpec:

0 commit comments

Comments
 (0)