Skip to content

Commit

Permalink
RDISCROWD-6853: Deprecate hdfs (#901)
Browse files Browse the repository at this point in the history
* handle missing swagger config

* task create api fails for hdfs

* disable hdfs endpoint

* disable hdfs tests
  • Loading branch information
dchhabda authored Mar 6, 2024
1 parent 4302b11 commit dc563e2
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 265 deletions.
4 changes: 4 additions & 0 deletions pybossa/api/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ def _update_attribute(self, new, old):
def _preprocess_post_data(self, data):
project_id = data["project_id"]
info = data["info"]
if isinstance(info, dict):
hdfs_task = any([val.startswith("/fileproxy/hdfs/") for val in info.values() if isinstance(val, str)])
if hdfs_task:
raise BadRequest("Invalid task payload. HDFS is not supported")
duplicate = task_repo.find_duplicate(project_id=project_id, info=info)
if duplicate:
message = {
Expand Down
6 changes: 5 additions & 1 deletion pybossa/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1014,8 +1014,12 @@ def setup_http_signer(app):


def setup_swagger(app):
swagger_path = app.config.get('SWAGGER_HEADER_PATH')
if swagger_path is None:
return

try:
with open(app.config.get('SWAGGER_HEADER_PATH'), 'r') as file:
with open(swagger_path, 'r') as file:
html_as_string = file.read()
app.config.get('SWAGGER_TEMPLATE')['head_text'] = html_as_string
except (FileNotFoundError, TypeError):
Expand Down
118 changes: 60 additions & 58 deletions pybossa/view/fileproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from pybossa.contributions_guard import ContributionsGuard
from pybossa.core import task_repo, signer
from pybossa.encryption import AESWithGCM
from pybossa.pybhdfs.client import HDFSKerberos
# from pybossa.pybhdfs.client import HDFSKerberos
from pybossa.sched import has_lock
from pybossa.cloud_store_api.s3 import get_content_and_key_from_s3

Expand All @@ -50,18 +50,6 @@ def decorated(*args, **kwargs):
return decorated


def is_valid_hdfs_url(attempt_path, attempt_args):
def is_valid_url(v):
if not isinstance(v, six.string_types):
return False
parsed = urlparse(v)
parsed_args = parse_qs(parsed.query)
return (parsed.path == attempt_path
and parsed_args.get('offset') == attempt_args.get('offset')
and parsed_args.get('length') == attempt_args.get('length'))
return is_valid_url


def check_allowed(user_id, task_id, project, is_valid_url):
task = task_repo.get_task(task_id)

Expand Down Expand Up @@ -197,55 +185,69 @@ def encrypt_task_response_data(task_id, project_id, data):
return content


# def is_valid_hdfs_url(attempt_path, attempt_args):
# def is_valid_url(v):
# if not isinstance(v, six.string_types):
# return False
# parsed = urlparse(v)
# parsed_args = parse_qs(parsed.query)
# return (parsed.path == attempt_path
# and parsed_args.get('offset') == attempt_args.get('offset')
# and parsed_args.get('length') == attempt_args.get('length'))
# return is_valid_url



@blueprint.route('/hdfs/<string:cluster>/<int:project_id>/<path:path>')
@no_cache
@login_required
def hdfs_file(project_id, cluster, path):
if not current_app.config.get('HDFS_CONFIG'):
raise NotFound('Not Found')
signature = request.args.get('task-signature')
if not signature:
raise Forbidden('No signature')
size_signature = len(signature)
if size_signature > TASK_SIGNATURE_MAX_SIZE:
current_app.logger.exception(
'Project id {}, cluster {} path {} invalid task signature. Signature length {} exceeds max allowed length {}.' \
.format(project_id, cluster, path, size_signature, TASK_SIGNATURE_MAX_SIZE))
raise Forbidden('Invalid signature')

project = get_project_data(project_id)
timeout = project['info'].get('timeout', ContributionsGuard.STAMP_TTL)
payload = signer.loads(signature, max_age=timeout)
task_id = payload['task_id']

try:
check_allowed(current_user.id, task_id, project,
is_valid_hdfs_url(request.path, request.args.to_dict(flat=False)))
except Exception:
current_app.logger.exception('Project id %s not allowed to get file %s %s', project_id, path,
str(request.args))
raise

current_app.logger.info("Project id %s, task id %s. Accessing hdfs cluster %s, path %s", project_id, task_id, cluster, path)
client = HDFSKerberos(**current_app.config['HDFS_CONFIG'][cluster])
offset = request.args.get('offset')
length = request.args.get('length')

try:
offset = int(offset) if offset else None
length = int(length) if length else None
content = client.get('/{}'.format(path), offset=offset, length=length)
project_encryption = get_project_encryption(project)
if project_encryption and all(project_encryption.values()):
secret = get_secret_from_vault(project_encryption)
cipher = AESWithGCM(secret)
content = cipher.decrypt(content)
except Exception:
current_app.logger.exception("Project id %s, task id %s, cluster %s, get task file %s, %s",
project_id, task_id, cluster, path, str(request.args))
raise InternalServerError('An Error Occurred')

return Response(content)
raise BadRequest("Invalid task. HDFS is not supported")
# if not current_app.config.get('HDFS_CONFIG'):
# raise NotFound('Not Found')
# signature = request.args.get('task-signature')
# if not signature:
# raise Forbidden('No signature')
# size_signature = len(signature)
# if size_signature > TASK_SIGNATURE_MAX_SIZE:
# current_app.logger.exception(
# 'Project id {}, cluster {} path {} invalid task signature. Signature length {} exceeds max allowed length {}.' \
# .format(project_id, cluster, path, size_signature, TASK_SIGNATURE_MAX_SIZE))
# raise Forbidden('Invalid signature')

# project = get_project_data(project_id)
# timeout = project['info'].get('timeout', ContributionsGuard.STAMP_TTL)
# payload = signer.loads(signature, max_age=timeout)
# task_id = payload['task_id']

# try:
# check_allowed(current_user.id, task_id, project,
# is_valid_hdfs_url(request.path, request.args.to_dict(flat=False)))
# except Exception:
# current_app.logger.exception('Project id %s not allowed to get file %s %s', project_id, path,
# str(request.args))
# raise

# current_app.logger.info("Project id %s, task id %s. Accessing hdfs cluster %s, path %s", project_id, task_id, cluster, path)
# client = HDFSKerberos(**current_app.config['HDFS_CONFIG'][cluster])
# offset = request.args.get('offset')
# length = request.args.get('length')

# try:
# offset = int(offset) if offset else None
# length = int(length) if length else None
# content = client.get('/{}'.format(path), offset=offset, length=length)
# project_encryption = get_project_encryption(project)
# if project_encryption and all(project_encryption.values()):
# secret = get_secret_from_vault(project_encryption)
# cipher = AESWithGCM(secret)
# content = cipher.decrypt(content)
# except Exception:
# current_app.logger.exception("Project id %s, task id %s, cluster %s, get task file %s, %s",
# project_id, task_id, cluster, path, str(request.args))
# raise InternalServerError('An Error Occurred')

# return Response(content)


def validate_task(project, task_id, user_id):
Expand Down
27 changes: 27 additions & 0 deletions test/test_api/test_task_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1237,3 +1237,30 @@ def upload_gold_data(self, mock, mock2):

url = tasks.upload_gold_data(task, 1, {'ans1': 'test'})
assert url == 'testURL', url


@with_context
def test_create_task_with_hdfs_payload(self):
[admin, subadminowner, subadmin, reguser] = UserFactory.create_batch(4)
make_admin(admin)
make_subadmin(subadminowner)
make_subadmin(subadmin)

project = ProjectFactory.create(owner=subadminowner)
admin_headers = dict(Authorization=admin.api_key)
task_info = dict(field_1='one', field_2='/fileproxy/hdfs/my_hdfs_file.txt')

# POST fails with error 400
data = dict(project_id=project.id, info=task_info, n_answers=2)
res = self.app.post('/api/task', data=json.dumps(data), headers=admin_headers)
assert res.status_code == 400
response = json.loads(res.data)
assert response["exception_msg"] == "Invalid task payload. HDFS is not supported"

# POST successful with hdfs not present in task payload
task_info["field_2"] = "xyz"
data = dict(project_id=project.id, info=task_info, n_answers=2)
res = self.app.post('/api/task', data=json.dumps(data), headers=admin_headers)
task = json.loads(res.data)
assert res.status_code == 200
assert task["info"] == {"field_1": "one", "field_2": "xyz"}
Loading

0 comments on commit dc563e2

Please sign in to comment.