From 68e5b8611c8162e8fde62142369b5e985feaba77 Mon Sep 17 00:00:00 2001 From: BorisYourich Date: Mon, 5 May 2025 10:49:27 +0000 Subject: [PATCH 01/22] Added condition in docker.py for stage in and out. Added additional logic/rewrote code for those conditions in event_actions.py: initialization, conditional command building, command joining, empty command check, singularity placeholders, error handling/logging --- tesp_api/service/event_actions.py | 137 ++++++++++++++++++++---------- tesp_api/utils/docker.py | 7 +- 2 files changed, 98 insertions(+), 46 deletions(-) diff --git a/tesp_api/service/event_actions.py b/tesp_api/service/event_actions.py index 09c837e..4aa4066 100644 --- a/tesp_api/service/event_actions.py +++ b/tesp_api/service/event_actions.py @@ -156,12 +156,16 @@ async def handle_run_task(event: Event) -> None: start_time=Just(datetime.datetime.now(datetime.timezone.utc)) ) - # prepare docker commands + # prepare container commands, initialize to None container_cmds = list() - # stage-in + stage_in_command = None + stage_out_command = None + + # stage-in preparation print("Payload:") print(payload) stage_in_mount = payload['task_config']['inputs_directory'] + # Define a generic executor for staging operations stage_exec = TesTaskExecutor(image="willdockerhub/curl-wget:latest", command=[], workdir=Path("/downloads")) @@ -169,81 +173,124 @@ async def handle_run_task(event: Event) -> None: if CONTAINER_TYPE == "docker": stage_in_command = docker_stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs) elif CONTAINER_TYPE == "singularity": - stage_exec.image = "docker://" + stage_exec.image - stage_in_command = singularity_stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs) + # Adapt image name if needed for singularity + singularity_stage_exec = TesTaskExecutor(image="docker://" + stage_exec.image, command=[], workdir=stage_exec.workdir) + stage_in_command = singularity_stage_in_command(singularity_stage_exec, resource_conf, stage_in_mount, input_confs) + # Note: Assumes singularity_stage_in_command was also modified to return None if input_confs is empty. If not, this part needs adjustment. - # container_cmds.append(stage_in_command) + # Main execution commands for i, executor in enumerate(task.executors): + run_command = "" + script_content = "" if CONTAINER_TYPE == "docker": - run_command, script_content = docker_run_command(executor, task_id, resource_conf, volume_confs, + run_command, script_content = docker_run_command(executor, str(task_id), resource_conf, volume_confs, input_confs, output_confs, stage_in_mount, i) elif CONTAINER_TYPE == "singularity": mount_job_dir = payload['task_config']['job_directory'] - run_command, script_content = singularity_run_command(executor, task_id, resource_conf, volume_confs, + run_command, script_content = singularity_run_command(executor, str(task_id), resource_conf, volume_confs, input_confs, output_confs, stage_in_mount, mount_job_dir, i) + if run_command and script_content: # Ensure command generation was successful + await pulsar_operations.upload( + payload['task_id'], DataType.INPUT, + file_content=Just(script_content), + file_path=f'run_script_{i}.sh') + container_cmds.append(run_command) - await pulsar_operations.upload( - payload['task_id'], DataType.INPUT, - file_content=Just(script_content), - file_path=f'run_script_{i}.sh') - container_cmds.append(run_command) - + # Stage-out preparation if CONTAINER_TYPE == "docker": - stage_out_command = docker_stage_out_command(stage_exec, resource_conf, output_confs, volume_confs) + stage_out_command = docker_stage_out_command(stage_exec, resource_conf, output_confs, volume_confs) elif CONTAINER_TYPE == "singularity": mount_job_dir = payload['task_config']['job_directory'] - bind_mount = payload['task_config']['inputs_directory'] - stage_out_command = singularity_stage_out_command(stage_exec, resource_conf, bind_mount, + bind_mount = payload['task_config']['inputs_directory'] # This might need adjustment based on actual stage-out needs + # Adapt image name if needed for singularity + singularity_stage_exec = TesTaskExecutor(image="docker://" + stage_exec.image, command=[], workdir=stage_exec.workdir) + stage_out_command = singularity_stage_out_command(singularity_stage_exec, resource_conf, bind_mount, output_confs, volume_confs, mount_job_dir) + # Note: Assumes singularity_stage_out_command was also modified to return None if output_confs is empty. If not, this part needs adjustment. - # Join all commands with " && " - run_commands = " && ".join(container_cmds) - run_command = (f"""set -xe && {stage_in_command} && {run_commands} && {stage_out_command}""") + # Build the final command string conditionally + final_command_parts = ['set -xe'] # Start with execution options - command_start_time = datetime.datetime.now(datetime.timezone.utc) + if stage_in_command: + final_command_parts.append(stage_in_command) - # start the task (docker container/s) in the pulsar - await pulsar_operations.run_job(task_id, run_command) + if container_cmds: # Only add run commands if there are any + final_command_parts.append(" && ".join(container_cmds)) - # wait for the task - command_status = await pulsar_operations.job_status_complete(str(task_id)) + if stage_out_command: + final_command_parts.append(stage_out_command) - command_end_time = datetime.datetime.now(datetime.timezone.utc) - await append_task_executor_logs( - task_id, - author, - TesTaskState.RUNNING, - command_start_time, - command_end_time, - command_status['stdout'], - command_status['stderr'], - command_status['returncode'] - ) - if command_status['returncode'] != 0: - task = await task_repository.update_task_state( + # Join the parts that exist with ' && ' + # Filter out potential empty strings just in case, though the logic above should prevent them + final_run_command = " && ".join(filter(None, final_command_parts)) + + + # Ensure there's actually something to run + if len(final_command_parts) <= 1 : # Only 'set -xe' is present + print(f"Task {task_id}: No commands to execute (no stage-in, no main executors, no stage-out). Skipping Pulsar job run.") + # Decide how to proceed - mark as complete? Error? For now, let's proceed to completion logic. + # We might skip pulsar interaction entirely here. + + else: + print(f"Task {task_id}: Final command to run in Pulsar:") + print(final_run_command) + command_start_time = datetime.datetime.now(datetime.timezone.utc) + + # start the task (docker container/s) in the pulsar + await pulsar_operations.run_job(task_id, final_run_command) + + # wait for the task + command_status = await pulsar_operations.job_status_complete(str(task_id)) + + command_end_time = datetime.datetime.now(datetime.timezone.utc) + await append_task_executor_logs( task_id, + author, TesTaskState.RUNNING, - TesTaskState.EXECUTOR_ERROR + command_start_time, + command_end_time, + command_status['stdout'], + command_status['stderr'], + command_status['returncode'] ) + if command_status['returncode'] != 0: + await task_repository.update_task_state( # Await the update + task_id, + TesTaskState.RUNNING, + TesTaskState.EXECUTOR_ERROR + ) + # Log the error details before raising + print(f"Task {task_id} executor error. Pulsar job return code: {command_status['returncode']}") + print(f"Stdout:\n{command_status['stdout']}") + print(f"Stderr:\n{command_status['stderr']}") + raise TaskExecutorError(f"Pulsar job failed with return code {command_status['returncode']}") - raise TaskExecutorError() except Exception as error: - pulsar_event_handle_error(error, task_id, event_name, pulsar_operations) - - # dispatch_event('finalize_task', payload) + print(f"Error during task {task_id} execution in event {event_name}: {type(error).__name__} - {error}") + import traceback + traceback.print_exc() + # Call the existing error handler + await pulsar_event_handle_error(error, task_id, event_name, pulsar_operations) + # Rethrow or handle cleanup if pulsar_event_handle_error doesn't terminate execution flow + return # Stop further processing in this handler after error + + # Finalize task - This part runs only if no exceptions occurred or were caught and handled above + # If an error occurred and was handled by pulsar_event_handle_error which updated state, + # this final update to COMPLETE might be incorrect. Consider state check if needed. + print(f"Task {task_id}: Execution seemingly successful, proceeding to finalize.") await Promise(lambda resolve, reject: resolve(None)) \ .then(lambda ignored: task_repository.update_task_state( task_id, - TesTaskState.RUNNING, + TesTaskState.RUNNING, # Original state before completion TesTaskState.COMPLETE )) \ .map(lambda task: get_else_throw( - task, TaskNotFoundError(task_id, Just(TesTaskState.RUNNING)) + task, TaskNotFoundError(task_id, Just(TesTaskState.RUNNING)) # Expect RUNNING if successful )) \ .then(lambda ignored: pulsar_operations.erase_job(task_id)) \ - .catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)) \ + .catch(lambda error: pulsar_event_handle_error(error, task_id, "finalize_task", pulsar_operations)) \ .then(lambda x: x) # invokes promise returned by error handler, otherwise acts as identity function diff --git a/tesp_api/utils/docker.py b/tesp_api/utils/docker.py index 1e9e942..ed6a044 100644 --- a/tesp_api/utils/docker.py +++ b/tesp_api/utils/docker.py @@ -142,6 +142,9 @@ def docker_run_command(executor: TesTaskExecutor, job_id: str, resource_conf: di def docker_stage_in_command(executor: TesTaskExecutor, resource_conf: dict, bind_mount: str, input_confs: List[dict]) -> str: + if not input_confs: + return None + command_builder = DockerRunCommandBuilder() \ .with_image(executor.image) \ .with_workdir(executor.workdir) \ @@ -165,6 +168,9 @@ def docker_stage_in_command(executor: TesTaskExecutor, resource_conf: dict, def docker_stage_out_command(executor: TesTaskExecutor, resource_conf: dict, output_confs: List[dict], volume_confs: List[dict]) -> str: + if not output_confs: + return None + command_builder = DockerRunCommandBuilder() \ .with_image(executor.image) \ .with_workdir(executor.workdir) \ @@ -220,4 +226,3 @@ def map_volumes(job_id: str, volumes: List[str], outputs: List[TesTaskOutput]): }) return output_confs, volume_confs - From ad62fea8ea5df27a15b3ac1253c65b8425531848 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Valkovsk=C3=BD?= Date: Mon, 12 May 2025 14:42:06 +0000 Subject: [PATCH 02/22] commit for issue #36 --- docker-compose.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docker-compose.yaml b/docker-compose.yaml index 4abe304..bb67730 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -39,6 +39,8 @@ services: dockerfile: Dockerfile target: development image: pulsar_rest + profiles: + - pulsar container_name: pulsar-rest privileged: true expose: From 7383e66d7b18854a8e5e984c1ee3f73aea288f37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Valkovsk=C3=BD?= Date: Mon, 12 May 2025 17:45:10 +0200 Subject: [PATCH 03/22] Update README.md for issue #36 --- README.md | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index c471e59..2302a8b 100644 --- a/README.md +++ b/README.md @@ -17,9 +17,19 @@ standard and distributing `TES` tasks execution to `Pulsar` applications. ### Deploy The most straightforward way to deploy the TESP is to use Docker Compose. +All services (default): ``` -docker compose up -d --build +docker compose --profile "*" up -d ``` + +Without pulsar_rest service: +``` +docker compose up -d +``` +
+
+
+ Depending on you Docker and Docker Compose installation, you may need to use `docker-compose` (with hyphen) instead. You might encounter a timeout error in container runtime which can be solved by correct `mtu` configuration either in the `docker-compose.yaml`: From 9ac548fe229bf2dfc8a48e4866e71cadfa96aef1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Valkovsk=C3=BD?= Date: Tue, 13 May 2025 10:08:24 +0000 Subject: [PATCH 04/22] update for issue #36, docker profiles function as desired --- docker-compose.yaml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index bb67730..5cbe79f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -5,6 +5,7 @@ services: dockerfile: docker/tesp_api/Dockerfile target: development image: tesp-api + profiles: ["api", "all"] environment: - CONTAINER_TYPE=docker # Set to "docker", "singularity", or "both" container_name: tesp-api @@ -21,6 +22,7 @@ services: tesp-db: image: mongo:latest + profiles: ["api", "all"] container_name: tesp-db volumes: - ./docker/mongodb/data:/data/db @@ -39,8 +41,7 @@ services: dockerfile: Dockerfile target: development image: pulsar_rest - profiles: - - pulsar + profiles: ["pulsar", "all"] container_name: pulsar-rest privileged: true expose: From 4d43342e14cf3cb64a7e91aa3073f419adc1e609 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Valkovsk=C3=BD?= Date: Tue, 13 May 2025 12:26:20 +0200 Subject: [PATCH 05/22] Update README.md issue #36 --- README.md | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 2302a8b..cc42f16 100644 --- a/README.md +++ b/README.md @@ -17,14 +17,20 @@ standard and distributing `TES` tasks execution to `Pulsar` applications. ### Deploy The most straightforward way to deploy the TESP is to use Docker Compose. -All services (default): + +#### All services (default): +``` +docker compose --profile all up -d +``` + +#### Without pulsar_rest service: ``` -docker compose --profile "*" up -d +docker compose --profile api up -d ``` -Without pulsar_rest service: +#### Only pulsar_rest service: ``` -docker compose up -d +docker compose --profile pulsar up -d ```

From ce1fca3a6fae451b93603a5c538b59b6ff76fda5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Valkovsk=C3=BD?= Date: Wed, 14 May 2025 14:16:48 +0000 Subject: [PATCH 06/22] fixed stuck loop caused by cancelation/deletion of a job with galaxy client, CESNET/usegalaxy#153 --- tesp_api/api/endpoints/task_endpoints.py | 19 +- tesp_api/service/event_actions.py | 288 ++++++++++++----------- 2 files changed, 171 insertions(+), 136 deletions(-) diff --git a/tesp_api/api/endpoints/task_endpoints.py b/tesp_api/api/endpoints/task_endpoints.py index 1f2655f..2db4138 100644 --- a/tesp_api/api/endpoints/task_endpoints.py +++ b/tesp_api/api/endpoints/task_endpoints.py @@ -5,7 +5,8 @@ from pymonad.promise import Promise from fastapi.params import Depends from fastapi import APIRouter, Body -from fastapi.responses import Response +# MODIFIED: Import JSONResponse +from fastapi.responses import Response, JSONResponse from tesp_api.api.error import api_handle_error from tesp_api.service.event_dispatcher import dispatch_event @@ -101,17 +102,25 @@ async def get_tasks( @router.post("/tasks/{id}:cancel", - responses={200: {"description": "Ok"}}, + responses={200: {"description": "Ok"}}, # Consider adding a response_model if GA4GH TES spec defines one for cancel description=descriptions["tasks-delete"],) async def cancel_task( id: str, token_subject: str = Depends(parse_verify_token), - ) -> Response: + ) -> Response: # Return type is still Response, JSONResponse is a subclass return await Promise(lambda resolve, reject: resolve(( maybe_of(token_subject), ObjectId(id) ))).then(lambda get_tasks_args: task_repository.cancel_task(*get_tasks_args))\ - .map(lambda task_id: Response(status_code=200, media_type="application/json"))\ + .map(lambda task_id_maybe: + # MODIFIED: Use JSONResponse to ensure the body is "{}" + # task_id_maybe here is likely a Maybe[ObjectId] or similar from cancel_task + # We just need to return an empty JSON object on success. + # If cancel_task itself can fail and we want to return a different status, + # that logic would need to be built into how cancel_task's result is handled. + # Assuming cancel_task raises an exception handled by .catch(api_handle_error) on failure. + JSONResponse(content={}, status_code=200) + )\ .catch(api_handle_error) @@ -119,7 +128,7 @@ async def cancel_task( responses={200: {"description": "Ok"}}, description=descriptions["service-info"], response_model=TesServiceInfo) -async def get_service_info() -> TesServiceInfo: +async def get_service_info() -> TesServiceInfo: # FastAPI directly handles Pydantic model return return TesServiceInfo( id="fi.muni.cz.tesp", name="TESP", diff --git a/tesp_api/service/event_actions.py b/tesp_api/service/event_actions.py index 4aa4066..dfc65e9 100644 --- a/tesp_api/service/event_actions.py +++ b/tesp_api/service/event_actions.py @@ -2,10 +2,11 @@ import datetime from typing import List from pathlib import Path +import asyncio -from pymonad.maybe import Just +from pymonad.maybe import Just, Nothing from bson.objectid import ObjectId -from pymonad.promise import Promise +from pymonad.promise import Promise, _Promise from tesp_api.utils.docker import ( docker_run_command, @@ -20,12 +21,13 @@ ) from tesp_api.service.pulsar_service import pulsar_service from tesp_api.service.event_dispatcher import dispatch_event -from tesp_api.utils.functional import get_else_throw, maybe_of +from tesp_api.utils.functional import get_else_throw, maybe_of from tesp_api.service.event_handler import Event, local_handler from tesp_api.repository.task_repository import task_repository from tesp_api.service.file_transfer_service import file_transfer_service from tesp_api.service.error import pulsar_event_handle_error, TaskNotFoundError, TaskExecutorError from tesp_api.service.pulsar_operations import PulsarRestOperations, PulsarAmpqOperations, DataType +from tesp_api.repository.task_repository_utils import append_task_executor_logs, update_last_task_log_time from tesp_api.repository.model.task import ( TesTaskState, TesTaskExecutor, @@ -33,10 +35,7 @@ TesTaskInput, TesTaskOutput ) -from tesp_api.repository.task_repository_utils import append_task_executor_logs, update_last_task_log_time - -CONTAINER_TYPE = os.getenv("CONTAINER_TYPE", "both") - +CONTAINER_TYPE = os.getenv("CONTAINER_TYPE", "both") # change both to docker only? @local_handler.register(event_name="queued_task") def handle_queued_task(event: Event) -> None: @@ -49,7 +48,6 @@ def handle_queued_task(event: Event) -> None: case PulsarAmpqOperations() as pulsar_ampq_operations: dispatch_event('queued_task_ampq', {**payload, 'pulsar_operations': pulsar_ampq_operations}) - @local_handler.register(event_name="queued_task_rest") async def handle_queued_task_rest(event: Event): event_name, payload = event @@ -63,7 +61,7 @@ async def handle_queued_task_rest(event: Event): .then(lambda nothing: pulsar_operations.setup_job(task_id))\ .map(lambda setup_job_result: dispatch_event('initialize_task', {**payload, 'task_config': setup_job_result}))\ .catch(lambda error: pulsar_event_handle_error(error, payload['task_id'], event_name, pulsar_operations))\ - .then(lambda x: x) # invokes promise returned by error handler, otherwise acts as identity function + .then(lambda x: x) @local_handler.register(event_name="initialize_task") @@ -89,7 +87,10 @@ async def setup_data(job_id: ObjectId, print("Volumes:") print(volumes) - output_confs, volume_confs = map_volumes(str(job_id), volumes, outputs) + output_confs_mapped, volume_confs_mapped = map_volumes(str(job_id), volumes, outputs) + output_confs.extend(output_confs_mapped) + volume_confs.extend(volume_confs_mapped) + print(inputs) @@ -97,7 +98,6 @@ async def setup_data(job_id: ObjectId, content = inputs[i].content pulsar_path = payload['task_config']['inputs_directory'] + f'/input_file_{i}' if content is not None and inputs[i].url is None: - #content = await file_transfer_service.download_file(inputs[i].url) pulsar_path = await pulsar_operations.upload( job_id, DataType.INPUT, file_content=Just(content), @@ -115,7 +115,7 @@ async def setup_data(job_id: ObjectId, updated_task, TaskNotFoundError(task_id, Just(TesTaskState.QUEUED)) )).then(lambda updated_task: setup_data( task_id, - maybe_of(updated_task.resources).maybe([], lambda x: x), + maybe_of(updated_task.resources).maybe(None, lambda x: x), maybe_of(updated_task.volumes).maybe([], lambda x: x), maybe_of(updated_task.inputs).maybe([], lambda x: x), maybe_of(updated_task.outputs).maybe([], lambda x: x) @@ -126,7 +126,7 @@ async def setup_data(job_id: ObjectId, 'input_confs': res_input_output_confs[2], 'output_confs': res_input_output_confs[3] })).catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations))\ - .then(lambda x: x) # invokes promise returned by error handler, otherwise acts as identity function + .then(lambda x: x) @local_handler.register(event_name="run_task") @@ -135,19 +135,35 @@ async def handle_run_task(event: Event) -> None: task_id: ObjectId = payload['task_id'] author: str = payload['author'] resource_conf: dict = payload['resource_conf'] - volume_confs: List[Path] = payload['volume_confs'] + volume_confs: List[dict] = payload['volume_confs'] input_confs: List[dict] = payload['input_confs'] output_confs: List[dict] = payload['output_confs'] pulsar_operations: PulsarRestOperations = payload['pulsar_operations'] + + run_command_str = None + command_start_time = datetime.datetime.now(datetime.timezone.utc) + # Flag to indicate if the main job logic completed without an exception during pulsar interaction + job_logic_completed_normally = False + - # init task - task_monad = await task_repository.update_task_state( - task_id, - TesTaskState.INITIALIZING, - TesTaskState.RUNNING - ) try: - task = get_else_throw(task_monad, TaskNotFoundError(task_id, Just(TesTaskState.INITIALIZING))) + # Initial state update to RUNNING + task_monad_init = await task_repository.update_task_state( + task_id, + TesTaskState.INITIALIZING, + TesTaskState.RUNNING + ) + task = get_else_throw(task_monad_init, TaskNotFoundError(task_id, Just(TesTaskState.INITIALIZING))) + + # Check if task was cancelled between INITIALIZING and this point (quick cancel) + # This check is early, before significant Pulsar interaction. + current_task_after_init_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id}) + current_task_after_init = get_else_throw(current_task_after_init_monad, TaskNotFoundError(task_id)) + if current_task_after_init.state == TesTaskState.CANCELED: + print(f"Task {task_id} found CANCELED shortly after being set to RUNNING. Aborting handle_run_task early.") + # The API cancel path should handle Pulsar cleanup. + return + await update_last_task_log_time( task_id, @@ -156,141 +172,151 @@ async def handle_run_task(event: Event) -> None: start_time=Just(datetime.datetime.now(datetime.timezone.utc)) ) - # prepare container commands, initialize to None container_cmds = list() - stage_in_command = None - stage_out_command = None - - # stage-in preparation - print("Payload:") - print(payload) stage_in_mount = payload['task_config']['inputs_directory'] - # Define a generic executor for staging operations stage_exec = TesTaskExecutor(image="willdockerhub/curl-wget:latest", - command=[], - workdir=Path("/downloads")) - + command=[], + workdir=Path("/downloads")) + + stage_in_command_str_val = None if CONTAINER_TYPE == "docker": - stage_in_command = docker_stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs) + stage_in_command_str_val = docker_stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs) elif CONTAINER_TYPE == "singularity": - # Adapt image name if needed for singularity - singularity_stage_exec = TesTaskExecutor(image="docker://" + stage_exec.image, command=[], workdir=stage_exec.workdir) - stage_in_command = singularity_stage_in_command(singularity_stage_exec, resource_conf, stage_in_mount, input_confs) - # Note: Assumes singularity_stage_in_command was also modified to return None if input_confs is empty. If not, this part needs adjustment. + stage_exec.image = "docker://" + stage_exec.image + stage_in_command_str_val = singularity_stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs) - - # Main execution commands for i, executor in enumerate(task.executors): - run_command = "" - script_content = "" + run_script_cmd_str, script_content = "", "" if CONTAINER_TYPE == "docker": - run_command, script_content = docker_run_command(executor, str(task_id), resource_conf, volume_confs, + run_script_cmd_str, script_content = docker_run_command(executor, task_id, resource_conf, volume_confs, input_confs, output_confs, stage_in_mount, i) elif CONTAINER_TYPE == "singularity": mount_job_dir = payload['task_config']['job_directory'] - run_command, script_content = singularity_run_command(executor, str(task_id), resource_conf, volume_confs, + run_script_cmd_str, script_content = singularity_run_command(executor, task_id, resource_conf, volume_confs, input_confs, output_confs, stage_in_mount, mount_job_dir, i) - if run_command and script_content: # Ensure command generation was successful - await pulsar_operations.upload( - payload['task_id'], DataType.INPUT, - file_content=Just(script_content), - file_path=f'run_script_{i}.sh') - container_cmds.append(run_command) - - # Stage-out preparation + await pulsar_operations.upload( + payload['task_id'], DataType.INPUT, + file_content=Just(script_content), + file_path=f'run_script_{i}.sh') + container_cmds.append(run_script_cmd_str) + + stage_out_command_str_val = None if CONTAINER_TYPE == "docker": - stage_out_command = docker_stage_out_command(stage_exec, resource_conf, output_confs, volume_confs) + stage_out_command_str_val = docker_stage_out_command(stage_exec, resource_conf, output_confs, volume_confs) elif CONTAINER_TYPE == "singularity": mount_job_dir = payload['task_config']['job_directory'] - bind_mount = payload['task_config']['inputs_directory'] # This might need adjustment based on actual stage-out needs - # Adapt image name if needed for singularity - singularity_stage_exec = TesTaskExecutor(image="docker://" + stage_exec.image, command=[], workdir=stage_exec.workdir) - stage_out_command = singularity_stage_out_command(singularity_stage_exec, resource_conf, bind_mount, + bind_mount = payload['task_config']['inputs_directory'] + stage_out_command_str_val = singularity_stage_out_command(stage_exec, resource_conf, bind_mount, output_confs, volume_confs, mount_job_dir) - # Note: Assumes singularity_stage_out_command was also modified to return None if output_confs is empty. If not, this part needs adjustment. - - - # Build the final command string conditionally - final_command_parts = ['set -xe'] # Start with execution options - if stage_in_command: - final_command_parts.append(stage_in_command) - - if container_cmds: # Only add run commands if there are any - final_command_parts.append(" && ".join(container_cmds)) + executors_commands_joined_str = " && ".join(filter(None, container_cmds)) + + command_list_for_join = [cmd for cmd in [stage_in_command_str_val, executors_commands_joined_str, stage_out_command_str_val] if cmd and cmd.strip()] + if command_list_for_join: + run_command_str = f"set -xe && {' && '.join(command_list_for_join)}" + else: + run_command_str = None - if stage_out_command: - final_command_parts.append(stage_out_command) + command_start_time = datetime.datetime.now(datetime.timezone.utc) - # Join the parts that exist with ' && ' - # Filter out potential empty strings just in case, though the logic above should prevent them - final_run_command = " && ".join(filter(None, final_command_parts)) + command_status: dict + if run_command_str is None: + print(f"Task {task_id} has no commands to run. Treating as successful no-op.") + command_status = {'stdout': '', 'stderr': 'No commands to run.', 'returncode': 0} + else: + print(f"Running command for task {task_id}: {run_command_str}") + await pulsar_operations.run_job(task_id, run_command_str) + # This is the polling call + command_status = await pulsar_operations.job_status_complete(str(task_id)) + # If we reach here, job_status_complete returned without an exception during its polling. + job_logic_completed_normally = True - # Ensure there's actually something to run - if len(final_command_parts) <= 1 : # Only 'set -xe' is present - print(f"Task {task_id}: No commands to execute (no stage-in, no main executors, no stage-out). Skipping Pulsar job run.") - # Decide how to proceed - mark as complete? Error? For now, let's proceed to completion logic. - # We might skip pulsar interaction entirely here. + command_end_time = datetime.datetime.now(datetime.timezone.utc) + await append_task_executor_logs( + task_id, + author, + TesTaskState.RUNNING, # Log is associated with this phase + command_start_time, + command_end_time, + command_status.get('stdout', ''), + command_status.get('stderr', ''), + command_status.get('returncode', -1) + ) - else: - print(f"Task {task_id}: Final command to run in Pulsar:") - print(final_run_command) - command_start_time = datetime.datetime.now(datetime.timezone.utc) + # Check current task state from DB. It might have been changed to CANCELED + # by an external request while job_status_complete was polling or after it returned. + current_task_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id}) + current_task_obj = get_else_throw(current_task_monad, TaskNotFoundError(task_id)) - # start the task (docker container/s) in the pulsar - await pulsar_operations.run_job(task_id, final_run_command) + if current_task_obj.state == TesTaskState.CANCELED: + print(f"Task {task_id} is in CANCELED state after job execution/polling. Aborting further state changes by handle_run_task.") + # The API cancel path should have handled Pulsar erase. + return - # wait for the task - command_status = await pulsar_operations.job_status_complete(str(task_id)) - - command_end_time = datetime.datetime.now(datetime.timezone.utc) - await append_task_executor_logs( + if command_status.get('returncode', -1) != 0: + # Job failed, and it wasn't due to user cancellation (CANCELED state check passed) + await task_repository.update_task_state( task_id, - author, - TesTaskState.RUNNING, - command_start_time, - command_end_time, - command_status['stdout'], - command_status['stderr'], - command_status['returncode'] + TesTaskState.RUNNING, # Expected from state + TesTaskState.EXECUTOR_ERROR ) - if command_status['returncode'] != 0: - await task_repository.update_task_state( # Await the update - task_id, - TesTaskState.RUNNING, - TesTaskState.EXECUTOR_ERROR - ) - # Log the error details before raising - print(f"Task {task_id} executor error. Pulsar job return code: {command_status['returncode']}") - print(f"Stdout:\n{command_status['stdout']}") - print(f"Stderr:\n{command_status['stderr']}") - raise TaskExecutorError(f"Pulsar job failed with return code {command_status['returncode']}") - - - except Exception as error: - print(f"Error during task {task_id} execution in event {event_name}: {type(error).__name__} - {error}") - import traceback - traceback.print_exc() - # Call the existing error handler - await pulsar_event_handle_error(error, task_id, event_name, pulsar_operations) - # Rethrow or handle cleanup if pulsar_event_handle_error doesn't terminate execution flow - return # Stop further processing in this handler after error - - # Finalize task - This part runs only if no exceptions occurred or were caught and handled above - # If an error occurred and was handled by pulsar_event_handle_error which updated state, - # this final update to COMPLETE might be incorrect. Consider state check if needed. - print(f"Task {task_id}: Execution seemingly successful, proceeding to finalize.") - await Promise(lambda resolve, reject: resolve(None)) \ - .then(lambda ignored: task_repository.update_task_state( - task_id, - TesTaskState.RUNNING, # Original state before completion - TesTaskState.COMPLETE - )) \ - .map(lambda task: get_else_throw( - task, TaskNotFoundError(task_id, Just(TesTaskState.RUNNING)) # Expect RUNNING if successful + # No need to raise TaskExecutorError here if we want the promise chain below to handle cleanup + # pulsar_event_handle_error will be called by the .catch of the promise chain if this update fails + # or if we want to centralize error handling. + # For now, let the promise chain handle it for consistency. + # However, to ensure Pulsar job erasure on executor error: + print(f"Task {task_id} executor returned non-zero: {command_status.get('returncode', -1)}. Setting state to EXECUTOR_ERROR.") + await pulsar_operations.erase_job(task_id) # Erase on executor error + return # Stop further processing + + # If job completed successfully and was not cancelled: + # Proceed to set state to COMPLETE and cleanup. + await Promise(lambda resolve, reject: resolve(None)) \ + .then(lambda ignored: task_repository.update_task_state( + task_id, + TesTaskState.RUNNING, + TesTaskState.COMPLETE )) \ - .then(lambda ignored: pulsar_operations.erase_job(task_id)) \ - .catch(lambda error: pulsar_event_handle_error(error, task_id, "finalize_task", pulsar_operations)) \ - .then(lambda x: x) # invokes promise returned by error handler, otherwise acts as identity function + .map(lambda task_after_complete_update: get_else_throw( + task_after_complete_update, TaskNotFoundError(task_id, Just(TesTaskState.RUNNING)) + )) \ + .then(lambda ignored: pulsar_operations.erase_job(task_id)) \ + .catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)) \ + .then(lambda x: x) + + except asyncio.CancelledError: + print(f"handle_run_task for task {task_id} was cancelled via asyncio.CancelledError.") + await task_repository.update_task_state(task_id, None, TesTaskState.CANCELED) + # Ensure Pulsar job is cleaned up if this task is cancelled mid-flight + await pulsar_operations.kill_job(task_id) + await pulsar_operations.erase_job(task_id) + print(f"Task {task_id} cleaned up after asyncio cancellation.") + # Not re-raising, as this handler is managing the lifecycle. + except Exception as error: + # This block is now primarily for errors from job_status_complete or other unexpected issues + print(f"Exception caught in handle_run_task for task {task_id}: {type(error).__name__} - {error}") + + # Check if the error is due to a prior cancellation + # (e.g., LookupError because Pulsar job was erased by the cancel API) + task_state_after_error_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id}) + if task_state_after_error_monad.is_just() and task_state_after_error_monad.value.state == TesTaskState.CANCELED: + print(f"Task {task_id} is already CANCELED. Exception '{type(error).__name__}' likely due to cancellation. No further error processing by handle_run_task.") + # The cancel_task API path should have handled Pulsar job erasure. + # No need to call pulsar_event_handle_error or erase_job again here. + return # Exit gracefully + + # If not due to a prior cancellation, then it's an actual error during run. + print(f"Task {task_id} not CANCELED, proceeding with generic error handling for '{type(error).__name__}'.") + error_handler_result = pulsar_event_handle_error(error, task_id, event_name, pulsar_operations) + if asyncio.iscoroutine(error_handler_result) or isinstance(error_handler_result, _Promise): + await error_handler_result + + # Fallback: Ensure Pulsar job is erased if pulsar_event_handle_error didn't. + # This is a safeguard, as pulsar_event_handle_error should ideally manage this. + try: + print(f"Ensuring Pulsar job for task {task_id} is erased after general error handling.") + await pulsar_operations.erase_job(task_id) + except Exception as final_erase_error: + print(f"Error during final erase attempt for task {task_id} after general error: {final_erase_error}") From 967c77a84656c7e2e09fd4ebb290a611efbd28ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Valkovsk=C3=BD?= Date: Wed, 14 May 2025 16:22:54 +0200 Subject: [PATCH 07/22] Update ci.yaml --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 162a8c2..2d80d3c 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -22,7 +22,7 @@ jobs: - name: run docker compose run: | - docker compose up -d --build + docker compose --profile all up -d --build - name: run docker compose dts run: | From 26c7a64bf8da0927fc5e1854faec0a8dc55e5658 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Valkovsk=C3=BD?= Date: Mon, 19 May 2025 12:13:21 +0200 Subject: [PATCH 08/22] Update README.md --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index cc42f16..3e03f0c 100644 --- a/README.md +++ b/README.md @@ -196,7 +196,8 @@ Service representing `TESP API` is configured to mount this project sources as a same command as is mentioned above. Therefore, any changes made to the sources in this repository will be immediately applied to the docker service as well, enabling live reloading which makes development within the `docker` environment very easy. ```shell -docker-compose up -d +docker compose --profile all up -d + ```   From 08598ebca3dac5ac9d37e2d6a2e9e2cea51a896b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Valkovsk=C3=BD?= Date: Tue, 27 May 2025 12:22:05 +0000 Subject: [PATCH 09/22] cleaned up comments, docstrings --- tesp_api/api/endpoints/task_endpoints.py | 10 +- tesp_api/service/event_actions.py | 198 ++++++++++------------- 2 files changed, 87 insertions(+), 121 deletions(-) diff --git a/tesp_api/api/endpoints/task_endpoints.py b/tesp_api/api/endpoints/task_endpoints.py index 2db4138..78205da 100644 --- a/tesp_api/api/endpoints/task_endpoints.py +++ b/tesp_api/api/endpoints/task_endpoints.py @@ -5,7 +5,6 @@ from pymonad.promise import Promise from fastapi.params import Depends from fastapi import APIRouter, Body -# MODIFIED: Import JSONResponse from fastapi.responses import Response, JSONResponse from tesp_api.api.error import api_handle_error @@ -102,7 +101,7 @@ async def get_tasks( @router.post("/tasks/{id}:cancel", - responses={200: {"description": "Ok"}}, # Consider adding a response_model if GA4GH TES spec defines one for cancel + responses={200: {"description": "Ok"}}, description=descriptions["tasks-delete"],) async def cancel_task( id: str, @@ -113,12 +112,7 @@ async def cancel_task( ObjectId(id) ))).then(lambda get_tasks_args: task_repository.cancel_task(*get_tasks_args))\ .map(lambda task_id_maybe: - # MODIFIED: Use JSONResponse to ensure the body is "{}" - # task_id_maybe here is likely a Maybe[ObjectId] or similar from cancel_task - # We just need to return an empty JSON object on success. - # If cancel_task itself can fail and we want to return a different status, - # that logic would need to be built into how cancel_task's result is handled. - # Assuming cancel_task raises an exception handled by .catch(api_handle_error) on failure. + # Use JSONResponse to ensure the body is "{}" JSONResponse(content={}, status_code=200) )\ .catch(api_handle_error) diff --git a/tesp_api/service/event_actions.py b/tesp_api/service/event_actions.py index dfc65e9..1d54811 100644 --- a/tesp_api/service/event_actions.py +++ b/tesp_api/service/event_actions.py @@ -4,7 +4,7 @@ from pathlib import Path import asyncio -from pymonad.maybe import Just, Nothing +from pymonad.maybe import Just, Nothing from bson.objectid import ObjectId from pymonad.promise import Promise, _Promise @@ -21,13 +21,12 @@ ) from tesp_api.service.pulsar_service import pulsar_service from tesp_api.service.event_dispatcher import dispatch_event -from tesp_api.utils.functional import get_else_throw, maybe_of +from tesp_api.utils.functional import get_else_throw, maybe_of from tesp_api.service.event_handler import Event, local_handler from tesp_api.repository.task_repository import task_repository from tesp_api.service.file_transfer_service import file_transfer_service from tesp_api.service.error import pulsar_event_handle_error, TaskNotFoundError, TaskExecutorError from tesp_api.service.pulsar_operations import PulsarRestOperations, PulsarAmpqOperations, DataType -from tesp_api.repository.task_repository_utils import append_task_executor_logs, update_last_task_log_time from tesp_api.repository.model.task import ( TesTaskState, TesTaskExecutor, @@ -35,13 +34,17 @@ TesTaskInput, TesTaskOutput ) -CONTAINER_TYPE = os.getenv("CONTAINER_TYPE", "both") # change both to docker only? +from tesp_api.repository.task_repository_utils import append_task_executor_logs, update_last_task_log_time + +CONTAINER_TYPE = os.getenv("CONTAINER_TYPE", "both") @local_handler.register(event_name="queued_task") def handle_queued_task(event: Event) -> None: + """ + Dispatches the task to a REST or AMQP specific handler based on Pulsar operations type. + """ event_name, payload = event - print("Queued task:") - print(payload) + print(f"Queued task: {payload.get('task_id')}") match pulsar_service.get_operations(): case PulsarRestOperations() as pulsar_rest_operations: dispatch_event('queued_task_rest', {**payload, 'pulsar_operations': pulsar_rest_operations}) @@ -50,31 +53,37 @@ def handle_queued_task(event: Event) -> None: @local_handler.register(event_name="queued_task_rest") async def handle_queued_task_rest(event: Event): + """ + Sets up the job in Pulsar via REST operations and dispatches an 'initialize_task' event. + """ event_name, payload = event task_id: ObjectId = payload['task_id'] pulsar_operations: PulsarRestOperations = payload['pulsar_operations'] - print("Queued task rest:") - print(payload) + print(f"Queued task rest: {task_id}") await Promise(lambda resolve, reject: resolve(None))\ .then(lambda nothing: pulsar_operations.setup_job(task_id))\ .map(lambda setup_job_result: dispatch_event('initialize_task', {**payload, 'task_config': setup_job_result}))\ - .catch(lambda error: pulsar_event_handle_error(error, payload['task_id'], event_name, pulsar_operations))\ - .then(lambda x: x) - + .catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations))\ + .then(lambda x: x) # Invokes promise, potentially from error handler @local_handler.register(event_name="initialize_task") async def handle_initializing_task(event: Event) -> None: + """ + Updates task state to INITIALIZING, prepares data/volumes/inputs/outputs, + and dispatches a 'run_task' event. + """ event_name, payload = event task_id: ObjectId = payload['task_id'] pulsar_operations: PulsarRestOperations = payload['pulsar_operations'] async def setup_data(job_id: ObjectId, - resources: TesTaskResources, - volumes: List[str], - inputs: List[TesTaskInput], - outputs: List[TesTaskOutput]): + resources: TesTaskResources, + volumes: List[str], + inputs: List[TesTaskInput], + outputs: List[TesTaskOutput]): + """Helper to configure resources, volumes, inputs, and outputs for Pulsar.""" resource_conf: dict volume_confs: List[dict] = [] input_confs: List[dict] = [] @@ -85,27 +94,23 @@ async def setup_data(job_id: ObjectId, 'ram_gb': resources.ram_gb if resources else None }) - print("Volumes:") - print(volumes) - output_confs_mapped, volume_confs_mapped = map_volumes(str(job_id), volumes, outputs) - output_confs.extend(output_confs_mapped) - volume_confs.extend(volume_confs_mapped) - + mapped_outputs, mapped_volumes = map_volumes(str(job_id), volumes, outputs) + output_confs.extend(mapped_outputs) + volume_confs.extend(mapped_volumes) - print(inputs) - - for i in range(0, len(inputs)): - content = inputs[i].content - pulsar_path = payload['task_config']['inputs_directory'] + f'/input_file_{i}' - if content is not None and inputs[i].url is None: - pulsar_path = await pulsar_operations.upload( + for i, tes_input in enumerate(inputs): + content = tes_input.content + pulsar_path_val = payload['task_config']['inputs_directory'] + f'/input_file_{i}' + if content is not None and tes_input.url is None: + pulsar_path_val = await pulsar_operations.upload( job_id, DataType.INPUT, file_content=Just(content), file_path=f'input_file_{i}') - input_confs.append({'container_path': inputs[i].path, 'pulsar_path': pulsar_path, 'url':inputs[i].url}) + input_confs.append({'container_path': tes_input.path, 'pulsar_path': pulsar_path_val, 'url': tes_input.url}) return resource_conf, volume_confs, input_confs, output_confs + print(f"Initializing task: {task_id}") await Promise(lambda resolve, reject: resolve(None))\ .then(lambda nothing: task_repository.update_task_state( task_id, @@ -113,12 +118,12 @@ async def setup_data(job_id: ObjectId, TesTaskState.INITIALIZING )).map(lambda updated_task: get_else_throw( updated_task, TaskNotFoundError(task_id, Just(TesTaskState.QUEUED)) - )).then(lambda updated_task: setup_data( + )).then(lambda updated_task_val: setup_data( task_id, - maybe_of(updated_task.resources).maybe(None, lambda x: x), - maybe_of(updated_task.volumes).maybe([], lambda x: x), - maybe_of(updated_task.inputs).maybe([], lambda x: x), - maybe_of(updated_task.outputs).maybe([], lambda x: x) + maybe_of(updated_task_val.resources).maybe(None, lambda x: x), + maybe_of(updated_task_val.volumes).maybe([], lambda x: x), + maybe_of(updated_task_val.inputs).maybe([], lambda x: x), + maybe_of(updated_task_val.outputs).maybe([], lambda x: x) )).map(lambda res_input_output_confs: dispatch_event('run_task', { **payload, 'resource_conf': res_input_output_confs[0], @@ -128,9 +133,13 @@ async def setup_data(job_id: ObjectId, })).catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations))\ .then(lambda x: x) - @local_handler.register(event_name="run_task") async def handle_run_task(event: Event) -> None: + """ + Updates task state to RUNNING, prepares and executes job commands in Pulsar, + waits for completion, logs results, and updates task state accordingly (COMPLETE, EXECUTOR_ERROR). + Handles cancellations and other exceptions during its lifecycle. + """ event_name, payload = event task_id: ObjectId = payload['task_id'] author: str = payload['author'] @@ -142,12 +151,10 @@ async def handle_run_task(event: Event) -> None: run_command_str = None command_start_time = datetime.datetime.now(datetime.timezone.utc) - # Flag to indicate if the main job logic completed without an exception during pulsar interaction - job_logic_completed_normally = False - try: - # Initial state update to RUNNING + print(f"Running task: {task_id}") + # Set task state to RUNNING task_monad_init = await task_repository.update_task_state( task_id, TesTaskState.INITIALIZING, @@ -155,15 +162,12 @@ async def handle_run_task(event: Event) -> None: ) task = get_else_throw(task_monad_init, TaskNotFoundError(task_id, Just(TesTaskState.INITIALIZING))) - # Check if task was cancelled between INITIALIZING and this point (quick cancel) - # This check is early, before significant Pulsar interaction. + # Early check: If task was cancelled very quickly after being set to RUNNING current_task_after_init_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id}) current_task_after_init = get_else_throw(current_task_after_init_monad, TaskNotFoundError(task_id)) if current_task_after_init.state == TesTaskState.CANCELED: - print(f"Task {task_id} found CANCELED shortly after being set to RUNNING. Aborting handle_run_task early.") - # The API cancel path should handle Pulsar cleanup. - return - + print(f"Task {task_id} found CANCELED shortly after RUNNING state update. Aborting handler.") + return # API cancel path handles Pulsar cleanup await update_last_task_log_time( task_id, @@ -172,17 +176,16 @@ async def handle_run_task(event: Event) -> None: start_time=Just(datetime.datetime.now(datetime.timezone.utc)) ) + # Prepare Pulsar commands container_cmds = list() stage_in_mount = payload['task_config']['inputs_directory'] - stage_exec = TesTaskExecutor(image="willdockerhub/curl-wget:latest", - command=[], - workdir=Path("/downloads")) + stage_exec = TesTaskExecutor(image="willdockerhub/curl-wget:latest", command=[], workdir=Path("/downloads")) stage_in_command_str_val = None if CONTAINER_TYPE == "docker": stage_in_command_str_val = docker_stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs) elif CONTAINER_TYPE == "singularity": - stage_exec.image = "docker://" + stage_exec.image + stage_exec.image = "docker://" + stage_exec.image # Singularity needs "docker://" prefix stage_in_command_str_val = singularity_stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs) for i, executor in enumerate(task.executors): @@ -196,7 +199,7 @@ async def handle_run_task(event: Event) -> None: input_confs, output_confs, stage_in_mount, mount_job_dir, i) await pulsar_operations.upload( - payload['task_id'], DataType.INPUT, + task_id, DataType.INPUT, # Use task_id from payload, not payload['task_id'] file_content=Just(script_content), file_path=f'run_script_{i}.sh') container_cmds.append(run_script_cmd_str) @@ -206,78 +209,54 @@ async def handle_run_task(event: Event) -> None: stage_out_command_str_val = docker_stage_out_command(stage_exec, resource_conf, output_confs, volume_confs) elif CONTAINER_TYPE == "singularity": mount_job_dir = payload['task_config']['job_directory'] - bind_mount = payload['task_config']['inputs_directory'] + bind_mount = payload['task_config']['inputs_directory'] # This might be stage_in_mount too stage_out_command_str_val = singularity_stage_out_command(stage_exec, resource_conf, bind_mount, output_confs, volume_confs, mount_job_dir) executors_commands_joined_str = " && ".join(filter(None, container_cmds)) + # Construct the final command string for Pulsar command_list_for_join = [cmd for cmd in [stage_in_command_str_val, executors_commands_joined_str, stage_out_command_str_val] if cmd and cmd.strip()] - if command_list_for_join: - run_command_str = f"set -xe && {' && '.join(command_list_for_join)}" - else: - run_command_str = None + run_command_str = f"set -xe && {' && '.join(command_list_for_join)}" if command_list_for_join else None command_start_time = datetime.datetime.now(datetime.timezone.utc) - command_status: dict + if run_command_str is None: print(f"Task {task_id} has no commands to run. Treating as successful no-op.") command_status = {'stdout': '', 'stderr': 'No commands to run.', 'returncode': 0} else: - print(f"Running command for task {task_id}: {run_command_str}") + print(f"Submitting job to Pulsar for task {task_id}: {run_command_str}") await pulsar_operations.run_job(task_id, run_command_str) - # This is the polling call - command_status = await pulsar_operations.job_status_complete(str(task_id)) - - # If we reach here, job_status_complete returned without an exception during its polling. - job_logic_completed_normally = True + command_status = await pulsar_operations.job_status_complete(str(task_id)) # Polls Pulsar for job completion command_end_time = datetime.datetime.now(datetime.timezone.utc) await append_task_executor_logs( - task_id, - author, - TesTaskState.RUNNING, # Log is associated with this phase - command_start_time, - command_end_time, - command_status.get('stdout', ''), - command_status.get('stderr', ''), + task_id, author, TesTaskState.RUNNING, + command_start_time, command_end_time, + command_status.get('stdout', ''), command_status.get('stderr', ''), command_status.get('returncode', -1) ) - # Check current task state from DB. It might have been changed to CANCELED - # by an external request while job_status_complete was polling or after it returned. + # Re-fetch task state to check for external cancellation during job execution current_task_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id}) current_task_obj = get_else_throw(current_task_monad, TaskNotFoundError(task_id)) if current_task_obj.state == TesTaskState.CANCELED: - print(f"Task {task_id} is in CANCELED state after job execution/polling. Aborting further state changes by handle_run_task.") - # The API cancel path should have handled Pulsar erase. + print(f"Task {task_id} found CANCELED after job completion polling. Aborting state changes.") return if command_status.get('returncode', -1) != 0: - # Job failed, and it wasn't due to user cancellation (CANCELED state check passed) - await task_repository.update_task_state( - task_id, - TesTaskState.RUNNING, # Expected from state - TesTaskState.EXECUTOR_ERROR - ) - # No need to raise TaskExecutorError here if we want the promise chain below to handle cleanup - # pulsar_event_handle_error will be called by the .catch of the promise chain if this update fails - # or if we want to centralize error handling. - # For now, let the promise chain handle it for consistency. - # However, to ensure Pulsar job erasure on executor error: - print(f"Task {task_id} executor returned non-zero: {command_status.get('returncode', -1)}. Setting state to EXECUTOR_ERROR.") - await pulsar_operations.erase_job(task_id) # Erase on executor error - return # Stop further processing - - # If job completed successfully and was not cancelled: - # Proceed to set state to COMPLETE and cleanup. + print(f"Task {task_id} executor error (return code: {command_status.get('returncode', -1)}). Setting state to EXECUTOR_ERROR.") + await task_repository.update_task_state(task_id, TesTaskState.RUNNING, TesTaskState.EXECUTOR_ERROR) + await pulsar_operations.erase_job(task_id) + return + + # Job successful and not cancelled, set to COMPLETE + print(f"Task {task_id} completed successfully. Setting state to COMPLETE.") await Promise(lambda resolve, reject: resolve(None)) \ .then(lambda ignored: task_repository.update_task_state( - task_id, - TesTaskState.RUNNING, - TesTaskState.COMPLETE + task_id, TesTaskState.RUNNING, TesTaskState.COMPLETE )) \ .map(lambda task_after_complete_update: get_else_throw( task_after_complete_update, TaskNotFoundError(task_id, Just(TesTaskState.RUNNING)) @@ -287,36 +266,29 @@ async def handle_run_task(event: Event) -> None: .then(lambda x: x) except asyncio.CancelledError: - print(f"handle_run_task for task {task_id} was cancelled via asyncio.CancelledError.") + # This asyncio.Task (handle_run_task) was cancelled externally + print(f"handle_run_task for task {task_id} was explicitly cancelled (asyncio.CancelledError).") await task_repository.update_task_state(task_id, None, TesTaskState.CANCELED) - # Ensure Pulsar job is cleaned up if this task is cancelled mid-flight - await pulsar_operations.kill_job(task_id) - await pulsar_operations.erase_job(task_id) - print(f"Task {task_id} cleaned up after asyncio cancellation.") - # Not re-raising, as this handler is managing the lifecycle. + await pulsar_operations.kill_job(task_id) + await pulsar_operations.erase_job(task_id) + print(f"Task {task_id} Pulsar job cleanup attempted after asyncio cancellation.") + except Exception as error: - # This block is now primarily for errors from job_status_complete or other unexpected issues - print(f"Exception caught in handle_run_task for task {task_id}: {type(error).__name__} - {error}") + print(f"Exception in handle_run_task for task {task_id}: {type(error).__name__} - {error}") - # Check if the error is due to a prior cancellation - # (e.g., LookupError because Pulsar job was erased by the cancel API) task_state_after_error_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id}) if task_state_after_error_monad.is_just() and task_state_after_error_monad.value.state == TesTaskState.CANCELED: - print(f"Task {task_id} is already CANCELED. Exception '{type(error).__name__}' likely due to cancellation. No further error processing by handle_run_task.") - # The cancel_task API path should have handled Pulsar job erasure. - # No need to call pulsar_event_handle_error or erase_job again here. - return # Exit gracefully + print(f"Task {task_id} is already CANCELED. Exception '{type(error).__name__}' likely due to this. No further error processing by handler.") + return - # If not due to a prior cancellation, then it's an actual error during run. - print(f"Task {task_id} not CANCELED, proceeding with generic error handling for '{type(error).__name__}'.") + # If not already CANCELED, proceed with standard error handling + print(f"Task {task_id} not CANCELED; proceeding with pulsar_event_handle_error for '{type(error).__name__}'.") error_handler_result = pulsar_event_handle_error(error, task_id, event_name, pulsar_operations) if asyncio.iscoroutine(error_handler_result) or isinstance(error_handler_result, _Promise): await error_handler_result - # Fallback: Ensure Pulsar job is erased if pulsar_event_handle_error didn't. - # This is a safeguard, as pulsar_event_handle_error should ideally manage this. try: - print(f"Ensuring Pulsar job for task {task_id} is erased after general error handling.") + print(f"Ensuring Pulsar job for task {task_id} is erased after general error handling in run_task.") await pulsar_operations.erase_job(task_id) except Exception as final_erase_error: - print(f"Error during final erase attempt for task {task_id} after general error: {final_erase_error}") + print(f"Error during final Pulsar erase attempt for task {task_id} after general error: {final_erase_error}") From 3957c866d585f91eabbe0398980e3cf718c16ab9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Valkovsk=C3=BD?= Date: Tue, 10 Jun 2025 16:14:42 +0000 Subject: [PATCH 10/22] reverted changes: temporary variables back to direct assignement and lambda parameter name back to updated_task --- tesp_api/service/event_actions.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/tesp_api/service/event_actions.py b/tesp_api/service/event_actions.py index 1d54811..3441d9d 100644 --- a/tesp_api/service/event_actions.py +++ b/tesp_api/service/event_actions.py @@ -79,11 +79,10 @@ async def handle_initializing_task(event: Event) -> None: pulsar_operations: PulsarRestOperations = payload['pulsar_operations'] async def setup_data(job_id: ObjectId, - resources: TesTaskResources, - volumes: List[str], - inputs: List[TesTaskInput], - outputs: List[TesTaskOutput]): - """Helper to configure resources, volumes, inputs, and outputs for Pulsar.""" + resources: TesTaskResources, + volumes: List[str], + inputs: List[TesTaskInput], + outputs: List[TesTaskOutput]): resource_conf: dict volume_confs: List[dict] = [] input_confs: List[dict] = [] @@ -94,9 +93,7 @@ async def setup_data(job_id: ObjectId, 'ram_gb': resources.ram_gb if resources else None }) - mapped_outputs, mapped_volumes = map_volumes(str(job_id), volumes, outputs) - output_confs.extend(mapped_outputs) - volume_confs.extend(mapped_volumes) + output_confs, volume_confs = map_volumes(str(job_id), volumes, outputs) for i, tes_input in enumerate(inputs): content = tes_input.content @@ -118,12 +115,12 @@ async def setup_data(job_id: ObjectId, TesTaskState.INITIALIZING )).map(lambda updated_task: get_else_throw( updated_task, TaskNotFoundError(task_id, Just(TesTaskState.QUEUED)) - )).then(lambda updated_task_val: setup_data( + )).then(lambda updated_task: setup_data( task_id, - maybe_of(updated_task_val.resources).maybe(None, lambda x: x), - maybe_of(updated_task_val.volumes).maybe([], lambda x: x), - maybe_of(updated_task_val.inputs).maybe([], lambda x: x), - maybe_of(updated_task_val.outputs).maybe([], lambda x: x) + maybe_of(updated_task.resources).maybe(None, lambda x: x), + maybe_of(updated_task.volumes).maybe([], lambda x: x), + maybe_of(updated_task.inputs).maybe([], lambda x: x), + maybe_of(updated_task.outputs).maybe([], lambda x: x) )).map(lambda res_input_output_confs: dispatch_event('run_task', { **payload, 'resource_conf': res_input_output_confs[0], From 418cf28991d47afb7a5a0935bef36b5a9e4ddb6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Valkovsk=C3=BD?= Date: Mon, 7 Jul 2025 10:36:18 +0000 Subject: [PATCH 11/22] comment delete --- tesp_api/api/endpoints/task_endpoints.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tesp_api/api/endpoints/task_endpoints.py b/tesp_api/api/endpoints/task_endpoints.py index 78205da..73bf39f 100644 --- a/tesp_api/api/endpoints/task_endpoints.py +++ b/tesp_api/api/endpoints/task_endpoints.py @@ -106,7 +106,7 @@ async def get_tasks( async def cancel_task( id: str, token_subject: str = Depends(parse_verify_token), - ) -> Response: # Return type is still Response, JSONResponse is a subclass + ) -> Response: return await Promise(lambda resolve, reject: resolve(( maybe_of(token_subject), ObjectId(id) From 1da5daba0cc0b0837d71ebd63a8a701a102280d6 Mon Sep 17 00:00:00 2001 From: micoleaoo Date: Fri, 11 Jul 2025 12:28:10 +0000 Subject: [PATCH 12/22] added docker profile for pulsar service --- docker-compose.yaml | 7 +------ tests/test_data/env_test_1 | 1 + tests/test_data/env_test_2 | 1 + 3 files changed, 3 insertions(+), 6 deletions(-) create mode 100644 tests/test_data/env_test_1 create mode 100644 tests/test_data/env_test_2 diff --git a/docker-compose.yaml b/docker-compose.yaml index 5cbe79f..3d8803d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -5,7 +5,6 @@ services: dockerfile: docker/tesp_api/Dockerfile target: development image: tesp-api - profiles: ["api", "all"] environment: - CONTAINER_TYPE=docker # Set to "docker", "singularity", or "both" container_name: tesp-api @@ -22,7 +21,6 @@ services: tesp-db: image: mongo:latest - profiles: ["api", "all"] container_name: tesp-db volumes: - ./docker/mongodb/data:/data/db @@ -41,16 +39,13 @@ services: dockerfile: Dockerfile target: development image: pulsar_rest - profiles: ["pulsar", "all"] + profiles: ["pulsar"] container_name: pulsar-rest privileged: true expose: - "8913" volumes: - ./docker/pulsar_rest/app.yml:/opt/pysetup/app.yml -# DIND - problem with resource limitation, missing cgroups inside -# - ./docker/pulsar_rest/data/:/opt/pysetup/files/staging/ -# DOOD - /opt/pysetup/files/staging/:/opt/pysetup/files/staging/ - /var/run/docker.sock:/var/run/docker.sock diff --git a/tests/test_data/env_test_1 b/tests/test_data/env_test_1 new file mode 100644 index 0000000..b2c9cad --- /dev/null +++ b/tests/test_data/env_test_1 @@ -0,0 +1 @@ +first upload successful diff --git a/tests/test_data/env_test_2 b/tests/test_data/env_test_2 new file mode 100644 index 0000000..b89622e --- /dev/null +++ b/tests/test_data/env_test_2 @@ -0,0 +1 @@ +second upload successful From ddcc337afc4ff5caf30952cb3878a9b966515e16 Mon Sep 17 00:00:00 2001 From: micoleaoo Date: Fri, 11 Jul 2025 12:30:20 +0000 Subject: [PATCH 13/22] added docker profile for pulsar service --- tests/test_data/env_test_1 | 1 - tests/test_data/env_test_2 | 1 - 2 files changed, 2 deletions(-) delete mode 100644 tests/test_data/env_test_1 delete mode 100644 tests/test_data/env_test_2 diff --git a/tests/test_data/env_test_1 b/tests/test_data/env_test_1 deleted file mode 100644 index b2c9cad..0000000 --- a/tests/test_data/env_test_1 +++ /dev/null @@ -1 +0,0 @@ -first upload successful diff --git a/tests/test_data/env_test_2 b/tests/test_data/env_test_2 deleted file mode 100644 index b89622e..0000000 --- a/tests/test_data/env_test_2 +++ /dev/null @@ -1 +0,0 @@ -second upload successful From 135c1a728ffdad46943dd3b7e43a6067aaa23f36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Valkovsk=C3=BD?= Date: Mon, 14 Jul 2025 12:35:22 +0200 Subject: [PATCH 14/22] Update ci.yaml --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 2d80d3c..b0d76a5 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -22,7 +22,7 @@ jobs: - name: run docker compose run: | - docker compose --profile all up -d --build + docker compose --profile pulsar up -d - name: run docker compose dts run: | From 224a484a3d39433dae297f31787085c44ebf6a4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Valkovsk=C3=BD?= Date: Mon, 14 Jul 2025 12:40:16 +0200 Subject: [PATCH 15/22] Update ci.yaml --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index b0d76a5..2cd3f7c 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -22,7 +22,7 @@ jobs: - name: run docker compose run: | - docker compose --profile pulsar up -d + docker compose --profile pulsar up -d --build - name: run docker compose dts run: | From 35babf65efc26f4e0db71ab0e341e6473d6e4150 Mon Sep 17 00:00:00 2001 From: micoleaoo Date: Mon, 14 Jul 2025 11:46:44 +0000 Subject: [PATCH 16/22] updated dockerfile image --- docker/tesp_api/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/tesp_api/Dockerfile b/docker/tesp_api/Dockerfile index 1db86b7..59bf8c2 100644 --- a/docker/tesp_api/Dockerfile +++ b/docker/tesp_api/Dockerfile @@ -2,7 +2,7 @@ # See https://docs.docker.com/develop/develop-images/multistage-build/ # Environment variables and base image -FROM python:3.10-slim-buster as python-base +FROM python:3.10-slim-bullseye as python-base ENV PYTHONUNBUFFERED=1 \ PYTHONDONTWRITEBYTECODE=1 \ PIP_NO_CACHE_DIR=off \ From 2ad51617ae4f90ce2cf65959fb553f02a96b7314 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Valkovsk=C3=BD?= Date: Mon, 14 Jul 2025 14:51:25 +0200 Subject: [PATCH 17/22] Update README.md --- README.md | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 3e03f0c..a76f6d4 100644 --- a/README.md +++ b/README.md @@ -18,20 +18,18 @@ standard and distributing `TES` tasks execution to `Pulsar` applications. ### Deploy The most straightforward way to deploy the TESP is to use Docker Compose. -#### All services (default): +#### API and DB services (default): ``` -docker compose --profile all up -d +docker compose up -d ``` +Expecting exetrnal Pulsar configured in `settings.toml` before the compose is run. +So far only REST Pulsar communication is supported. -#### Without pulsar_rest service: -``` -docker compose --profile api up -d -``` - -#### Only pulsar_rest service: +#### With pulsar_rest service: ``` docker compose --profile pulsar up -d ``` +


@@ -196,8 +194,11 @@ Service representing `TESP API` is configured to mount this project sources as a same command as is mentioned above. Therefore, any changes made to the sources in this repository will be immediately applied to the docker service as well, enabling live reloading which makes development within the `docker` environment very easy. ```shell -docker compose --profile all up -d - +docker compose up -d +``` +Or +```shell +docker compose --profile pulsar up -d ```   From 3abf4a6e0699543be48330e150cc3187003e3ab6 Mon Sep 17 00:00:00 2001 From: micoleaoo Date: Wed, 16 Jul 2025 13:44:37 +0000 Subject: [PATCH 18/22] Added directory IO test which functions with upload_server.py instead of DTS services through docker compose, potentially replacing them in the near future (now works with HTTP only). Added output files and container clean up after tests. --- tests/smoke_tests.py | 44 ++++++++++++--- tests/test_data/input_dir/dir1/dir_test1.txt | 1 + tests/test_data/input_dir/dir2/dir_test2.txt | 1 + tests/test_jsons/dir-io.json | 44 +++++++++++++++ tests/test_jsons/envs.json | 6 +- tests/upload_server.py | 59 ++++++++++++++++++++ 6 files changed, 145 insertions(+), 10 deletions(-) create mode 100644 tests/test_data/input_dir/dir1/dir_test1.txt create mode 100644 tests/test_data/input_dir/dir2/dir_test2.txt create mode 100644 tests/test_jsons/dir-io.json create mode 100644 tests/upload_server.py diff --git a/tests/smoke_tests.py b/tests/smoke_tests.py index 4177273..2d2c907 100644 --- a/tests/smoke_tests.py +++ b/tests/smoke_tests.py @@ -4,6 +4,10 @@ import json import os +import pytest +import shutil +import subprocess + import sys sys.path.append('/app') @@ -156,6 +160,10 @@ def test_get_task_list(): def test_inputs(): assert _test_simple("inputs.json", 120) +# Test directory as an input. +def test_dir_input(): + assert _test_simple("dir-io.json", 120) + #def test_outputs(): # Tests S3 and FTP upload and download. #jsons = ["outputs-prepare", "outputs-prepare-check", "outputs-test", "outputs-test-check"] @@ -170,13 +178,13 @@ def test_envs(): assert _test_simple("envs.json", 100) expected_files = { - f"{script_directory}/test_data/env_test_1": "first upload successful\n", - f"{script_directory}/test_data/env_test_2": "second upload successful\n" + f"{script_directory}/uploaded_data/env_test_1": "first upload successful\n", + f"{script_directory}/uploaded_data/env_test_2": "second upload successful\n" } # Verify that each file was created with the expected content for path, expected_content in expected_files.items(): - # Open the file and read its content (if accessible in the current environment) + # Open the file and read its content with open(path, 'r') as f: content = f.read() assert content == expected_content, f"File {path} does not contain the expected content." @@ -207,20 +215,42 @@ def test_task_cancel(): # Checks whether given task exceeds available resources. def test_resource_check_with_limits(): - # Load the JSON file json_data = _open_json('resource_check.json') - # Extract CPU cores and RAM values cpu_cores = _gnv(json_data, "resources.cpu_cores") ram_gb = _gnv(json_data, "resources.ram_gb") - # Define the maximum limits max_cpu_cores = 4 max_ram_gb = 3.8 - # Perform the checks with limits assert cpu_cores <= max_cpu_cores, f"CPU cores exceed the limit: {cpu_cores} > {max_cpu_cores}" assert ram_gb <= max_ram_gb, f"RAM exceeds the limit: {ram_gb} > {max_ram_gb}" print(f"Requested resources: CPU cores = {cpu_cores}, RAM = {ram_gb} GB (Limits: {max_cpu_cores} cores, {max_ram_gb} GB RAM)") +# Remove uploaded_data directory +@pytest.fixture(scope="session", autouse=True) +def cleanup_uploaded_data(): + yield # Wait until all tests are done + + uploaded_path = os.path.join(os.path.dirname(__file__), "uploaded_data") + if os.path.isdir(uploaded_path): + try: + shutil.rmtree(uploaded_path) + print(f"Removed uploaded data: {uploaded_path}") + except Exception as e: + print(f"Failed to remove {uploaded_path}: {e}") + + # container clean up + try: + output = subprocess.check_output([ + "docker", "ps", "-q", "--filter", "ancestor=ubuntu", "--filter", "status=running" + ]).decode().strip().splitlines() + + for container_id in output: + subprocess.run(["docker", "rm", "-f", container_id]) + print(f"Removed container: {container_id}") + except Exception as e: + print(f"Failed to remove sleep containers: {e}") + + diff --git a/tests/test_data/input_dir/dir1/dir_test1.txt b/tests/test_data/input_dir/dir1/dir_test1.txt new file mode 100644 index 0000000..a805158 --- /dev/null +++ b/tests/test_data/input_dir/dir1/dir_test1.txt @@ -0,0 +1 @@ +This is a directory input test n1. diff --git a/tests/test_data/input_dir/dir2/dir_test2.txt b/tests/test_data/input_dir/dir2/dir_test2.txt new file mode 100644 index 0000000..158fffb --- /dev/null +++ b/tests/test_data/input_dir/dir2/dir_test2.txt @@ -0,0 +1 @@ +This is a directory input test n2. diff --git a/tests/test_jsons/dir-io.json b/tests/test_jsons/dir-io.json new file mode 100644 index 0000000..ba8b029 --- /dev/null +++ b/tests/test_jsons/dir-io.json @@ -0,0 +1,44 @@ +{ + "name": "test-local-file-dir-io", + "description": "Test local file and directory input/output with curl and wget", + "inputs": [ + { + "name": "simple_input", + "type": "FILE", + "url": "http://172.17.0.1:5000/test_data/input_file.txt", + "path": "/data/input_file.txt" + }, + { + "name": "input_directory", + "type": "DIRECTORY", + "url": "http://172.17.0.1:5000/test_data/input_dir/", + "path": "/data/input_dir" + } + ], + "outputs": [ + { + "name": "simple_output", + "type": "FILE", + "url": "http://172.17.0.1:5000/upload", + "path": "/data/output_file.txt" + }, + { + "name": "output_directory", + "type": "DIRECTORY", + "url": "http://172.17.0.1:5000/upload", + "path": "/data/results" + } + ], + "executors": [ + { + "image": "alpine", + "workdir": "/data", + "command": [ + "sh", + "-c", + "mkdir -p /data/results && cp /data/input_file.txt /data/output_file.txt && cp -r /data/input_dir/* /data/results/" + ] + } + ] +} + diff --git a/tests/test_jsons/envs.json b/tests/test_jsons/envs.json index f1e64a3..93b8ed2 100644 --- a/tests/test_jsons/envs.json +++ b/tests/test_jsons/envs.json @@ -1,12 +1,12 @@ { "outputs": [ { - "path": "/tesp-api/tests/test_data/env_test_1", + "path": "/tesp-api/tests/uploaded_data/env_test_1", "url": "http://172.17.0.1:5000/upload", "type": "FILE" }, { - "path": "/tesp-api/tests/test_data/env_test_2", + "path": "/tesp-api/tests/uploaded_data/env_test_2", "url": "http://172.17.0.1:5000/upload", "type": "FILE" } @@ -17,7 +17,7 @@ "command": [ "sh", "-c", - "echo \"$TEST_FILE\" > /tesp-api/tests/test_data/env_test_1 && echo \"$TEST_FILE_2\" > /tesp-api/tests/test_data/env_test_2" + "echo \"$TEST_FILE\" > /tesp-api/tests/uploaded_data/env_test_1 && echo \"$TEST_FILE_2\" > /tesp-api/tests/uploaded_data/env_test_2" ], "env": { "TEST_FILE": "first upload successful", diff --git a/tests/upload_server.py b/tests/upload_server.py new file mode 100644 index 0000000..e14d4b9 --- /dev/null +++ b/tests/upload_server.py @@ -0,0 +1,59 @@ +import os +import sys +from flask import Flask, request, jsonify, send_from_directory, abort +from werkzeug.utils import secure_filename + +HOME_DIR = sys.path[0] # resolves to /home/debian +UPLOAD_ROOT = os.path.join(HOME_DIR, "uploaded_data") +SERVE_ROOT = HOME_DIR + +app = Flask(__name__) + +# Handle file upload +@app.route('/upload', methods=['POST']) +def upload(): + if 'file' not in request.files: + return jsonify({"error": "No file part"}), 400 + + uploaded_file = request.files['file'] + raw_filename = uploaded_file.filename # may include subdir e.g. "subdir/file.txt" + + # Secure each path part + secure_parts = [secure_filename(part) for part in raw_filename.split('/') if part] + safe_path = os.path.join(UPLOAD_ROOT, *secure_parts) + + # Ensure parent dir exists + os.makedirs(os.path.dirname(safe_path), exist_ok=True) + uploaded_file.save(safe_path) + + return jsonify({"status": "ok", "saved_as": safe_path}) + +@app.route('/', methods=['GET']) +@app.route('//', methods=['GET']) +def browse(subpath): + full_path = os.path.join(SERVE_ROOT, subpath) + + if os.path.isdir(full_path): + # Directory listing + items = os.listdir(full_path) + links = [] + for item in items: + item_path = os.path.join(subpath, item) + if os.path.isdir(os.path.join(full_path, item)): + item_path += '/' + links.append(f"{item}
") + return "" + "\n".join(links) + "" + + elif os.path.isfile(full_path): + # Serve file + return send_from_directory(os.path.dirname(full_path), os.path.basename(full_path)) + + else: + abort(404, description=f"{subpath} not found") + +if __name__ == '__main__': + os.makedirs(UPLOAD_ROOT, exist_ok=True) + print(f"Uploading to: {UPLOAD_ROOT}") + print(f"Serving from: {SERVE_ROOT}") + app.run(host='0.0.0.0', port=5000, debug=True) + From 4033daf1eafbe2b6e1d2d4c33aae7c58dbd87730 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Valkovsk=C3=BD?= Date: Wed, 16 Jul 2025 16:53:24 +0200 Subject: [PATCH 19/22] Update ci.yaml --- .github/workflows/ci.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 2cd3f7c..66713fc 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -24,9 +24,9 @@ jobs: run: | docker compose --profile pulsar up -d --build - - name: run docker compose dts + - name: run upload_server.py run: | - docker compose -f docker/dts/docker-compose.yaml up -d + python upload_server.py - name: instal pytest run: pip install pytest From 958d58578fd4abb9decd91b4ef2bfb8361ae8e0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Valkovsk=C3=BD?= Date: Thu, 17 Jul 2025 14:20:23 +0200 Subject: [PATCH 20/22] Update ci.yaml --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 66713fc..3242a77 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -26,7 +26,7 @@ jobs: - name: run upload_server.py run: | - python upload_server.py + python tests/upload_server.py - name: instal pytest run: pip install pytest From 83203d296f5299532d2beff94ae0ec7f474cadea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Valkovsk=C3=BD?= Date: Thu, 17 Jul 2025 14:26:51 +0200 Subject: [PATCH 21/22] Update ci.yaml --- .github/workflows/ci.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 3242a77..f1886e9 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -24,6 +24,9 @@ jobs: run: | docker compose --profile pulsar up -d --build + - name: install flask + run: pip install flask + - name: run upload_server.py run: | python tests/upload_server.py From 169f6ca6d829043327655ac9c177090abefd71d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Valkovsk=C3=BD?= Date: Thu, 17 Jul 2025 14:42:06 +0200 Subject: [PATCH 22/22] Update ci.yaml --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index f1886e9..ef20374 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -29,7 +29,7 @@ jobs: - name: run upload_server.py run: | - python tests/upload_server.py + python tests/upload_server.py & - name: instal pytest run: pip install pytest