diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 162a8c2..ef20374 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -22,11 +22,14 @@ jobs: - name: run docker compose run: | - docker compose up -d --build + docker compose --profile pulsar up -d --build - - name: run docker compose dts + - name: install flask + run: pip install flask + + - name: run upload_server.py run: | - docker compose -f docker/dts/docker-compose.yaml up -d + python tests/upload_server.py & - name: instal pytest run: pip install pytest diff --git a/README.md b/README.md index c471e59..a76f6d4 100644 --- a/README.md +++ b/README.md @@ -17,9 +17,23 @@ standard and distributing `TES` tasks execution to `Pulsar` applications. ### Deploy The most straightforward way to deploy the TESP is to use Docker Compose. + +#### API and DB services (default): +``` +docker compose up -d +``` +Expecting exetrnal Pulsar configured in `settings.toml` before the compose is run. +So far only REST Pulsar communication is supported. + +#### With pulsar_rest service: ``` -docker compose up -d --build +docker compose --profile pulsar up -d ``` + +
+
+
+ Depending on you Docker and Docker Compose installation, you may need to use `docker-compose` (with hyphen) instead. You might encounter a timeout error in container runtime which can be solved by correct `mtu` configuration either in the `docker-compose.yaml`: @@ -180,7 +194,11 @@ Service representing `TESP API` is configured to mount this project sources as a same command as is mentioned above. Therefore, any changes made to the sources in this repository will be immediately applied to the docker service as well, enabling live reloading which makes development within the `docker` environment very easy. ```shell -docker-compose up -d +docker compose up -d +``` +Or +```shell +docker compose --profile pulsar up -d ```   diff --git a/docker-compose.yaml b/docker-compose.yaml index 4abe304..3d8803d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -39,15 +39,13 @@ services: dockerfile: Dockerfile target: development image: pulsar_rest + profiles: ["pulsar"] container_name: pulsar-rest privileged: true expose: - "8913" volumes: - ./docker/pulsar_rest/app.yml:/opt/pysetup/app.yml -# DIND - problem with resource limitation, missing cgroups inside -# - ./docker/pulsar_rest/data/:/opt/pysetup/files/staging/ -# DOOD - /opt/pysetup/files/staging/:/opt/pysetup/files/staging/ - /var/run/docker.sock:/var/run/docker.sock diff --git a/docker/tesp_api/Dockerfile b/docker/tesp_api/Dockerfile index 1db86b7..59bf8c2 100644 --- a/docker/tesp_api/Dockerfile +++ b/docker/tesp_api/Dockerfile @@ -2,7 +2,7 @@ # See https://docs.docker.com/develop/develop-images/multistage-build/ # Environment variables and base image -FROM python:3.10-slim-buster as python-base +FROM python:3.10-slim-bullseye as python-base ENV PYTHONUNBUFFERED=1 \ PYTHONDONTWRITEBYTECODE=1 \ PIP_NO_CACHE_DIR=off \ diff --git a/tesp_api/api/endpoints/task_endpoints.py b/tesp_api/api/endpoints/task_endpoints.py index 1f2655f..73bf39f 100644 --- a/tesp_api/api/endpoints/task_endpoints.py +++ b/tesp_api/api/endpoints/task_endpoints.py @@ -5,7 +5,7 @@ from pymonad.promise import Promise from fastapi.params import Depends from fastapi import APIRouter, Body -from fastapi.responses import Response +from fastapi.responses import Response, JSONResponse from tesp_api.api.error import api_handle_error from tesp_api.service.event_dispatcher import dispatch_event @@ -111,7 +111,10 @@ async def cancel_task( maybe_of(token_subject), ObjectId(id) ))).then(lambda get_tasks_args: task_repository.cancel_task(*get_tasks_args))\ - .map(lambda task_id: Response(status_code=200, media_type="application/json"))\ + .map(lambda task_id_maybe: + # Use JSONResponse to ensure the body is "{}" + JSONResponse(content={}, status_code=200) + )\ .catch(api_handle_error) @@ -119,7 +122,7 @@ async def cancel_task( responses={200: {"description": "Ok"}}, description=descriptions["service-info"], response_model=TesServiceInfo) -async def get_service_info() -> TesServiceInfo: +async def get_service_info() -> TesServiceInfo: # FastAPI directly handles Pydantic model return return TesServiceInfo( id="fi.muni.cz.tesp", name="TESP", diff --git a/tesp_api/service/event_actions.py b/tesp_api/service/event_actions.py index 7c235c5..e436e49 100644 --- a/tesp_api/service/event_actions.py +++ b/tesp_api/service/event_actions.py @@ -2,9 +2,12 @@ import datetime from typing import List from pathlib import Path -from pymonad.maybe import Just +import asyncio + +from pymonad.maybe import Just, Nothing from bson.objectid import ObjectId -from pymonad.promise import Promise +from pymonad.promise import Promise, _Promise + from tesp_api.utils.container import stage_in_command, run_command, stage_out_command, map_volumes from tesp_api.service.pulsar_service import pulsar_service from tesp_api.service.event_dispatcher import dispatch_event @@ -26,41 +29,47 @@ CONTAINER_TYPE = os.getenv("CONTAINER_TYPE", "docker") - @local_handler.register(event_name="queued_task") def handle_queued_task(event: Event) -> None: + """ + Dispatches the task to a REST or AMQP specific handler based on Pulsar operations type. + """ event_name, payload = event - print("Queued task:") - print(payload) + print(f"Queued task: {payload.get('task_id')}") match pulsar_service.get_operations(): case PulsarRestOperations() as pulsar_rest_operations: dispatch_event('queued_task_rest', {**payload, 'pulsar_operations': pulsar_rest_operations}) case PulsarAmpqOperations() as pulsar_ampq_operations: dispatch_event('queued_task_ampq', {**payload, 'pulsar_operations': pulsar_ampq_operations}) - @local_handler.register(event_name="queued_task_rest") async def handle_queued_task_rest(event: Event): + """ + Sets up the job in Pulsar via REST operations and dispatches an 'initialize_task' event. + """ event_name, payload = event task_id: ObjectId = payload['task_id'] pulsar_operations: PulsarRestOperations = payload['pulsar_operations'] - print("Queued task rest:") - print(payload) + print(f"Queued task rest: {task_id}") await Promise(lambda resolve, reject: resolve(None))\ .then(lambda nothing: pulsar_operations.setup_job(task_id))\ .map(lambda setup_job_result: dispatch_event('initialize_task', {**payload, 'task_config': setup_job_result}))\ - .catch(lambda error: pulsar_event_handle_error(error, payload['task_id'], event_name, pulsar_operations))\ - .then(lambda x: x) # invokes promise returned by error handler, otherwise acts as identity function - + .catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations))\ + .then(lambda x: x) # Invokes promise, potentially from error handler @local_handler.register(event_name="initialize_task") async def handle_initializing_task(event: Event) -> None: + """ + Updates task state to INITIALIZING, prepares data/volumes/inputs/outputs, + and dispatches a 'run_task' event. + """ event_name, payload = event task_id: ObjectId = payload['task_id'] pulsar_operations: PulsarRestOperations = payload['pulsar_operations'] + # Merged Logic: Using the feature-complete setup_data from the new version async def setup_data(job_id: ObjectId, resources: TesTaskResources, volumes: List[str], @@ -76,12 +85,8 @@ async def setup_data(job_id: ObjectId, 'ram_gb': resources.ram_gb if resources else None }) - print("Volumes:") - print(volumes) output_confs, volume_confs = map_volumes(str(job_id), volumes, outputs) - print(inputs) - for i, input_item in enumerate(inputs): if input_item.type == TesTaskIOType.DIRECTORY: pulsar_path = payload['task_config']['inputs_directory'] + f'/input_dir_{i}' @@ -103,6 +108,7 @@ async def setup_data(job_id: ObjectId, return resource_conf, volume_confs, input_confs, output_confs + print(f"Initializing task: {task_id}") await Promise(lambda resolve, reject: resolve(None))\ .then(lambda nothing: task_repository.update_task_state( task_id, @@ -112,7 +118,7 @@ async def setup_data(job_id: ObjectId, updated_task, TaskNotFoundError(task_id, Just(TesTaskState.QUEUED)) )).then(lambda updated_task: setup_data( task_id, - maybe_of(updated_task.resources).maybe([], lambda x: x), + maybe_of(updated_task.resources).maybe(None, lambda x: x), maybe_of(updated_task.volumes).maybe([], lambda x: x), maybe_of(updated_task.inputs).maybe([], lambda x: x), maybe_of(updated_task.outputs).maybe([], lambda x: x) @@ -125,26 +131,41 @@ async def setup_data(job_id: ObjectId, })).catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations))\ .then(lambda x: x) - @local_handler.register(event_name="run_task") async def handle_run_task(event: Event) -> None: + """ + Updates task state to RUNNING, prepares and executes job commands in Pulsar, + waits for completion, logs results, and updates task state accordingly (COMPLETE, EXECUTOR_ERROR). + Handles cancellations and other exceptions during its lifecycle. + """ event_name, payload = event task_id: ObjectId = payload['task_id'] author: str = payload['author'] resource_conf: dict = payload['resource_conf'] - volume_confs: List[Path] = payload['volume_confs'] + volume_confs: List[dict] = payload['volume_confs'] input_confs: List[dict] = payload['input_confs'] output_confs: List[dict] = payload['output_confs'] pulsar_operations: PulsarRestOperations = payload['pulsar_operations'] + + run_command_str = None + command_start_time = datetime.datetime.now(datetime.timezone.utc) - # init task - task_monad = await task_repository.update_task_state( - task_id, - TesTaskState.INITIALIZING, - TesTaskState.RUNNING - ) try: - task = get_else_throw(task_monad, TaskNotFoundError(task_id, Just(TesTaskState.INITIALIZING))) + print(f"Running task: {task_id}") + # Set task state to RUNNING + task_monad_init = await task_repository.update_task_state( + task_id, + TesTaskState.INITIALIZING, + TesTaskState.RUNNING + ) + task = get_else_throw(task_monad_init, TaskNotFoundError(task_id, Just(TesTaskState.INITIALIZING))) + + # Early check for cancellation + current_task_after_init_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id}) + current_task_after_init = get_else_throw(current_task_after_init_monad, TaskNotFoundError(task_id)) + if current_task_after_init.state == TesTaskState.CANCELED: + print(f"Task {task_id} found CANCELED shortly after RUNNING state update. Aborting handler.") + return await update_last_task_log_time( task_id, @@ -153,100 +174,110 @@ async def handle_run_task(event: Event) -> None: start_time=Just(datetime.datetime.now(datetime.timezone.utc)) ) - container_cmds = list() - stage_exec = TesTaskExecutor(image="willdockerhub/curl-wget:latest", - command=[], - workdir=Path("/downloads")) - # stage-in + stage_exec = TesTaskExecutor(image="willdockerhub/curl-wget:latest", command=[], workdir=Path("/downloads")) + + # Stage-in command stage_in_cmd = "" + stage_in_mount = "" if input_confs: stage_in_mount = payload['task_config']['inputs_directory'] - stage_in_cmd = stage_in_command( - stage_exec, - resource_conf, - stage_in_mount, - input_confs, - CONTAINER_TYPE - ) + stage_in_cmd = stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs, CONTAINER_TYPE) - # main execution + # Main execution commands container_cmds = [] for i, executor in enumerate(task.executors): run_cmd = run_command( - executor=executor, - job_id=task_id, - resource_conf=resource_conf, - volume_confs=volume_confs, - input_confs=input_confs, - output_confs=output_confs, - inputs_directory=stage_in_mount if input_confs else "", - container_type=CONTAINER_TYPE, - job_directory=payload['task_config']['job_directory'] if CONTAINER_TYPE == "singularity" else None, + executor=executor, job_id=str(task_id), resource_conf=resource_conf, + volume_confs=volume_confs, input_confs=input_confs, output_confs=output_confs, + inputs_directory=stage_in_mount, container_type=CONTAINER_TYPE, + job_directory=payload['task_config'].get('job_directory') if CONTAINER_TYPE == "singularity" else None, executor_index=i ) container_cmds.append(run_cmd) - # stage-out + # Stage-out command stage_out_cmd = "" if output_confs: stage_out_cmd = stage_out_command( - stage_exec, - resource_conf, - output_confs, - volume_confs, + stage_exec, resource_conf, output_confs, volume_confs, container_type=CONTAINER_TYPE, - bind_mount=payload['task_config']['inputs_directory'] if CONTAINER_TYPE == "singularity" else None, - job_directory=payload['task_config']['job_directory'] if CONTAINER_TYPE == "singularity" else None + bind_mount=payload['task_config'].get('inputs_directory') if CONTAINER_TYPE == "singularity" else None, + job_directory=payload['task_config'].get('job_directory') if CONTAINER_TYPE == "singularity" else None ) - # Combine commands - run_commands = " && ".join(container_cmds) - parts = ["set -xe", stage_in_cmd, run_commands, stage_out_cmd] + # Combine all commands into a single string for Pulsar + executors_commands_joined_str = " && ".join(filter(None, container_cmds)) + parts = ["set -xe", stage_in_cmd, executors_commands_joined_str, stage_out_cmd] non_empty_parts = [p.strip() for p in parts if p and p.strip()] - full_command = " && ".join(non_empty_parts) - print(full_command) + run_command_str = " && ".join(non_empty_parts) if non_empty_parts else None + # Resume with the polished version's logic for execution and state management command_start_time = datetime.datetime.now(datetime.timezone.utc) + command_status: dict - # start the task (docker container/s) in the pulsar - await pulsar_operations.run_job(task_id, full_command) - - # wait for the task - command_status = await pulsar_operations.job_status_complete(str(task_id)) + if run_command_str is None: + print(f"Task {task_id} has no commands to run. Treating as successful no-op.") + command_status = {'stdout': '', 'stderr': 'No commands to run.', 'returncode': 0} + else: + print(f"Submitting job to Pulsar for task {task_id}: {run_command_str}") + await pulsar_operations.run_job(task_id, run_command_str) + command_status = await pulsar_operations.job_status_complete(str(task_id)) command_end_time = datetime.datetime.now(datetime.timezone.utc) await append_task_executor_logs( - task_id, - author, - TesTaskState.RUNNING, - command_start_time, - command_end_time, - command_status['stdout'], - command_status['stderr'], - command_status['returncode'] + task_id, author, TesTaskState.RUNNING, + command_start_time, command_end_time, + command_status.get('stdout', ''), command_status.get('stderr', ''), + command_status.get('returncode', -1) ) - if command_status['returncode'] != 0: - task = await task_repository.update_task_state( - task_id, - TesTaskState.RUNNING, - TesTaskState.EXECUTOR_ERROR - ) - raise TaskExecutorError() + current_task_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id}) + current_task_obj = get_else_throw(current_task_monad, TaskNotFoundError(task_id)) - except Exception as error: - pulsar_event_handle_error(error, task_id, event_name, pulsar_operations) + if current_task_obj.state == TesTaskState.CANCELED: + print(f"Task {task_id} found CANCELED after job completion polling. Aborting state changes.") + return - # dispatch_event('finalize_task', payload) - await Promise(lambda resolve, reject: resolve(None)) \ - .then(lambda ignored: task_repository.update_task_state( - task_id, - TesTaskState.RUNNING, - TesTaskState.COMPLETE - )) \ - .map(lambda task: get_else_throw( - task, TaskNotFoundError(task_id, Just(TesTaskState.RUNNING)) + if command_status.get('returncode', -1) != 0: + print(f"Task {task_id} executor error (return code: {command_status.get('returncode', -1)}). Setting state to EXECUTOR_ERROR.") + await task_repository.update_task_state(task_id, TesTaskState.RUNNING, TesTaskState.EXECUTOR_ERROR) + await pulsar_operations.erase_job(task_id) + return + + print(f"Task {task_id} completed successfully. Setting state to COMPLETE.") + await Promise(lambda resolve, reject: resolve(None)) \ + .then(lambda ignored: task_repository.update_task_state( + task_id, TesTaskState.RUNNING, TesTaskState.COMPLETE )) \ - .then(lambda ignored: pulsar_operations.erase_job(task_id)) \ - .catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)) \ - .then(lambda x: x) # invokes promise returned by error handler, otherwise acts as identity function + .map(lambda task_after_complete_update: get_else_throw( + task_after_complete_update, TaskNotFoundError(task_id, Just(TesTaskState.RUNNING)) + )) \ + .then(lambda ignored: pulsar_operations.erase_job(task_id)) \ + .catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)) \ + .then(lambda x: x) + + except asyncio.CancelledError: + print(f"handle_run_task for task {task_id} was explicitly cancelled (asyncio.CancelledError).") + await task_repository.update_task_state(task_id, None, TesTaskState.CANCELED) + await pulsar_operations.kill_job(task_id) + await pulsar_operations.erase_job(task_id) + print(f"Task {task_id} Pulsar job cleanup attempted after asyncio cancellation.") + + except Exception as error: + print(f"Exception in handle_run_task for task {task_id}: {type(error).__name__} - {error}") + + task_state_after_error_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id}) + if task_state_after_error_monad.is_just() and task_state_after_error_monad.value.state == TesTaskState.CANCELED: + print(f"Task {task_id} is already CANCELED. Exception '{type(error).__name__}' likely due to this. No further error processing by handler.") + return + + print(f"Task {task_id} not CANCELED; proceeding with pulsar_event_handle_error for '{type(error).__name__}'.") + error_handler_result = pulsar_event_handle_error(error, task_id, event_name, pulsar_operations) + if asyncio.iscoroutine(error_handler_result) or isinstance(error_handler_result, _Promise): + await error_handler_result + + try: + print(f"Ensuring Pulsar job for task {task_id} is erased after general error handling in run_task.") + await pulsar_operations.erase_job(task_id) + except Exception as final_erase_error: + print(f"Error during final Pulsar erase attempt for task {task_id} after general error: {final_erase_error}") diff --git a/tests/smoke_tests.py b/tests/smoke_tests.py index 4177273..2d2c907 100644 --- a/tests/smoke_tests.py +++ b/tests/smoke_tests.py @@ -4,6 +4,10 @@ import json import os +import pytest +import shutil +import subprocess + import sys sys.path.append('/app') @@ -156,6 +160,10 @@ def test_get_task_list(): def test_inputs(): assert _test_simple("inputs.json", 120) +# Test directory as an input. +def test_dir_input(): + assert _test_simple("dir-io.json", 120) + #def test_outputs(): # Tests S3 and FTP upload and download. #jsons = ["outputs-prepare", "outputs-prepare-check", "outputs-test", "outputs-test-check"] @@ -170,13 +178,13 @@ def test_envs(): assert _test_simple("envs.json", 100) expected_files = { - f"{script_directory}/test_data/env_test_1": "first upload successful\n", - f"{script_directory}/test_data/env_test_2": "second upload successful\n" + f"{script_directory}/uploaded_data/env_test_1": "first upload successful\n", + f"{script_directory}/uploaded_data/env_test_2": "second upload successful\n" } # Verify that each file was created with the expected content for path, expected_content in expected_files.items(): - # Open the file and read its content (if accessible in the current environment) + # Open the file and read its content with open(path, 'r') as f: content = f.read() assert content == expected_content, f"File {path} does not contain the expected content." @@ -207,20 +215,42 @@ def test_task_cancel(): # Checks whether given task exceeds available resources. def test_resource_check_with_limits(): - # Load the JSON file json_data = _open_json('resource_check.json') - # Extract CPU cores and RAM values cpu_cores = _gnv(json_data, "resources.cpu_cores") ram_gb = _gnv(json_data, "resources.ram_gb") - # Define the maximum limits max_cpu_cores = 4 max_ram_gb = 3.8 - # Perform the checks with limits assert cpu_cores <= max_cpu_cores, f"CPU cores exceed the limit: {cpu_cores} > {max_cpu_cores}" assert ram_gb <= max_ram_gb, f"RAM exceeds the limit: {ram_gb} > {max_ram_gb}" print(f"Requested resources: CPU cores = {cpu_cores}, RAM = {ram_gb} GB (Limits: {max_cpu_cores} cores, {max_ram_gb} GB RAM)") +# Remove uploaded_data directory +@pytest.fixture(scope="session", autouse=True) +def cleanup_uploaded_data(): + yield # Wait until all tests are done + + uploaded_path = os.path.join(os.path.dirname(__file__), "uploaded_data") + if os.path.isdir(uploaded_path): + try: + shutil.rmtree(uploaded_path) + print(f"Removed uploaded data: {uploaded_path}") + except Exception as e: + print(f"Failed to remove {uploaded_path}: {e}") + + # container clean up + try: + output = subprocess.check_output([ + "docker", "ps", "-q", "--filter", "ancestor=ubuntu", "--filter", "status=running" + ]).decode().strip().splitlines() + + for container_id in output: + subprocess.run(["docker", "rm", "-f", container_id]) + print(f"Removed container: {container_id}") + except Exception as e: + print(f"Failed to remove sleep containers: {e}") + + diff --git a/tests/test_data/input_dir/dir1/dir_test1.txt b/tests/test_data/input_dir/dir1/dir_test1.txt new file mode 100644 index 0000000..a805158 --- /dev/null +++ b/tests/test_data/input_dir/dir1/dir_test1.txt @@ -0,0 +1 @@ +This is a directory input test n1. diff --git a/tests/test_data/input_dir/dir2/dir_test2.txt b/tests/test_data/input_dir/dir2/dir_test2.txt new file mode 100644 index 0000000..158fffb --- /dev/null +++ b/tests/test_data/input_dir/dir2/dir_test2.txt @@ -0,0 +1 @@ +This is a directory input test n2. diff --git a/tests/test_jsons/dir-io.json b/tests/test_jsons/dir-io.json new file mode 100644 index 0000000..ba8b029 --- /dev/null +++ b/tests/test_jsons/dir-io.json @@ -0,0 +1,44 @@ +{ + "name": "test-local-file-dir-io", + "description": "Test local file and directory input/output with curl and wget", + "inputs": [ + { + "name": "simple_input", + "type": "FILE", + "url": "http://172.17.0.1:5000/test_data/input_file.txt", + "path": "/data/input_file.txt" + }, + { + "name": "input_directory", + "type": "DIRECTORY", + "url": "http://172.17.0.1:5000/test_data/input_dir/", + "path": "/data/input_dir" + } + ], + "outputs": [ + { + "name": "simple_output", + "type": "FILE", + "url": "http://172.17.0.1:5000/upload", + "path": "/data/output_file.txt" + }, + { + "name": "output_directory", + "type": "DIRECTORY", + "url": "http://172.17.0.1:5000/upload", + "path": "/data/results" + } + ], + "executors": [ + { + "image": "alpine", + "workdir": "/data", + "command": [ + "sh", + "-c", + "mkdir -p /data/results && cp /data/input_file.txt /data/output_file.txt && cp -r /data/input_dir/* /data/results/" + ] + } + ] +} + diff --git a/tests/test_jsons/envs.json b/tests/test_jsons/envs.json index f1e64a3..93b8ed2 100644 --- a/tests/test_jsons/envs.json +++ b/tests/test_jsons/envs.json @@ -1,12 +1,12 @@ { "outputs": [ { - "path": "/tesp-api/tests/test_data/env_test_1", + "path": "/tesp-api/tests/uploaded_data/env_test_1", "url": "http://172.17.0.1:5000/upload", "type": "FILE" }, { - "path": "/tesp-api/tests/test_data/env_test_2", + "path": "/tesp-api/tests/uploaded_data/env_test_2", "url": "http://172.17.0.1:5000/upload", "type": "FILE" } @@ -17,7 +17,7 @@ "command": [ "sh", "-c", - "echo \"$TEST_FILE\" > /tesp-api/tests/test_data/env_test_1 && echo \"$TEST_FILE_2\" > /tesp-api/tests/test_data/env_test_2" + "echo \"$TEST_FILE\" > /tesp-api/tests/uploaded_data/env_test_1 && echo \"$TEST_FILE_2\" > /tesp-api/tests/uploaded_data/env_test_2" ], "env": { "TEST_FILE": "first upload successful", diff --git a/tests/upload_server.py b/tests/upload_server.py new file mode 100644 index 0000000..e14d4b9 --- /dev/null +++ b/tests/upload_server.py @@ -0,0 +1,59 @@ +import os +import sys +from flask import Flask, request, jsonify, send_from_directory, abort +from werkzeug.utils import secure_filename + +HOME_DIR = sys.path[0] # resolves to /home/debian +UPLOAD_ROOT = os.path.join(HOME_DIR, "uploaded_data") +SERVE_ROOT = HOME_DIR + +app = Flask(__name__) + +# Handle file upload +@app.route('/upload', methods=['POST']) +def upload(): + if 'file' not in request.files: + return jsonify({"error": "No file part"}), 400 + + uploaded_file = request.files['file'] + raw_filename = uploaded_file.filename # may include subdir e.g. "subdir/file.txt" + + # Secure each path part + secure_parts = [secure_filename(part) for part in raw_filename.split('/') if part] + safe_path = os.path.join(UPLOAD_ROOT, *secure_parts) + + # Ensure parent dir exists + os.makedirs(os.path.dirname(safe_path), exist_ok=True) + uploaded_file.save(safe_path) + + return jsonify({"status": "ok", "saved_as": safe_path}) + +@app.route('/', methods=['GET']) +@app.route('//', methods=['GET']) +def browse(subpath): + full_path = os.path.join(SERVE_ROOT, subpath) + + if os.path.isdir(full_path): + # Directory listing + items = os.listdir(full_path) + links = [] + for item in items: + item_path = os.path.join(subpath, item) + if os.path.isdir(os.path.join(full_path, item)): + item_path += '/' + links.append(f"{item}
") + return "" + "\n".join(links) + "" + + elif os.path.isfile(full_path): + # Serve file + return send_from_directory(os.path.dirname(full_path), os.path.basename(full_path)) + + else: + abort(404, description=f"{subpath} not found") + +if __name__ == '__main__': + os.makedirs(UPLOAD_ROOT, exist_ok=True) + print(f"Uploading to: {UPLOAD_ROOT}") + print(f"Serving from: {SERVE_ROOT}") + app.run(host='0.0.0.0', port=5000, debug=True) +