From 58120e0fb4389a892b4e27abf182644d8e150f7a Mon Sep 17 00:00:00 2001 From: ngr Date: Mon, 2 May 2022 15:09:50 -0600 Subject: [PATCH 1/4] Initialization of support for S3: - New File subtype - helpers_aws with lazy S3 client - check that S3 file exists from BaseSpec if field.type is S3File --- pydra/engine/helpers_aws.py | 25 +++++++++++++++++ pydra/engine/helpers_file.py | 6 ++++ pydra/engine/specs.py | 38 +++++++++++++++++++++++++- pydra/engine/task.py | 5 ++++ pydra/engine/tests/test_helpers_aws.py | 15 ++++++++++ setup.cfg | 2 ++ 6 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 pydra/engine/helpers_aws.py create mode 100644 pydra/engine/tests/test_helpers_aws.py diff --git a/pydra/engine/helpers_aws.py b/pydra/engine/helpers_aws.py new file mode 100644 index 0000000000..ebc1416da4 --- /dev/null +++ b/pydra/engine/helpers_aws.py @@ -0,0 +1,25 @@ +"""List of helper methods and clients.""" +# TODO It would be nice to rewrite this to some class (e.g. AwsManager). + +try: + import boto3 + from botocore.exceptions import ClientError +except ImportError: + pass + +import logging + +logger = logging.getLogger("pydra") + +s3_client: boto3.client = None + + +def get_s3_client(): + """Lazy getter for S3 client. """ + + global s3_client + + if not s3_client: + s3_client = boto3.client('s3') + + return s3_client diff --git a/pydra/engine/helpers_file.py b/pydra/engine/helpers_file.py index 1810eccd32..3640351c93 100644 --- a/pydra/engine/helpers_file.py +++ b/pydra/engine/helpers_file.py @@ -811,3 +811,9 @@ def is_existing_file(value): return Path(value).exists() except TypeError: return False + + +def is_s3_file(f): + from .specs import S3File + + return isinstance(f, S3File) diff --git a/pydra/engine/specs.py b/pydra/engine/specs.py index 063e0c197f..f560b1914c 100644 --- a/pydra/engine/specs.py +++ b/pydra/engine/specs.py @@ -25,6 +25,23 @@ class File: """An :obj:`os.pathlike` object, designating a file.""" +class S3File(File): + """Remote file in AWS S3 block storage.""" + + bucket_name: str = None + obj_key: str = None + + @classmethod + def converter(cls, value): + + # from helpers_aws import get_s3_client + # s3_client = get_s3_client() + + # Upload file + + raise NotImplementedError("S3File Converter") + + class Directory: """An :obj:`os.pathlike` object, designating a folder.""" @@ -183,7 +200,7 @@ def check_fields_input_spec(self): require_to_check[fld.name] = mdata["requires"] if ( - fld.type in [File, Directory] + fld.type in [File, Directory, S3File] or "pydra.engine.specs.File" in str(fld.type) or "pydra.engine.specs.Directory" in str(fld.type) ): @@ -196,6 +213,10 @@ def check_fields_input_spec(self): def _file_check(self, field): """checking if the file exists""" + + if field.type is S3File: + return self._s3_file_exists(field) + if isinstance(getattr(self, field.name), list): # if value is a list and type is a list of Files/Directory, checking all elements if field.type in [ty.List[File], ty.List[Directory]]: @@ -213,6 +234,21 @@ def _file_check(self, field): f"the file {file} from the {field.name} input does not exist" ) + def _s3_file_exists(self, field): + """Checks if the file exists in the bucket and is accessible.""" + + assert isinstance(field.type, S3File), f"Field {field} is not of type S3File." + from helpers_aws import get_s3_client, ClientError + + s3_client = get_s3_client() + + bucket, key = field.bucket, field.obj_key + try: + s3_client.head_object(Bucket=bucket, Key=key) + return True + except ClientError: + return False + def check_metadata(self): """Check contained metadata.""" diff --git a/pydra/engine/task.py b/pydra/engine/task.py index a62117bb21..ac9ad2491e 100644 --- a/pydra/engine/task.py +++ b/pydra/engine/task.py @@ -825,3 +825,8 @@ def container_args(self): cargs.extend(["--pwd", str(self.output_cpath)]) cargs.append(self.inputs.image) return cargs + + +class ServerlessTask(ShellCommandTask): + """Placeholder for a type of Tasks running using a FaaS (aka Serverless) platform.""" + pass \ No newline at end of file diff --git a/pydra/engine/tests/test_helpers_aws.py b/pydra/engine/tests/test_helpers_aws.py new file mode 100644 index 0000000000..5fc4a469fc --- /dev/null +++ b/pydra/engine/tests/test_helpers_aws.py @@ -0,0 +1,15 @@ +import boto3 +import botocore +import pytest + +from ..helpers_aws import ( + s3_client, + get_s3_client, +) + + +def test_get_s3_client(): + assert s3_client is None + r = get_s3_client() + + assert isinstance(r, botocore.client.BaseClient) diff --git a/setup.cfg b/setup.cfg index 89dc37dd10..28b97fd2e1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,6 +43,7 @@ test_requires = tornado boutiques pympler + boto3 packages = find: include_package_data = True @@ -84,6 +85,7 @@ dev = %(test)s black==21.4b2 pre-commit + boto3 dask = %(test)s dask From 6dd7ebd616237776eef27119e478d0697c0377fb Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 2 May 2022 21:22:22 +0000 Subject: [PATCH 2/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pydra/engine/helpers_aws.py | 4 ++-- pydra/engine/task.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pydra/engine/helpers_aws.py b/pydra/engine/helpers_aws.py index ebc1416da4..692b287823 100644 --- a/pydra/engine/helpers_aws.py +++ b/pydra/engine/helpers_aws.py @@ -15,11 +15,11 @@ def get_s3_client(): - """Lazy getter for S3 client. """ + """Lazy getter for S3 client.""" global s3_client if not s3_client: - s3_client = boto3.client('s3') + s3_client = boto3.client("s3") return s3_client diff --git a/pydra/engine/task.py b/pydra/engine/task.py index ac9ad2491e..302fc21607 100644 --- a/pydra/engine/task.py +++ b/pydra/engine/task.py @@ -829,4 +829,5 @@ def container_args(self): class ServerlessTask(ShellCommandTask): """Placeholder for a type of Tasks running using a FaaS (aka Serverless) platform.""" - pass \ No newline at end of file + + pass From 1b73bb2cbcbe1c6ae3350c995854c2f82bf3c4ba Mon Sep 17 00:00:00 2001 From: ngr Date: Mon, 2 May 2022 15:25:58 -0600 Subject: [PATCH 3/4] rerun --- pydra/engine/tests/test_helpers_aws.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pydra/engine/tests/test_helpers_aws.py b/pydra/engine/tests/test_helpers_aws.py index 5fc4a469fc..e1aaf010aa 100644 --- a/pydra/engine/tests/test_helpers_aws.py +++ b/pydra/engine/tests/test_helpers_aws.py @@ -1,6 +1,4 @@ -import boto3 import botocore -import pytest from ..helpers_aws import ( s3_client, From 1a73e6aea48ec1906a0ca96b5448c8ea97870b0b Mon Sep 17 00:00:00 2001 From: ngr Date: Mon, 2 May 2022 15:32:34 -0600 Subject: [PATCH 4/4] rerun checks --- pydra/engine/helpers_aws.py | 1 - pydra/engine/specs.py | 7 ++++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pydra/engine/helpers_aws.py b/pydra/engine/helpers_aws.py index 692b287823..2e26320b1b 100644 --- a/pydra/engine/helpers_aws.py +++ b/pydra/engine/helpers_aws.py @@ -3,7 +3,6 @@ try: import boto3 - from botocore.exceptions import ClientError except ImportError: pass diff --git a/pydra/engine/specs.py b/pydra/engine/specs.py index f560b1914c..60725d35d7 100644 --- a/pydra/engine/specs.py +++ b/pydra/engine/specs.py @@ -238,7 +238,12 @@ def _s3_file_exists(self, field): """Checks if the file exists in the bucket and is accessible.""" assert isinstance(field.type, S3File), f"Field {field} is not of type S3File." - from helpers_aws import get_s3_client, ClientError + + try: + from botocore.exceptions import ClientError + from helpers_aws import get_s3_client + except ImportError: + pass s3_client = get_s3_client()