Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,8 @@ class TPipelineLocalState(TypedDict, total=False):
"""Run dir when pipeline was instantiated for a first time, defaults to cwd on OSS run context"""
last_run_context: Optional[TLastRunContext]
"""Context from the last successful pipeline run or sync"""
_last_dev_mode: bool
"""Indicates whether previous run used dev_mode; used to reset state on dev->non-dev toggle"""


class TPipelineState(TVersionedState, total=False):
Expand Down
25 changes: 21 additions & 4 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Any,
Callable,
ClassVar,
Dict,
List,
Iterator,
Optional,
Expand Down Expand Up @@ -849,6 +850,10 @@ def _sync_destination(
)
# write the state back
self._props_to_state(state)
# persist current dev_mode in local state so future inits can detect toggles
if state.get("_local") is None:
state.setdefault("_local", {})
state["_local"]["_last_dev_mode"] = self.dev_mode
# verify state
if state_default_schema_name := state.get("default_schema_name"):
# at least empty list is present
Expand Down Expand Up @@ -1582,6 +1587,11 @@ def _get_state(self) -> TPipelineState:
_local = migrated_state["_local"]
if "initial_cwd" not in _local:
_local["initial_cwd"] = os.path.abspath(dlt.current.run_context().local_dir)
# if previous run used dev_mode=True and current instance uses dev_mode=False,
# reset pipeline by returning an empty state
_last_dev_mode = _local.get("_last_dev_mode")
if _last_dev_mode and not self.dev_mode:
return default_pipeline_state()
return migrated_state
except FileNotFoundError:
# do not set the state hash, this will happen on first merge
Expand Down Expand Up @@ -1689,9 +1699,11 @@ def _state_to_props(self, state: TPipelineState) -> None:
for prop in Pipeline.STATE_PROPS:
if prop in state and not prop.startswith("_"):
setattr(self, prop, state[prop]) # type: ignore
# access local state via a casted mapping to allow dynamic keys
local_state = cast("Dict[str, Any]", state["_local"])
for prop in Pipeline.LOCAL_STATE_PROPS:
if prop in state["_local"] and not prop.startswith("_"):
setattr(self, prop, state["_local"][prop]) # type: ignore
if prop in local_state and not prop.startswith("_"):
setattr(self, prop, local_state[prop]) # type: ignore
# staging and destination are taken from state only if not yet set in the pipeline
if not self._destination:
self._set_destinations(
Expand Down Expand Up @@ -1725,7 +1737,7 @@ def _props_to_state(self, state: TPipelineState) -> TPipelineState:
state[prop] = getattr(self, prop) # type: ignore
for prop in Pipeline.LOCAL_STATE_PROPS:
if not prop.startswith("_"):
state["_local"][prop] = getattr(self, prop) # type: ignore
state["_local"][prop] = getattr(self, prop)
if self._destination:
state["destination_type"] = self._destination.destination_type
state["destination_name"] = self._destination.configured_name
Expand Down Expand Up @@ -1771,7 +1783,12 @@ def _bump_version_and_extract_state(

Storage will be created on demand. In that case the extracted package will be immediately committed.
"""
_, hash_, _ = bump_pipeline_state_version_if_modified(self._props_to_state(state))
# Write pipeline props into state and persist the current dev_mode value in local state
self._props_to_state(state)
if state.get("_local") is None:
state.setdefault("_local", {})
state["_local"]["_last_dev_mode"] = self.dev_mode
_, hash_, _ = bump_pipeline_state_version_if_modified(state)
should_extract = hash_ != state["_local"].get("_last_extracted_hash")
if should_extract and extract_state:
extract_ = extract or Extract(self._schema_storage, self._normalize_storage_config())
Expand Down
20 changes: 20 additions & 0 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,26 @@ def test_run_dev_mode_default_dataset() -> None:
assert p.dataset_name and p.dataset_name.endswith(p._pipeline_instance_id)


def test_dev_mode_then_non_dev_resets_dataset_name() -> None:
p_dev = dlt.pipeline(dev_mode=True, destination="filesystem")
assert p_dev.dataset_name.endswith(p_dev._pipeline_instance_id)
p_non_dev = dlt.pipeline(destination="filesystem")
assert not p_non_dev.dataset_name.endswith(p_dev._pipeline_instance_id)
assert p_non_dev.dataset_name.endswith("_dataset")


def test_dev_mode_then_non_dev_with_explicit_dataset_name() -> None:
base_ds = "my_custom_dataset_name"
# In dev mode, explicit dataset_name should be suffixed with instance id
p_dev = dlt.pipeline(dev_mode=True, destination="filesystem", dataset_name=base_ds)
assert p_dev.dataset_name.startswith(base_ds)
assert p_dev.dataset_name != base_ds
assert p_dev.dataset_name.endswith(p_dev._pipeline_instance_id)
# In non-dev mode, explicit dataset_name should be used as-is (no suffix)
p_non_dev = dlt.pipeline(destination="filesystem", dataset_name=base_ds)
assert p_non_dev.dataset_name == base_ds


def test_run_dev_mode_default_dataset_layout(environment) -> None:
# Set dataset_name_layout to "bobby_%s"
dataset_name_layout = "bobby_%s"
Expand Down
Loading