Skip to content

Commit

Permalink
no more data class
Browse files Browse the repository at this point in the history
  • Loading branch information
khsrali committed Feb 10, 2025
1 parent a8bac9a commit 5473613
Show file tree
Hide file tree
Showing 15 changed files with 379 additions and 49 deletions.
4 changes: 2 additions & 2 deletions .docker/tests/test_aiida.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


def test_correct_python_version_installed(aiida_exec, python_version):
info = json.loads(aiida_exec('mamba list --json --full-name python', ignore_stderr=True).decode())[0]
info = json.loads(aiida_exec('mamba list --json --full-name python').decode())[0]
assert info['name'] == 'python'
assert parse(info['version']) == parse(python_version)

Expand All @@ -15,7 +15,7 @@ def test_correct_pgsql_version_installed(aiida_exec, pgsql_version, variant):
if variant == 'aiida-core-base':
pytest.skip('PostgreSQL is not installed in the base image')

info = json.loads(aiida_exec('mamba list --json --full-name postgresql', ignore_stderr=True).decode())[0]
info = json.loads(aiida_exec('mamba list --json --full-name postgresql').decode())[0]
assert info['name'] == 'postgresql'
assert parse(info['version']).major == parse(pgsql_version).major

Expand Down
8 changes: 7 additions & 1 deletion .github/system_tests/test_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,14 +437,20 @@ def launch_all():
run_multiply_add_workchain()

# Testing the stashing functionality
# To speedup, here we only check with StashMode.COPY.
# All stash_modes are tested separatly in test_execmanager.py
print('Testing the stashing functionality')
process, inputs, expected_result = create_calculation_process(code=code_doubler, inputval=1)
with tempfile.TemporaryDirectory() as tmpdir:
# Delete the temporary directory to test that the stashing functionality will create it if necessary
shutil.rmtree(tmpdir, ignore_errors=True)

source_list = ['output.txt', 'triple_value.*']
inputs['metadata']['options']['stash'] = {'target_base': tmpdir, 'source_list': source_list}
inputs['metadata']['options']['stash'] = {
'stash_mode': StashMode.COPY.value,
'target_base': tmpdir,
'source_list': source_list,
}
_, node = run.get_node(process, **inputs)
assert node.is_finished_ok
assert 'remote_stash' in node.outputs
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ requires-python = '>=3.9'
'core.orbital' = 'aiida.orm.nodes.data.orbital:OrbitalData'
'core.remote' = 'aiida.orm.nodes.data.remote.base:RemoteData'
'core.remote.stash' = 'aiida.orm.nodes.data.remote.stash.base:RemoteStashData'
'core.remote.stash.compress' = 'aiida.orm.nodes.data.remote.stash.compress:RemoteStashCompressedData'
'core.remote.stash.folder' = 'aiida.orm.nodes.data.remote.stash.folder:RemoteStashFolderData'
'core.singlefile' = 'aiida.orm.nodes.data.singlefile:SinglefileData'
'core.str' = 'aiida.orm.nodes.data.str:Str'
Expand Down
4 changes: 4 additions & 0 deletions src/aiida/common/datastructures.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ class StashMode(Enum):
"""Mode to use when stashing files from the working directory of a completed calculation job for safekeeping."""

COPY = 'copy'
COMPRESS_TAR = 'tar'
COMPRESS_TARBZ2 = 'tar.bz2'
COMPRESS_TARGZ = 'tar.gz'
COMPRESS_TARXZ = 'tar.xz'


class CalcJobState(Enum):
Expand Down
120 changes: 85 additions & 35 deletions src/aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,56 +435,106 @@ async def stash_calculation(calculation: CalcJobNode, transport: Transport) -> N
:param transport: an already opened transport.
"""
from aiida.common.datastructures import StashMode
from aiida.orm import RemoteStashFolderData
from aiida.orm import RemoteStashCompressedData, RemoteStashFolderData

logger_extra = get_dblogger_extra(calculation)

stash_options = calculation.get_option('stash')
stash_mode = stash_options.get('mode', StashMode.COPY.value)
stash_mode = stash_options.get('stash_mode')
source_list = stash_options.get('source_list', [])
uuid = calculation.uuid
source_basepath = Path(calculation.get_remote_workdir())

if not source_list:
return

if stash_mode != StashMode.COPY.value:
EXEC_LOGGER.warning(f'stashing mode {stash_mode} is not implemented yet.')
if stash_mode not in [mode.value for mode in StashMode.__members__.values()]:
EXEC_LOGGER.warning(f'stashing mode {stash_mode} is not supported. Stashing skipped.')
return

cls = RemoteStashFolderData
EXEC_LOGGER.debug(
f'stashing files with mode {stash_mode} for calculation<{calculation.pk}>: {source_list}', extra=logger_extra
)

EXEC_LOGGER.debug(f'stashing files for calculation<{calculation.pk}>: {source_list}', extra=logger_extra)
if stash_mode == StashMode.COPY.value:
target_basepath = Path(stash_options['target_base']) / uuid[:2] / uuid[2:4] / uuid[4:]

uuid = calculation.uuid
source_basepath = Path(calculation.get_remote_workdir())
target_basepath = Path(stash_options['target_base']) / uuid[:2] / uuid[2:4] / uuid[4:]

for source_filename in source_list:
if transport.has_magic(source_filename):
copy_instructions = []
for globbed_filename in await transport.glob_async(source_basepath / source_filename):
target_filepath = target_basepath / Path(globbed_filename).relative_to(source_basepath)
copy_instructions.append((globbed_filename, target_filepath))
else:
copy_instructions = [(source_basepath / source_filename, target_basepath / source_filename)]
for source_filename in source_list:
if transport.has_magic(source_filename):
copy_instructions = []
for globbed_filename in await transport.glob_async(source_basepath / source_filename):
target_filepath = target_basepath / Path(globbed_filename).relative_to(source_basepath)
copy_instructions.append((globbed_filename, target_filepath))
else:
copy_instructions = [(source_basepath / source_filename, target_basepath / source_filename)]

for source_filepath, target_filepath in copy_instructions:
# If the source file is in a (nested) directory, create those directories first in the target directory
target_dirname = target_filepath.parent
await transport.makedirs_async(target_dirname, ignore_existing=True)

try:
await transport.copy_async(source_filepath, target_filepath)
except (OSError, ValueError) as exception:
EXEC_LOGGER.warning(f'failed to stash {source_filepath} to {target_filepath}: {exception}')
else:
EXEC_LOGGER.debug(f'stashed {source_filepath} to {target_filepath}')

remote_stash = RemoteStashFolderData(
computer=calculation.computer,
target_basepath=str(target_basepath),
stash_mode=StashMode(stash_mode),
source_list=source_list,
).store()

elif stash_mode in [
StashMode.COMPRESS_TAR.value,
StashMode.COMPRESS_TARBZ2.value,
StashMode.COMPRESS_TARGZ.value,
StashMode.COMPRESS_TARXZ.value,
]:
# stash_mode values are identical with compression_format in transport plugin:
# 'tar', 'tar.gz', 'tar.bz2', or 'tar.xz'
compression_format = stash_mode
file_name = stash_options.get('file_name', uuid)
dereference = stash_options.get('dereference', False)
target_basepath = Path(stash_options['target_base'])
authinfo = calculation.get_authinfo()
aiida_remote_base = authinfo.get_workdir().format(username=transport.whoami())

target_destination = str(target_basepath / file_name) + '.' + compression_format

remote_stash = RemoteStashCompressedData(
computer=calculation.computer,
target_basepath=target_destination,
stash_mode=StashMode(stash_mode),
source_list=source_list,
compression_format=compression_format,
dereference=dereference,
)

for source_filepath, target_filepath in copy_instructions:
# If the source file is in a (nested) directory, create those directories first in the target directory
target_dirname = target_filepath.parent
await transport.makedirs_async(target_dirname, ignore_existing=True)
source_list_abs = [source_basepath / source for source in source_list]

try:
await transport.compress_async(
format=compression_format,
remotesources=source_list_abs,
remotedestination=target_destination,
rootdir=aiida_remote_base,
overwrite=False,
dereference=dereference,
)
except (OSError, ValueError) as exception:
EXEC_LOGGER.warning(f'failed to compress {source_list} to {target_destination}: {exception}')
return
# note: if you raise here, you triger the exponential backoff
# and if you don't raise appreas as succesful in verdi process list: Finished [0]
# open an issue to fix this
# raise exceptions.RemoteOperationError(f'failed '
# 'to compress {source_list} to {target_destination}: {exception}')

remote_stash.store()

try:
await transport.copy_async(source_filepath, target_filepath)
except (OSError, ValueError) as exception:
EXEC_LOGGER.warning(f'failed to stash {source_filepath} to {target_filepath}: {exception}')
else:
EXEC_LOGGER.debug(f'stashed {source_filepath} to {target_filepath}')

remote_stash = cls(
computer=calculation.computer,
target_basepath=str(target_basepath),
stash_mode=StashMode(stash_mode),
source_list=source_list,
).store()
remote_stash.base.links.add_incoming(calculation, link_type=LinkType.CREATE, link_label='remote_stash')


Expand Down
33 changes: 29 additions & 4 deletions src/aiida/engine/processes/calcjobs/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def validate_stash_options(stash_options: Any, _: Any) -> Optional[str]:

target_base = stash_options.get('target_base', None)
source_list = stash_options.get('source_list', None)
stash_mode = stash_options.get('mode', StashMode.COPY.value)
stash_mode = stash_options.get('stash_mode', None)

if not isinstance(target_base, str) or not os.path.isabs(target_base):
return f'`metadata.options.stash.target_base` should be an absolute filepath, got: {target_base}'
Expand All @@ -130,9 +130,23 @@ def validate_stash_options(stash_options: Any, _: Any) -> Optional[str]:
try:
StashMode(stash_mode)
except ValueError:
port = 'metadata.options.stash.mode'
port = 'metadata.options.stash.stash_mode'
return f'`{port}` should be a member of aiida.common.datastructures.StashMode, got: {stash_mode}'

if stash_mode in [
StashMode.COMPRESS_TAR.value,
StashMode.COMPRESS_TARBZ2.value,
StashMode.COMPRESS_TARGZ.value,
StashMode.COMPRESS_TARXZ.value,
]:
dereference = stash_options.get('dereference')
if not isinstance(dereference, bool):
return f'`metadata.options.stash.dereference` should be a boolean, got: {dereference}'

file_name = stash_options.get('file_name')
if not isinstance(file_name, str):
return f'`metadata.options.stash.file_name` should be a string, got: {file_name}'

return None


Expand Down Expand Up @@ -415,7 +429,19 @@ def define(cls, spec: CalcJobProcessSpec) -> None: # type: ignore[override]
required=False,
help='Mode with which to perform the stashing, should be value of `aiida.common.datastructures.StashMode`.',
)

spec.input(
'metadata.options.stash.dereference',
valid_type=bool,
required=False,
help='Whether to follow symlinks while stashing or not, specific to StashMode.COMPRESS',
)
spec.input(
'metadata.options.stash.file_name',
valid_type=str,
required=False,
help='File name to be assigned to the compressed file. If not provided, '
'the UUID of the original calculation is chosen by default. Specific to StashMode.COMPRESS',
)
spec.output(
'remote_folder',
valid_type=orm.RemoteData,
Expand All @@ -434,7 +460,6 @@ def define(cls, spec: CalcJobProcessSpec) -> None: # type: ignore[override]
help='Files that are retrieved by the daemon will be stored in this node. By default the stdout and stderr '
'of the scheduler will be added, but one can add more by specifying them in `CalcInfo.retrieve_list`.',
)

spec.exit_code(
100,
'ERROR_NO_RETRIEVED_FOLDER',
Expand Down
1 change: 1 addition & 0 deletions src/aiida/orm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
'QbFields',
'QueryBuilder',
'RemoteData',
'RemoteStashCompressedData',
'RemoteStashData',
'RemoteStashFolderData',
'SinglefileData',
Expand Down
1 change: 1 addition & 0 deletions src/aiida/orm/nodes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
'ProcessNode',
'ProjectionData',
'RemoteData',
'RemoteStashCompressedData',
'RemoteStashData',
'RemoteStashFolderData',
'SinglefileData',
Expand Down
1 change: 1 addition & 0 deletions src/aiida/orm/nodes/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
'PortableCode',
'ProjectionData',
'RemoteData',
'RemoteStashCompressedData',
'RemoteStashData',
'RemoteStashFolderData',
'SinglefileData',
Expand Down
3 changes: 2 additions & 1 deletion src/aiida/orm/nodes/data/remote/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@

__all__ = (
'RemoteData',
'RemoteStashCompressedData',
'RemoteStashData',
'RemoteStashFolderData',
'RemoteStashFolderData'
)

# fmt: on
4 changes: 3 additions & 1 deletion src/aiida/orm/nodes/data/remote/stash/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
# fmt: off

from .base import *
from .compress import *
from .folder import *

__all__ = (
'RemoteStashCompressedData',
'RemoteStashData',
'RemoteStashFolderData',
'RemoteStashFolderData'
)

# fmt: on
Loading

0 comments on commit 5473613

Please sign in to comment.