Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pybossa/cloud_store_api/base_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def copy_key(self, bucket, source_key, target_key, **kwargs):
"%s: %s, key %s. http status %d",
self.__class__.__name__,
str(e),
err_resp.get("Key", path),
err_resp.get("Key", target_key),
http_status,
)
raise
Expand Down
158 changes: 158 additions & 0 deletions pybossa/cloud_store_api/base_s3_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
from urllib.parse import urlsplit, urlunsplit
from botocore.exceptions import ClientError
from botocore.config import Config
import boto3
from pybossa.cloud_store_api.base_conn import BaseConnection


class BaseS3Client(BaseConnection):
"""
Base class for S3 clients that provides common boto3 initialization
and request modification patterns.

This class extends BaseConnection to maintain compatibility with existing
code while providing shared functionality for S3 client implementations.
"""

def __init__(
self,
aws_access_key_id=None,
aws_secret_access_key=None,
aws_session_token=None,
profile_name=None,
endpoint_url=None,
region_name=None,
s3_ssl_no_verify=False,
host_suffix="",
**kwargs
):
self.host_suffix = host_suffix or ""
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key

# Initialize http_connection_kwargs for compatibility with legacy tests
self.http_connection_kwargs = {}
if s3_ssl_no_verify:
import ssl
self.http_connection_kwargs['context'] = ssl._create_unverified_context()

# Create boto3 session
session = (
boto3.session.Session(profile_name=profile_name)
if profile_name
else boto3.session.Session()
)

# Configure path-style addressing (emulates OrdinaryCallingFormat)
config = Config(
region_name=region_name,
s3={"addressing_style": "path"},
)

# Handle SSL verification
verify = False if s3_ssl_no_verify else None # None = default verify behavior

self.client = session.client(
"s3",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token,
endpoint_url=endpoint_url,
config=config,
verify=verify,
)

# Register hooks if needed - subclasses can override this logic
if self._should_register_hooks():
self.client.meta.events.register(
"before-sign.s3",
self._before_sign_hook,
)

def _should_register_hooks(self):
"""
Determine when hooks should be registered.
Subclasses can override this to customize hook registration logic.
"""
return bool(self.host_suffix)

def _before_sign_hook(self, request, **kwargs):
"""
Base hook that handles host_suffix path modification.
Subclasses can override or extend this method for additional functionality.
"""
if self.host_suffix:
self._apply_host_suffix(request)

def _apply_host_suffix(self, request):
"""Apply host_suffix to the request URL path."""
parts = urlsplit(request.url)
# Ensure we don't double-prefix
new_path = (self.host_suffix.rstrip("/") + "/" +
parts.path.lstrip("/")).replace("//", "/")
request.url = urlunsplit(
(parts.scheme, parts.netloc, new_path, parts.query, parts.fragment))

def get_path(self, path):
"""
Return the path with host_suffix prepended, for compatibility with legacy tests.
This emulates the behavior that was expected from the old boto2 implementation.
"""
if not self.host_suffix:
return path

# Normalize the path to ensure proper formatting
if not path.startswith('/'):
path = '/' + path

# Combine host_suffix and path, avoiding double slashes
combined = (self.host_suffix.rstrip("/") + "/" + path.lstrip("/")).replace("//", "/")

# Ensure trailing slash if the original path was just '/'
if path == '/' and not combined.endswith('/'):
combined += '/'

return combined

# Override BaseConnection's delete_key to provide tolerant delete behavior
def delete_key(self, bucket, path, **kwargs):
"""
Delete an object, treating 200 and 204 as success.
This overrides BaseConnection's delete_key to provide more tolerant behavior.
"""
try:
resp = self.client.delete_object(Bucket=bucket, Key=path, **kwargs)
status = resp.get("ResponseMetadata", {}).get("HTTPStatusCode", 0)
if status not in (200, 204):
raise ClientError(
{
"Error": {"Code": str(status), "Message": "Unexpected status"},
"ResponseMetadata": {"HTTPStatusCode": status},
},
operation_name="DeleteObject",
)
return True
except ClientError:
# Propagate any other errors
raise

# Additional convenience methods for boto3 compatibility
def get_object(self, bucket, key, **kwargs):
"""Get object using boto3 client interface."""
return self.client.get_object(Bucket=bucket, Key=key, **kwargs)

def put_object(self, bucket, key, body, **kwargs):
"""Put object using boto3 client interface."""
return self.client.put_object(Bucket=bucket, Key=key, Body=body, **kwargs)

def list_objects(self, bucket, prefix="", **kwargs):
"""List objects using boto3 client interface."""
return self.client.list_objects_v2(Bucket=bucket, Prefix=prefix, **kwargs)

def upload_file(self, filename, bucket, key, **kwargs):
"""Upload file using boto3 client interface."""
return self.client.upload_file(filename, bucket, key, ExtraArgs=kwargs or {})

def raw(self):
"""Access the underlying boto3 client for advanced operations."""
return self.client
175 changes: 25 additions & 150 deletions pybossa/cloud_store_api/connection.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
from copy import deepcopy
import ssl
import sys
import time

from flask import current_app
from boto.auth_handler import AuthHandler
import boto.auth

from boto.exception import S3ResponseError
from boto.s3.key import Key
from boto.s3.bucket import Bucket
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
from boto.provider import Provider
import jwt
from botocore.config import Config
import boto3
from werkzeug.exceptions import BadRequest
from boto3.session import Session
from botocore.client import Config
from pybossa.cloud_store_api.base_conn import BaseConnection
from os import environ

from pybossa.cloud_store_api.proxied_s3_client import ProxiedS3Client
from pybossa.cloud_store_api.s3_client_wrapper import S3ClientWrapper


def check_store(store):
if not store:
Expand Down Expand Up @@ -59,60 +50,30 @@ def create_connection(**kwargs):
cert=kwargs.get("cert", False),
proxy_url=kwargs.get("proxy_url")
)
# Map legacy boto2 parameters to boto3 parameters
if 'host' in kwargs and 'endpoint_url' not in kwargs:
host = kwargs.pop('host')
port = kwargs.pop('port', None)
if port:
kwargs['endpoint_url'] = f"https://{host}:{port}"
else:
# Default to port 443 for HTTPS to maintain compatibility with old behavior
kwargs['endpoint_url'] = f"https://{host}:443"

if 'object_service' in kwargs:
current_app.logger.info("Calling ProxiedConnection")
conn = ProxiedConnection(**kwargs)
current_app.logger.info("Calling ProxiedS3Client")
# Map auth_headers to extra_headers for ProxiedS3Client compatibility
if 'auth_headers' in kwargs:
auth_headers = kwargs.pop('auth_headers')
# Convert auth_headers list of tuples to dict for extra_headers
if auth_headers:
kwargs['extra_headers'] = dict(auth_headers)
conn = ProxiedS3Client(**kwargs)
else:
current_app.logger.info("Calling CustomConnection")
conn = CustomConnection(**kwargs)
current_app.logger.info("Calling S3ClientWrapper")
conn = S3ClientWrapper(**kwargs)
return conn


class CustomProvider(Provider):
"""Extend Provider to carry information about the end service provider, in
case the service is being proxied.
"""

def __init__(self, name, access_key=None, secret_key=None,
security_token=None, profile_name=None, object_service=None,
auth_headers=None):
self.object_service = object_service or name
self.auth_headers = auth_headers
super(CustomProvider, self).__init__(name, access_key, secret_key,
security_token, profile_name)


class CustomConnection(S3Connection):

def __init__(self, *args, **kwargs):
if not kwargs.get('calling_format'):
kwargs['calling_format'] = OrdinaryCallingFormat()

kwargs['provider'] = CustomProvider('aws',
kwargs.get('aws_access_key_id'),
kwargs.get('aws_secret_access_key'),
kwargs.get('security_token'),
kwargs.get('profile_name'),
kwargs.pop('object_service', None),
kwargs.pop('auth_headers', None))

kwargs['bucket_class'] = CustomBucket

ssl_no_verify = kwargs.pop('s3_ssl_no_verify', False)
self.host_suffix = kwargs.pop('host_suffix', '')

super(CustomConnection, self).__init__(*args, **kwargs)

if kwargs.get('is_secure', True) and ssl_no_verify:
self.https_validate_certificates = False
context = ssl._create_unverified_context()
self.http_connection_kwargs['context'] = context

def get_path(self, path='/', *args, **kwargs):
ret = super(CustomConnection, self).get_path(path, *args, **kwargs)
return self.host_suffix + ret


class CustomConnectionV2(BaseConnection):
def __init__(
self,
Expand All @@ -133,89 +94,3 @@ def __init__(
proxies={"https": proxy_url, "http": proxy_url},
),
)


class CustomBucket(Bucket):
"""Handle both 200 and 204 as response code"""

def delete_key(self, *args, **kwargs):
try:
super(CustomBucket, self).delete_key(*args, **kwargs)
except S3ResponseError as e:
if e.status != 200:
raise


class ProxiedKey(Key):

def should_retry(self, response, chunked_transfer=False):
if 200 <= response.status <= 299:
return True
return super(ProxiedKey, self).should_retry(response, chunked_transfer)


class ProxiedBucket(CustomBucket):

def __init__(self, *args, **kwargs):
super(ProxiedBucket, self).__init__(*args, **kwargs)
self.set_key_class(ProxiedKey)


class ProxiedConnection(CustomConnection):
"""Object Store connection through proxy API. Sets the proper headers and
creates the jwt; use the appropriate Bucket and Key classes.
"""

def __init__(self, client_id, client_secret, object_service, *args, **kwargs):
self.client_id = client_id
self.client_secret = client_secret
kwargs['object_service'] = object_service
super(ProxiedConnection, self).__init__(*args, **kwargs)
self.set_bucket_class(ProxiedBucket)

def make_request(self, method, bucket='', key='', headers=None, data='',
query_args=None, sender=None, override_num_retries=None,
retry_handler=None):
headers = headers or {}
headers['jwt'] = self.create_jwt(method, self.host, bucket, key)
headers['x-objectservice-id'] = self.provider.object_service.upper()
current_app.logger.info("Calling ProxiedConnection.make_request. headers %s", str(headers))
return super(ProxiedConnection, self).make_request(method, bucket, key,
headers, data, query_args, sender, override_num_retries,
retry_handler)

def create_jwt(self, method, host, bucket, key):
now = int(time.time())
path = self.get_path(self.calling_format.build_path_base(bucket, key))
current_app.logger.info("create_jwt called. method %s, host %s, bucket %s, key %s, path %s", method, host, str(bucket), str(key), str(path))
payload = {
'iat': now,
'nbf': now,
'exp': now + 300,
'method': method,
'iss': self.client_id,
'host': host,
'path': path,
'region': 'ny'
}
return jwt.encode(payload, self.client_secret, algorithm='HS256')


class CustomAuthHandler(AuthHandler):
"""Implements sending of custom auth headers"""

capability = ['s3']

def __init__(self, host, config, provider):
if not provider.auth_headers:
raise boto.auth_handler.NotReadyToAuthenticate()
self._provider = provider
super(CustomAuthHandler, self).__init__(host, config, provider)

def add_auth(self, http_request, **kwargs):
headers = http_request.headers
for header, attr in self._provider.auth_headers:
headers[header] = getattr(self._provider, attr)

def sign_string(self, *args, **kwargs):
return ''
Loading
Loading