Skip to content

Commit 3e6dc17

Browse files
committed
Updated the DltResourceHints class to set a default boundary timestamp using the current load package's creation time when none is provided. Modified corresponding test to ensure correct behavior in the pipeline.
1 parent 4d25a6c commit 3e6dc17

File tree

2 files changed

+45
-1
lines changed

2 files changed

+45
-1
lines changed

dlt/extract/hints.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
from dlt.extract.items_transform import ValidateItem
5757
from dlt.extract.utils import ensure_table_schema_columns, ensure_table_schema_columns_hint
5858
from dlt.extract.validation import create_item_validator
59+
from dlt.common.time import ensure_pendulum_datetime_utc
60+
from dlt.common.storages.load_package import load_package_state as current_load_package
5961

6062
import sqlglot
6163

@@ -722,8 +724,17 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None:
722724

723725
if merge_strategy == "scd2":
724726
md_dict = cast(TScd2StrategyDict, md_dict)
725-
if "boundary_timestamp" in md_dict:
727+
boundary = md_dict.get("boundary_timestamp")
728+
if boundary is not None:
726729
dict_["x-boundary-timestamp"] = md_dict["boundary_timestamp"]
730+
else:
731+
try:
732+
dict_["x-boundary-timestamp"] = ensure_pendulum_datetime_utc(
733+
current_load_package()["state"]["created_at"]
734+
)
735+
except:
736+
dict_.pop("x-boundary-timestamp", None)
737+
727738
if md_dict.get("validity_column_names") is None:
728739
from_, to = DEFAULT_VALIDITY_COLUMN_NAMES
729740
else:

tests/load/pipeline/test_scd2.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# timezone is removed from all datetime objects in these tests to simplify comparison
22

3+
from unittest import mock
34
import pytest
45
from typing import List, Dict, Any, Optional
56
from datetime import date, datetime, timezone # noqa: I251
@@ -645,6 +646,7 @@ def test_boundary_timestamp(
645646
ts2 = "2024-08-22"
646647
ts3 = date(2024, 8, 20) # earlier than ts1 and ts2
647648
ts4 = "i_am_not_a_timestamp"
649+
ts5 = pendulum.datetime(2025, 8, 21, 12, 15, tz="UTC").timestamp()
648650

649651
@dlt.resource(
650652
table_name="dim_test",
@@ -726,6 +728,37 @@ def r(data):
726728
"boundary_timestamp": ts4,
727729
}
728730
)
731+
with mock.patch(
732+
"dlt.common.storages.load_package.precise_time",
733+
return_value=pendulum.datetime(2025, 8, 21, 12, 15, tz="UTC").timestamp(),
734+
):
735+
# run the resource without setting boundary timestamp
736+
dim_snap = [
737+
l3_1 := {"nk": 1, "foo": "foobar"}, # updated
738+
]
739+
r.apply_hints(
740+
write_disposition={
741+
"disposition": "merge",
742+
"strategy": "scd2",
743+
}
744+
)
745+
info = p.run(r(dim_snap), **destination_config.run_kwargs)
746+
assert_load_info(info)
747+
assert load_table_counts(p, "dim_test")["dim_test"] == 5
748+
expected = [
749+
{**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_1}, # unchanged
750+
{**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_2}, # unchanged
751+
{
752+
**{FROM: strip_timezone(ts2), TO: strip_timezone(ts5)},
753+
**l2_1,
754+
}, # retired in this run
755+
{
756+
**{FROM: strip_timezone(ts2), TO: strip_timezone(ts3)},
757+
**l2_3,
758+
}, # unchanged (already retired in load 3)
759+
{**{FROM: strip_timezone(ts5), TO: None}, **l3_1}, # new current version
760+
]
761+
assert_records_as_set(get_table(p, "dim_test"), expected)
729762

730763

731764
@pytest.mark.essential

0 commit comments

Comments
 (0)