diff --git a/Dockerfile b/Dockerfile index cb9b69a..9ba280e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -36,11 +36,10 @@ RUN echo "Server" > /app/README.md \ && chmod +x /app/entrypoint.sh # RUN conda update -n base -c conda-forge conda \ - && conda env create -f /app/environment.yml -y \ - && conda run -n server poetry self update \ - && echo "conda activate server" >> /.bashrc \ - && conda run -n server pip install --upgrade pip - # && conda run -n server pip install --no-cache-dir git+https://github.com/vivarium-collective/process-bigraph.git@main + && conda env create -f /app/env.yml -y \ + && conda update -n compose-server numpy -y \ + && pip install -e . + ENTRYPOINT ["/bin/bash"] diff --git a/README.md b/README.md index 7a2690e..07e4c3c 100644 --- a/README.md +++ b/README.md @@ -2,3 +2,22 @@ ![Worker Deployment](https://github.com/biosimulators/compose-server/actions/workflows/deploy-worker.yml/badge.svg) # BioCompose Server + +## *FOR DEVS*: + +### _Generate gRPC service stubs:_ +```bash +python shared/parse_proto.py +``` + +### _Generate OpenAPI spec yml:_ +```bash +python client/openapi_spec.py +``` + +### _Apply overlay changes_: +```bash +cd kustomize \ + && kubectl kustomize overlays/compose | kubectl apply -f - \ + && cd .. +``` \ No newline at end of file diff --git a/app/DataService.js b/app/DataService.js new file mode 100644 index 0000000..a4860ff --- /dev/null +++ b/app/DataService.js @@ -0,0 +1,55 @@ +class VivariumService { + constructor(root= 'http://127.0.0.1:3001') { + this.root = root; + this.testId = 'test'; + } + + formatUrl(root, duration, id) { + return `${root}/run-vivarium?duration=${duration}&vivarium_id=${id}`; + } + + async fetchStream(url) { + const response = await fetch(url); + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + + let jsonData = ""; + let isFirstChunk = true; + + while (true) { + const {done, value} = await reader.read(); + if (done) { + console.log("Stream finished."); + break; + } + + jsonData += decoder.decode(value); + + try { + if (isFirstChunk) { + jsonData = jsonData.replace('{"updates": [', "["); // ✅ Handle initial chunk + isFirstChunk = false; + } + + if (jsonData.endsWith("]}")) { + const parsedData = JSON.parse(jsonData); + console.log("Final JSON:", parsedData); + break; + } else { + console.log("Streaming JSON chunk received:", jsonData); + } + } catch (e) { + console.log("Waiting for more JSON data..."); + } + } + } + + submitRequest(duration, id) { + const url = this.formatUrl(this.root, duration, id); + return this.fetchStream(url); + } + + sendTestRequest(duration) { + return this.submitRequest(duration, this.testId); + } +} \ No newline at end of file diff --git a/assets/deployment/compose.envoy.yml b/assets/deployment/compose.envoy.yml new file mode 100644 index 0000000..7c3a179 --- /dev/null +++ b/assets/deployment/compose.envoy.yml @@ -0,0 +1,12 @@ +version: "3" +services: + envoy: + image: envoyproxy/envoy:v1.21.0 + ports: + - "9901:9901" + volumes: + - ./envoy.yaml:/etc/envoy/envoy.yaml + grpc_server: + image: my_grpc_service + ports: + - "50051:50051" diff --git a/assets/deployment/envoy.yml b/assets/deployment/envoy.yml new file mode 100644 index 0000000..3ce552d --- /dev/null +++ b/assets/deployment/envoy.yml @@ -0,0 +1,60 @@ +static_resources: + listeners: + - name: grpc_listener + address: + socket_address: + address: 0.0.0.0 + port_value: 9901 + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + codec_type: AUTO + stat_prefix: grpc_proxy + route_config: + name: local_route + virtual_hosts: + - name: grpc_services + domains: ["*"] + routes: + - match: { prefix: "/" } + route: + cluster: grpc_service + timeout: 0s + http_filters: + - name: envoy.filters.http.grpc_json_transcoder + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.grpc_json_transcoder.v3.GrpcJsonTranscoder + proto_descriptor: "/path/to/proto.pb" + services: + - my.grpc.Service + print_options: + add_whitespace: true + always_print_primitive_fields: true + always_print_enums_as_ints: false + - name: envoy.filters.http.router + + + + clusters: + - name: grpc_service + type: STRICT_DNS + connect_timeout: 5s + lb_policy: ROUND_ROBIN + http2_protocol_options: { } + load_assignment: + cluster_name: grpc_service + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: grpc_server1 + port_value: 50051 + - endpoint: + address: + socket_address: + address: grpc_server2 + port_value: 50052 + diff --git a/assets/docker/Dockerfile-base b/assets/docker/Dockerfile-base new file mode 100644 index 0000000..e7a3d45 --- /dev/null +++ b/assets/docker/Dockerfile-base @@ -0,0 +1,87 @@ +# FROM condaforge/miniforge3:24.9.2-0 +FROM condaforge/miniforge3:latest + +LABEL org.opencontainers.image.title="bio-compose-server-worker" \ + org.opencontainers.image.description="Base Docker image for BioCompose REST API management, job processing, and datastorage with MongoDB, ensuring scalable and robust performance." \ + org.opencontainers.image.url="https://compose.biosimulators.org/" \ + org.opencontainers.image.source="https://github.com/biosimulators/bio-compose-server" \ + org.opencontainers.image.authors="Alexander Patrie , BioSimulators Team " \ + org.opencontainers.image.vendor="BioSimulators Team" + +ENV DEBIAN_FRONTEND=noninteractive + +ARG INSTALL_WORKER + +# RUN apt-get update \ +# && apt-get install -y \ +# meson \ +# g++ \ +# gfortran \ +# libblas-dev \ +# liblapack-dev \ +# libgfortran5 \ +# libhdf5-dev \ +# libhdf5-serial-dev \ +# libatlas-base-dev \ +# cmake \ +# make \ +# git \ +# build-essential \ +# python3-dev \ +# swig \ +# libc6-dev \ +# libx11-dev \ +# libc6 \ +# libgl1-mesa-dev \ +# pkg-config \ +# curl \ +# tar \ +# libgl1-mesa-glx \ +# libice6 \ +# libsm6 \ +# gnupg \ +# libstdc++6 \ +# && apt-get clean + +# copy assets +COPY assets/docker/config/.biosimulations.json /.google/.bio-compose.json +COPY assets/docker/config/.pys_usercfg.ini /Pysces/.pys_usercfg.ini +COPY assets/docker/config/.pys_usercfg.ini /root/Pysces/.pys_usercfg.ini +COPY tests/test_fixtures /test_fixtures + +WORKDIR /app + +# copy env configs +COPY ./environment.yml /app/environment.yml +COPY ./pyproject.toml /app/pyproject.toml +COPY ./gateway /app/gateway +COPY ./shared /app/shared +COPY ./worker /app/worker +COPY ./common /app/common + +RUN echo "Server" > /app/README.md \ + && mkdir /app/config + +# create base env +RUN conda install cmake ninja pybind11 scikit-build-core -y +RUN conda run pip install -e . \ + && conda env export --from-history > /app/env-export.yml + +# && conda install --file=/app/environment.yml \ +# RUN mamba install conda-forge::readdy -y +# RUN mamba create -n server python=3.11 cmake ninja pybind11 scikit-build-core -y +# RUN mamba run -n server pip install -e . \ +# && mamba env export -n server --from-history > /app/env-export.yml + +# handle worker(simulators) +# RUN if [ "$INSTALL_WORKER" == "1" ]; then \ +# mamba install conda-forge::pymem3dg -y conda-forge::readdy \ +# fi + +# && conda install -n bsp conda-forge::readdy -y + + +# build alone with: +# docker build -f ./assets/docker/Dockerfile-base -t BASE . + + diff --git a/client/.VERSION b/client/.VERSION new file mode 100644 index 0000000..8acdd82 --- /dev/null +++ b/client/.VERSION @@ -0,0 +1 @@ +0.0.1 diff --git a/client/Dockerfile-client b/client/Dockerfile-client new file mode 100644 index 0000000..2782a5c --- /dev/null +++ b/client/Dockerfile-client @@ -0,0 +1,41 @@ +FROM condaforge/miniforge3:24.9.2-0 + +LABEL org.opencontainers.image.title="bio-compose-server-gateway" \ + org.opencontainers.image.description="Base Docker image for BioCompose REST API management, job processing, and datastorage with MongoDB, ensuring scalable and robust performance." \ + org.opencontainers.image.url="https://compose.biosimulators.org/" \ + org.opencontainers.image.source="https://github.com/biosimulators/bio-compose-server" \ + org.opencontainers.image.authors="Alexander Patrie , BioSimulators Team " \ + org.opencontainers.image.vendor="BioSimulators Team" + +ENV DEBIAN_FRONTEND=noninteractive + +# copy assets +COPY assets/docker/config/.biosimulations.json /.google/.bio-compose.json +COPY assets/docker/config/.pys_usercfg.ini /Pysces/.pys_usercfg.ini +COPY assets/docker/config/.pys_usercfg.ini /root/Pysces/.pys_usercfg.ini +COPY tests/test_fixtures /test_fixtures + +WORKDIR /app + +# copy env configs +COPY ./environment.yml /app/environment.yml +COPY ./pyproject.toml /app/pyproject.toml +COPY ./gateway /app/gateway +COPY ./shared /app/shared +COPY ./worker /app/worker +COPY ./common /app/common + +RUN echo "Server" > /app/README.md \ + && mkdir /app/config +# +RUN conda update -n base -c conda-forge conda \ + && conda env create -f /app/environment.yml -y \ + && conda run -n compose-server pip install -e . \ + && conda run -n compose-server python /app/gateway/openapi_spec.py + +# CMD ["conda", "run", "-n", "compose-server", "uvicorn", "gateway.main:app", "--host", "0.0.0.0", "--port", "3001", "--reload"] + +# EXPOSE 3001 + +CMD ["conda", "run", "-n", "compose-server", "uvicorn", "gateway.main:app", "--host", "0.0.0.0", "--port", "3001", "--reload"] + diff --git a/client/__init__.py b/client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/client/_main.py b/client/_main.py new file mode 100644 index 0000000..36df946 --- /dev/null +++ b/client/_main.py @@ -0,0 +1,1042 @@ +""" +Server gateway implementation. + +Author: Alexander Patrie <@AlexPatrie> +""" + +import json +import os +import uuid +import sys +from tempfile import mkdtemp +from typing import * +import asyncio +from functools import partial +# import websockets + + +import dotenv +import grpc +import uvicorn +from fastapi import FastAPI, File, UploadFile, HTTPException, Query, APIRouter, Body, WebSocket +from process_bigraph import Process, pp, Composite +from starlette.middleware.cors import CORSMiddleware +from starlette.responses import FileResponse +from pydantic import BeforeValidator +from google.protobuf.struct_pb2 import Struct +from google.protobuf.any_pb2 import Any + + +from client.submit_runs import submit_utc_run, submit_pymem3dg_run +from common.proto import simulation_pb2_grpc, simulation_pb2 +from shared.connect import MongoConnector +from yaml import compose + +from shared.io import write_uploaded_file, download_file_from_bucket, write_local_file +from shared.log_config import setup_logging +from shared.utils import get_project_version, new_job_id, handle_exception, serialize_numpy, clean_temp_files +from shared.environment import ( + ENV_PATH, + DEFAULT_DB_NAME, + DEFAULT_JOB_COLLECTION_NAME, + DEFAULT_BUCKET_NAME, LOCAL_GRPC_MAPPING +) +from shared.data_model import ( + BigraphRegistryAddresses, + CompositionNode, + CompositionSpec, + CompositionRun, + OutputData, + ValidatedComposition, + SmoldynRun, + SimulariumAgentParameters, + ReaddySpeciesConfig, + ReaddyReactionConfig, + ReaddyParticleConfig, + ReaddyRun, + AmiciRun, + CobraRun, + CopasiRun, + TelluriumRun, + IncompleteFileJob, + APP_SERVERS, + HealthCheckResponse, + ProcessMetadata, + Mem3dgRun, + BigraphSchemaType, + StateData, + FileUpload +) + +from client.health import check_client +from shared.vivarium import CORE, check_composition, convert_process + +logger = setup_logging(__file__) + +# NOTE: create an env config at this filepath if dev +dotenv.load_dotenv(ENV_PATH) + +STANDALONE_GATEWAY = bool(os.getenv("STANDALONE_GATEWAY")) +MONGO_URI = os.getenv("MONGO_URI") if not STANDALONE_GATEWAY else os.getenv("STANDALONE_MONGO_URI") +GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") + + +# -- app constraints and components -- # + +APP_VERSION = get_project_version() +APP_TITLE = "compose-server" +APP_ORIGINS = [ + 'http://127.0.0.1:8000', + 'http://127.0.0.1:4200', + 'http://127.0.0.1:4201', + 'http://127.0.0.1:4202', + 'http://localhost:4200', + 'http://localhost:4201', + 'http://localhost:4202', + 'http://localhost:8000', + 'http://localhost:3001', + 'https://biosimulators.org', + 'https://www.biosimulators.org', + 'https://biosimulators.dev', + 'https://www.biosimulators.dev', + 'https://run.biosimulations.dev', + 'https://run.biosimulations.org', + 'https://biosimulations.dev', + 'https://biosimulations.org', + 'https://bio.libretexts.org', + 'https://compose.biosimulations.org' +] + +db_conn_gateway = MongoConnector(connection_uri=MONGO_URI, database_id=DEFAULT_DB_NAME) +router = APIRouter() +app = FastAPI(title=APP_TITLE, version=APP_VERSION, servers=APP_SERVERS) +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"] +) +app.mongo_client = db_conn_gateway.client + +PyObjectId = Annotated[str, BeforeValidator(str)] +clients: List[WebSocket] = [] + +# SERVER_WS_URL = "ws://localhost:8001/ws" +# BASE_WS_URL = "ws://{function_name}:8000/ws" + +# manager = SocketConnectionManager() + +# @app.websocket("/ws") +# async def websocket_endpoint(websocket: WebSocket): +# """Handles incoming WebSocket connections from the server.""" +# await manager.connect(websocket) +# try: +# while True: +# data = await websocket.receive_text() +# print(f"Received from server: {data}") +# except WebSocketDisconnect: +# manager.disconnect(websocket) +# +# +# @app.post("/send-deltas/") +# async def send_deltas(request: StateData): +# """Receives HTTP data and forwards it to the WebSocket server. +# Used for twin representation. +# """ +# try: +# async with websockets.connect(SERVER_WS_URL) as websocket: +# packet: str = json.dumps( +# request.serialize() +# ) +# await websocket.send(packet) +# +# response: str | bytes = await websocket.recv() +# return json.loads(response) +# except Exception as e: +# raise HTTPException(status_code=500, detail=f"WebSocket error: {str(e)}") + + +# -- Composition: submit composition jobs -- + + +def submit_simulation( + job_id: str, + last_updated: str, + simulators: list[str], + duration: int, + spec: dict +) -> list[dict]: + """Sends a single request-response message to the gRPC server.""" + with grpc.insecure_channel(LOCAL_GRPC_MAPPING) as channel: + state = spec.get("state").copy() + print(f'Got spec: {state.keys()}') + del state['global_time'] + + stub = simulation_pb2_grpc.SimulationServiceStub(channel) + try: + # convert `spec` dictionary into a `map` + grpc_spec = {} + for key, value in state.items(): + # TODO: handle this more comprehensively + if isinstance(value, dict) and "address" in value.keys(): + grpc_spec[key] = convert_process(value) + + request = simulation_pb2.SimulationRequest( + job_id=job_id, + last_updated=last_updated, + simulators=simulators, + duration=duration, + spec=grpc_spec + ) + + response_iterator = stub.StreamSimulation(request) + + results = [] + for update in response_iterator: + result = { + "job_id": update.job_id, + "last_updated": update.last_updated, + "results": update.results + } + results.append(result) + print(f'Got result: {result}') + return results + except grpc.RpcError as e: + raise HTTPException(status_code=500, detail=f"gRPC error: {e}") + + + +@app.post( + "/get-process-metadata", + operation_id="get-process-metadata", + response_model=ProcessMetadata, + tags=["Composition"], + summary="Get the input and output ports, as well as state data from a given process and corresponding parameter configuration." +) +async def get_process_metadata( + process_id: str = Query(..., title="Process ID", example="simple-membrane-process"), + config: UploadFile = File(..., title="Process Config"), + model_files: List[UploadFile] = File(..., title="Files"), + return_composite_state: bool = Query(default=True, title="Return Composite State"), + # local_registry: bool = Query(default=True, title="Whether to use the local registry. Please note that this must be true by default for now."), +) -> ProcessMetadata: + if not config.filename.endswith('.json') and config.content_type != 'application/json': + raise HTTPException(status_code=400, detail="Invalid file type. Only JSON files are supported.") + try: + from bsp import app_registrar + + # TODO: implement this as a method param once new registries are established + local_registry = True + registry = app_registrar.core.process_registry + process_constructor = registry.access(process_id) + job_id = f'get-process-metadata-{process_id}-' + str(uuid.uuid4()) + + # read uploaded config + contents = await config.read() + config_data: Dict = json.loads(contents) + temp_files = [] + + # parse config for model specification TODO: generalize this + for uploaded_file in model_files: + # case: has a SedModel config spec (ode, fba, smoldyn) + if "model" in config_data.keys(): + specified_model = config_data["model"]["model_source"] + if uploaded_file.filename == specified_model.split("/")[-1]: + temp_dir = mkdtemp() + temp_file = os.path.join(temp_dir, uploaded_file.filename) + with open(temp_file, "wb") as f: + uploaded = await uploaded_file.read() + f.write(uploaded) + config_data["model"]["model_source"] = temp_file + temp_files.append(temp_file) + # case: has a mesh file config (membrane) + elif "mesh_file" in config_data.keys(): + temp_dir = mkdtemp() + temp_file = os.path.join(temp_dir, uploaded_file.filename) + with open(temp_file, "wb") as f: + uploaded = await uploaded_file.read() + f.write(uploaded) + config_data["mesh_file"] = temp_file + temp_files.append(temp_file) + + # instantiate the process for verification + process = process_constructor(config_data, app_registrar.core) + inputs: dict = process.inputs() + outputs: dict = process.outputs() + initial_state: dict = serialize_numpy(process.initial_state()) + + # define composition spec from process instance and verify with composite instance + doc = { + "_type": "process", + "address": f"local:{process_id}", + "config": config_data, + "inputs": dict(zip( + inputs.keys(), + [f"{name}_store" for name in inputs.keys()] + )), + "outputs": dict(zip( + outputs.keys(), + [[f"{name}_store"] for name in outputs.keys()] + )) + } + composite = Composite(config={'state': doc}, core=app_registrar.core) + state = {} + if return_composite_state: + state: dict = composite.state + state.pop("instance", None) + + # remove temp files + clean_temp_files(temp_files) + + return ProcessMetadata( + process_address=f"local:{process_id}" if local_registry else process_id, + input_schema=inputs, + output_schema=outputs, + initial_state=initial_state, + state=state, + ) + except Exception as e: + message = handle_exception("process-metadata") + f'-{str(e)}' + logger.error(message) + raise HTTPException(status_code=400, detail=message) + + +@app.get( + "/get-process-bigraph-addresses", + operation_id="get-process-bigraph-addresses", + response_model=BigraphRegistryAddresses, + tags=["Composition"], + summary="Get process bigraph implementation addresses for composition specifications.") +async def get_process_bigraph_addresses() -> BigraphRegistryAddresses: + # TODO: adjust this. Currently, if the optional simulator dep is not included, the process implementations will not show up + addresses: list[str] = list(CORE.process_registry.registry.keys()) + version = "bsp:latest" + return BigraphRegistryAddresses(registered_addresses=addresses, version=version) + + +@app.get( + "/get-bigraph-schema-types", + operation_id="get-bigraph-schema-types", + response_model=list[BigraphSchemaType], + tags=["Composition"], + summary="Get process bigraph implementation addresses for composition specifications.") +async def get_bigraph_schema_types() -> list[BigraphSchemaType]: + # TODO: adjust this. Currently, if the optional simulator dep is not included, the process implementations will not show up + return [ + BigraphSchemaType(type_id=type_name, default_value=type_spec.get("_default", {}), description=type_spec.get("_description")) + for type_name, type_spec in CORE.types().items() + ] + + +# TODO: make this more specific in checking +# @app.post( +# "/validate-composition", +# # response_model=ValidatedComposition, +# tags=["Composition"], +# operation_id="validate-composition", +# summary="Validate Simulation Experiment Design specification file.", +# ) +# async def validate_composition( +# spec_file: UploadFile = File(..., description="Composition JSON File"), +# ) -> ValidatedComposition: +# # validate filetype +# if not spec_file.filename.endswith('.json') and spec_file.content_type != 'application/json': +# raise HTTPException(status_code=400, detail="Invalid file type. Only JSON files are supported.") +# +# # multifold IO verification: +# try: +# contents = await spec_file.read() +# document_data: Dict = json.loads(contents) +# return check_composition(document_data) +# except json.JSONDecodeError as e: +# message = handle_exception("validate-composition-json-decode-error") + f'-{str(e)}' +# logger.error(message) +# raise HTTPException(status_code=400, detail=message) +# except Exception as e: +# message = handle_exception("validate-composition") + f'-{str(e)}' +# logger.error(message) +# raise HTTPException(status_code=400, detail=message) + + +@app.post( + "/upload-file", + tags=["Files"], + operation_id="upload-file", + summary="Upload a file (ie: model file) to the composition api bucket" +) +async def upload_file( + file: UploadFile = File(..., description="Uploaded File"), + job_id: Optional[str] = Query(default=new_job_id("upload-file"), description="Optional Job ID associated with this upload.") +) -> FileUpload: + """TODO: make auth required for this endpoint.""" + try: + file_ext = os.path.splitext(file.filename)[-1] + uploaded_file_location: str = await write_uploaded_file( + job_id=job_id, + uploaded_file=file, + bucket_name=DEFAULT_BUCKET_NAME, + extension=file_ext + ) + return FileUpload(job_id=job_id, location=uploaded_file_location, status="SUCCESS") + except Exception as e: + message = handle_exception("upload-file") + raise HTTPException(status_code=400, detail=message) + + +@app.post( + "/submit-composition", + # response_model=CompositionRun, + tags=["Composition"], + operation_id="submit-composition", + summary="Submit composition spec for simulation", +) +async def submit_composition( + composition_file: UploadFile = File(..., description="Composition JSON File"), + duration: int = Query(..., description="Duration of simulation"), + # simulators: list[str] = Query(..., description="Simulator package names to use for implementation"), +) -> list[dict]: # CompositionRun: + """We should assume that any file specifications have already been validated and remotely uploaded.""" + if not composition_file.filename.endswith('.json') and composition_file.content_type != 'application/json': + raise HTTPException(status_code=400, detail="Invalid file type. Only JSON files are supported.") + + job_id = new_job_id("composition") + try: + file_content: bytes = await composition_file.read() + composite_spec: dict[str, str | dict] = json.loads(file_content) + + # verification by fitting write confirmation into CompositionRun...to verify O phase of IO, garbage in garbage out + write_confirmation: dict[str, str | list[str] | int | dict] = await db_conn_gateway.write( + collection_name=DEFAULT_JOB_COLLECTION_NAME, + status="PENDING", + spec=composite_spec, + job_id=job_id, + last_updated=db_conn_gateway.timestamp(), + simulators=[], + duration=duration + ) + + # return CompositionRun(**write_confirmation) + return submit_simulation( + job_id=job_id, + duration=duration, + last_updated=db_conn_gateway.timestamp(), + simulators=[], + spec=composite_spec + ) + except json.JSONDecodeError: + raise HTTPException(status_code=400, detail="Invalid JSON format.") + except Exception as e: + message = handle_exception("submit-composition") + f'-{str(e)}' + logger.error(message) + raise HTTPException(status_code=400, detail=message) + + +@app.get( + "/get-composition-state/{job_id}", + operation_id="get-composition-state", + tags=["Composition"], + summary="Get the composite spec of a given simulation run indexed by job_id.") +async def get_composition_state(job_id: str): + try: + spec = await db_conn_gateway.read(collection_name="result_states", job_id=job_id) + if spec is None: + raise HTTPException(status_code=404, detail="Could not find result state.") + else: + if "_id" in spec.keys(): + spec.pop("_id") + + return spec + except Exception as e: + message = handle_exception("get-composition-state") + f'-{str(e)}' + logger.error(message) + raise HTTPException(status_code=400, detail=message) + + +# -- Data: output data -- + +@app.get( + "/get-output/{job_id}", + response_model=OutputData, + operation_id='get-output', + tags=["Data"], + summary='Get the results of an existing simulation run.') +async def get_output(job_id: str): + # get the job + job = await db_conn_gateway.read(collection_name=DEFAULT_JOB_COLLECTION_NAME, job_id=job_id) + + # parse id and return if job exist + if job is not None: + not_included = ["_id", "spec", "duration", "simulators"] + data = {} + for key in job.keys(): + if key not in not_included: + data[key] = job[key] + + return OutputData(**data) + else: + # otherwise, job does not exists + msg = f"Job with id: {job_id} not found. Please check the job_id and try again." + logger.error(msg) + raise HTTPException(status_code=404, detail=msg) + + +@app.get( + "/get-output-file/{job_id}", + operation_id='get-output-file', + tags=["Data"], + summary='Get the results of an existing simulation run from Smoldyn or Readdy as either a downloadable file or job progression status.' +) +async def get_output_file(job_id: str): + # state-case: job is completed + if not job_id.startswith("simulation-execution"): + raise HTTPException(status_code=404, detail="This must be an output file job query starting with 'simulation-execution'.") + job = await db_conn_gateway.read(collection_name="completed_jobs", job_id=job_id) + if job is not None: + # rm mongo index + job.pop('_id', None) + # parse filepath in bucket and create file response + job_data = job + if isinstance(job_data, dict): + remote_fp = job_data.get("results").get("results_file") + if remote_fp is not None: + temp_dest = mkdtemp() + local_fp = download_file_from_bucket(source_blob_path=remote_fp, out_dir=temp_dest, bucket_name=DEFAULT_BUCKET_NAME) + # return downloadable file blob + return FileResponse(path=local_fp, media_type="application/octet-stream", filename=local_fp.split("/")[-1]) # TODO: return special smoldyn file instance + # state-case: job has failed + if job is None: + job = await db_conn_gateway.read(collection_name="failed_jobs", job_id=job_id) + # state-case: job is not in completed: + if job is None: + job = await db_conn_gateway.read(collection_name="in_progress_jobs", job_id=job_id) + # state-case: job is not in progress: + if job is None: + job = await db_conn_gateway.read(collection_name="pending_jobs", job_id=job_id) + # case: job is either failed, in prog, or pending + if job is not None: + # rm mongo index + job.pop('_id', None) + # specify source safely + src = job.get('source', job.get('path')) + if src is not None: + source = src.split('/')[-1] + else: + source = None + + return IncompleteFileJob( + job_id=job_id, + timestamp=job.get('timestamp'), + status=job.get('status'), + source=source + ) + + +# -- Files: submit file IO jobs -- + +@app.post( + "/generate-simularium-file", + operation_id='generate-simularium-file', + tags=["Files"], + summary='Generate a simularium file with a compatible simulation results file from Smoldyn') +async def generate_simularium_file( + uploaded_file: UploadFile = File(..., description="A file containing results that can be parse by Simularium (spatial)."), + box_size: float = Query(..., description="Size of the simulation box as a floating point number."), + filename: str = Query(default=None, description="Name desired for the simularium file. NOTE: pass only the file name without an extension."), + translate_output: bool = Query(default=True, description="Whether to translate the output trajectory prior to converting to simularium. See simulariumio documentation for more details."), + validate_output: bool = Query(default=True, description="Whether to validate the outputs for the simularium file. See simulariumio documentation for more details."), + agent_parameters: SimulariumAgentParameters = Body(default=None, description="Parameters for the simularium agents defining either radius or mass and density.") +): + job_id = "files-generate-simularium-file" + str(uuid.uuid4()) + _time = db_conn_gateway.timestamp() + uploaded_file_location = await write_uploaded_file( + job_id=job_id, + uploaded_file=uploaded_file, + bucket_name=DEFAULT_BUCKET_NAME, + extension='.txt' + ) + + # new simularium job in db + if filename is None: + filename = 'simulation' + agent_params = {} + if agent_parameters is not None: + for agent_param in agent_parameters.agents: + agent_params[agent_param.name] = agent_param.serialize() + + new_job_submission = await db_conn_gateway.write( + collection_name=DEFAULT_JOB_COLLECTION_NAME, + status="PENDING", + job_id=job_id, + timestamp=_time, + path=uploaded_file_location, + filename=filename, + box_size=box_size, + translate_output=translate_output, + validate_output=validate_output, + agent_parameters=agent_params if agent_params is not {} else None + ) + gen_id = new_job_submission.get('_id') + if gen_id is not None: + new_job_submission.pop('_id') + + return new_job_submission + + +# -- Health: check health status -- + +@app.get( + "/", + tags=["Health"], + summary="Health check", + response_model=HealthCheckResponse, +) +def check_health() -> HealthCheckResponse: + response = check_client(db_conn_gateway) + return HealthCheckResponse( + version=APP_VERSION, + status="running" if response.status == "PASS" else response + ) + + +# TODO: refactor job id parsing in worker for dispatch +# -- Processes: submit single simulator jobs -- + + +@app.post( + "/run-mem3dg-process", + response_model=Mem3dgRun, + name="Run Mem3dg Process", + operation_id="run-mem3dg-process", + tags=["Processes"], + summary="Run a Mem3dg Process", +) +async def run_mem3dg_process( + duration: int = Query(...), + characteristic_time_step: float = Query(..., example=1), + tension_modulus: float = Query(..., example=0.1), + preferred_area: float = Query(..., example=12.486), + preferred_volume: float = Query(..., example=2.933), + reservoir_volume: float = Query(..., example=1), + osmotic_strength: float = Query(..., example=0.02), + volume: float = Query(..., example=2.9), + damping: float = Query(..., example=0.05), + bending_kbc: float = Query(..., example=0.008), + tolerance: Optional[float] = Query(default=1e-11), + # geometry_type: Optional[str] = None, + # geometry_parameters: Optional[Dict[str, Union[float, int]]] = None, + mesh_file: UploadFile = File(...), +)-> Mem3dgRun: + try: + job_id = 'run-mem3dg-' + str(uuid.uuid4()) + uploaded_file_location = await write_uploaded_file( + job_id=job_id, + uploaded_file=mesh_file, + bucket_name=DEFAULT_BUCKET_NAME, + extension='.ply' + ) + parameters = { + "bending": { + "Kbc": bending_kbc, + } + } + mem3dg_run = await submit_pymem3dg_run( + job_id=job_id, + db_connector=db_conn_gateway, + characteristic_time_step=characteristic_time_step, + tension_modulus=tension_modulus, + preferred_area=preferred_area, + preferred_volume=preferred_volume, + reservoir_volume=reservoir_volume, + osmotic_strength=osmotic_strength, + volume=volume, + damping=damping, + tolerance=tolerance, + mesh_file=uploaded_file_location, + parameters_config=parameters, + duration=duration + ) + print(f'Got the mem3dg run: {mem3dg_run}') + return mem3dg_run + except Exception as e: + message = handle_exception("run-mem3dg-process") + f'-{str(e)}' + logger.error(message) + raise HTTPException(status_code=400, detail=message) + + +@app.post( + "/run-amici-process", + response_model=AmiciRun, + name="Run an Amici simulation", + operation_id="run-amici-process", + tags=["Processes"], + summary="Run a Amici simulation.") +async def run_amici_process( + model_file: UploadFile = File(..., description="SBML file"), + start: int = Query(..., description="Start time"), + stop: int = Query(..., description="End time(duration)"), + steps: int = Query(..., description="Number of steps.") +) -> AmiciRun: + run_data = await submit_utc_run( + db_connector=db_conn_gateway, + simulator="amici", + model_file=model_file, + implementation_scope="process", + start=start, + stop=stop, + steps=steps, + context_model=AmiciRun, + logger=logger + ) + + return run_data + + +@app.post( + "/run-cobra-process", + response_model=CobraRun, + name="Run an cobra simulation", + operation_id="run-cobra-process", + tags=["Processes"], + summary="Run a cobra simulation.") +async def run_cobra_process( + model_file: UploadFile = File(..., description="SBML file"), + start: int = Query(..., description="Start time"), + stop: int = Query(..., description="End time(duration)"), + steps: int = Query(..., description="Number of steps.") +) -> CobraRun: + run_data = await submit_utc_run( + db_connector=db_conn_gateway, + simulator="cobra", + model_file=model_file, + implementation_scope="process", + start=start, + stop=stop, + steps=steps, + context_model=CobraRun, + logger=logger + ) + + return run_data + + +@app.post( + "/run-copasi-process", + response_model=CopasiRun, + name="Run an copasi simulation", + operation_id="run-copasi-process", + tags=["Processes"], + summary="Run a copasi simulation.") +async def run_copasi_process( + model_file: UploadFile = File(..., description="SBML file"), + start: int = Query(..., description="Start time"), + stop: int = Query(..., description="End time(duration)"), + steps: int = Query(..., description="Number of steps.") +) -> CopasiRun: + run_data = await submit_utc_run( + db_connector=db_conn_gateway, + simulator="copasi", + model_file=model_file, + implementation_scope="process", + start=start, + stop=stop, + steps=steps, + context_model=CopasiRun, + logger=logger + ) + + return run_data + + +@app.post( + "/run-tellurium-process", + response_model=TelluriumRun, + name="Run an tellurium simulation", + operation_id="run-tellurium-process", + tags=["Processes"], + summary="Run a tellurium simulation.") +async def run_tellurium_process( + model_file: UploadFile = File(..., description="SBML file"), + start: int = Query(..., description="Start time"), + stop: int = Query(..., description="End time(duration)"), + steps: int = Query(..., description="Number of steps.") +) -> TelluriumRun: + run_data = await submit_utc_run( + db_connector=db_conn_gateway, + simulator="tellurium", + model_file=model_file, + implementation_scope="process", + start=start, + stop=stop, + steps=steps, + context_model=TelluriumRun, + logger=logger + ) + + return run_data + + +# -- Steps: submit single simulator jobs -- + +@app.post( + "/run-amici-step", + response_model=AmiciRun, + name="Run an Amici simulation", + operation_id="run-amici-step", + tags=["Steps"], + summary="Run a Amici simulation.") +async def run_amici_step( + model_file: UploadFile = File(..., description="SBML file"), + start: int = Query(..., description="Start time"), + stop: int = Query(..., description="End time(duration)"), + steps: int = Query(..., description="Number of steps.") +) -> AmiciRun: + run_data = await submit_utc_run( + db_connector=db_conn_gateway, + simulator="amici", + model_file=model_file, + implementation_scope="step", + start=start, + stop=stop, + steps=steps, + context_model=AmiciRun, + logger=logger + ) + + return run_data + + +@app.post( + "/run-cobra-step", + response_model=CobraRun, + name="Run an cobra simulation", + operation_id="run-cobra-step", + tags=["Steps"], + summary="Run a cobra simulation.") +async def run_cobra_step( + model_file: UploadFile = File(..., description="SBML file"), + start: int = Query(..., description="Start time"), + stop: int = Query(..., description="End time(duration)"), + steps: int = Query(..., description="Number of steps.") +) -> CobraRun: + run_data = await submit_utc_run( + db_connector=db_conn_gateway, + simulator="cobra", + model_file=model_file, + implementation_scope="step", + start=start, + stop=stop, + steps=steps, + context_model=CobraRun, + logger=logger + ) + + return run_data + + +@app.post( + "/run-copasi-step", + response_model=CopasiRun, + name="Run an copasi simulation", + operation_id="run-copasi-step", + tags=["Steps"], + summary="Run a copasi simulation.") +async def run_copasi_step( + model_file: UploadFile = File(..., description="SBML file"), + start: int = Query(..., description="Start time"), + stop: int = Query(..., description="End time(duration)"), + steps: int = Query(..., description="Number of steps.") +) -> CopasiRun: + run_data = await submit_utc_run( + db_connector=db_conn_gateway, + simulator="copasi", + model_file=model_file, + implementation_scope="step", + start=start, + stop=stop, + steps=steps, + context_model=CopasiRun, + logger=logger + ) + + return run_data + + +@app.post( + "/run-readdy-step", + response_model=ReaddyRun, + name="Run a readdy simulation", + operation_id="run-readdy-step", + tags=["Steps"], + summary="Run a readdy simulation.") +async def run_readdy_step( + box_size: List[float] = Query(default=[0.3, 0.3, 0.3], description="Box Size of box"), + duration: int = Query(default=10, description="Simulation Duration"), + dt: float = Query(default=0.0008, description="Interval of step with which simulation runs"), + species_config: List[ReaddySpeciesConfig] = Body( + ..., + description="Species Configuration, specifying species name mapped to diffusion constant", + examples=[ + [ + {"name": "E", "diffusion_constant": 10.0}, + {"name": "S", "diffusion_constant": 10.0}, + {"name": "ES", "diffusion_constant": 10.0}, + {"name": "P", "diffusion_constant": 10.0} + ] + ] + ), + reactions_config: List[ReaddyReactionConfig] = Body( + ..., + description="Reactions Configuration, specifying reaction scheme mapped to reaction constant.", + examples=[ + [ + {"scheme": "fwd: E +(0.03) S -> ES", "rate": 86.78638438}, + {"scheme": "back: ES -> E +(0.03) S", "rate": 1.0}, + {"scheme": "prod: ES -> E +(0.03) P", "rate": 1.0}, + ] + ] + ), + particles_config: List[ReaddyParticleConfig] = Body( + ..., + description="Particles Configuration, specifying initial particle positions for each particle.", + examples=[ + [ + { + "name": "E", + "initial_positions": [ + [-0.11010841, 0.01048227, -0.07514985], + [0.02715631, -0.03829782, 0.14395517], + [0.05522253, -0.11880506, 0.02222362] + ] + }, + { + "name": "S", + "initial_positions": [ + [-0.21010841, 0.21048227, -0.07514985], + [0.02715631, -0.03829782, 0.14395517], + [0.05522253, -0.11880506, 0.02222362] + ] + } + ] + ] + ), + unit_system_config: Dict[str, str] = Body({"length_unit": "micrometer", "time_unit": "second"}, description="Unit system configuration"), + reaction_handler: str = Query(default="UncontrolledApproximation", description="Reaction handler as per Readdy simulation documentation.") +) -> ReaddyRun: + try: + # get job params + job_id = "simulation-execution-readdy" + str(uuid.uuid4()) + _time = db_conn_gateway.timestamp() + + # instantiate new return + readdy_run = ReaddyRun( + job_id=job_id, + last_updated=_time, + box_size=box_size, + status="PENDING", + duration=duration, + dt=dt, + species_config=species_config, + reactions_config=reactions_config, + particles_config=particles_config, + unit_system_config={"length_unit": "micrometer", "time_unit": "second"}, + reaction_handler="UncontrolledApproximation" + ) + + # insert job + pending_job = await db_conn_gateway.write( + collection_name=DEFAULT_JOB_COLLECTION_NAME, + box_size=readdy_run.box_size, + job_id=readdy_run.job_id, + last_updated=readdy_run.last_updated, + status=readdy_run.status, + duration=readdy_run.duration, + dt=readdy_run.dt, + species_config=[config.serialize() for config in readdy_run.species_config], + reactions_config=[config.serialize() for config in readdy_run.reactions_config], + particles_config=[config.serialize() for config in readdy_run.particles_config], + unit_system_config=readdy_run.unit_system_config, + reaction_handler=readdy_run.reaction_handler + ) + + return readdy_run + except Exception as e: + logger.error(str(e)) + raise HTTPException(status_code=500, detail=str(e)) + + +@app.post( + "/run-smoldyn-step", + response_model=SmoldynRun, + name="Run a smoldyn simulation", + operation_id="run-smoldyn-step", + tags=["Steps"], + summary="Run a smoldyn simulation.") +async def run_smoldyn_step( + uploaded_file: UploadFile = File(..., description="Smoldyn Configuration File"), + duration: int = Query(default=None, description="Simulation Duration"), + dt: float = Query(default=None, description="Interval of step with which simulation runs"), + # initial_molecule_state: List = Body(default=None, description="Mapping of species names to initial molecule conditions including counts and location.") +) -> SmoldynRun: + try: + # get job params + job_id = "simulation-execution-smoldyn" + str(uuid.uuid4()) + _time = db_conn_gateway.timestamp() + uploaded_file_location = await write_uploaded_file(job_id=job_id, uploaded_file=uploaded_file, bucket_name=DEFAULT_BUCKET_NAME, extension='.txt') + # instantiate new return + smoldyn_run = SmoldynRun( + job_id=job_id, + last_updated=_time, + status="PENDING", + path=uploaded_file_location, + duration=duration, + dt=dt + ) + # insert job + pending_job = await db_conn_gateway.write( + collection_name="smoldyn_jobs", + job_id=smoldyn_run.job_id, + last_updated=smoldyn_run.last_updated, + status=smoldyn_run.status, + path=smoldyn_run.path, + duration=smoldyn_run.duration, + dt=smoldyn_run.dt + ) + + return smoldyn_run + except Exception as e: + logger.error(str(e)) + raise HTTPException(status_code=500, detail=str(e)) + + +@app.post( + "/run-tellurium-step", + response_model=TelluriumRun, + name="Run an tellurium simulation", + operation_id="run-tellurium-step", + tags=["Steps"], + summary="Run a tellurium simulation.") +async def run_tellurium_step( + model_file: UploadFile = File(..., description="SBML file"), + start: int = Query(..., description="Start time"), + stop: int = Query(..., description="End time(duration)"), + steps: int = Query(..., description="Number of steps.") +) -> TelluriumRun: + run_data = await submit_utc_run( + db_connector=db_conn_gateway, + simulator="tellurium", + model_file=model_file, + implementation_scope="step", + start=start, + stop=stop, + steps=steps, + context_model=TelluriumRun, + logger=logger + ) + + return run_data + + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=3001) + diff --git a/client/compatible.py b/client/compatible.py new file mode 100644 index 0000000..570171d --- /dev/null +++ b/client/compatible.py @@ -0,0 +1,2 @@ +COMPATIBLE_UTC_SIMULATORS = [('amici', '0.11.21'), ('copasi', '0.71'), ('tellurium', '2.2.10')] +COMPATIBLE_VERIFICATION_SIMULATORS = COMPATIBLE_UTC_SIMULATORS.copy() \ No newline at end of file diff --git a/client/handler.py b/client/handler.py new file mode 100644 index 0000000..c5203b0 --- /dev/null +++ b/client/handler.py @@ -0,0 +1,81 @@ +import os + +from tempfile import mkdtemp + +import grpc +from fastapi import UploadFile, HTTPException +from google.protobuf.json_format import MessageToDict + +from common.proto import simulation_pb2_grpc, simulation_pb2 +from shared.environment import LOCAL_GRPC_MAPPING + + +class ClientHandler: + @classmethod + async def parse_uploaded_files_in_spec(cls, model_files: list[UploadFile], config_data: dict) -> list[str]: + temp_files = [] + + # parse config for model specification TODO: generalize this + for uploaded_file in model_files: + # case: has a SedModel config spec (ode, fba, smoldyn) + if "model" in config_data.keys(): + specified_model = config_data["model"]["model_source"] + if uploaded_file.filename == specified_model.split("/")[-1]: + temp_dir = mkdtemp() + temp_file = os.path.join(temp_dir, uploaded_file.filename) + with open(temp_file, "wb") as f: + uploaded = await uploaded_file.read() + f.write(uploaded) + config_data["model"]["model_source"] = temp_file + temp_files.append(temp_file) + # case: has a mesh file config (membrane) + elif "mesh_file" in config_data.keys(): + temp_dir = mkdtemp() + temp_file = os.path.join(temp_dir, uploaded_file.filename) + with open(temp_file, "wb") as f: + uploaded = await uploaded_file.read() + f.write(uploaded) + config_data["mesh_file"] = temp_file + temp_files.append(temp_file) + return temp_files + + @classmethod + def check_document_extension(cls, document: UploadFile) -> None: + if not document.filename.endswith('.json') and document.content_type != 'application/json': + raise HTTPException(status_code=400, detail="Invalid file type. Only JSON files are supported.") + + @classmethod + def submit_run(cls, last_updated: str, duration: int, pickle_path: str, job_id: str, vivarium_id: str) -> list[list[dict[str, str | dict]]]: + with grpc.insecure_channel(LOCAL_GRPC_MAPPING) as channel: + stub = simulation_pb2_grpc.VivariumServiceStub(channel) + request = simulation_pb2.VivariumRequest( + last_updated=last_updated, + duration=duration, + pickle_path=pickle_path, + job_id=job_id, + vivarium_id=vivarium_id + ) + + response_iterator = stub.StreamVivarium(request) + + # TODO: the following block should be generalized (used by many) + results = [] + print(f'got update: {response_iterator.result()}') + # for update in response_iterator: + # print(f'Result: {update}') + # # structured_results = [ + # # MessageToDict(result) # ✅ Convert Protobuf Struct to dict + # # for result in update.results + # # ] + # # results.append(structured_results) + # # # TODO: fit this into the data model! + # # result = { + # # "job_id": update.job_id, + # # "last_updated": update.last_updated, + # # "results": update.results + # # } + # # results.append(result) + # # print(f'Got result: {result}') + + return results + diff --git a/client/health.py b/client/health.py new file mode 100644 index 0000000..bfe0fa8 --- /dev/null +++ b/client/health.py @@ -0,0 +1,34 @@ +from fastapi import FastAPI + +from shared.data_model import DbClientResponse +from shared.connect import DatabaseConnector +from shared.environment import DEFAULT_DB_TYPE + + +def check_client(db_connector: DatabaseConnector) -> DbClientResponse: + """TODO: generalize this to work with multiple client types. Currently using Mongo.""" + msg = "Pinged your deployment. You successfully connected to MongoDB!" + status = "PASS" + try: + db_connector.client.admin.command('ping') + except Exception as e: + msg = f"Failed to connect to MongoDB:\n{e}" + status = "FAIL" + return DbClientResponse( + message=msg, + db_type=DEFAULT_DB_TYPE, + timestamp=db_connector.timestamp(), + status=status + ) + + +def stop_client(db_connector: DatabaseConnector) -> DbClientResponse: + """TODO: generalize this to work with multiple client types. Currently using Mongo.""" + db_connector.client.close() + + return DbClientResponse( + message=f"{DEFAULT_DB_TYPE} successfully closed!", + db_type=DEFAULT_DB_TYPE, + timestamp=db_connector.timestamp() + ) + diff --git a/client/main.py b/client/main.py new file mode 100644 index 0000000..e30878b --- /dev/null +++ b/client/main.py @@ -0,0 +1,346 @@ +""" +Server gateway implementation. + +Author: Alexander Patrie <@AlexPatrie> +""" + +import json +import os +import shutil +import typing +import uuid +import sys +import zlib +from tempfile import mkdtemp +from typing import * +import asyncio +from functools import partial +# import websockets + + +import dotenv +import grpc +import uvicorn +from fastapi import FastAPI, File, UploadFile, HTTPException, Query, APIRouter, Body, WebSocket, Response, Depends +from google.protobuf.json_format import MessageToDict +from process_bigraph import Process, pp, Composite +from starlette.middleware.cors import CORSMiddleware +from starlette.responses import FileResponse, StreamingResponse +from pydantic import BeforeValidator +from google.protobuf.struct_pb2 import Struct +from google.protobuf.any_pb2 import Any +from vivarium import Vivarium +from vivarium.tests import TOY_PROCESSES, DEMO_PROCESSES # TODO: replace these + +from client.handler import ClientHandler +from client.submit_runs import submit_utc_run, submit_pymem3dg_run +from common.proto import simulation_pb2_grpc, simulation_pb2 +from shared.connect import MongoConnector +from yaml import compose + +from shared.io import write_uploaded_file, download_file_from_bucket, write_local_file +from shared.log_config import setup_logging +from shared.serial import write_pickle, create_vivarium_id, hydrate_pickle, get_pickle, sign_pickle, get_remote_pickle_path +from shared.utils import get_project_version, new_job_id, handle_exception, serialize_numpy, clean_temp_files, timestamp +from shared.environment import ( + ENV_PATH, + DEFAULT_DB_NAME, + DEFAULT_JOB_COLLECTION_NAME, + DEFAULT_BUCKET_NAME, LOCAL_GRPC_MAPPING, TEST_KEY +) +from shared.data_model import ( + BigraphRegistryAddresses, + CompositionNode, + CompositionSpec, + CompositionRun, + OutputData, + ValidatedComposition, + SmoldynRun, + SimulariumAgentParameters, + ReaddySpeciesConfig, + ReaddyReactionConfig, + ReaddyParticleConfig, + ReaddyRun, + AmiciRun, + CobraRun, + CopasiRun, + TelluriumRun, + IncompleteFileJob, + APP_SERVERS, + HealthCheckResponse, + ProcessMetadata, + Mem3dgRun, + BigraphSchemaType, + StateData, + FileUpload +) + +from client.health import check_client +from shared.vivarium import CORE, check_composition, convert_process + +logger = setup_logging(__file__) + +# NOTE: create an env config at this filepath if dev +dotenv.load_dotenv(ENV_PATH) + +STANDALONE_GATEWAY = bool(os.getenv("STANDALONE_GATEWAY")) +MONGO_URI = os.getenv("MONGO_URI") if not STANDALONE_GATEWAY else os.getenv("STANDALONE_MONGO_URI") +GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") +TEST_VIVARIUM_ID = os.getenv("TEST_VIVARIUM_ID") + +# -- app constraints and components -- # + +APP_VERSION = get_project_version() +APP_TITLE = "compose-server" +APP_ORIGINS = [ + 'http://127.0.0.1:8000', + 'http://127.0.0.1:4200', + 'http://127.0.0.1:4201', + 'http://127.0.0.1:4202', + 'http://localhost:4200', + 'http://localhost:4201', + 'http://localhost:4202', + 'http://localhost:8000', + 'http://localhost:3001', + 'https://biosimulators.org', + 'https://www.biosimulators.org', + 'https://biosimulators.dev', + 'https://www.biosimulators.dev', + 'https://run.biosimulations.dev', + 'https://run.biosimulations.org', + 'https://biosimulations.dev', + 'https://biosimulations.org', + 'https://bio.libretexts.org', + 'https://compose.biosimulations.org', + '*' +] + +db_conn_gateway = MongoConnector(connection_uri=MONGO_URI, database_id=DEFAULT_DB_NAME) +router = APIRouter() +app = FastAPI(title=APP_TITLE, version=APP_VERSION, servers=APP_SERVERS) +app.add_middleware( + CORSMiddleware, + allow_origins=APP_ORIGINS, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"] +) +app.mongo_client = db_conn_gateway.client + +PyObjectId = Annotated[str, BeforeValidator(str)] + + +def optional_file(document: UploadFile = File(default=None)): + return document + + +class ClientHandler: + @classmethod + async def parse_uploaded_files_in_spec(cls, model_files: list[UploadFile], config_data: dict) -> list[str]: + temp_files = [] + # parse config for model specification TODO: generalize this + for uploaded_file in model_files: + # case: has a SedModel config spec (ode, fba, smoldyn) + if "model" in config_data.keys(): + specified_model = config_data["model"]["model_source"] + if uploaded_file.filename == specified_model.split("/")[-1]: + temp_dir = mkdtemp() + temp_file = os.path.join(temp_dir, uploaded_file.filename) + with open(temp_file, "wb") as f: + uploaded = await uploaded_file.read() + f.write(uploaded) + config_data["model"]["model_source"] = temp_file + temp_files.append(temp_file) + # case: has a mesh file config (membrane) + elif "mesh_file" in config_data.keys(): + temp_dir = mkdtemp() + temp_file = os.path.join(temp_dir, uploaded_file.filename) + with open(temp_file, "wb") as f: + uploaded = await uploaded_file.read() + f.write(uploaded) + config_data["mesh_file"] = temp_file + temp_files.append(temp_file) + return temp_files + + @classmethod + def check_document_extension(cls, document: UploadFile) -> None: + if not document.filename.endswith('.json') and document.content_type != 'application/json': + raise HTTPException(status_code=400, detail="Invalid file type. Only JSON files are supported.") + + @classmethod + def get_vivarium(cls, vivarium_id: str) -> Vivarium: + temp_dir = mkdtemp() + vivarium = hydrate_pickle(vivarium_id, temp_dir) + shutil.rmtree(temp_dir) + return vivarium + + +@app.post( + "/new-vivarium", + name="Create new vivarium", + operation_id="new-vivarium", + tags=["Composition"], +) +async def create_new_vivarium(document: UploadFile | str | None = None): + # compile all possible registrations TODO: generalize/streamline this + registered_processes: dict = CORE.process_registry.registry + registered_processes.update(TOY_PROCESSES) + registered_processes.update(DEMO_PROCESSES) + + # assume no document at first + composite_spec: dict[str, str | dict] | None = None + + # handle document upload + if document: + ClientHandler.check_document_extension(document) + + file_content: bytes = await document.read() + composite_spec: dict[str, str | dict] = json.loads(file_content) + + # create new viv instance and add emitter + viv = Vivarium(document=composite_spec, processes=registered_processes, types=CORE.types()) + viv.add_emitter() + + # write pickled viv to bucket and get remote location + new_id: str = create_vivarium_id(viv) + remote_vivarium_pickle_path = write_pickle(viv, new_id) + + return {'vivarium_id': new_id, 'remote_path': remote_vivarium_pickle_path} + + +@app.get( + '/run-vivarium', + name="Run vivarium", + operation_id="run-vivarium", + tags=["Composition"], +) +async def run_vivarium(duration: int, vivarium_id: str = Query(default=None)): + """Streams gRPC responses as they arrive to the FastAPI client.""" + if vivarium_id is None: + new_viv_response = await create_new_vivarium() + vivarium_id = new_viv_response['vivarium_id'] + + pickle_path = get_remote_pickle_path(vivarium_id) + last_updated = timestamp() + job_id = new_job_id('run-vivarium') + + async def event_stream(): + channel = grpc.insecure_channel(LOCAL_GRPC_MAPPING) + try: + stub = simulation_pb2_grpc.VivariumServiceStub(channel) + request = simulation_pb2.VivariumRequest( + last_updated=last_updated, + duration=duration, + pickle_path=pickle_path, + job_id=job_id, + vivarium_id=vivarium_id + ) + response_iterator = stub.StreamVivarium(request) + + yield '{"updates": [' + first = True + for update in response_iterator: + structured_results = [ + MessageToDict(result) for result in update.results + ] + if not first: + yield "," + yield json.dumps({ + "job_id": update.job_id, + "last_updated": update.last_updated, + "results": structured_results + }) + first = False + + yield ']}\n' + + except grpc.RpcError as e: + yield json.dumps({"error": e.details()}) + + finally: + channel.close() + + return StreamingResponse(event_stream(), media_type="application/json") + + +@app.get( + '/get-document', + name="Get document", + operation_id="get-document", + summary="Get the document required to instantiate a new Vivarium instance corresponding to the given vivarium_id", + tags=["Composition"], +) +async def get_document(vivarium_id: str): + # hydrate pickle into vivarium + vivarium: Vivarium = ClientHandler.get_vivarium(vivarium_id) + return vivarium.make_document() + + +@app.post( + '/add-process', + name="Add process", + operation_id="add-process", + tags=["Add Data"] +) +async def add_process( + vivarium_id: str, + process_name: str, + process_id: str, + config: dict[str, typing.Any] | None = None, + inputs: dict[str, typing.Any] | None = None, + outputs: dict[str, typing.Any] | None = None +): + vivarium: Vivarium = ClientHandler.get_vivarium(vivarium_id) + vivarium.add_process( + name=process_name, + process_id=process_id, + config=config, + inputs=inputs, + outputs=outputs + ) + + write_pickle(vivarium, vivarium_id) + return vivarium.make_document() + + +@app.post( + '/add-object', + name="Add object", + operation_id="add-object", + tags=["Add Data"] +) +async def add_object( + vivarium_id: str, + name: str, + path: list[str] | None = None, + value: typing.Any | None = None, +): + vivarium: Vivarium = ClientHandler.get_vivarium(vivarium_id) + vivarium.add_object( + name=name, + path=path, + value=value, + ) + + write_pickle(vivarium, vivarium_id) + return vivarium.make_document() + + +# @app.get( +# '/_get-pickle', +# operation_id="_get-pickle", +# tags=["Composition"], +# name="_get-pickle", +# response_class=Response +# ) +# async def _get_pickle(vivarium_id: str) -> Response: +# temp_dir = mkdtemp() +# p: bytes = get_pickle(vivarium_id, temp_dir) +# compressed_pickle = zlib.compress(p) +# signed_pickle = sign_pickle(compressed_pickle, TEST_KEY) +# return Response(content=signed_pickle, media_type="application/octet-stream") + + +# if __name__ == "__main__": +# uvicorn.run(app, host="0.0.0.0", port=3001) +# uvicorn client.main:app --reload --host 0.0.0.0 --port 3001 diff --git a/client/openapi_spec.py b/client/openapi_spec.py new file mode 100644 index 0000000..03d748f --- /dev/null +++ b/client/openapi_spec.py @@ -0,0 +1,39 @@ +import json +import os + +import yaml +from fastapi.openapi.utils import get_openapi + +from main import app + + +def main(): + openapi_spec = get_openapi( + title=app.title, + version=app.version, + openapi_version=app.openapi_version, + description=app.description, + routes=app.routes, + servers=app.servers + ) + + # Convert the JSON OpenAPI spec to YAML + openapi_spec_yaml = yaml.dump(json.loads(json.dumps(openapi_spec)), sort_keys=False) + + current_directory = os.path.dirname(os.path.realpath(__file__)) + + # Write the YAML OpenAPI spec to a file in subdirectory spec + openapi_version = app.openapi_version.replace('.', '_') + spec_fp = f"{current_directory}/spec/openapi_{openapi_version}_generated.yaml" + if os.path.exists(spec_fp): + print('Spec exists, deleting...') + os.remove(spec_fp) + + with open(spec_fp, "w") as f: + f.write(openapi_spec_yaml) + + print('New OpenAPI spec for compose_api generated!') + + +if __name__ == "__main__": + main() diff --git a/client/spec/openapi_3_1_0_generated.yaml b/client/spec/openapi_3_1_0_generated.yaml new file mode 100644 index 0000000..5c72a17 --- /dev/null +++ b/client/spec/openapi_3_1_0_generated.yaml @@ -0,0 +1,1703 @@ +openapi: 3.1.0 +info: + title: compose-server + version: 0.0.1 +paths: + /get-process-metadata: + post: + tags: + - Composition + summary: Get the input and output ports, as well as state data from a given + process and corresponding parameter configuration. + operationId: get-process-metadata + parameters: + - name: process_id + in: query + required: true + schema: + type: string + title: Process ID + example: simple-membrane-process + - name: return_composite_state + in: query + required: false + schema: + type: boolean + title: Return Composite State + default: true + requestBody: + required: true + content: + multipart/form-data: + schema: + $ref: '#/components/schemas/Body_get-process-metadata' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/ProcessMetadata' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /get-process-bigraph-addresses: + get: + tags: + - Composition + summary: Get process bigraph implementation addresses for composition specifications. + operationId: get-process-bigraph-addresses + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/BigraphRegistryAddresses' + /get-bigraph-schema-types: + get: + tags: + - Composition + summary: Get process bigraph implementation addresses for composition specifications. + operationId: get-bigraph-schema-types + responses: + '200': + description: Successful Response + content: + application/json: + schema: + items: + $ref: '#/components/schemas/BigraphSchemaType' + type: array + title: Response Get-Bigraph-Schema-Types + /validate-composition: + post: + tags: + - Composition + summary: Validate Simulation Experiment Design specification file. + operationId: validate-composition + requestBody: + content: + multipart/form-data: + schema: + $ref: '#/components/schemas/Body_validate-composition' + required: true + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/ValidatedComposition' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /submit-composition: + post: + tags: + - Composition + summary: Submit composition spec for simulation + operationId: submit-composition + parameters: + - name: duration + in: query + required: true + schema: + type: integer + description: Duration of simulation + title: Duration + description: Duration of simulation + requestBody: + required: true + content: + multipart/form-data: + schema: + $ref: '#/components/schemas/Body_submit-composition' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/CompositionRun' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /get-composition-state/{job_id}: + get: + tags: + - Composition + summary: Get the composite spec of a given simulation run indexed by job_id. + operationId: get-composition-state + parameters: + - name: job_id + in: path + required: true + schema: + type: string + title: Job Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: {} + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /get-output/{job_id}: + get: + tags: + - Data + summary: Get the results of an existing simulation run. + operationId: get-output + parameters: + - name: job_id + in: path + required: true + schema: + type: string + title: Job Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/OutputData' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /get-output-file/{job_id}: + get: + tags: + - Data + summary: Get the results of an existing simulation run from Smoldyn or Readdy + as either a downloadable file or job progression status. + operationId: get-output-file + parameters: + - name: job_id + in: path + required: true + schema: + type: string + title: Job Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: {} + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /generate-simularium-file: + post: + tags: + - Files + summary: Generate a simularium file with a compatible simulation results file + from Smoldyn + operationId: generate-simularium-file + parameters: + - name: box_size + in: query + required: true + schema: + type: number + description: Size of the simulation box as a floating point number. + title: Box Size + description: Size of the simulation box as a floating point number. + - name: filename + in: query + required: false + schema: + type: string + description: 'Name desired for the simularium file. NOTE: pass only the + file name without an extension.' + title: Filename + description: 'Name desired for the simularium file. NOTE: pass only the file + name without an extension.' + - name: translate_output + in: query + required: false + schema: + type: boolean + description: Whether to translate the output trajectory prior to converting + to simularium. See simulariumio documentation for more details. + default: true + title: Translate Output + description: Whether to translate the output trajectory prior to converting + to simularium. See simulariumio documentation for more details. + - name: validate_output + in: query + required: false + schema: + type: boolean + description: Whether to validate the outputs for the simularium file. See + simulariumio documentation for more details. + default: true + title: Validate Output + description: Whether to validate the outputs for the simularium file. See + simulariumio documentation for more details. + requestBody: + required: true + content: + multipart/form-data: + schema: + $ref: '#/components/schemas/Body_generate-simularium-file' + responses: + '200': + description: Successful Response + content: + application/json: + schema: {} + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /: + get: + tags: + - Health + summary: Health check + operationId: check_health__get + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/HealthCheckResponse' + /run-mem3dg-process: + post: + tags: + - Processes + summary: Run a Mem3dg Process + operationId: run-mem3dg-process + parameters: + - name: duration + in: query + required: true + schema: + type: integer + title: Duration + - name: characteristic_time_step + in: query + required: true + schema: + type: number + title: Characteristic Time Step + - name: tension_modulus + in: query + required: true + schema: + type: number + title: Tension Modulus + - name: preferred_area + in: query + required: true + schema: + type: number + title: Preferred Area + - name: preferred_volume + in: query + required: true + schema: + type: number + title: Preferred Volume + - name: reservoir_volume + in: query + required: true + schema: + type: number + title: Reservoir Volume + - name: osmotic_strength + in: query + required: true + schema: + type: number + title: Osmotic Strength + - name: volume + in: query + required: true + schema: + type: number + title: Volume + - name: damping + in: query + required: true + schema: + type: number + title: Damping + - name: bending_kbc + in: query + required: true + schema: + type: number + title: Bending Kbc + - name: tolerance + in: query + required: false + schema: + anyOf: + - type: number + - type: 'null' + default: 1.0e-11 + title: Tolerance + requestBody: + required: true + content: + multipart/form-data: + schema: + $ref: '#/components/schemas/Body_run-mem3dg-process' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/Mem3dgRun' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /run-amici-process: + post: + tags: + - Processes + summary: Run a Amici simulation. + operationId: run-amici-process + parameters: + - name: start + in: query + required: true + schema: + type: integer + description: Start time + title: Start + description: Start time + - name: stop + in: query + required: true + schema: + type: integer + description: End time(duration) + title: Stop + description: End time(duration) + - name: steps + in: query + required: true + schema: + type: integer + description: Number of steps. + title: Steps + description: Number of steps. + requestBody: + required: true + content: + multipart/form-data: + schema: + $ref: '#/components/schemas/Body_run-amici-process' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/AmiciRun' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /run-cobra-process: + post: + tags: + - Processes + summary: Run a cobra simulation. + operationId: run-cobra-process + parameters: + - name: start + in: query + required: true + schema: + type: integer + description: Start time + title: Start + description: Start time + - name: stop + in: query + required: true + schema: + type: integer + description: End time(duration) + title: Stop + description: End time(duration) + - name: steps + in: query + required: true + schema: + type: integer + description: Number of steps. + title: Steps + description: Number of steps. + requestBody: + required: true + content: + multipart/form-data: + schema: + $ref: '#/components/schemas/Body_run-cobra-process' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/CobraRun' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /run-copasi-process: + post: + tags: + - Processes + summary: Run a copasi simulation. + operationId: run-copasi-process + parameters: + - name: start + in: query + required: true + schema: + type: integer + description: Start time + title: Start + description: Start time + - name: stop + in: query + required: true + schema: + type: integer + description: End time(duration) + title: Stop + description: End time(duration) + - name: steps + in: query + required: true + schema: + type: integer + description: Number of steps. + title: Steps + description: Number of steps. + requestBody: + required: true + content: + multipart/form-data: + schema: + $ref: '#/components/schemas/Body_run-copasi-process' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/CopasiRun' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /run-tellurium-process: + post: + tags: + - Processes + summary: Run a tellurium simulation. + operationId: run-tellurium-process + parameters: + - name: start + in: query + required: true + schema: + type: integer + description: Start time + title: Start + description: Start time + - name: stop + in: query + required: true + schema: + type: integer + description: End time(duration) + title: Stop + description: End time(duration) + - name: steps + in: query + required: true + schema: + type: integer + description: Number of steps. + title: Steps + description: Number of steps. + requestBody: + required: true + content: + multipart/form-data: + schema: + $ref: '#/components/schemas/Body_run-tellurium-process' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/TelluriumRun' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /run-amici-step: + post: + tags: + - Steps + summary: Run a Amici simulation. + operationId: run-amici-step + parameters: + - name: start + in: query + required: true + schema: + type: integer + description: Start time + title: Start + description: Start time + - name: stop + in: query + required: true + schema: + type: integer + description: End time(duration) + title: Stop + description: End time(duration) + - name: steps + in: query + required: true + schema: + type: integer + description: Number of steps. + title: Steps + description: Number of steps. + requestBody: + required: true + content: + multipart/form-data: + schema: + $ref: '#/components/schemas/Body_run-amici-step' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/AmiciRun' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /run-cobra-step: + post: + tags: + - Steps + summary: Run a cobra simulation. + operationId: run-cobra-step + parameters: + - name: start + in: query + required: true + schema: + type: integer + description: Start time + title: Start + description: Start time + - name: stop + in: query + required: true + schema: + type: integer + description: End time(duration) + title: Stop + description: End time(duration) + - name: steps + in: query + required: true + schema: + type: integer + description: Number of steps. + title: Steps + description: Number of steps. + requestBody: + required: true + content: + multipart/form-data: + schema: + $ref: '#/components/schemas/Body_run-cobra-step' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/CobraRun' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /run-copasi-step: + post: + tags: + - Steps + summary: Run a copasi simulation. + operationId: run-copasi-step + parameters: + - name: start + in: query + required: true + schema: + type: integer + description: Start time + title: Start + description: Start time + - name: stop + in: query + required: true + schema: + type: integer + description: End time(duration) + title: Stop + description: End time(duration) + - name: steps + in: query + required: true + schema: + type: integer + description: Number of steps. + title: Steps + description: Number of steps. + requestBody: + required: true + content: + multipart/form-data: + schema: + $ref: '#/components/schemas/Body_run-copasi-step' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/CopasiRun' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /run-readdy-step: + post: + tags: + - Steps + summary: Run a readdy simulation. + operationId: run-readdy-step + parameters: + - name: box_size + in: query + required: false + schema: + type: array + items: + type: number + description: Box Size of box + default: + - 0.3 + - 0.3 + - 0.3 + title: Box Size + description: Box Size of box + - name: duration + in: query + required: false + schema: + type: integer + description: Simulation Duration + default: 10 + title: Duration + description: Simulation Duration + - name: dt + in: query + required: false + schema: + type: number + description: Interval of step with which simulation runs + default: 0.0008 + title: Dt + description: Interval of step with which simulation runs + - name: reaction_handler + in: query + required: false + schema: + type: string + description: Reaction handler as per Readdy simulation documentation. + default: UncontrolledApproximation + title: Reaction Handler + description: Reaction handler as per Readdy simulation documentation. + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/Body_run-readdy-step' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/ReaddyRun' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /run-smoldyn-step: + post: + tags: + - Steps + summary: Run a smoldyn simulation. + operationId: run-smoldyn-step + parameters: + - name: duration + in: query + required: false + schema: + type: integer + description: Simulation Duration + title: Duration + description: Simulation Duration + - name: dt + in: query + required: false + schema: + type: number + description: Interval of step with which simulation runs + title: Dt + description: Interval of step with which simulation runs + requestBody: + required: true + content: + multipart/form-data: + schema: + $ref: '#/components/schemas/Body_run-smoldyn-step' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/SmoldynRun' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /run-tellurium-step: + post: + tags: + - Steps + summary: Run a tellurium simulation. + operationId: run-tellurium-step + parameters: + - name: start + in: query + required: true + schema: + type: integer + description: Start time + title: Start + description: Start time + - name: stop + in: query + required: true + schema: + type: integer + description: End time(duration) + title: Stop + description: End time(duration) + - name: steps + in: query + required: true + schema: + type: integer + description: Number of steps. + title: Steps + description: Number of steps. + requestBody: + required: true + content: + multipart/form-data: + schema: + $ref: '#/components/schemas/Body_run-tellurium-step' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/TelluriumRun' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' +components: + schemas: + AmiciRun: + properties: + job_id: + type: string + title: Job Id + last_updated: + type: string + title: Last Updated + status: + type: string + title: Status + simulator: + type: string + title: Simulator + model_file: + type: string + title: Model File + start: + type: integer + title: Start + stop: + type: integer + title: Stop + steps: + type: integer + title: Steps + params: + type: object + title: Params + type: object + required: + - job_id + - last_updated + - status + - simulator + - model_file + - start + - stop + - steps + title: AmiciRun + BigraphRegistryAddresses: + properties: + version: + type: string + title: Version + registered_addresses: + items: + type: string + type: array + title: Registered Addresses + type: object + required: + - version + - registered_addresses + title: BigraphRegistryAddresses + BigraphSchemaType: + properties: + type_id: + type: string + title: Type Id + default_value: + type: string + title: Default Value + description: + type: string + title: Description + type: object + required: + - type_id + - default_value + - description + title: BigraphSchemaType + Body_generate-simularium-file: + properties: + uploaded_file: + type: string + format: binary + title: Uploaded File + description: A file containing results that can be parse by Simularium (spatial). + agent_parameters: + $ref: '#/components/schemas/SimulariumAgentParameters' + description: Parameters for the simularium agents defining either radius + or mass and density. + type: object + required: + - uploaded_file + title: Body_generate-simularium-file + Body_get-process-metadata: + properties: + config: + type: string + format: binary + title: Process Config + model_files: + items: + type: string + format: binary + type: array + title: Files + type: object + required: + - config + - model_files + title: Body_get-process-metadata + Body_run-amici-process: + properties: + model_file: + type: string + format: binary + title: Model File + description: SBML file + type: object + required: + - model_file + title: Body_run-amici-process + Body_run-amici-step: + properties: + model_file: + type: string + format: binary + title: Model File + description: SBML file + type: object + required: + - model_file + title: Body_run-amici-step + Body_run-cobra-process: + properties: + model_file: + type: string + format: binary + title: Model File + description: SBML file + type: object + required: + - model_file + title: Body_run-cobra-process + Body_run-cobra-step: + properties: + model_file: + type: string + format: binary + title: Model File + description: SBML file + type: object + required: + - model_file + title: Body_run-cobra-step + Body_run-copasi-process: + properties: + model_file: + type: string + format: binary + title: Model File + description: SBML file + type: object + required: + - model_file + title: Body_run-copasi-process + Body_run-copasi-step: + properties: + model_file: + type: string + format: binary + title: Model File + description: SBML file + type: object + required: + - model_file + title: Body_run-copasi-step + Body_run-mem3dg-process: + properties: + mesh_file: + type: string + format: binary + title: Mesh File + type: object + required: + - mesh_file + title: Body_run-mem3dg-process + Body_run-readdy-step: + properties: + species_config: + items: + $ref: '#/components/schemas/ReaddySpeciesConfig' + type: array + title: Species Config + description: Species Configuration, specifying species name mapped to diffusion + constant + examples: + - - diffusion_constant: 10.0 + name: E + - diffusion_constant: 10.0 + name: S + - diffusion_constant: 10.0 + name: ES + - diffusion_constant: 10.0 + name: P + reactions_config: + items: + $ref: '#/components/schemas/ReaddyReactionConfig' + type: array + title: Reactions Config + description: Reactions Configuration, specifying reaction scheme mapped + to reaction constant. + examples: + - - rate: 86.78638438 + scheme: 'fwd: E +(0.03) S -> ES' + - rate: 1.0 + scheme: 'back: ES -> E +(0.03) S' + - rate: 1.0 + scheme: 'prod: ES -> E +(0.03) P' + particles_config: + items: + $ref: '#/components/schemas/ReaddyParticleConfig' + type: array + title: Particles Config + description: Particles Configuration, specifying initial particle positions + for each particle. + examples: + - - initial_positions: + - - -0.11010841 + - 0.01048227 + - -0.07514985 + - - 0.02715631 + - -0.03829782 + - 0.14395517 + - - 0.05522253 + - -0.11880506 + - 0.02222362 + name: E + - initial_positions: + - - -0.21010841 + - 0.21048227 + - -0.07514985 + - - 0.02715631 + - -0.03829782 + - 0.14395517 + - - 0.05522253 + - -0.11880506 + - 0.02222362 + name: S + unit_system_config: + additionalProperties: + type: string + type: object + title: Unit System Config + description: Unit system configuration + default: + length_unit: micrometer + time_unit: second + type: object + required: + - species_config + - reactions_config + - particles_config + title: Body_run-readdy-step + Body_run-smoldyn-step: + properties: + uploaded_file: + type: string + format: binary + title: Uploaded File + description: Smoldyn Configuration File + type: object + required: + - uploaded_file + title: Body_run-smoldyn-step + Body_run-tellurium-process: + properties: + model_file: + type: string + format: binary + title: Model File + description: SBML file + type: object + required: + - model_file + title: Body_run-tellurium-process + Body_run-tellurium-step: + properties: + model_file: + type: string + format: binary + title: Model File + description: SBML file + type: object + required: + - model_file + title: Body_run-tellurium-step + Body_submit-composition: + properties: + spec_file: + type: string + format: binary + title: Spec File + description: Composition JSON File + model_files: + items: + type: string + format: binary + type: array + title: Model Files + description: List of uploaded model files + type: object + required: + - spec_file + - model_files + title: Body_submit-composition + Body_validate-composition: + properties: + spec_file: + type: string + format: binary + title: Spec File + description: Composition JSON File + type: object + required: + - spec_file + title: Body_validate-composition + CobraRun: + properties: + job_id: + type: string + title: Job Id + last_updated: + type: string + title: Last Updated + status: + type: string + title: Status + simulator: + type: string + title: Simulator + model_file: + type: string + title: Model File + start: + type: integer + title: Start + stop: + type: integer + title: Stop + steps: + type: integer + title: Steps + params: + type: object + title: Params + type: object + required: + - job_id + - last_updated + - status + - simulator + - model_file + - start + - stop + - steps + title: CobraRun + CompositionRun: + properties: + job_id: + type: string + title: Job Id + last_updated: + type: string + title: Last Updated + status: + type: string + title: Status + simulators: + items: + type: string + type: array + title: Simulators + duration: + type: integer + title: Duration + spec: + type: object + title: Spec + results: + type: object + title: Results + type: object + required: + - job_id + - last_updated + - status + - simulators + - duration + - spec + title: CompositionRun + CopasiRun: + properties: + job_id: + type: string + title: Job Id + last_updated: + type: string + title: Last Updated + status: + type: string + title: Status + simulator: + type: string + title: Simulator + model_file: + type: string + title: Model File + start: + type: integer + title: Start + stop: + type: integer + title: Stop + steps: + type: integer + title: Steps + params: + type: object + title: Params + type: object + required: + - job_id + - last_updated + - status + - simulator + - model_file + - start + - stop + - steps + title: CopasiRun + HTTPValidationError: + properties: + detail: + items: + $ref: '#/components/schemas/ValidationError' + type: array + title: Detail + type: object + title: HTTPValidationError + HealthCheckResponse: + properties: + version: + type: string + title: Version + status: + type: string + title: Status + message: + type: string + title: Message + default: Welcome to the BioCompose API + swagger_ui: + type: string + title: Swagger Ui + default: https://compose.biosimulations.org/docs + type: object + required: + - version + - status + title: HealthCheckResponse + Mem3dgRun: + properties: + job_id: + type: string + title: Job Id + last_updated: + type: string + title: Last Updated + status: + type: string + title: Status + simulators: + items: + type: string + type: array + title: Simulators + duration: + type: integer + title: Duration + spec: + type: object + title: Spec + results: + type: object + title: Results + type: object + required: + - job_id + - last_updated + - status + - simulators + - duration + - spec + title: Mem3dgRun + OutputData: + properties: + job_id: + type: string + title: Job Id + status: + type: string + title: Status + last_updated: + type: string + title: Last Updated + results: + type: object + title: Results + type: object + required: + - job_id + - status + - last_updated + - results + title: OutputData + ProcessMetadata: + properties: + process_address: + type: string + title: Process Address + input_schema: + type: object + title: Input Schema + output_schema: + type: object + title: Output Schema + initial_state: + type: object + title: Initial State + state: + anyOf: + - type: object + - type: 'null' + title: State + type: object + required: + - process_address + - input_schema + - output_schema + - initial_state + title: ProcessMetadata + ReaddyParticleConfig: + properties: + name: + type: string + title: Name + initial_positions: + items: + items: + type: number + type: array + type: array + title: Initial Positions + type: object + required: + - name + - initial_positions + title: ReaddyParticleConfig + ReaddyReactionConfig: + properties: + scheme: + type: string + title: Scheme + rate: + type: number + title: Rate + type: object + required: + - scheme + - rate + title: ReaddyReactionConfig + ReaddyRun: + properties: + job_id: + type: string + title: Job Id + last_updated: + type: string + title: Last Updated + status: + type: string + title: Status + duration: + type: number + title: Duration + dt: + type: number + title: Dt + box_size: + items: + type: number + type: array + title: Box Size + species_config: + anyOf: + - additionalProperties: + type: number + type: object + - items: + $ref: '#/components/schemas/ReaddySpeciesConfig' + type: array + title: Species Config + particles_config: + anyOf: + - additionalProperties: + items: + items: + type: number + type: array + type: array + type: object + - items: + $ref: '#/components/schemas/ReaddyParticleConfig' + type: array + title: Particles Config + reactions_config: + anyOf: + - additionalProperties: + type: number + type: object + - items: + $ref: '#/components/schemas/ReaddyReactionConfig' + type: array + title: Reactions Config + unit_system_config: + type: object + title: Unit System Config + reaction_handler: + type: string + title: Reaction Handler + type: object + required: + - job_id + - last_updated + - status + - duration + - dt + - box_size + - species_config + - particles_config + - reactions_config + - unit_system_config + - reaction_handler + title: ReaddyRun + ReaddySpeciesConfig: + properties: + name: + type: string + title: Name + diffusion_constant: + type: number + title: Diffusion Constant + type: object + required: + - name + - diffusion_constant + title: ReaddySpeciesConfig + SimulariumAgentParameter: + properties: + name: + type: string + title: Name + radius: + anyOf: + - type: number + - type: 'null' + title: Radius + mass: + anyOf: + - type: number + - type: 'null' + title: Mass + density: + anyOf: + - type: number + - type: 'null' + title: Density + type: object + required: + - name + - radius + - mass + - density + title: SimulariumAgentParameter + SimulariumAgentParameters: + properties: + agents: + items: + $ref: '#/components/schemas/SimulariumAgentParameter' + type: array + title: Agents + type: object + required: + - agents + title: SimulariumAgentParameters + SmoldynRun: + properties: + job_id: + type: string + title: Job Id + last_updated: + type: string + title: Last Updated + status: + type: string + title: Status + path: + type: string + title: Path + duration: + type: number + title: Duration + dt: + type: number + title: Dt + type: object + required: + - job_id + - last_updated + - status + - path + - duration + - dt + title: SmoldynRun + TelluriumRun: + properties: + job_id: + type: string + title: Job Id + last_updated: + type: string + title: Last Updated + status: + type: string + title: Status + simulator: + type: string + title: Simulator + model_file: + type: string + title: Model File + start: + type: integer + title: Start + stop: + type: integer + title: Stop + steps: + type: integer + title: Steps + params: + type: object + title: Params + type: object + required: + - job_id + - last_updated + - status + - simulator + - model_file + - start + - stop + - steps + title: TelluriumRun + ValidatedComposition: + properties: + valid: + type: boolean + title: Valid + invalid_nodes: + anyOf: + - items: + additionalProperties: + type: string + type: object + type: array + - type: 'null' + title: Invalid Nodes + type: object + required: + - valid + title: ValidatedComposition + ValidationError: + properties: + loc: + items: + anyOf: + - type: string + - type: integer + type: array + title: Location + msg: + type: string + title: Message + type: + type: string + title: Error Type + type: object + required: + - loc + - msg + - type + title: ValidationError diff --git a/client/submit_runs.py b/client/submit_runs.py new file mode 100644 index 0000000..bdfa46b --- /dev/null +++ b/client/submit_runs.py @@ -0,0 +1,134 @@ +import importlib +import uuid +from logging import Logger +from typing import * + +from fastapi import UploadFile, HTTPException +from process_bigraph import Composite + +from shared.data_model import UtcRun, AmiciRun, CobraRun, CopasiRun, TelluriumRun, ValidatedComposition, Mem3dgRun +from shared.connect import DatabaseConnector +from shared.environment import DEFAULT_JOB_COLLECTION_NAME, DEFAULT_BUCKET_NAME +from shared.io import write_uploaded_file +from shared.utils import deserialize_composition + +from gateway.handlers.states import generate_mem3dg_state + + +async def submit_pymem3dg_run( + job_id: str, + characteristic_time_step: float, + tension_modulus: float, + preferred_area: float, + preferred_volume: float, + reservoir_volume: float, + osmotic_strength: float, + volume: float, + parameters_config: dict[str, float | int], + damping: float, + duration: int, + tolerance: Optional[float] = 1e-11, + geometry_type: Optional[str] = None, + geometry_parameters: Optional[Dict[str, Union[float, int]]] = None, + mesh_file: Optional[str] = None, + db_connector: Optional[DatabaseConnector] = None, +): + input_state = generate_mem3dg_state( + characteristic_time_step=characteristic_time_step, + tension_modulus=tension_modulus, + preferred_area=preferred_area, + preferred_volume=preferred_volume, + reservoir_volume=reservoir_volume, + osmotic_strength=osmotic_strength, + volume=volume, + parameters_config=parameters_config, + damping=damping, + tolerance=tolerance, + geometry_type=geometry_type, + geometry_parameters=geometry_parameters, + mesh_file=mesh_file + ) + + mem3dg_job = Mem3dgRun( + job_id=job_id, + last_updated=db_connector.timestamp(), + status="PENDING", + simulators=["pymem3dg"], + duration=duration, + spec=input_state + ) + + # save job to db + if db_connector: + await db_connector.write( + collection_name=DEFAULT_JOB_COLLECTION_NAME, + **mem3dg_job.serialize() + ) + + return mem3dg_job + + +async def submit_utc_run( + db_connector: DatabaseConnector, + simulator: str, + model_file: UploadFile, + implementation_scope: str, # either 'process' or 'step' or 'p' or 's' + start: int, + stop: int, + steps: int, + logger: Logger, + **params +) -> AmiciRun | CobraRun | CopasiRun | TelluriumRun: + """ + :param db_connector: database connector singleton + :param simulator: simulator used + :param model_file: model file (sbml) + :param implementation_scope: one of either: 'process', 'step', 'p', 's' + :param start: simulation start time + :param stop: simulation stop time + :param steps: simulation steps + :param logger: logger from current __file__ + """ + try: + # parse process or step and simulator for correct path routing + if len(implementation_scope) == 1: + implementation_scope += "rocess" if "p" in implementation_scope else "tep" + job_id = f"run-{simulator}-{implementation_scope}-" + str(uuid.uuid4()) + + # upload model file to bucket + remote_model_path = await write_uploaded_file( + job_id=job_id, + uploaded_file=model_file, + bucket_name=DEFAULT_BUCKET_NAME, + extension='.xml' + ) + + # fit/validate data to return structure + data_models = importlib.import_module("shared.data_model") + ContextModel = getattr( + data_models, + simulator.replace(simulator[0], simulator[0].upper()) + ) + + run_data: AmiciRun | CobraRun | CopasiRun | TelluriumRun = ContextModel( + job_id=job_id, + last_updated=db_connector.timestamp(), + status="PENDING", + simulator=simulator, + model_file=remote_model_path, + start=start, + stop=stop, + steps=steps, + params=params + ) + + # save job to db + await db_connector.write( + collection_name=DEFAULT_JOB_COLLECTION_NAME, + **run_data.serialize() + ) + + return run_data + except Exception as e: + logger.error(str(e)) + raise HTTPException(status_code=500, detail=str(e)) diff --git a/common/proto/__init__.py b/common/proto/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/common/proto/simulation.proto b/common/proto/simulation.proto new file mode 100644 index 0000000..9ec81c4 --- /dev/null +++ b/common/proto/simulation.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; + +import "google/protobuf/struct.proto"; +import "google/protobuf/any.proto"; + +package compose; + +service VivariumService { + rpc StreamVivarium (VivariumRequest) returns (stream SimulationUpdate); +} + +message VivariumRequest { + string last_updated = 1; + int32 duration = 2; + string pickle_path = 3; + string job_id = 4; + string vivarium_id = 5; +} + +message SimulationUpdate { + string job_id = 1; + string last_updated = 2; + repeated google.protobuf.Struct results = 3; +} diff --git a/common/proto/simulation_pb2.py b/common/proto/simulation_pb2.py new file mode 100644 index 0000000..60de016 --- /dev/null +++ b/common/proto/simulation_pb2.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: simulation.proto +# Protobuf Python Version: 5.29.0 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 29, + 0, + '', + 'simulation.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from google.protobuf import struct_pb2 as google_dot_protobuf_dot_struct__pb2 +from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10simulation.proto\x12\x07\x63ompose\x1a\x1cgoogle/protobuf/struct.proto\x1a\x19google/protobuf/any.proto\"s\n\x0fVivariumRequest\x12\x14\n\x0clast_updated\x18\x01 \x01(\t\x12\x10\n\x08\x64uration\x18\x02 \x01(\x05\x12\x13\n\x0bpickle_path\x18\x03 \x01(\t\x12\x0e\n\x06job_id\x18\x04 \x01(\t\x12\x13\n\x0bvivarium_id\x18\x05 \x01(\t\"b\n\x10SimulationUpdate\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x14\n\x0clast_updated\x18\x02 \x01(\t\x12(\n\x07results\x18\x03 \x03(\x0b\x32\x17.google.protobuf.Struct2Z\n\x0fVivariumService\x12G\n\x0eStreamVivarium\x12\x18.compose.VivariumRequest\x1a\x19.compose.SimulationUpdate0\x01\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'simulation_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_VIVARIUMREQUEST']._serialized_start=86 + _globals['_VIVARIUMREQUEST']._serialized_end=201 + _globals['_SIMULATIONUPDATE']._serialized_start=203 + _globals['_SIMULATIONUPDATE']._serialized_end=301 + _globals['_VIVARIUMSERVICE']._serialized_start=303 + _globals['_VIVARIUMSERVICE']._serialized_end=393 +# @@protoc_insertion_point(module_scope) diff --git a/common/proto/simulation_pb2_grpc.py b/common/proto/simulation_pb2_grpc.py new file mode 100644 index 0000000..d70fb20 --- /dev/null +++ b/common/proto/simulation_pb2_grpc.py @@ -0,0 +1,97 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + +import common.proto.simulation_pb2 as simulation__pb2 + +GRPC_GENERATED_VERSION = '1.70.0' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in simulation_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) + + +class VivariumServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.StreamVivarium = channel.unary_stream( + '/compose.VivariumService/StreamVivarium', + request_serializer=simulation__pb2.VivariumRequest.SerializeToString, + response_deserializer=simulation__pb2.SimulationUpdate.FromString, + _registered_method=True) + + +class VivariumServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def StreamVivarium(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_VivariumServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'StreamVivarium': grpc.unary_stream_rpc_method_handler( + servicer.StreamVivarium, + request_deserializer=simulation__pb2.VivariumRequest.FromString, + response_serializer=simulation__pb2.SimulationUpdate.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'compose.VivariumService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('compose.VivariumService', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class VivariumService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def StreamVivarium(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_stream( + request, + target, + '/compose.VivariumService/StreamVivarium', + simulation__pb2.VivariumRequest.SerializeToString, + simulation__pb2.SimulationUpdate.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/env.yml b/env.yml new file mode 100644 index 0000000..cee3545 --- /dev/null +++ b/env.yml @@ -0,0 +1,199 @@ +name: compose-server +channels: + - defaults + - conda-forge +dependencies: + - blas=2.131 + - blas-devel=3.9.0 + - blosc=1.21.6 + - bottleneck=1.4.2 + - brotli-python=1.0.9 + - bzip2=1.0.8 + - c-ares=1.34.4 + - ca-certificates=2025.2.25 + - certifi=2025.1.31 + - cftime=1.6.4 + - contourpy=1.3.1 + - cycler=0.11.0 + - fonttools=4.55.3 + - freetype=2.12.1 + - hdf4=4.2.15 + - hdf5=1.14.3 + - kiwisolver=1.4.8 + - krb5=1.21.3 + - lcms2=2.17 + - lerc=4.0.0 + - libaec=1.1.3 + - libblas=3.9.0 + - libcblas=3.9.0 + - libcurl=8.12.1 + - libcxx=19.1.7 + - libdeflate=1.22 + - libedit=3.1.20230828 + - libev=4.33 + - libexpat=2.6.4 + - libffi=3.4.4 + - libgfortran=5.0.0 + - libgfortran5=13.2.0 + - libiconv=1.18 + - libjpeg-turbo=3.0.3 + - liblapack=3.9.0 + - liblapacke=3.9.0 + - liblzma=5.6.4 + - libnetcdf=4.9.2 + - libnghttp2=1.64.0 + - libopenblas=0.3.29 + - libpng=1.6.47 + - libsqlite=3.49.1 + - libssh2=1.11.1 + - libtiff=4.7.0 + - libwebp-base=1.5.0 + - libxcb=1.17.0 + - libxml2=2.13.6 + - libzip=1.11.2 + - libzlib=1.3.1 + - llvm-openmp=19.1.7 + - lz4-c=1.9.4 + - matplotlib-base=3.10.0 + - ncurses=6.5 + - netcdf-cxx4=4.3.1 + - netcdf4=1.7.2 + - numexpr=2.10.1 + - numpy=2.2.2 + - numpy-base=2.2.2 + - openblas=0.3.29 + - openjpeg=2.5.3 + - openssl=3.4.1 + - packaging=24.2 + - pandas=2.2.3 + - pillow=11.1.0 + - polyscope=2.3.0 + - pthread-stubs=0.3 + - pymem3dg=0.0.7 + - pyparsing=3.2.0 + - python=3.11.11 + - python-dateutil=2.9.0post0 + - python-tzdata=2023.3 + - python_abi=3.11 + - pytz=2024.1 + - readline=8.2 + - scipy=1.15.1 + - seaborn=0.13.2 + - setuptools=72.1.0 + - six=1.16.0 + - snappy=1.2.1 + - tk=8.6.13 + - tzdata=2025a + - unicodedata2=15.1.0 + - wheel=0.45.1 + - xarray=2024.11.0 + - xorg-libxau=1.0.12 + - xorg-libxdmcp=1.1.5 + - zstd=1.5.7 + - pip: + - annotated-types==0.7.0 + - anyio==4.8.0 + - appdirs==1.4.4 + - asttokens==3.0.0 + - attrs==25.1.0 + - bigraph-schema==0.0.51 + - bigraph-viz==0.1.3 + - biosimulator-processes==0.3.19 + - cachetools==5.5.2 + - chardet==5.2.0 + - charset-normalizer==3.4.1 + - click==8.1.8 + - cobra==0.29.1 + - compose-server==0.0.1 + - copasi-basico==0.83 + - decorator==5.2.1 + - depinfo==2.2.0 + - diskcache==5.6.3 + - dnspython==2.7.0 + - executing==2.2.0 + - fastapi==0.115.11 + - fastjsonschema==2.21.1 + - fire==0.7.0 + - flexcache==0.3 + - flexparser==0.4 + - future==1.0.0 + - google-api-core==2.24.1 + - google-auth==2.38.0 + - google-cloud-core==2.4.2 + - google-cloud-storage==3.1.0 + - google-crc32c==1.6.0 + - google-resumable-media==2.7.2 + - googleapis-common-protos==1.69.1 + - graphviz==0.20.3 + - grpcio==1.70.0 + - grpcio-tools==1.70.0 + - h11==0.14.0 + - h5py==3.13.0 + - httpcore==1.0.7 + - httpx==0.28.1 + - idna==3.10 + - imageio==2.37.0 + - importlib-resources==6.5.2 + - iniconfig==2.0.0 + - ipython==9.0.1 + - ipython-pygments-lexers==1.1.1 + - jedi==0.19.2 + - jsonschema==4.23.0 + - jsonschema-specifications==2024.10.1 + - jupyter-core==5.7.2 + - lxml==5.3.1 + - markdown-it-py==3.0.0 + - matplotlib-inline==0.1.7 + - mdurl==0.1.2 + - mpmath==1.3.0 + - nbformat==5.10.4 + - optlang==1.8.3 + - orjson==3.10.15 + - parsimonious==0.10.0 + - parso==0.8.4 + - pexpect==4.9.0 + - pint==0.24.4 + - pip==25.0.1 + - pip-autoremove==0.10.0 + - platformdirs==4.3.6 + - pluggy==1.5.0 + - process-bigraph==0.0.27 + - prompt-toolkit==3.0.50 + - proto-plus==1.26.0 + - protobuf==5.29.3 + - ptyprocess==0.7.0 + - pure-eval==0.2.3 + - pyasn1==0.6.1 + - pyasn1-modules==0.4.1 + - pydantic==2.10.6 + - pydantic-core==2.27.2 + - pygments==2.19.1 + - pymongo==4.11.2 + - pytest==8.3.5 + - python-copasi==4.45.296 + - python-dotenv==1.0.1 + - python-libsbml==5.20.4 + - python-multipart==0.0.20 + - pyyaml==6.0.2 + - referencing==0.36.2 + - regex==2024.11.6 + - requests==2.32.3 + - rich==13.9.4 + - rpds-py==0.23.1 + - rsa==4.9 + - ruamel-yaml==0.18.10 + - ruamel-yaml-clib==0.2.12 + - sniffio==1.3.1 + - stack-data==0.6.3 + - starlette==0.46.0 + - swiglpk==5.0.12 + - sympy==1.13.3 + - termcolor==2.5.0 + - toml==0.10.2 + - traitlets==5.14.3 + - typing-extensions==4.12.2 + - urllib3==2.3.0 + - uvicorn==0.34.0 + - vivarium-interface==0.0.3 + - wcwidth==0.2.13 + - websockets==15.0.1 diff --git a/environment.yml b/environment.yml index c194119..b4471b2 100644 --- a/environment.yml +++ b/environment.yml @@ -1,11 +1,10 @@ name: compose-server channels: - - defaults - readdy - conda-forge dependencies: - pip - - python=3.10 + # - python=3.10 - poetry - pymem3dg # - readdy diff --git a/gateway/handlers/health.py b/gateway/handlers/health.py index 556307e..bfe0fa8 100644 --- a/gateway/handlers/health.py +++ b/gateway/handlers/health.py @@ -1,7 +1,7 @@ from fastapi import FastAPI from shared.data_model import DbClientResponse -from shared.database import DatabaseConnector +from shared.connect import DatabaseConnector from shared.environment import DEFAULT_DB_TYPE diff --git a/gateway/handlers/submit.py b/gateway/handlers/submit.py index d08fe0c..93a9179 100644 --- a/gateway/handlers/submit.py +++ b/gateway/handlers/submit.py @@ -7,15 +7,15 @@ from process_bigraph import Composite from shared.data_model import UtcRun, AmiciRun, CobraRun, CopasiRun, TelluriumRun, ValidatedComposition, Mem3dgRun -from shared.database import DatabaseConnector +from shared.connect import DatabaseConnector from shared.environment import DEFAULT_JOB_COLLECTION_NAME, DEFAULT_BUCKET_NAME from shared.io import write_uploaded_file +from shared.utils import deserialize_composition from gateway.handlers.states import generate_mem3dg_state async def submit_pymem3dg_run( - db_connector: DatabaseConnector, job_id: str, characteristic_time_step: float, tension_modulus: float, @@ -31,6 +31,7 @@ async def submit_pymem3dg_run( geometry_type: Optional[str] = None, geometry_parameters: Optional[Dict[str, Union[float, int]]] = None, mesh_file: Optional[str] = None, + db_connector: Optional[DatabaseConnector] = None, ): input_state = generate_mem3dg_state( characteristic_time_step=characteristic_time_step, @@ -58,10 +59,11 @@ async def submit_pymem3dg_run( ) # save job to db - await db_connector.write( - collection_name=DEFAULT_JOB_COLLECTION_NAME, - **mem3dg_job.serialize() - ) + if db_connector: + await db_connector.write( + collection_name=DEFAULT_JOB_COLLECTION_NAME, + **mem3dg_job.serialize() + ) return mem3dg_job @@ -136,6 +138,15 @@ async def submit_utc_run( def check_composition(document_data: Dict) -> ValidatedComposition: validation = {'valid': True} + + # validation 1 (fit data model) + try: + validation['composition'] = deserialize_composition(document_data) + except: + validation['valid'] = False + validation['composition'] = None + + # validation 2 invalid_nodes = [] for node_name, node_spec in document_data.items(): if "emitter" not in node_name: diff --git a/gateway/main.py b/gateway/main.py index ddceb18..6543a1f 100644 --- a/gateway/main.py +++ b/gateway/main.py @@ -10,17 +10,23 @@ import sys from tempfile import mkdtemp from typing import * +import asyncio +from functools import partial +# import websockets + import dotenv import uvicorn -from fastapi import FastAPI, File, UploadFile, HTTPException, Query, APIRouter, Body +from fastapi import FastAPI, File, UploadFile, HTTPException, Query, APIRouter, Body, WebSocket from process_bigraph import Process, pp, Composite from starlette.middleware.cors import CORSMiddleware from starlette.responses import FileResponse from pydantic import BeforeValidator -from shared.database import MongoConnector -from shared.io import write_uploaded_file, download_file_from_bucket +from shared.connect import MongoConnector +from yaml import compose + +from shared.io import write_uploaded_file, download_file_from_bucket, write_local_file from shared.log_config import setup_logging from shared.utils import get_project_version, new_job_id, handle_exception, serialize_numpy, clean_temp_files from shared.environment import ( @@ -51,7 +57,9 @@ HealthCheckResponse, ProcessMetadata, Mem3dgRun, - BigraphSchemaType + BigraphSchemaType, + StateData, + FileUpload ) from gateway.handlers.submit import submit_utc_run, check_composition, submit_pymem3dg_run from gateway.handlers.health import check_client @@ -106,6 +114,41 @@ app.mongo_client = db_conn_gateway.client PyObjectId = Annotated[str, BeforeValidator(str)] +clients: List[WebSocket] = [] + +# SERVER_WS_URL = "ws://localhost:8001/ws" +# BASE_WS_URL = "ws://{function_name}:8000/ws" + +# manager = SocketConnectionManager() + +# @app.websocket("/ws") +# async def websocket_endpoint(websocket: WebSocket): +# """Handles incoming WebSocket connections from the server.""" +# await manager.connect(websocket) +# try: +# while True: +# data = await websocket.receive_text() +# print(f"Received from server: {data}") +# except WebSocketDisconnect: +# manager.disconnect(websocket) +# +# +# @app.post("/send-deltas/") +# async def send_deltas(request: StateData): +# """Receives HTTP data and forwards it to the WebSocket server. +# Used for twin representation. +# """ +# try: +# async with websockets.connect(SERVER_WS_URL) as websocket: +# packet: str = json.dumps( +# request.serialize() +# ) +# await websocket.send(packet) +# +# response: str | bytes = await websocket.recv() +# return json.loads(response) +# except Exception as e: +# raise HTTPException(status_code=500, detail=f"WebSocket error: {str(e)}") # -- Composition: submit composition jobs -- @@ -265,6 +308,31 @@ async def validate_composition( raise HTTPException(status_code=400, detail=message) +@app.post( + "/upload-file", + tags=["Files"], + operation_id="upload-file", + summary="Upload a file (ie: model file) to the composition api bucket" +) +async def upload_file( + file: UploadFile = File(..., description="Uploaded File"), + job_id: Optional[str] = Query(default=new_job_id("upload-file"), description="Optional Job ID associated with this upload.") +) -> FileUpload: + """TODO: make auth required for this endpoint.""" + try: + file_ext = os.path.splitext(file.filename)[-1] + uploaded_file_location: str = await write_uploaded_file( + job_id=job_id, + uploaded_file=file, + bucket_name=DEFAULT_BUCKET_NAME, + extension=file_ext + ) + return FileUpload(job_id=job_id, location=uploaded_file_location, status="SUCCESS") + except Exception as e: + message = handle_exception("upload-file") + raise HTTPException(status_code=400, detail=message) + + @app.post( "/submit-composition", response_model=CompositionRun, @@ -273,67 +341,28 @@ async def validate_composition( summary="Submit composition spec for simulation", ) async def submit_composition( - spec_file: UploadFile = File(..., description="Composition JSON File"), - # simulators: List[str] = Query(..., description="Simulator package names to use for implementation"), + composition_file: UploadFile = File(..., description="Composition JSON File"), duration: int = Query(..., description="Duration of simulation"), - model_files: List[UploadFile] = File(..., description="List of uploaded model files"), + # simulators: list[str] = Query(..., description="Simulator package names to use for implementation"), ) -> CompositionRun: - # validate filetype - if not spec_file.filename.endswith('.json') and spec_file.content_type != 'application/json': + """We should assume that any file specifications have already been validated and remotely uploaded.""" + if not composition_file.filename.endswith('.json') and composition_file.content_type != 'application/json': raise HTTPException(status_code=400, detail="Invalid file type. Only JSON files are supported.") job_id = new_job_id("composition") try: - # 1. verification by fitting a common datamodel (in this case composition spec) - contents = await spec_file.read() - data: Dict = json.loads(contents) - - simulators: List[str] = [] - for node_name, node_spec in data.items(): - # parse list of simulators required from spec addresses - - address = node_spec['address'] # MUST be like: local:copasi-process - # if "emitter" not in address: - # simulator = address.split(":")[-1].split('-')[0] - # simulators.append(simulator) - - # upload model files as needed (model filepath MUST match that which is in the spec-->./model-file - for model_file in model_files: - spec_model_source = node_spec.get("config").get("model", {}).get("model_source") - if spec_model_source: - if (spec_model_source.split('/')[-1] == model_file.filename): - file_ext = os.path.splitext(spec_model_source)[-1] - uploaded_model_source_location = await write_uploaded_file( - job_id=job_id, - uploaded_file=model_file, - bucket_name=DEFAULT_BUCKET_NAME, - extension=file_ext - ) - data[node_name]["config"]["model"]["model_source"] = uploaded_model_source_location - - # 1a. verification by fitting the individual process specs to an expected structure - nodes: List[CompositionNode] = [] - for node_name, node_spec in data.items(): - node = CompositionNode(name=node_name, **node_spec) - nodes.append(node) - - # 1b. verification by fitting that tree of nodes into an expected structure (which is consumed by pbg.Composite()) - composition = CompositionSpec( - nodes=nodes, - emitter_mode="all", - job_id=job_id - ) + file_content: bytes = await composition_file.read() + composite_spec: dict[str, str | dict] = json.loads(file_content) - # 2. verification by fitting write confirmation into CompositionRun...to verify O phase of IO, garbage in garbage out - write_confirmation: Dict = await db_conn_gateway.write( + # verification by fitting write confirmation into CompositionRun...to verify O phase of IO, garbage in garbage out + write_confirmation: dict[str, str | list[str] | int | dict] = await db_conn_gateway.write( collection_name=DEFAULT_JOB_COLLECTION_NAME, status="PENDING", - spec=composition.spec, - job_id=composition.job_id, + spec=composite_spec, + job_id=job_id, last_updated=db_conn_gateway.timestamp(), - simulators=simulators, - duration=duration, - results={} + simulators=[], + duration=duration ) return CompositionRun(**write_confirmation) @@ -515,6 +544,7 @@ def check_health() -> HealthCheckResponse: # TODO: refactor job id parsing in worker for dispatch # -- Processes: submit single simulator jobs -- + @app.post( "/run-mem3dg-process", response_model=Mem3dgRun, @@ -525,15 +555,15 @@ def check_health() -> HealthCheckResponse: ) async def run_mem3dg_process( duration: int = Query(...), - characteristic_time_step: float = Query(...), - tension_modulus: float = Query(...), - preferred_area: float = Query(...), - preferred_volume: float = Query(...), - reservoir_volume: float = Query(...), - osmotic_strength: float = Query(...), - volume: float = Query(...), - damping: float = Query(...), - bending_kbc: float = Query(...), + characteristic_time_step: float = Query(..., example=1), + tension_modulus: float = Query(..., example=0.1), + preferred_area: float = Query(..., example=12.486), + preferred_volume: float = Query(..., example=2.933), + reservoir_volume: float = Query(..., example=1), + osmotic_strength: float = Query(..., example=0.02), + volume: float = Query(..., example=2.9), + damping: float = Query(..., example=0.05), + bending_kbc: float = Query(..., example=0.008), tolerance: Optional[float] = Query(default=1e-11), # geometry_type: Optional[str] = None, # geometry_parameters: Optional[Dict[str, Union[float, int]]] = None, @@ -547,7 +577,6 @@ async def run_mem3dg_process( bucket_name=DEFAULT_BUCKET_NAME, extension='.ply' ) - parameters = { "bending": { "Kbc": bending_kbc, @@ -569,7 +598,7 @@ async def run_mem3dg_process( parameters_config=parameters, duration=duration ) - + print(f'Got the mem3dg run: {mem3dg_run}') return mem3dg_run except Exception as e: message = handle_exception("run-mem3dg-process") + f'-{str(e)}' diff --git a/pyproject.toml b/pyproject.toml index 23f01a6..6dddd53 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,20 @@ dependencies = [ "pymongo", "biosimulator-processes", "imageio", - "nbformat" + "nbformat", + "websockets", + "grpcio", + "grpcio-tools" +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-cov", + "deptry" , + "mypy", + "pre-commit", + "tox" ] # also make sure to install readdy and pymem3dg: @@ -32,13 +45,32 @@ packages = [ {include = "common"} ] +# [build-system] +# requires = ["poetry-core"] +# build-backend = "poetry.core.masonry.api" + [build-system] -requires = ["poetry-core"] -build-backend = "poetry.core.masonry.api" +requires = ["hatchling", "setuptools_scm"] +build-backend = "hatchling.build" + +# [build-system] +# requires = ["scikit-build-core[conda]", "setuptools_scm", "hatchling"] +# build-backend = "scikit_build_core.build" + +[tool.hatch.metadata] +allow-direct-references = true + +[tool.hatch.build.targets.wheel] +packages = ["gateway", "shared", "worker", "common", "client", "server"] +# [tool.hatch.envs.default] +# dependencies = [ +# "conda: numpy", +# "conda: scipy", +# "conda: matplotlib", +# "conda: pybind11" +# ] -[tool.pytest.ini_options] -addopts = "-s" -testpaths = ["tests"] -asyncio_mode = "auto" -asyncio_default_fixture_loop_scope = "function" +[tool.setuptools_scm] # Required for automatic versioning +version_scheme = "guess-next-dev" +local_scheme = "node-and-date" diff --git a/server/.VERSION b/server/.VERSION new file mode 100644 index 0000000..8acdd82 --- /dev/null +++ b/server/.VERSION @@ -0,0 +1 @@ +0.0.1 diff --git a/server/Dockerfile-server b/server/Dockerfile-server new file mode 100644 index 0000000..ca8b0ff --- /dev/null +++ b/server/Dockerfile-server @@ -0,0 +1,73 @@ +# FROM condaforge/miniforge3:24.9.2-0 +FROM condaforge/miniforge3:latest + +LABEL org.opencontainers.image.title="bio-compose-server-worker" \ + org.opencontainers.image.description="Base Docker image for BioCompose REST API management, job processing, and datastorage with MongoDB, ensuring scalable and robust performance." \ + org.opencontainers.image.url="https://compose.biosimulators.org/" \ + org.opencontainers.image.source="https://github.com/biosimulators/bio-compose-server" \ + org.opencontainers.image.authors="Alexander Patrie , BioSimulators Team " \ + org.opencontainers.image.vendor="BioSimulators Team" + +ENV DEBIAN_FRONTEND=noninteractive + +RUN apt-get update \ + && apt-get install -y \ + meson \ + g++ \ + gfortran \ + libblas-dev \ + liblapack-dev \ + libgfortran5 \ + libhdf5-dev \ + libhdf5-serial-dev \ + libatlas-base-dev \ + cmake \ + make \ + git \ + build-essential \ + python3-dev \ + swig \ + libc6-dev \ + libx11-dev \ + libc6 \ + libgl1-mesa-dev \ + pkg-config \ + curl \ + tar \ + libgl1-mesa-glx \ + libice6 \ + libsm6 \ + gnupg \ + libstdc++6 \ + && apt-get clean + +# copy assets +COPY assets/docker/config/.biosimulations.json /.google/.bio-compose.json +COPY assets/docker/config/.pys_usercfg.ini /Pysces/.pys_usercfg.ini +COPY assets/docker/config/.pys_usercfg.ini /root/Pysces/.pys_usercfg.ini +COPY tests/test_fixtures /test_fixtures + +WORKDIR /app + +# copy env configs +COPY ./environment.yml /app/environment.yml +COPY ./pyproject.toml /app/pyproject.toml +COPY ./gateway /app/gateway +COPY ./shared /app/shared +COPY ./worker /app/worker +COPY ./common /app/common + +RUN echo "Server" > /app/README.md \ + && mkdir /app/config + +RUN conda update -n base -c conda-forge conda \ + && conda env create -f /app/environment.yml -y \ + && conda install -n compose-server conda-forge::readdy -y \ + && conda run -n compose-server pip install -e . + +RUN conda env create -f /app/environment.yml -y \ + && conda install -n compose-server conda-forge::readdy -y \ + && conda run -n compose-server pip install -e . + +CMD ["conda", "run", "-n", "compose-server", "python3", "worker/main.py"] + diff --git a/server/__init__.py b/server/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/server/handler.py b/server/handler.py new file mode 100644 index 0000000..e69de29 diff --git a/server/main.py b/server/main.py new file mode 100644 index 0000000..eb2429f --- /dev/null +++ b/server/main.py @@ -0,0 +1,105 @@ +import hashlib +import hmac +import logging +import pickle +import shutil +import time +from concurrent import futures +from tempfile import mkdtemp + +import grpc +from google.protobuf.struct_pb2 import Struct +from vivarium import Vivarium + +from common.proto import simulation_pb2, simulation_pb2_grpc +from shared.environment import TEST_KEY +from shared.log_config import setup_logging +from shared.serial import get_remote_pickle_path, write_pickle, hydrate_pickle +from shared.utils import timestamp + + +logger = setup_logging(__file__) + + +class ServerHandler: + @classmethod + def verify_pickle(cls, signed_data: bytes, secret_key: bytes) -> bytes: + """Verify HMAC and return raw compressed pickle data.""" + signature = signed_data[:32] + data: bytes = signed_data[32:] + + expected_sig = hmac.new(secret_key, data, hashlib.sha256).digest() + if not hmac.compare_digest(signature, expected_sig): + raise ValueError("Pickle signature verification failed") + return data + + @classmethod + def process_run(cls, duration: int, pickle_path: str, job_id: str, vivarium_id: str, buffer: float = 0.05): + try: + tmp = mkdtemp() + vivarium: Vivarium = hydrate_pickle(vivarium_id=vivarium_id, temp_dir=tmp) + + for i in range(duration): + # run simulation for k + vivarium.run(1) # TODO: make this timestep more controllable and smaller + results_k = vivarium.get_results() + if not isinstance(results_k, list): + raise Exception("Results could not be parsed as a list.") + + # convert data + proto_results = [] + for result in results_k: + struct = Struct() + struct.update(result) + proto_results.append(struct) + + # stream kth update + update = simulation_pb2.SimulationUpdate( + job_id=job_id, + last_updated=timestamp(), + results=proto_results + ) + yield update + + # write the updated vivarium state to the pickle file + remote_pickle_path = get_remote_pickle_path(vivarium_id) + write_pickle(vivarium_id=vivarium_id, vivarium=vivarium) + + shutil.rmtree(tmp) + except Exception as e: + print(e) + raise grpc.RpcError(grpc.StatusCode.INTERNAL, str(e)) + + +class VivariumService(simulation_pb2_grpc.VivariumServiceServicer): + @property + def buffer(self) -> float: + return 0.25 + + def StreamVivarium(self, request, context): + """Handles a gRPC streaming request from a client.""" + for update in ServerHandler.process_run( + duration=request.duration, + pickle_path=request.pickle_path, + job_id=request.job_id, + vivarium_id=request.vivarium_id + ): + if context.is_active(): + yield update + time.sleep(self.buffer) + else: + print(f'Client is disconnected!') + break + + +def serve(): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + simulation_pb2_grpc.add_VivariumServiceServicer_to_server(VivariumService(), server) + server.add_insecure_port("[::]:50051") + server.start() + print("gRPC Server started on port 50051...") + server.wait_for_termination() + + +if __name__ == "__main__": + serve() diff --git a/shared/database.py b/shared/connect.py similarity index 89% rename from shared/database.py rename to shared/connect.py index e7f1731..4db863b 100644 --- a/shared/database.py +++ b/shared/connect.py @@ -1,9 +1,11 @@ +import json from abc import abstractmethod, ABC from dataclasses import dataclass from datetime import datetime from enum import Enum from typing import * +from fastapi import WebSocket from pymongo import MongoClient from pymongo.collection import Collection from pymongo.database import Database @@ -12,6 +14,22 @@ from shared.environment import DEFAULT_JOB_COLLECTION_NAME, DEFAULT_DB_NAME +class SocketConnectionManager: + def __init__(self): + self.active_connections = [] + + async def connect(self, websocket: WebSocket): + await websocket.accept() + self.active_connections.append(websocket) + + def disconnect(self, websocket: WebSocket): + self.active_connections.remove(websocket) + + async def send_data(self, message: dict, websocket: WebSocket): + await websocket.send_text(json.dumps(message)) + return await websocket.receive_text() + + class DatabaseConnector(ABC): """Abstract class that is both serializable and interacts with the database (of any type). """ def __init__(self, connection_uri: str, database_id: str, connector_id: str, local: bool = False): diff --git a/shared/data_model.py b/shared/data_model.py index 3bd8eaf..eb575c0 100644 --- a/shared/data_model.py +++ b/shared/data_model.py @@ -2,6 +2,8 @@ # -- worker models -- # import os from dataclasses import dataclass, asdict, field +import abc +import dataclasses as dc from enum import Enum from typing import * @@ -19,15 +21,78 @@ class BaseModel(_BaseModel): @dataclass class BaseClass: """Base Python Dataclass multipurpose class with custom app configuration.""" - def to_dict(self): - return asdict(self) - def serialize(self): return asdict(self) + def set(self, attribute_id: str, value: float | int | str | bool | list | dict): + return setattr(self, attribute_id, value) + + def get(self, attribute_id: str): + return getattr(self, attribute_id) + + @property + def attributes(self) -> list[str]: + return list[self.__dataclass_fields__.keys()] + + +# --- vivarium interface --- + +class DynamicData: + def __init__(self, **params): + """Dynamically define and set state attributes via **data.""" + self._set_attributes(**params) + + def _set_attributes(self, **params): + for k, v in params.items(): + setattr(self, k, v) + + def serialize(self) -> dict: + return self.__dict__ + + def __repr__(self) -> str: + return str(self.serialize()) + + +class StatefulDict(dict): + def __new__(cls, *args, **kwargs): + instance = super().__new__(cls) + return instance + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + +class Result(dict): + def __new__(cls, global_time: float, *args, **kwargs): + instance = super().__new__(cls) + instance.global_time = global_time + return instance + + def __init__(self, global_time: float, *args, **kwargs): + super().__init__(*args, **kwargs) + + +@dataclass +class RunResponse(BaseClass): + job_id: str + last_updated: str + results: list[dict[str, Any] | Result] + + +@dataclass +class Results: + data: list[Result] + # -- requests -- +@dataclass +class FileUpload: + location: str + job_id: str + status: str + + @dataclass class ProcessMetadata(BaseClass): process_address: str @@ -56,7 +121,7 @@ class CompositionRun(Run): simulators: List[str] duration: int spec: Dict[str, Any] - results: Dict[str, Any] = field(default=None) + results: Dict[str, Any] = field(default_factory=dict) @dataclass @@ -215,20 +280,39 @@ def __post_init__(self): self.store = DataStore(self.store) +@dataclass +class Port(BaseClass): + name: str + value: Any + + +@dataclass +class InputPort(Port): + pass + + +@dataclass +class OutputPort(Port): + pass + + @dataclass class CompositionNode(BaseClass): name: str _type: str address: str - config: Dict[str, Any] - inputs: Dict[str, List[str]] - outputs: Optional[Dict[str, List[str]]] = None - - def to_dict(self): - rep = super().to_dict() - rep.pop("name") - if not self.outputs: - rep.pop("outputs") + config: DynamicData + inputs: Optional[list[InputPort]] = field(default_factory=list) + outputs: Optional[list[OutputPort]] = field(default_factory=list) + + def _format_port(self, ports: list[InputPort | OutputPort]) -> dict[str, Any]: + return {port.name: port.value for port in ports} + + def export(self) -> dict[str, Any]: + rep = self.serialize() + del rep['name'] + rep['inputs'] = self._format_port(self.inputs) + rep['outputs'] = self._format_port(self.outputs) return rep @@ -239,14 +323,11 @@ class CompositionSpec(BaseClass): spec = CompositionSpec(nodes=nodes, emitter_mode='ports') composite = Composition({'state': spec """ - nodes: List[CompositionNode] - job_id: str - emitter_mode: str = "all" + nodes: list[CompositionNode] - @property - def spec(self): + def export(self): return { - node_spec.name: node_spec.to_dict() + node_spec.name: node_spec.export() for node_spec in self.nodes } @@ -269,6 +350,7 @@ class SmoldynOutput(FileResponse): class ValidatedComposition(BaseClass): valid: bool invalid_nodes: Optional[list[dict[str, str]]] = field(default=None) + composition: Optional[CompositionSpec] = field(default=None) @dataclass @@ -294,6 +376,12 @@ class IncompleteFileJob(BaseClass): source: str +@dataclass +class MembraneConfig(BaseClass): + characteristic_time_step: float + + + class JobStatuses: PENDING = "PENDING" IN_PROGRESS = "IN_PROGRESS" @@ -317,4 +405,56 @@ class JobStatuses: ] +# --- websocket client --- + +# This class represents a change to object values (x, y, z are placeholders) +@dataclass +class Packet(BaseClass): + dx: float + dy: float + dz: float + + +# --- websocket server --- + +class StateData(DynamicData): + pass + + +class AccumulationState(StateData): + def set_data(self, **params): + for k, v in params.items(): + prev = getattr(self, k) + setattr(self, k, prev + v) + + +@dataclass +class Region(BaseClass): + region_id: str + metadata: dict[str, str] + + +@dataclass +class BodyRegionState(AccumulationState): + region: Region + state: AccumulationState + + +@dataclass +class Patient(BaseClass): + name: str + dob: str + age: float + history_metadata: dict[str, str] + + +@dataclass +class BodyState(AccumulationState): + patient: Patient + region_states: list[BodyRegionState] + + + + + diff --git a/shared/environment.py b/shared/environment.py index a8d4107..9285375 100644 --- a/shared/environment.py +++ b/shared/environment.py @@ -19,3 +19,5 @@ DEFAULT_DB_NAME = os.getenv("DB_NAME", "compose_db") DEFAULT_BUCKET_NAME = os.getenv("BUCKET_NAME", "compose_bucket") DEFAULT_JOB_COLLECTION_NAME = os.getenv("JOB_COLLECTION_NAME", "compose_jobs") +LOCAL_GRPC_MAPPING = "localhost:50051" +TEST_KEY = b"supersecurekey" \ No newline at end of file diff --git a/shared/io.py b/shared/io.py index 76eb7e0..d50a397 100644 --- a/shared/io.py +++ b/shared/io.py @@ -13,6 +13,14 @@ from google.cloud import storage +async def write_local_file(temp_dir: str, uploaded_file: UploadFile) -> Path: + temp_file = os.path.join(temp_dir, uploaded_file.filename) + with open(temp_file, "wb") as f: + uploaded = await uploaded_file.read() + f.write(uploaded) + return Path(temp_file) + + def check_upload_file_extension(file: UploadFile, purpose: str, ext: str, message: str = None) -> bool: if not file.filename.endswith(ext): msg = message or f"Files for {purpose} must be passed in {ext} format." diff --git a/shared/parse_proto.py b/shared/parse_proto.py new file mode 100644 index 0000000..30b45be --- /dev/null +++ b/shared/parse_proto.py @@ -0,0 +1,24 @@ +import os + +import grpc_tools.protoc + + +def parse_proto() -> None: + root = os.path.dirname(os.path.dirname(__file__)) + proto_dir = os.path.join(root, "common", "proto") + proto_file = os.path.join(proto_dir, "simulation.proto") + + proto_include = os.popen("python -c 'import grpc_tools.protoc; import os; print(os.path.dirname(grpc_tools.protoc.__file__))'").read().strip() + + return grpc_tools.protoc.main([ + "grpc_tools.protoc", + f"-I{proto_dir}", + f"-I{proto_include}/_proto", + f"--python_out={proto_dir}", + f"--grpc_python_out={proto_dir}", + proto_file, + ]) + + +if __name__ == "__main__": + parse_proto() diff --git a/shared/scripts/handle_env.sh b/shared/scripts/handle_env.sh new file mode 100644 index 0000000..c9b443f --- /dev/null +++ b/shared/scripts/handle_env.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +# Initialize Conda in the shell +eval "$(conda shell.bash hook)" +conda activate server + +# Execute the command passed to the container +exec "$@" + diff --git a/shared/serial.py b/shared/serial.py new file mode 100644 index 0000000..70a46b6 --- /dev/null +++ b/shared/serial.py @@ -0,0 +1,60 @@ +import os +import pickle +import uuid +import hashlib +import zlib +from tempfile import mkdtemp + +from vivarium import Vivarium +import hmac + + +from shared.environment import DEFAULT_BUCKET_NAME +from shared.io import download_file, upload_blob + + +def get_pickle(vivarium_id: str, temp_dir: str) -> bytes: + remote_pickle_path = get_remote_pickle_path(vivarium_id) + local_pickle_file = download_file(remote_pickle_path, temp_dir, DEFAULT_BUCKET_NAME) + with open(local_pickle_file, "rb") as f: + return f.read() + + +def hydrate_pickle(vivarium_id: str, temp_dir: str) -> Vivarium: + remote_pickle_path = get_remote_pickle_path(vivarium_id) + local_pickle_file = download_file(remote_pickle_path, temp_dir, DEFAULT_BUCKET_NAME) + with open(local_pickle_file, "rb") as f: + return pickle.load(f) + + +def write_pickle(vivarium: Vivarium, vivarium_id: str) -> str: + upload_prefix = f"file_uploads/vivarium/{vivarium_id}" + bucket_prefix = f"gs://{DEFAULT_BUCKET_NAME}/" + upload_prefix + local_pickle_file = f'.pckl' + save_dest = mkdtemp() + local_path = os.path.join(save_dest, local_pickle_file) + + # write pickle locally + with open(local_path, "wb") as f: + pickle.dump(vivarium, f) + + # upload local pickle to bucket + remote_pickle_path = upload_prefix + local_path.split("/")[-1] + upload_blob(bucket_name=DEFAULT_BUCKET_NAME, source_file_name=local_path, destination_blob_name=remote_pickle_path) + + # return remote path + return remote_pickle_path + + +def get_remote_pickle_path(vivarium_id: str) -> str: + return f"file_uploads/vivarium/{vivarium_id}.pckl" + + +def create_vivarium_id(vivarium: Vivarium) -> str: + return 'vivarium-' + str(uuid.uuid4()) + '-' + str(vivarium.__hash__()) + + +def sign_pickle(data: bytes, secret_key: bytes) -> bytes: + """Signs the data with HMAC to prevent tampering.""" + signature = hmac.new(secret_key, data, hashlib.sha256).digest() + return signature + data diff --git a/shared/utils.py b/shared/utils.py index d2a3c34..26a5353 100644 --- a/shared/utils.py +++ b/shared/utils.py @@ -1,13 +1,47 @@ import os +import pickle import traceback import uuid +from datetime import datetime from enum import Enum, EnumMeta from asyncio import sleep +from tempfile import mkdtemp from typing import * from pprint import pformat import h5py import numpy as np +from vivarium import Vivarium + +from shared.data_model import CompositionNode, CompositionSpec +from shared.environment import DEFAULT_BUCKET_NAME +from shared.io import download_file, upload_blob + + +def timestamp() -> str: + return str(datetime.now()) + + +def deserialize_composition(composite_spec: dict) -> CompositionSpec: + """Converts pbg-native composition spec into a structured data model representation.""" + nodes: list[CompositionNode] = [] + for node_name, node_spec in composite_spec.items(): + node = CompositionNode(name=node_name, **node_spec) + nodes.append(node) + nodes: list[CompositionNode] = [CompositionNode(name=node_name, **node_spec) for node_name, node_spec in composite_spec.items()] + + # verification by fitting that tree of nodes into an expected structure (which is consumed by pbg.Composite()) + return CompositionSpec(nodes=nodes) + + +def serialize_composition(composite_spec: CompositionSpec): + return composite_spec.export() + + +def pickle_composition(composite_spec: CompositionSpec, pickle_file: str) -> None: + serialized = composite_spec.export() + with open(pickle_file, "wb") as f: + pickle.dump(serialized, f) def clean_temp_files(temp_files: List[os.PathLike[str] | str]): diff --git a/shared/vivarium.py b/shared/vivarium.py new file mode 100644 index 0000000..c4d8f97 --- /dev/null +++ b/shared/vivarium.py @@ -0,0 +1,115 @@ +import json +from dataclasses import dataclass +from typing import Any + +import process_bigraph as pbg +from google.protobuf.internal.well_known_types import Struct, Any as _Any +from vivarium import Vivarium +from vivarium.tests import TOY_PROCESSES +from bsp import app_registrar + +from common.proto import simulation_pb2 +from shared.data_model import Results, Result, ValidatedComposition +from shared.utils import deserialize_composition + +CORE: pbg.ProcessTypes = app_registrar.core + + +# TODO: possibly have an endpoint /create-vivarium in which a vivarium instance is pickled and saved to db +# this exposes all methods of a stateful vivarium +# /add-process (vivarium_id, name, process_id): unpickle vivarium instance and run vivarium.add_process() +# /add-object (vivarium_id, name, value, path?): "" but for objects + + +def create_vivarium( + document: dict = None, + core: pbg.ProcessTypes = CORE +) -> Vivarium: + processes_to_use = [core.process_registry.registry, TOY_PROCESSES] + vivarium = Vivarium(processes=processes_to_use, document=document) + vivarium.add_emitter() + return vivarium + + +def parse_spec(composite_spec: dict, duration: int, core: pbg.ProcessTypes = CORE) -> Vivarium: + vivarium = Vivarium(core=core) # load in bsp.app_registrar.core + for process_name, spec in composite_spec.items(): + vivarium.add_process( + name=process_name, + process_id=spec.get('address'), + config=spec.get('config'), + inputs=spec.get('inputs'), + outputs=spec.get('outputs') + ) + vivarium.add_emitter() + + return vivarium + + +def run_composition(vivarium: Vivarium, duration: int) -> Results: + vivarium.run(duration) + return Results( + data=[ + Result(**result) + for result in vivarium.get_results() + ] + ) + + +def check_composition(document_data: dict) -> ValidatedComposition | Any: + validation = {'valid': True} + + # validation 1 (fit data model) + try: + validation['composition'] = deserialize_composition(document_data) + except: + validation['valid'] = False + validation['composition'] = None + + # validation 2 + invalid_nodes = [] + for node_name, node_spec in document_data.items(): + if "emitter" not in node_name: + try: + assert node_spec["inputs"], f"{node_name} is missing inputs" + assert node_spec["outputs"], f"{node_name} is missing outputs" + except AssertionError as e: + invalid_node = {node_name: str(e)} + invalid_nodes.append(invalid_node) + validation['valid'] = False + + validation['invalid_nodes'] = invalid_nodes if len(invalid_nodes) else None + # return ValidatedComposition(**validation) + return validation + + +# TODO: handle these differently + +def convert_object(key, data): + """Converts a Python dictionary into a gRPC Object message.""" + any_value = _Any() + any_value.Pack(convert_struct(key, data)) + return simulation_pb2.Object(value=any_value) + + +def convert_struct(key, data): + """Converts a Python dictionary into a gRPC Struct message.""" + d = eval(data) if isinstance(data, str) else data + struct = Struct() + struct.update({key: d}) + return struct + + +def convert_process(process_dict): + """Converts a Python dictionary to a gRPC Process message.""" + address = process_dict["address"] + _type = 'step' if 'emitter' in address else 'process' + outputs = process_dict.get('outputs') + return simulation_pb2.Process( + type=process_dict.get('_type', _type), + address=address, + config={k: convert_struct(k, v) for k, v in process_dict.get("config", {}).items()}, + inputs={k: convert_object(k, v) for k, v in process_dict.get("inputs", {}).items()}, + outputs={k: convert_object(k, v) for k, v in outputs.items()} if outputs else {}, + ) + diff --git a/tests/test_fixtures/membrane_composite.json b/tests/test_fixtures/membrane_composite.json index cfabbf5..b2af1fe 100644 --- a/tests/test_fixtures/membrane_composite.json +++ b/tests/test_fixtures/membrane_composite.json @@ -3,27 +3,27 @@ "_type": "process", "address": "local:simple-membrane-process", "config": { - "characteristic_time_step": 1, - "mesh_file": "oblate.ply", - "tension_model": { - "modulus": 0.1, - "preferred_area": 12.4866 - }, - "osmotic_model": { - "preferred_volume": 2.9306666666666668, - "reservoir_volume": 1, - "strength": 0.02, - "volume": 2.9 - }, - "parameters": { - "bending": { - "Kbc": 0.0000822 - }, - "damping": 0.05 - }, - "tolerance": 1e-11, - "console_output": true -}, + "characteristic_time_step": 1, + "mesh_file": "oblate.ply", + "tension_model": { + "modulus": 0.1, + "preferred_area": 12.4866 + }, + "osmotic_model": { + "preferred_volume": 2.9306666666666668, + "reservoir_volume": 1, + "strength": 0.02, + "volume": 2.9 + }, + "parameters": { + "bending": { + "Kbc": 0.0000822 + }, + "damping": 0.05 + }, + "tolerance": 1e-11, + "console_output": true + }, "inputs": { "geometry": [ "geometry_store" diff --git a/tests/test_fixtures/smoldyn-config.json b/tests/test_fixtures/smoldyn-config.json new file mode 100644 index 0000000..3eb3da9 --- /dev/null +++ b/tests/test_fixtures/smoldyn-config.json @@ -0,0 +1,5 @@ +{ + "model": { + "model_source": "Min1.txt" + } +} diff --git a/tests/test_fixtures/test-vivarium-id.txt b/tests/test_fixtures/test-vivarium-id.txt new file mode 100644 index 0000000..f614a08 --- /dev/null +++ b/tests/test_fixtures/test-vivarium-id.txt @@ -0,0 +1 @@ +vivarium-9e865a04-f03d-4091-b7e3-03b1f854115b-381588430 \ No newline at end of file diff --git a/tests/test_fixtures/vivarium-example.json b/tests/test_fixtures/vivarium-example.json new file mode 100644 index 0000000..a3b25e1 --- /dev/null +++ b/tests/test_fixtures/vivarium-example.json @@ -0,0 +1,46 @@ +{ + "state": { + "global_time": "0.0", + "adder": { + "inputs": { + "mass": [ + "x", + "mass" + ] + }, + "outputs": { + "mass": [ + "x", + "mass" + ] + }, + "interval": 1.0, + "address": "local:grow", + "config": { + "rate": "0.01" + } + }, + "x": { + "mass": "-11.11" + }, + "emitter": { + "address": "local:ram-emitter", + "config": { + "emit": { + "global_time": "any", + "x": "any" + } + }, + "inputs": { + "global_time": [ + "global_time" + ], + "x": [ + "x" + ] + }, + "outputs": null + } + }, + "composition": "(global_time:float|adder:process[(mass:float),(mass:float)]|x:(mass:float)|emitter:step[(global_time:any|x:any),()])" +} \ No newline at end of file diff --git a/tests/test_worker.py b/tests/test_worker.py index 6049985..8a04063 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -5,12 +5,10 @@ from dotenv import load_dotenv from process_bigraph import pp -from shared.database import MongoConnector +from shared.connect import MongoConnector from shared.environment import DEFAULT_DB_NAME, ENV_PATH, PROJECT_ROOT_PATH from shared.dynamic_env import create_dynamic_environment from shared.log_config import setup_logging -from worker.workers.composition import CompositionWorker - load_dotenv(ENV_PATH) diff --git a/worker/Dockerfile-worker b/worker/Dockerfile-worker index 48a7480..ca8b0ff 100644 --- a/worker/Dockerfile-worker +++ b/worker/Dockerfile-worker @@ -1,4 +1,5 @@ -FROM condaforge/miniforge3:24.9.2-0 +# FROM condaforge/miniforge3:24.9.2-0 +FROM condaforge/miniforge3:latest LABEL org.opencontainers.image.title="bio-compose-server-worker" \ org.opencontainers.image.description="Base Docker image for BioCompose REST API management, job processing, and datastorage with MongoDB, ensuring scalable and robust performance." \ @@ -64,5 +65,9 @@ RUN conda update -n base -c conda-forge conda \ && conda install -n compose-server conda-forge::readdy -y \ && conda run -n compose-server pip install -e . +RUN conda env create -f /app/environment.yml -y \ + && conda install -n compose-server conda-forge::readdy -y \ + && conda run -n compose-server pip install -e . + CMD ["conda", "run", "-n", "compose-server", "python3", "worker/main.py"] diff --git a/worker/dispatch.py b/worker/dispatch.py index fe21a15..b34d26e 100644 --- a/worker/dispatch.py +++ b/worker/dispatch.py @@ -22,6 +22,7 @@ import tempfile import os +from dataclasses import dataclass from typing import Any, Mapping, List import bsp @@ -30,26 +31,43 @@ from bsp.processes.simple_membrane_process import SimpleMembraneProcess from shared.io import download_file_from_bucket -from shared.database import MongoConnector +from shared.connect import MongoConnector from shared.environment import DEFAULT_BUCKET_NAME from shared.log_config import setup_logging from worker.sim_runs.runs import RunsWorker from shared.utils import handle_exception +from shared.data_model import BaseClass, StateData logger = setup_logging(__file__) -class CompositionState(dict): - """That which is exported by Composite.save()""" - def __new__(cls, *args, **kwargs): - return super(CompositionState, cls).__new__(cls, *args, **kwargs) +# class CompositionState(dict): +# """That which is exported by Composite.save()""" +# def __new__(cls, *args, **kwargs): +# return super(CompositionState, cls).__new__(cls, *args, **kwargs) +# +# +# class ResultData(dict): +# """That which is exported by Composite.gather_results()""" +# def __new__(cls, *args, **kwargs): +# return super(ResultData, cls).__new__(cls, *args, **kwargs) -class ResultData(dict): - """That which is exported by Composite.gather_results()""" - def __new__(cls, *args, **kwargs): - return super(ResultData, cls).__new__(cls, *args, **kwargs) +@dataclass +class CompositionState(StateData): + pass + + +@dataclass +class ResultData(StateData): + pass + + +@dataclass +class ServerResponse: + results: ResultData + state: CompositionState class JobDispatcher(object): @@ -64,26 +82,29 @@ def __init__(self, self.timeout = timeout * 60 @property - def current_jobs(self) -> List[Mapping[str, Any]]: - return self.db_connector.get_jobs() + def db_io(self) -> bool: + return self.db_connector is not None async def run(self, limit: int = 5, wait: int = 5): - i = 0 - while i < limit: - for job in self.current_jobs: - job_id = job['job_id'] - if job_id.startswith("run"): - await self.dispatch_run(job) - elif job_id.startswith("composition") or job_id.startswith('run-mem3dg-'): - await self.dispatch_composition(job) - i += 1 - await asyncio.sleep(wait) + if self.db_io: + i = 0 + while i < limit: + for job in self.db_connector.get_jobs(): + job_id = job['job_id'] + if job_id.startswith("run"): + await self.dispatch_run(job) + elif job_id.startswith("composition") or job_id.startswith('run-mem3dg-'): + await self.dispatch_composition(job) + i += 1 + await asyncio.sleep(wait) + else: + raise ValueError("You cannot call this method if a database connector is not provided in this class' constructor.") def create_dynamic_environment(self, job: Mapping[str, Any]) -> int: # TODO: implement this return 0 - async def dispatch_composition(self, job: Mapping[str, Any]): + async def dispatch_composition(self, job: Mapping[str, Any]) -> ServerResponse: job_status = job["status"] job_id = job["job_id"] if job_status.lower() == "pending": @@ -92,7 +113,8 @@ async def dispatch_composition(self, job: Mapping[str, Any]): self.create_dynamic_environment(job) # change job status to IN_PROGRESS - await self.db_connector.update_job(job_id=job_id, status="IN_PROGRESS") + if self.db_io: + await self.db_connector.update_job(job_id=job_id, status="IN_PROGRESS") # get request params and parse remote file uploads if needed input_state = job["spec"] @@ -118,19 +140,25 @@ async def dispatch_composition(self, job: Mapping[str, Any]): results = self.generate_composition_results(input_state, duration) state = self.generate_composition_state(composition) - # change status to complete and write results in DB - await self.db_connector.update_job( - job_id=job_id, - status="COMPLETE", - results=results - ) - - # write new result state to states collection - await self.db_connector.write( - collection_name="result_states", - job_id=job_id, - data=state, - last_updated=self.db_connector.timestamp() + if self.db_io: + # change status to complete and write results in DB + await self.db_connector.update_job( + job_id=job_id, + status="COMPLETE", + results=results + ) + + # write new result state to states collection + await self.db_connector.write( + collection_name="result_states", + job_id=job_id, + data=state, + last_updated=self.db_connector.timestamp() + ) + + return ServerResponse( + results=results, + state=state, ) except Exception as e: message = handle_exception(scope=job_id + str(e).strip()) @@ -145,12 +173,16 @@ def generate_composite(self, input_state) -> Composite: ) def generate_composition_results(self, composition: Composite, duration: int) -> ResultData: - # run the composition - composition.run(duration) - - # get the results formatted from emitter - results = composition.gather_results()[("emitter",)] - return ResultData(**results) + try: + # run the composition + composition.run(duration) + + # get the results formatted from emitter + results = composition.gather_results()[("emitter",)] + return ResultData(**results) + except: + msg = handle_exception("composition") + return ResultData(error=msg) def generate_composition_state(self, composition: Composite) -> CompositionState: temp_dir = tempfile.mkdtemp() diff --git a/worker/main.py b/worker/main.py index 121d54c..c507413 100644 --- a/worker/main.py +++ b/worker/main.py @@ -4,7 +4,7 @@ from dotenv import load_dotenv -from shared.database import MongoConnector +from shared.connect import MongoConnector from shared.environment import ENV_PATH, DEFAULT_DB_NAME from shared.log_config import setup_logging @@ -22,10 +22,39 @@ MAX_RETRIES = 30 MONGO_URI = os.getenv("MONGO_URI") GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") +USE_WEBSOCKETS = os.getenv("USE_WEBSOCKETS", "True") # singletons db_connector = MongoConnector(connection_uri=MONGO_URI, database_id=DEFAULT_DB_NAME) dispatcher = JobDispatcher(db_connector=db_connector) +app = FastAPI() + + +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + """Handles incoming WebSocket job requests from the Gateway.""" + await websocket.accept() + logger.info("Worker WebSocket connection established.") + + try: + while True: + data = await websocket.receive_text() + message = json.loads(data) + job_id = message.get("job_id") + job_status = message.get("status") + + if job_status == "PENDING": + logger.info(f"Received job {job_id} for processing.") + + # Run the job dispatcher logic + response = await dispatcher.dispatch_composition(message) + + # Send response back to Gateway + await websocket.send_text(json.dumps({"job_id": job_id, "status": "COMPLETE", "results": response})) + except Exception as e: + logger.error(f"Worker WebSocket error: {e}") + finally: + logger.info("Closing WebSocket connection.") async def main(max_retries=MAX_RETRIES): @@ -40,4 +69,5 @@ async def main(max_retries=MAX_RETRIES): if __name__ == "__main__": - asyncio.run(main()) + asyncio.run(main()) if eval(USE_WEBSOCKETS) \ + else uvicorn.run(app, host="0.0.0.0", port=8001) diff --git a/worker/sim_runs/runs.py b/worker/sim_runs/runs.py index e4fa76d..a73da24 100644 --- a/worker/sim_runs/runs.py +++ b/worker/sim_runs/runs.py @@ -2,7 +2,7 @@ import tempfile from typing import Dict, Mapping, Any -from shared.database import MongoConnector +from shared.connect import MongoConnector from shared.environment import DEFAULT_BUCKET_NAME from shared.io import download_file, format_smoldyn_configuration, write_uploaded_file from shared.data_model import OutputFile