diff --git a/pydra/engine/helpers_aws.py b/pydra/engine/helpers_aws.py new file mode 100644 index 0000000000..2e26320b1b --- /dev/null +++ b/pydra/engine/helpers_aws.py @@ -0,0 +1,24 @@ +"""List of helper methods and clients.""" +# TODO It would be nice to rewrite this to some class (e.g. AwsManager). + +try: + import boto3 +except ImportError: + pass + +import logging + +logger = logging.getLogger("pydra") + +s3_client: boto3.client = None + + +def get_s3_client(): + """Lazy getter for S3 client.""" + + global s3_client + + if not s3_client: + s3_client = boto3.client("s3") + + return s3_client diff --git a/pydra/engine/helpers_file.py b/pydra/engine/helpers_file.py index 1810eccd32..3640351c93 100644 --- a/pydra/engine/helpers_file.py +++ b/pydra/engine/helpers_file.py @@ -811,3 +811,9 @@ def is_existing_file(value): return Path(value).exists() except TypeError: return False + + +def is_s3_file(f): + from .specs import S3File + + return isinstance(f, S3File) diff --git a/pydra/engine/specs.py b/pydra/engine/specs.py index 063e0c197f..60725d35d7 100644 --- a/pydra/engine/specs.py +++ b/pydra/engine/specs.py @@ -25,6 +25,23 @@ class File: """An :obj:`os.pathlike` object, designating a file.""" +class S3File(File): + """Remote file in AWS S3 block storage.""" + + bucket_name: str = None + obj_key: str = None + + @classmethod + def converter(cls, value): + + # from helpers_aws import get_s3_client + # s3_client = get_s3_client() + + # Upload file + + raise NotImplementedError("S3File Converter") + + class Directory: """An :obj:`os.pathlike` object, designating a folder.""" @@ -183,7 +200,7 @@ def check_fields_input_spec(self): require_to_check[fld.name] = mdata["requires"] if ( - fld.type in [File, Directory] + fld.type in [File, Directory, S3File] or "pydra.engine.specs.File" in str(fld.type) or "pydra.engine.specs.Directory" in str(fld.type) ): @@ -196,6 +213,10 @@ def check_fields_input_spec(self): def _file_check(self, field): """checking if the file exists""" + + if field.type is S3File: + return self._s3_file_exists(field) + if isinstance(getattr(self, field.name), list): # if value is a list and type is a list of Files/Directory, checking all elements if field.type in [ty.List[File], ty.List[Directory]]: @@ -213,6 +234,26 @@ def _file_check(self, field): f"the file {file} from the {field.name} input does not exist" ) + def _s3_file_exists(self, field): + """Checks if the file exists in the bucket and is accessible.""" + + assert isinstance(field.type, S3File), f"Field {field} is not of type S3File." + + try: + from botocore.exceptions import ClientError + from helpers_aws import get_s3_client + except ImportError: + pass + + s3_client = get_s3_client() + + bucket, key = field.bucket, field.obj_key + try: + s3_client.head_object(Bucket=bucket, Key=key) + return True + except ClientError: + return False + def check_metadata(self): """Check contained metadata.""" diff --git a/pydra/engine/task.py b/pydra/engine/task.py index a62117bb21..302fc21607 100644 --- a/pydra/engine/task.py +++ b/pydra/engine/task.py @@ -825,3 +825,9 @@ def container_args(self): cargs.extend(["--pwd", str(self.output_cpath)]) cargs.append(self.inputs.image) return cargs + + +class ServerlessTask(ShellCommandTask): + """Placeholder for a type of Tasks running using a FaaS (aka Serverless) platform.""" + + pass diff --git a/pydra/engine/tests/test_helpers_aws.py b/pydra/engine/tests/test_helpers_aws.py new file mode 100644 index 0000000000..e1aaf010aa --- /dev/null +++ b/pydra/engine/tests/test_helpers_aws.py @@ -0,0 +1,13 @@ +import botocore + +from ..helpers_aws import ( + s3_client, + get_s3_client, +) + + +def test_get_s3_client(): + assert s3_client is None + r = get_s3_client() + + assert isinstance(r, botocore.client.BaseClient) diff --git a/setup.cfg b/setup.cfg index 89dc37dd10..28b97fd2e1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,6 +43,7 @@ test_requires = tornado boutiques pympler + boto3 packages = find: include_package_data = True @@ -84,6 +85,7 @@ dev = %(test)s black==21.4b2 pre-commit + boto3 dask = %(test)s dask