Skip to content

Commit 89f300d

Browse files
committed
- Added functionality to persist the current dev_mode in the local state.
- Implemented logic to reset the dataset name when switching from dev_mode to non-dev_mode. - tests to cover scenarios involving transitions between dev_mode and non-dev_mode with explicit dataset names.
1 parent 4a5ffd8 commit 89f300d

File tree

3 files changed

+39
-2
lines changed

3 files changed

+39
-2
lines changed

dlt/common/pipeline.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,8 @@ class TPipelineLocalState(TypedDict, total=False):
479479
"""Run dir when pipeline was instantiated for a first time, defaults to cwd on OSS run context"""
480480
last_run_context: Optional[TLastRunContext]
481481
"""Context from the last successful pipeline run or sync"""
482+
_last_dev_mode: bool
483+
"""Indicates whether previous run used dev_mode; used to reset state on dev->non-dev toggle"""
482484

483485

484486
class TPipelineState(TVersionedState, total=False):

dlt/pipeline/pipeline.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
Any,
88
Callable,
99
ClassVar,
10+
Dict,
1011
List,
1112
Iterator,
1213
Optional,
@@ -849,6 +850,10 @@ def _sync_destination(
849850
)
850851
# write the state back
851852
self._props_to_state(state)
853+
# persist current dev_mode in local state so future inits can detect toggles
854+
if state.get("_local") is None:
855+
state.setdefault("_local", {})
856+
state["_local"]["_last_dev_mode"] = self.dev_mode
852857
# verify state
853858
if state_default_schema_name := state.get("default_schema_name"):
854859
# at least empty list is present
@@ -1582,6 +1587,11 @@ def _get_state(self) -> TPipelineState:
15821587
_local = migrated_state["_local"]
15831588
if "initial_cwd" not in _local:
15841589
_local["initial_cwd"] = os.path.abspath(dlt.current.run_context().local_dir)
1590+
# if previous run used dev_mode=True and current instance uses dev_mode=False,
1591+
# reset pipeline by returning an empty state
1592+
_last_dev_mode = _local.get("_last_dev_mode")
1593+
if _last_dev_mode and not self.dev_mode:
1594+
return default_pipeline_state()
15851595
return migrated_state
15861596
except FileNotFoundError:
15871597
# do not set the state hash, this will happen on first merge
@@ -1725,7 +1735,7 @@ def _props_to_state(self, state: TPipelineState) -> TPipelineState:
17251735
state[prop] = getattr(self, prop) # type: ignore
17261736
for prop in Pipeline.LOCAL_STATE_PROPS:
17271737
if not prop.startswith("_"):
1728-
state["_local"][prop] = getattr(self, prop) # type: ignore
1738+
state["_local"][prop] = getattr(self, prop)
17291739
if self._destination:
17301740
state["destination_type"] = self._destination.destination_type
17311741
state["destination_name"] = self._destination.configured_name
@@ -1771,7 +1781,12 @@ def _bump_version_and_extract_state(
17711781
17721782
Storage will be created on demand. In that case the extracted package will be immediately committed.
17731783
"""
1774-
_, hash_, _ = bump_pipeline_state_version_if_modified(self._props_to_state(state))
1784+
# Write pipeline props into state and persist the current dev_mode value in local state
1785+
self._props_to_state(state)
1786+
if state.get("_local") is None:
1787+
state.setdefault("_local", {})
1788+
state["_local"]["_last_dev_mode"] = self.dev_mode
1789+
_, hash_, _ = bump_pipeline_state_version_if_modified(state)
17751790
should_extract = hash_ != state["_local"].get("_last_extracted_hash")
17761791
if should_extract and extract_state:
17771792
extract_ = extract or Extract(self._schema_storage, self._normalize_storage_config())

tests/pipeline/test_pipeline.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,26 @@ def test_run_dev_mode_default_dataset() -> None:
297297
assert p.dataset_name and p.dataset_name.endswith(p._pipeline_instance_id)
298298

299299

300+
def test_dev_mode_then_non_dev_resets_dataset_name() -> None:
301+
p_dev = dlt.pipeline(dev_mode=True, destination="filesystem")
302+
assert p_dev.dataset_name.endswith(p_dev._pipeline_instance_id)
303+
p_non_dev = dlt.pipeline(destination="filesystem")
304+
assert not p_non_dev.dataset_name.endswith(p_dev._pipeline_instance_id)
305+
assert p_non_dev.dataset_name.endswith("_dataset")
306+
307+
308+
def test_dev_mode_then_non_dev_with_explicit_dataset_name() -> None:
309+
base_ds = "my_custom_dataset_name"
310+
# In dev mode, explicit dataset_name should be suffixed with instance id
311+
p_dev = dlt.pipeline(dev_mode=True, destination="filesystem", dataset_name=base_ds)
312+
assert p_dev.dataset_name.startswith(base_ds)
313+
assert p_dev.dataset_name != base_ds
314+
assert p_dev.dataset_name.endswith(p_dev._pipeline_instance_id)
315+
# In non-dev mode, explicit dataset_name should be used as-is (no suffix)
316+
p_non_dev = dlt.pipeline(destination="filesystem", dataset_name=base_ds)
317+
assert p_non_dev.dataset_name == base_ds
318+
319+
300320
def test_run_dev_mode_default_dataset_layout(environment) -> None:
301321
# Set dataset_name_layout to "bobby_%s"
302322
dataset_name_layout = "bobby_%s"

0 commit comments

Comments
 (0)