Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/#397 scenario duplication #2373

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions taipy/core/data/_data_manager.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should implement a can_duplicate method, returning reasons, just like we have a can_create method.

Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ class _DataManager(_Manager[DataNode], _VersionMixin):
_EVENT_ENTITY_TYPE = EventEntityType.DATA_NODE
_repository: _DataFSRepository

@classmethod
def _get_owner_id(
cls, scope, cycle_id, scenario_id
) -> Union[Optional[SequenceId], Optional[ScenarioId], Optional[CycleId]]:
if scope == Scope.SCENARIO:
return scenario_id
elif scope == Scope.CYCLE:
return cycle_id
else:
return None

@classmethod
def _bulk_get_or_create(
cls,
Expand All @@ -48,13 +59,7 @@ def _bulk_get_or_create(
dn_configs_and_owner_id = []
for dn_config in data_node_configs:
scope = dn_config.scope
owner_id: Union[Optional[SequenceId], Optional[ScenarioId], Optional[CycleId]]
if scope == Scope.SCENARIO:
owner_id = scenario_id
elif scope == Scope.CYCLE:
owner_id = cycle_id
else:
owner_id = None
owner_id = cls._get_owner_id(scope, cycle_id, scenario_id)
dn_configs_and_owner_id.append((dn_config, owner_id))

data_nodes = cls._repository._get_by_configs_and_owner_ids(
Expand Down Expand Up @@ -174,3 +179,19 @@ def _get_by_config_id(cls, config_id: str, version_number: Optional[str] = None)
for fil in filters:
fil.update({"config_id": config_id})
return cls._repository._load_all(filters)

@classmethod
def _clone(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename _clone to _duplicate

cls, dn: DataNode, cycle_id: Optional[CycleId] = None, scenario_id: Optional[ScenarioId] = None
) -> DataNode:
cloned_dn = cls._get(dn)

cloned_dn.id = cloned_dn._new_id(cloned_dn._config_id)
cloned_dn._owner_id = cls._get_owner_id(cloned_dn._scope, cycle_id, scenario_id)
cloned_dn._parent_ids = set()

cls._set(cloned_dn)

cloned_dn._clone_data()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the data node has a cycle scope, and if the new scenario is from the same cycle, then we want to share the same data node between scenarios.
If the scope is global it always already exists, and we also want to share the existing one.


return cloned_dn
55 changes: 38 additions & 17 deletions taipy/core/data/_file_datanode_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class _FileDataNodeMixin:
_PATH_KEY = "path"
_DEFAULT_PATH_KEY = "default_path"
_IS_GENERATED_KEY = "is_generated"
__TAIPY_CLONED_PREFIX = "TAIPY_CLONED"

__logger = _TaipyLogger._get_logger()

Expand Down Expand Up @@ -109,12 +110,14 @@ def _get_downloadable_path(self) -> str:

return ""

def _upload(self,
path: str,
upload_checker: Optional[Callable[[str, Any], bool]] = None,
editor_id: Optional[str] = None,
comment: Optional[str] = None,
**kwargs: Any) -> ReasonCollection:
def _upload(
self,
path: str,
upload_checker: Optional[Callable[[str, Any], bool]] = None,
editor_id: Optional[str] = None,
comment: Optional[str] = None,
**kwargs: Any,
) -> ReasonCollection:
"""Upload a file data to the data node.

Arguments:
Expand All @@ -136,20 +139,23 @@ def _upload(self,
from ._data_manager_factory import _DataManagerFactory

reasons = ReasonCollection()
if (editor_id
and self.edit_in_progress # type: ignore[attr-defined]
and self.editor_id != editor_id # type: ignore[attr-defined]
and (not self.editor_expiration_date # type: ignore[attr-defined]
or self.editor_expiration_date > datetime.now())): # type: ignore[attr-defined]
if (
editor_id
and self.edit_in_progress # type: ignore[attr-defined]
and self.editor_id != editor_id # type: ignore[attr-defined]
and (
not self.editor_expiration_date # type: ignore[attr-defined]
or self.editor_expiration_date > datetime.now()
)
): # type: ignore[attr-defined]
reasons._add_reason(self.id, DataNodeEditInProgress(self.id)) # type: ignore[attr-defined]
return reasons

up_path = pathlib.Path(path)
try:
upload_data = self._read_from_path(str(up_path))
except Exception as err:
self.__logger.error(f"Error uploading `{up_path.name}` to data "
f"node `{self.id}`:") # type: ignore[attr-defined]
self.__logger.error(f"Error uploading `{up_path.name}` to data " f"node `{self.id}`:") # type: ignore[attr-defined]
self.__logger.error(f"Error: {err}")
reasons._add_reason(self.id, UploadFileCanNotBeRead(up_path.name, self.id)) # type: ignore[attr-defined]
return reasons
Expand All @@ -161,7 +167,8 @@ def _upload(self,
self.__logger.error(
f"Error with the upload checker `{upload_checker.__name__}` "
f"while checking `{up_path.name}` file for upload to the data "
f"node `{self.id}`:") # type: ignore[attr-defined]
f"node `{self.id}`:"
) # type: ignore[attr-defined]
self.__logger.error(f"Error: {err}")
can_upload = False

Expand All @@ -171,9 +178,12 @@ def _upload(self,

shutil.copy(up_path, self.path)

self.track_edit(timestamp=datetime.now(), # type: ignore[attr-defined]
editor_id=editor_id,
comment=comment, **kwargs)
self.track_edit(
timestamp=datetime.now(), # type: ignore[attr-defined]
editor_id=editor_id,
comment=comment,
**kwargs,
)
self.unlock_edit() # type: ignore[attr-defined]

_DataManagerFactory._build_manager()._set(self) # type: ignore[arg-type]
Expand Down Expand Up @@ -212,3 +222,14 @@ def _migrate_path(self, storage_type, old_path) -> str:
if os.path.exists(old_path):
shutil.move(old_path, new_path)
return new_path

def _clone_data_file(self, id: str) -> Optional[str]:
if os.path.exists(self.path):
folder_path, base_name = os.path.split(self.path)
new_base_path = os.path.join(folder_path, f"TAIPY_CLONE_{id}_{base_name}")
if os.path.isdir(self.path):
shutil.copytree(self.path, new_base_path)
else:
shutil.copy(self.path, new_base_path)
return new_base_path
return ""
Comment on lines +227 to +235
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to differentiate the cases where the initial path is generated by Taipy or provided by the user?
I believe it would be better. If it is Taipy generated, we can just replace the old id by the new one. otherwise, adding a prefix or a suffix as you did make sense.

5 changes: 5 additions & 0 deletions taipy/core/data/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,8 @@ def _write(self, data: Any, columns: Optional[List[str]] = None):
encoding=properties[self.__ENCODING_KEY],
header=properties[self._HAS_HEADER_PROPERTY],
)

def _clone_data(self):
new_data_path = self._clone_data_file(self.id)
self._properties[self._PATH_KEY] = new_data_path
return new_data_path
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move that outside the data node, to put it in the data_manager.
Just like we handle the parent_ids and the owner_id in the manager, I would set the path property in the manager as well (still retrieving the value from a fileDatanodeMixing method).
This is debatable, though...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well this is specific for file DNs only, if we put it in the data manager, we're grouping it with other types of DNs like Sql or mongo, I don't think it's a good idea

52 changes: 32 additions & 20 deletions taipy/core/data/data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,22 +433,27 @@ def append(self, data, editor_id: Optional[str] = None, comment: Optional[str] =
corresponding to this write.
"""
from ._data_manager_factory import _DataManagerFactory
if (editor_id

if (
editor_id
and self.edit_in_progress
and self.editor_id != editor_id
and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())):
and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())
):
raise DataNodeIsBeingEdited(self.id, self.editor_id)
self._append(data)
self.track_edit(editor_id=editor_id, comment=comment, **kwargs)
self.unlock_edit()
_DataManagerFactory._build_manager()._set(self)

def write(self,
data,
job_id: Optional[JobId] = None,
editor_id: Optional[str] = None,
comment: Optional[str] = None,
**kwargs: Any):
def write(
self,
data,
job_id: Optional[JobId] = None,
editor_id: Optional[str] = None,
comment: Optional[str] = None,
**kwargs: Any,
):
"""Write some data to this data node.

once the data is written, the data node is unlocked and the edit is tracked.
Expand All @@ -461,10 +466,12 @@ def write(self,
**kwargs (Any): Extra information to attach to the edit document
corresponding to this write.
"""
if (editor_id
if (
editor_id
and self.edit_in_progress
and self.editor_id != editor_id
and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())):
and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())
):
raise DataNodeIsBeingEdited(self.id, self.editor_id)
self._write(data)
self.track_edit(job_id=job_id, editor_id=editor_id, comment=comment, **kwargs)
Expand All @@ -473,12 +480,14 @@ def write(self,

_DataManagerFactory._build_manager()._set(self)

def track_edit(self,
job_id: Optional[str] = None,
editor_id: Optional[str] = None,
timestamp: Optional[datetime] = None,
comment: Optional[str] = None,
**options: Any):
def track_edit(
self,
job_id: Optional[str] = None,
editor_id: Optional[str] = None,
timestamp: Optional[datetime] = None,
comment: Optional[str] = None,
**options: Any,
):
"""Creates and adds a new entry in the edits attribute without writing the data.

Arguments:
Expand Down Expand Up @@ -627,15 +636,15 @@ def _get_rank(self, scenario_config_id: str) -> int:
If the data node config is not part of the scenario config, 0xfffc is returned as an infinite rank.
"""
if not scenario_config_id:
return 0xfffb
return 0xFFFB
dn_config = Config.data_nodes.get(self._config_id, None)
if not dn_config:
self._logger.error(f"Data node config `{self.config_id}` for data node `{self.id}` is not found.")
return 0xfffd
return 0xFFFD
if not dn_config._ranks:
self._logger.error(f"Data node config `{self.config_id}` for data node `{self.id}` has no rank.")
return 0xfffe
return dn_config._ranks.get(scenario_config_id, 0xfffc)
return 0xFFFE
return dn_config._ranks.get(scenario_config_id, 0xFFFC)

@abstractmethod
def _read(self):
Expand Down Expand Up @@ -676,6 +685,9 @@ def _get_last_modified_datetime(cls, path: Optional[str] = None) -> Optional[dat

return last_modified_datetime

def _clone_data(self):
raise NotImplementedError

@staticmethod
def _class_map():
def all_subclasses(cls):
Expand Down
5 changes: 5 additions & 0 deletions taipy/core/data/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,8 @@ def _write(self, data: Any):
self._write_excel_with_single_sheet(
data.to_excel, self._path, index=False, header=properties[self._HAS_HEADER_PROPERTY] or None
)

def _clone_data(self):
new_data_path = self._clone_data_file(self.id)
self._properties[self._PATH_KEY] = new_data_path
return new_data_path
5 changes: 5 additions & 0 deletions taipy/core/data/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ def _write(self, data: Any):
with open(self._path, "w", encoding=self.properties[self.__ENCODING_KEY]) as f: # type: ignore
json.dump(data, f, indent=4, cls=self._encoder)

def _clone_data(self):
new_data_path = self._clone_data_file(self.id)
self._properties[self._PATH_KEY] = new_data_path
return new_data_path


class _DefaultJSONEncoder(json.JSONEncoder):
def default(self, o):
Expand Down
5 changes: 5 additions & 0 deletions taipy/core/data/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,8 @@ def _append(self, data: Any):

def _write(self, data: Any):
self._write_with_kwargs(data)

def _clone_data(self):
new_data_path = self._clone_data_file(self.id)
self._properties[self._PATH_KEY] = new_data_path
return new_data_path
6 changes: 6 additions & 0 deletions taipy/core/data/pickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,9 @@ def _read_from_path(self, path: Optional[str] = None, **read_kwargs) -> Any:
def _write(self, data):
with open(self._path, "wb") as pf:
pickle.dump(data, pf)

def _clone_data(self):
new_data_path = self._clone_data_file(self.id)
del self._properties._entity_owner
self._properties[self._PATH_KEY] = new_data_path
return new_data_path
44 changes: 44 additions & 0 deletions taipy/core/scenario/_scenario_manager.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should implement a can_duplicate method, returning reasons, just like we have a can_create method.

Original file line number Diff line number Diff line change
Expand Up @@ -521,3 +521,47 @@ def _get_by_config_id(cls, config_id: str, version_number: Optional[str] = None)
for fil in filters:
fil.update({"config_id": config_id})
return cls._repository._load_all(filters)

@classmethod
def _clone(cls, scenario: Scenario) -> Scenario:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename _clone method to _duplicate as it does not return another instance of the same object. It returns a similar object with a few differences (ids, sub-entities' ids, paths, etc...)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should accept an optional name and an optional date in the method signature. It does not change much for the name but it does for the creation date. This has an impact on the potential cycle as the new scenario might be on a different cycle.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm? I can understand the name, but the creation date?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say I have a January scenario I am happy with. I want to start with a duplicate of this one to compute my February scenario. So beginning of February I duplicate my January scenario passing the current date so the new scenario is in the February cycle.

Does it make sense for you? And @FlorianJacta any opinion on that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree; duplication for me was not about the creation date. Your use case is a real use case from CFM

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the issue, we should also pass an optional list of data nodes or data node IDs. Without any list; we should copy all the data. If the list is provided, only the files of the data nodes in the list should be copied.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave it as the next step for now

"""
Clone a scenario.

Arguments:
scenario (Scenario): The scenario to clone.

Returns:
Scenario: The cloned scenario.
"""
cloned_scenario = cls._get(scenario)
cloned_scenario_id = cloned_scenario._new_id(cloned_scenario.config_id)
cloned_scenario.id = cloned_scenario_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cloned_scenario_id = cloned_scenario._new_id(cloned_scenario.config_id)
cloned_scenario.id = cloned_scenario_id
cloned_scenario.id = cloned_scenario._new_id(cloned_scenario.config_id)

# TODO: update sequences

# Clone tasks and data nodes
_task_manager = _TaskManagerFactory._build_manager()
_data_manager = _DataManagerFactory._build_manager()

cloned_tasks = set()
for task in cloned_scenario.tasks.values():
cloned_tasks.add(_task_manager._clone(task, None, cloned_scenario_id))
cloned_scenario._tasks = cloned_tasks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicating tasks should depend on minimal scope of inputs and outputs data nodes.


cloned_additional_data_nodes = set()
for data_node in cloned_scenario.additional_data_nodes.values():
cloned_additional_data_nodes.add(_data_manager._clone(data_node, None, cloned_scenario_id))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicating data nodes should depend on its scope.

cloned_scenario._additional_data_nodes = cloned_additional_data_nodes

for task in cloned_tasks:
if cloned_scenario_id not in task._parent_ids:
task._parent_ids.update([cloned_scenario_id])
_task_manager._set(task)

for dn in cloned_additional_data_nodes:
if cloned_scenario_id not in dn._parent_ids:
dn._parent_ids.update([cloned_scenario_id])
_data_manager._set(dn)

cls._set(cloned_scenario)

return cloned_scenario
Loading
Loading