Skip to content

Commit 0adf780

Browse files
committed
Update documentation to clarify usage of boundary_timestamp and add tests for new behavior, including resetting to current load time.
1 parent 61814bc commit 0adf780

File tree

5 files changed

+102
-62
lines changed

5 files changed

+102
-62
lines changed

dlt/destinations/impl/sqlalchemy/merge_job.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
from typing import Sequence, Tuple, Optional, List, Union
1+
from typing import Sequence, Tuple, Optional, List, Union, cast
22
import operator
33
import sqlalchemy as sa
44

5+
from dlt.common.typing import TAnyDateTime
56
from dlt.common.utils import uniq_id
67
from dlt.common.destination import PreparedTableSchema, DestinationCapabilitiesContext
78
from dlt.common.schema.utils import (
@@ -374,14 +375,10 @@ def gen_scd2_sql(
374375
format_datetime_literal = (
375376
DestinationCapabilitiesContext.generic_capabilities().format_datetime_literal
376377
)
377-
378-
boundary_ts = ensure_pendulum_datetime_utc(
379-
current_load_package()["state"]["created_at"]
380-
or root_table.get( # type: ignore[arg-type]
381-
"x-boundary-timestamp",
382-
current_load_package()["state"]["created_at"],
383-
)
384-
)
378+
created_at = current_load_package()["state"]["created_at"]
379+
_boundary_ts = cast(Optional[TAnyDateTime], root_table.get("x-boundary-timestamp"))
380+
boundary_ts: TAnyDateTime = _boundary_ts if _boundary_ts is not None else created_at
381+
boundary_ts = ensure_pendulum_datetime_utc(boundary_ts)
385382

386383
boundary_literal = format_datetime_literal(boundary_ts, caps.timestamp_precision)
387384

dlt/destinations/sql_jobs.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from dlt.common.time import ensure_pendulum_datetime_utc
55
from dlt.common.destination import PreparedTableSchema
66
from dlt.common.destination.utils import resolve_merge_strategy
7-
from dlt.common.typing import TypedDict
7+
from dlt.common.typing import TAnyDateTime, TypedDict
88

99
from dlt.common.schema.typing import (
1010
TSortOrder,
@@ -845,13 +845,11 @@ def gen_scd2_sql(
845845
DestinationCapabilitiesContext.generic_capabilities().format_datetime_literal
846846
)
847847

848-
boundary_ts = ensure_pendulum_datetime_utc(
849-
current_load_package()["state"]["created_at"]
850-
or root_table.get( # type: ignore[arg-type]
851-
"x-boundary-timestamp",
852-
current_load_package()["state"]["created_at"],
853-
)
854-
)
848+
created_at = current_load_package()["state"]["created_at"]
849+
_boundary_ts = cast(Optional[TAnyDateTime], root_table.get("x-boundary-timestamp"))
850+
boundary_ts: TAnyDateTime = _boundary_ts if _boundary_ts is not None else created_at
851+
boundary_ts = ensure_pendulum_datetime_utc(boundary_ts)
852+
855853
boundary_literal = format_datetime_literal(
856854
boundary_ts,
857855
caps.timestamp_precision,

dlt/extract/hints.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -823,22 +823,24 @@ def validate_write_disposition_hint(template: TResourceHints) -> None:
823823

824824
if wd.get("strategy") == "scd2":
825825
wd = cast(TScd2StrategyDict, wd)
826-
for ts in ("active_record_timestamp", "boundary_timestamp"):
827-
# if (
828-
# ts == "active_record_timestamp"
829-
# and wd.get("active_record_timestamp") is None
830-
# ):
831-
# continue # None is allowed for active_record_timestamp
832-
if ts in wd:
833-
if wd[ts] is None:
834-
continue
835-
else:
836-
try:
837-
ensure_pendulum_datetime_utc(wd[ts]) # type: ignore[literal-required]
838-
except Exception:
839-
raise ValueError(
840-
f"could not parse `{ts}` value `{wd[ts]}`" # type: ignore[literal-required]
841-
)
826+
827+
art = wd.get("active_record_timestamp")
828+
if art is not None:
829+
try:
830+
ensure_pendulum_datetime_utc(art)
831+
except (ValueError, TypeError) as exc:
832+
raise ValueError(
833+
f"could not parse `active_record_timestamp` value `{art}`"
834+
) from exc
835+
836+
bt = wd.get("boundary_timestamp")
837+
if bt is not None:
838+
try:
839+
ensure_pendulum_datetime_utc(bt)
840+
except (ValueError, TypeError) as exc:
841+
raise ValueError(
842+
f"could not parse `boundary_timestamp` value `{bt}`"
843+
) from exc
842844

843845
@staticmethod
844846
def validate_reference_hint(template: TResourceHints) -> None:

docs/website/docs/general-usage/merge-loading.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,35 @@ def dim_customer():
567567
...
568568
```
569569

570+
#### Reset boundary timestamp to the current load time
571+
To stop using a previously set `boundary_timestamp` and revert to the default (the current load package creation time), set `boundary_timestamp` to `None`. You can do this either at definition time or dynamically with `apply_hints` before a run.
572+
573+
Definition-time (always use current load time):
574+
```py
575+
@dlt.resource(
576+
write_disposition={
577+
"disposition": "merge",
578+
"strategy": "scd2",
579+
"boundary_timestamp": None, # reset to current load time
580+
}
581+
)
582+
def dim_customer():
583+
...
584+
```
585+
586+
Per-run reset (override just for this run):
587+
```py
588+
r.apply_hints(
589+
write_disposition={
590+
"disposition": "merge",
591+
"strategy": "scd2",
592+
"boundary_timestamp": None, # reset to current load time for this run
593+
}
594+
)
595+
pipeline.run(r(...))
596+
```
597+
When `boundary_timestamp` is `None` (or omitted), `dlt` uses the load package's creation timestamp as the boundary for both retiring existing versions and creating new versions.
598+
570599
### Example: Use your own row hash
571600
By default, `dlt` generates a row hash based on all columns provided by the resource and stores it in `_dlt_id`. You can use your own hash instead by specifying `row_version_column_name` in the `write_disposition` dictionary. You might already have a column present in your resource that can naturally serve as a row hash, in which case it's more efficient to use those pre-existing hash values than to generate new artificial ones. This option also allows you to use hashes based on a subset of columns, in case you want to ignore changes in some of the columns. When using your own hash, values for `_dlt_id` are randomly generated.
572601
```py

tests/load/pipeline/test_scd2.py

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -634,7 +634,7 @@ def r():
634634

635635
@pytest.mark.parametrize(
636636
"destination_config",
637-
destinations_configs(default_sql_configs=True, subset=["sqlalchemy"]),
637+
destinations_configs(default_sql_configs=True, subset=["sqlalchemy", "duckdb"]),
638638
ids=lambda x: x.name,
639639
)
640640
def test_boundary_timestamp(
@@ -659,73 +659,87 @@ def test_boundary_timestamp(
659659
def r(data):
660660
yield data
661661

662+
# normalize timestamps once for assertions
663+
ts1_dt = strip_timezone(ts1)
664+
ts2_dt = strip_timezone(ts2)
665+
ts3_dt = strip_timezone(ts3)
666+
ts5_dt = strip_timezone(ts5)
667+
662668
# load 1 — initial load
663669
dim_snap = [
664670
l1_1 := {"nk": 1, "foo": "foo"},
665671
l1_2 := {"nk": 2, "foo": "foo"},
666672
]
667-
current_time = {"ts": None}
673+
current_time: dict[str, float | None] = {"ts": None}
668674
with mock.patch(
669675
"dlt.common.storages.load_package.precise_time",
670676
side_effect=lambda: current_time["ts"],
671677
):
672678
# load 1 — initial load
673-
current_time["ts"] = pendulum.parse(ts1).timestamp()
679+
current_time["ts"] = pendulum.datetime(2024, 8, 21, 12, 15, tz="UTC").timestamp()
680+
r.apply_hints(
681+
write_disposition={
682+
"disposition": "merge",
683+
"strategy": "scd2",
684+
"boundary_timestamp": ts1,
685+
}
686+
)
674687
info = p.run(r(dim_snap), **destination_config.run_kwargs)
675688
assert_load_info(info)
676689
assert load_table_counts(p, "dim_test")["dim_test"] == 2
677690
expected = [
678-
{**{FROM: strip_timezone(ts1), TO: None}, **l1_1},
679-
{**{FROM: strip_timezone(ts1), TO: None}, **l1_2},
691+
{**{FROM: ts1_dt, TO: None}, **l1_1},
692+
{**{FROM: ts1_dt, TO: None}, **l1_2},
680693
]
681694
assert get_table(p, "dim_test", "nk", ts_columns=[FROM, TO]) == expected
682695

683696
# load 2 — different source records, different boundary timestamp
684-
current_time["ts"] = pendulum.parse(ts2).timestamp()
697+
current_time["ts"] = pendulum.datetime(2024, 8, 22, tz="UTC").timestamp()
698+
dim_snap = [
699+
l2_1 := {"nk": 1, "foo": "bar"}, # natural key 1 updated
700+
# l1_2, # natural key 2 no longer present
701+
l2_3 := {"nk": 3, "foo": "foo"}, # new natural key
702+
]
685703
r.apply_hints(
686704
write_disposition={
687705
"disposition": "merge",
688706
"strategy": "scd2",
689707
"boundary_timestamp": ts2,
690708
}
691709
)
692-
dim_snap = [
693-
l2_1 := {"nk": 1, "foo": "bar"}, # natural key 1 updated
694-
# l1_2, # natural key 2 no longer present
695-
l2_3 := {"nk": 3, "foo": "foo"}, # new natural key
696-
]
697710
info = p.run(r(dim_snap), **destination_config.run_kwargs)
698711
assert_load_info(info)
699712
assert load_table_counts(p, "dim_test")["dim_test"] == 4
700713
expected = [
701-
{**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_1}, # retired
702-
{**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_2}, # retired
703-
{**{FROM: strip_timezone(ts2), TO: None}, **l2_1}, # new
704-
{**{FROM: strip_timezone(ts2), TO: None}, **l2_3}, # new
714+
{**{FROM: ts1_dt, TO: ts2_dt}, **l1_1}, # retired
715+
{**{FROM: ts1_dt, TO: ts2_dt}, **l1_2}, # retired
716+
{**{FROM: ts2_dt, TO: None}, **l2_1}, # new
717+
{**{FROM: ts2_dt, TO: None}, **l2_3}, # new
705718
]
706-
assert_records_as_set(get_table(p, "dim_test"), expected)
719+
assert_records_as_set(get_table(p, "dim_test", ts_columns=[FROM, TO]), expected)
707720

708721
# load 3 — earlier boundary timestamp
709722
# we naively apply any valid timestamp
710723
# may lead to "valid from" > "valid to", as in this test case
724+
current_time["ts"] = pendulum.datetime(2024, 8, 22, 0, 0, 1, tz="UTC").timestamp()
725+
dim_snap = [l2_1] # natural key 3 no longer present
711726
r.apply_hints(
712727
write_disposition={
713728
"disposition": "merge",
714729
"strategy": "scd2",
715730
"boundary_timestamp": ts3,
716731
}
717732
)
718-
dim_snap = [l2_1] # natural key 3 no longer present
719733
info = p.run(r(dim_snap), **destination_config.run_kwargs)
720734
assert_load_info(info)
721735
assert load_table_counts(p, "dim_test")["dim_test"] == 4
722736
expected = [
723-
{**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_1}, # unchanged
724-
{**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_2}, # unchanged
725-
{**{FROM: strip_timezone(ts2), TO: None}, **l2_1}, # unchanged
726-
{**{FROM: strip_timezone(ts2), TO: strip_timezone(ts3)}, **l2_3}, # retired
737+
{**{FROM: ts1_dt, TO: ts2_dt}, **l1_1}, # unchanged
738+
{**{FROM: ts1_dt, TO: ts2_dt}, **l1_2}, # unchanged
739+
{**{FROM: ts2_dt, TO: None}, **l2_1}, # unchanged
740+
{**{FROM: ts2_dt, TO: ts3_dt}, **l2_3}, # retired
727741
]
728-
assert_records_as_set(get_table(p, "dim_test"), expected)
742+
assert_records_as_set(get_table(p, "dim_test", ts_columns=[FROM, TO]), expected)
729743

730744
# invalid boundary timestamp should raise error
731745
with pytest.raises(ValueError):
@@ -738,7 +752,7 @@ def r(data):
738752
)
739753

740754
# run 4 — no boundary timestamp (use current precise_time)
741-
current_time["ts"] = pendulum.parse(ts5).timestamp()
755+
current_time["ts"] = ts5
742756
dim_snap = [
743757
l3_1 := {"nk": 1, "foo": "foobar"}, # updated
744758
]
@@ -753,19 +767,19 @@ def r(data):
753767
assert_load_info(info)
754768
assert load_table_counts(p, "dim_test")["dim_test"] == 5
755769
expected = [
756-
{**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_1}, # unchanged
757-
{**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_2}, # unchanged
770+
{**{FROM: ts1_dt, TO: ts2_dt}, **l1_1}, # unchanged
771+
{**{FROM: ts1_dt, TO: ts2_dt}, **l1_2}, # unchanged
758772
{
759-
**{FROM: strip_timezone(ts2), TO: strip_timezone(ts5)},
773+
**{FROM: ts2_dt, TO: ts5_dt},
760774
**l2_1,
761775
}, # retired in this run
762776
{
763-
**{FROM: strip_timezone(ts2), TO: strip_timezone(ts3)},
777+
**{FROM: ts2_dt, TO: ts3_dt},
764778
**l2_3,
765779
}, # unchanged (already retired in load 3)
766-
{**{FROM: strip_timezone(ts5), TO: None}, **l3_1}, # new current version
780+
{**{FROM: ts5_dt, TO: None}, **l3_1}, # new current version
767781
]
768-
assert_records_as_set(get_table(p, "dim_test"), expected)
782+
assert_records_as_set(get_table(p, "dim_test", ts_columns=[FROM, TO]), expected)
769783

770784

771785
@pytest.mark.essential

0 commit comments

Comments
 (0)