From b3a3f716fdfc366385ec346a1d17c24bdd09dff6 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Tue, 16 Nov 2021 17:25:13 -0800 Subject: [PATCH 1/6] Stage directories. --- cwltool/docker.py | 3 +-- cwltool/job.py | 10 ++++++++++ cwltool/pathmapper.py | 30 ++++++++++++++++++++++++------ 3 files changed, 35 insertions(+), 8 deletions(-) diff --git a/cwltool/docker.py b/cwltool/docker.py index b1c1b7bbf..ff91ea1ac 100644 --- a/cwltool/docker.py +++ b/cwltool/docker.py @@ -247,8 +247,7 @@ def append_volume( options.append("readonly") output = StringIO() csv.writer(output).writerow(options) - mount_arg = output.getvalue().strip() - runtime.append(f"--mount={mount_arg}") + runtime.append(f"--mount={output.getvalue().strip()}") # Unlike "--volume", "--mount" will fail if the volume doesn't already exist. if not os.path.exists(source): os.makedirs(source) diff --git a/cwltool/job.py b/cwltool/job.py index e617fcd96..8e1b1c60e 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -693,6 +693,7 @@ def add_volumes( any_path_okay: bool = False, ) -> None: """Append volume mappings to the runtime option list.""" + stage_source_dir = os.environ.get('STAGE_SRC_DIR', os.path.join(tempfile.gettempdir(), 'cwl-stg-src-dir')) container_outdir = self.builder.outdir for key, vol in (itm for itm in pathmapper.items() if itm[1].staged): host_outdir_tgt = None # type: Optional[str] @@ -700,6 +701,8 @@ def add_volumes( host_outdir_tgt = os.path.join( self.outdir, vol.target[len(container_outdir) + 1 :] ) + if stage_source_dir and vol.resolved.startswith(stage_source_dir): + continue # path is already staged; only mount the host directory if not host_outdir_tgt and not any_path_okay: raise WorkflowException( "No mandatory DockerRequirement, yet path is outside " @@ -707,20 +710,27 @@ def add_volumes( "$(runtime.outdir): {}".format(vol) ) if vol.type in ("File", "Directory"): + logging.critical(f'file/dir: {vol.target}') self.add_file_or_directory_volume(runtime, vol, host_outdir_tgt) elif vol.type == "WritableFile": + logging.critical(f'wfile: {vol.target}') self.add_writable_file_volume( runtime, vol, host_outdir_tgt, tmpdir_prefix ) elif vol.type == "WritableDirectory": + logging.critical(f'wdir: {vol.target}') self.add_writable_directory_volume( runtime, vol, host_outdir_tgt, tmpdir_prefix ) elif vol.type in ["CreateFile", "CreateWritableFile"]: + logging.critical(f'cf: {vol.target}') new_path = self.create_file_and_add_volume( runtime, vol, host_outdir_tgt, secret_store, tmpdir_prefix ) pathmapper.update(key, new_path, vol.target, vol.type, vol.staged) + # mount a single host directory for all staged source files + if stage_source_dir and pathmapper.stagedir != container_outdir: + self.append_volume(runtime, stage_source_dir, pathmapper.stagedir, writable=True) def run( self, diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py index c89fac5f4..f998d61fd 100644 --- a/cwltool/pathmapper.py +++ b/cwltool/pathmapper.py @@ -1,6 +1,7 @@ import collections import logging import os +import tempfile import stat import urllib import uuid @@ -65,6 +66,7 @@ def __init__( ) -> None: """Initialize the PathMapper.""" self._pathmap = {} # type: Dict[str, MapperEnt] + self.staged_src_files = set() self.stagedir = stagedir self.separateDirs = separateDirs self.setup(dedup(referenced_files), basedir) @@ -93,7 +95,10 @@ def visit( basedir: str, copy: bool = False, staged: bool = False, + stage_source_dir: Optional[str] = None, ) -> None: + if stage_source_dir: + os.makedirs(stage_source_dir, exist_ok=True) stagedir = cast(Optional[str], obj.get("dirname")) or stagedir tgt = os.path.join( stagedir, @@ -150,10 +155,16 @@ def visit( else os.path.join(os.path.dirname(deref), rl) ) st = os.lstat(deref) - - self._pathmap[path] = MapperEnt( - deref, tgt, "WritableFile" if copy else "File", staged - ) + if stage_source_dir: + staged_source_file = os.path.join(stage_source_dir, os.path.basename(deref)) + os.link(deref, staged_source_file) + self._pathmap[path] = MapperEnt( + staged_source_file, tgt, "WritableFile" if copy else "File", staged + ) + else: + self._pathmap[path] = MapperEnt( + deref, tgt, "WritableFile" if copy else "File", staged + ) self.visitlisting( cast(List[CWLObjectType], obj.get("secondaryFiles", [])), stagedir, @@ -163,19 +174,26 @@ def visit( ) def setup(self, referenced_files: List[CWLObjectType], basedir: str) -> None: - # Go through each file and set the target to its own directory along # with any secondary files. stagedir = self.stagedir + stage_source_dir = os.environ.get('STAGE_SRC_DIR', os.path.join(tempfile.gettempdir(), 'cwl-stg-src-dir')) + if stage_source_dir: + os.makedirs(stage_source_dir, exist_ok=True) for fob in referenced_files: + staging_uuid = str(uuid.uuid4()) if self.separateDirs: - stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4()) + # this is what the path will be inside of the container environment + stagedir = os.path.join(self.stagedir, "stg%s" % staging_uuid) + # if STAGE_SRC_DIR is set, this is where input paths will be linked/staged at + stagesourcedir = None if not stage_source_dir else os.path.join(stage_source_dir, "stg%s" % staging_uuid) self.visit( fob, stagedir, basedir, copy=cast(bool, fob.get("writable", False)), staged=True, + stage_source_dir=stagesourcedir, ) def mapper(self, src: str) -> MapperEnt: From 6767c098f48b10f8565c4b0f8f6b5594ef2af412 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Tue, 16 Nov 2021 17:34:53 -0800 Subject: [PATCH 2/6] Clean up. --- cwltool/docker.py | 3 ++- cwltool/job.py | 6 +++--- cwltool/pathmapper.py | 16 ++++++---------- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/cwltool/docker.py b/cwltool/docker.py index ff91ea1ac..b1c1b7bbf 100644 --- a/cwltool/docker.py +++ b/cwltool/docker.py @@ -247,7 +247,8 @@ def append_volume( options.append("readonly") output = StringIO() csv.writer(output).writerow(options) - runtime.append(f"--mount={output.getvalue().strip()}") + mount_arg = output.getvalue().strip() + runtime.append(f"--mount={mount_arg}") # Unlike "--volume", "--mount" will fail if the volume doesn't already exist. if not os.path.exists(source): os.makedirs(source) diff --git a/cwltool/job.py b/cwltool/job.py index 8e1b1c60e..bdc51118e 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -701,14 +701,14 @@ def add_volumes( host_outdir_tgt = os.path.join( self.outdir, vol.target[len(container_outdir) + 1 :] ) - if stage_source_dir and vol.resolved.startswith(stage_source_dir): - continue # path is already staged; only mount the host directory if not host_outdir_tgt and not any_path_okay: raise WorkflowException( "No mandatory DockerRequirement, yet path is outside " "the designated output directory, also know as " "$(runtime.outdir): {}".format(vol) ) + if stage_source_dir and vol.resolved.startswith(stage_source_dir): + continue # path is already staged; only mount the host directory (at the end of this function) if vol.type in ("File", "Directory"): logging.critical(f'file/dir: {vol.target}') self.add_file_or_directory_volume(runtime, vol, host_outdir_tgt) @@ -728,8 +728,8 @@ def add_volumes( runtime, vol, host_outdir_tgt, secret_store, tmpdir_prefix ) pathmapper.update(key, new_path, vol.target, vol.type, vol.staged) - # mount a single host directory for all staged source files if stage_source_dir and pathmapper.stagedir != container_outdir: + # mount a single host directory for all staged input files self.append_volume(runtime, stage_source_dir, pathmapper.stagedir, writable=True) def run( diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py index f998d61fd..dcb739829 100644 --- a/cwltool/pathmapper.py +++ b/cwltool/pathmapper.py @@ -66,7 +66,6 @@ def __init__( ) -> None: """Initialize the PathMapper.""" self._pathmap = {} # type: Dict[str, MapperEnt] - self.staged_src_files = set() self.stagedir = stagedir self.separateDirs = separateDirs self.setup(dedup(referenced_files), basedir) @@ -158,13 +157,10 @@ def visit( if stage_source_dir: staged_source_file = os.path.join(stage_source_dir, os.path.basename(deref)) os.link(deref, staged_source_file) - self._pathmap[path] = MapperEnt( - staged_source_file, tgt, "WritableFile" if copy else "File", staged - ) - else: - self._pathmap[path] = MapperEnt( - deref, tgt, "WritableFile" if copy else "File", staged - ) + deref = staged_source_file + self._pathmap[path] = MapperEnt( + deref, tgt, "WritableFile" if copy else "File", staged + ) self.visitlisting( cast(List[CWLObjectType], obj.get("secondaryFiles", [])), stagedir, @@ -186,14 +182,14 @@ def setup(self, referenced_files: List[CWLObjectType], basedir: str) -> None: # this is what the path will be inside of the container environment stagedir = os.path.join(self.stagedir, "stg%s" % staging_uuid) # if STAGE_SRC_DIR is set, this is where input paths will be linked/staged at - stagesourcedir = None if not stage_source_dir else os.path.join(stage_source_dir, "stg%s" % staging_uuid) + unique_stage_source_dir = None if not stage_source_dir else os.path.join(stage_source_dir, "stg%s" % staging_uuid) self.visit( fob, stagedir, basedir, copy=cast(bool, fob.get("writable", False)), staged=True, - stage_source_dir=stagesourcedir, + stage_source_dir=unique_stage_source_dir, ) def mapper(self, src: str) -> MapperEnt: From aa31f1e612acf888fac2d9e2e43d0249d3e785e4 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Tue, 16 Nov 2021 17:35:43 -0800 Subject: [PATCH 3/6] Clean up. --- cwltool/job.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/cwltool/job.py b/cwltool/job.py index bdc51118e..2a1096dfb 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -710,20 +710,16 @@ def add_volumes( if stage_source_dir and vol.resolved.startswith(stage_source_dir): continue # path is already staged; only mount the host directory (at the end of this function) if vol.type in ("File", "Directory"): - logging.critical(f'file/dir: {vol.target}') self.add_file_or_directory_volume(runtime, vol, host_outdir_tgt) elif vol.type == "WritableFile": - logging.critical(f'wfile: {vol.target}') self.add_writable_file_volume( runtime, vol, host_outdir_tgt, tmpdir_prefix ) elif vol.type == "WritableDirectory": - logging.critical(f'wdir: {vol.target}') self.add_writable_directory_volume( runtime, vol, host_outdir_tgt, tmpdir_prefix ) elif vol.type in ["CreateFile", "CreateWritableFile"]: - logging.critical(f'cf: {vol.target}') new_path = self.create_file_and_add_volume( runtime, vol, host_outdir_tgt, secret_store, tmpdir_prefix ) From fbf845ce5a24a279755d1ef92a9fcd2ab92e8ce3 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Tue, 16 Nov 2021 17:37:59 -0800 Subject: [PATCH 4/6] Clean up. --- cwltool/pathmapper.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py index dcb739829..87e86b257 100644 --- a/cwltool/pathmapper.py +++ b/cwltool/pathmapper.py @@ -174,15 +174,16 @@ def setup(self, referenced_files: List[CWLObjectType], basedir: str) -> None: # with any secondary files. stagedir = self.stagedir stage_source_dir = os.environ.get('STAGE_SRC_DIR', os.path.join(tempfile.gettempdir(), 'cwl-stg-src-dir')) - if stage_source_dir: - os.makedirs(stage_source_dir, exist_ok=True) for fob in referenced_files: staging_uuid = str(uuid.uuid4()) if self.separateDirs: # this is what the path will be inside of the container environment stagedir = os.path.join(self.stagedir, "stg%s" % staging_uuid) # if STAGE_SRC_DIR is set, this is where input paths will be linked/staged at - unique_stage_source_dir = None if not stage_source_dir else os.path.join(stage_source_dir, "stg%s" % staging_uuid) + unique_stage_source_dir = None + if stage_source_dir: + unique_stage_source_dir = os.path.join(stage_source_dir, "stg%s" % staging_uuid) + os.makedirs(stage_source_dir, exist_ok=True) self.visit( fob, stagedir, From b86e464be63b878c4f86a9978464a9a4fc984522 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Tue, 16 Nov 2021 17:41:57 -0800 Subject: [PATCH 5/6] Copy if we hit an error. --- cwltool/pathmapper.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py index 87e86b257..7b7f70451 100644 --- a/cwltool/pathmapper.py +++ b/cwltool/pathmapper.py @@ -2,6 +2,7 @@ import logging import os import tempfile +import shutil import stat import urllib import uuid @@ -156,7 +157,10 @@ def visit( st = os.lstat(deref) if stage_source_dir: staged_source_file = os.path.join(stage_source_dir, os.path.basename(deref)) - os.link(deref, staged_source_file) + try: + os.link(deref, staged_source_file) + except OSError: + shutil.copyfile(deref, staged_source_file) deref = staged_source_file self._pathmap[path] = MapperEnt( deref, tgt, "WritableFile" if copy else "File", staged From ee119e150238d792ebaed4fbce3272116189fdb1 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Wed, 17 Nov 2021 12:53:06 -0800 Subject: [PATCH 6/6] Linting. --- cwltool/job.py | 8 ++++++-- cwltool/pathmapper.py | 12 +++++++++--- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/cwltool/job.py b/cwltool/job.py index 2a1096dfb..210c79bd3 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -693,7 +693,9 @@ def add_volumes( any_path_okay: bool = False, ) -> None: """Append volume mappings to the runtime option list.""" - stage_source_dir = os.environ.get('STAGE_SRC_DIR', os.path.join(tempfile.gettempdir(), 'cwl-stg-src-dir')) + stage_source_dir = os.environ.get( + "STAGE_SRC_DIR", os.path.join(tempfile.gettempdir(), "cwl-stg-src-dir") + ) container_outdir = self.builder.outdir for key, vol in (itm for itm in pathmapper.items() if itm[1].staged): host_outdir_tgt = None # type: Optional[str] @@ -726,7 +728,9 @@ def add_volumes( pathmapper.update(key, new_path, vol.target, vol.type, vol.staged) if stage_source_dir and pathmapper.stagedir != container_outdir: # mount a single host directory for all staged input files - self.append_volume(runtime, stage_source_dir, pathmapper.stagedir, writable=True) + self.append_volume( + runtime, stage_source_dir, pathmapper.stagedir, writable=True + ) def run( self, diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py index 7b7f70451..faa8aa517 100644 --- a/cwltool/pathmapper.py +++ b/cwltool/pathmapper.py @@ -156,7 +156,9 @@ def visit( ) st = os.lstat(deref) if stage_source_dir: - staged_source_file = os.path.join(stage_source_dir, os.path.basename(deref)) + staged_source_file = os.path.join( + stage_source_dir, os.path.basename(deref) + ) try: os.link(deref, staged_source_file) except OSError: @@ -177,7 +179,9 @@ def setup(self, referenced_files: List[CWLObjectType], basedir: str) -> None: # Go through each file and set the target to its own directory along # with any secondary files. stagedir = self.stagedir - stage_source_dir = os.environ.get('STAGE_SRC_DIR', os.path.join(tempfile.gettempdir(), 'cwl-stg-src-dir')) + stage_source_dir = os.environ.get( + "STAGE_SRC_DIR", os.path.join(tempfile.gettempdir(), "cwl-stg-src-dir") + ) for fob in referenced_files: staging_uuid = str(uuid.uuid4()) if self.separateDirs: @@ -186,7 +190,9 @@ def setup(self, referenced_files: List[CWLObjectType], basedir: str) -> None: # if STAGE_SRC_DIR is set, this is where input paths will be linked/staged at unique_stage_source_dir = None if stage_source_dir: - unique_stage_source_dir = os.path.join(stage_source_dir, "stg%s" % staging_uuid) + unique_stage_source_dir = os.path.join( + stage_source_dir, "stg%s" % staging_uuid + ) os.makedirs(stage_source_dir, exist_ok=True) self.visit( fob,