Skip to content

Commit

Permalink
Merge pull request #1809 from AntaresSimulatorTeam/release/2.15.5
Browse files Browse the repository at this point in the history
  • Loading branch information
skamril authored Nov 15, 2023
2 parents 0537e58 + ffafeba commit 465db27
Show file tree
Hide file tree
Showing 57 changed files with 3,210 additions and 825 deletions.
27 changes: 27 additions & 0 deletions alembic/versions/d495746853cc_add_owner_id_to_jobresult.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Add owner_id to JobResult
Revision ID: d495746853cc
Revises: e65e0c04606b
Create Date: 2023-10-19 13:16:29.969047
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "d495746853cc"
down_revision = "e65e0c04606b"
branch_labels = None
depends_on = None


def upgrade() -> None:
with op.batch_alter_table("job_result", schema=None) as batch_op:
batch_op.add_column(sa.Column('owner_id', sa.Integer(), default=None))
batch_op.create_foreign_key("fk_job_result_owner_id", "identities", ["owner_id"], ["id"], ondelete="SET NULL")


def downgrade() -> None:
with op.batch_alter_table("job_result", schema=None) as batch_op:
batch_op.drop_column("owner_id")
4 changes: 2 additions & 2 deletions antarest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

# Standard project metadata

__version__ = "2.15.4"
__version__ = "2.15.5"
__author__ = "RTE, Antares Web Team"
__date__ = "2023-10-25"
__date__ = "2023-11-16"
# noinspection SpellCheckingInspection
__credits__ = "(c) Réseau de Transport de l’Électricité (RTE)"

Expand Down
5 changes: 5 additions & 0 deletions antarest/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ def __init__(self, message: str) -> None:
super().__init__(HTTPStatus.EXPECTATION_FAILED, message)


class VariantGenerationTimeoutError(HTTPException):
def __init__(self, message: str) -> None:
super().__init__(HTTPStatus.REQUEST_TIMEOUT, message)


class NoParentStudyError(HTTPException):
def __init__(self, message: str) -> None:
super().__init__(HTTPStatus.NOT_FOUND, message)
Expand Down
17 changes: 17 additions & 0 deletions antarest/core/interfaces/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,23 @@


class CacheConstants(Enum):
"""
Constants used to identify cache entries.
- `RAW_STUDY`: variable used to store JSON (or bytes) objects.
This cache is used by the `RawStudyService` or `VariantStudyService` to store
values that are retrieved from the filesystem.
Note: that it is unlikely that this cache is used, because it is only used
to fetch data inside a study when the URL is "" and the depth is -1.
- `STUDY_FACTORY`: variable used to store objects of type `FileStudyTreeConfigDTO`.
This cache is used by the `create_from_fs` function when retrieving the configuration
of a study from the data on the disk.
- `STUDY_LISTING`: variable used to store objects of type `StudyMetadataDTO`.
This cache is used by the `get_studies_information` function to store the list of studies.
"""

RAW_STUDY = "RAW_STUDY"
STUDY_FACTORY = "STUDY_FACTORY"
STUDY_LISTING = "STUDY_LISTING"
Expand Down
10 changes: 9 additions & 1 deletion antarest/core/tasks/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from fastapi import HTTPException

from antarest.core.tasks.model import TaskJob, TaskListFilter
from antarest.core.tasks.model import TaskJob, TaskListFilter, TaskStatus
from antarest.core.utils.fastapi_sqlalchemy import db
from antarest.core.utils.utils import assert_this

Expand Down Expand Up @@ -82,3 +82,11 @@ def delete(self, tid: str) -> None:
if task:
db.session.delete(task)
db.session.commit()

def update_timeout(self, task_id: str, timeout: int) -> None:
"""Update task status to TIMEOUT."""
task: TaskJob = db.session.get(TaskJob, task_id)
task.status = TaskStatus.TIMEOUT
task.result_msg = f"Task '{task_id}' timeout after {timeout} seconds"
task.result_status = False
db.session.commit()
41 changes: 26 additions & 15 deletions antarest/core/tasks/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
TaskUpdateNotifier = Callable[[str], None]
Task = Callable[[TaskUpdateNotifier], TaskResult]

DEFAULT_AWAIT_MAX_TIMEOUT = 172800 # 48 hours
"""Default timeout for `await_task` in seconds."""


class ITaskService(ABC):
@abstractmethod
Expand Down Expand Up @@ -74,7 +77,7 @@ def list_tasks(self, task_filter: TaskListFilter, request_params: RequestParamet
raise NotImplementedError()

@abstractmethod
def await_task(self, task_id: str, timeout_sec: Optional[int] = None) -> None:
def await_task(self, task_id: str, timeout_sec: int = DEFAULT_AWAIT_MAX_TIMEOUT) -> None:
raise NotImplementedError()


Expand All @@ -83,9 +86,6 @@ def noop_notifier(message: str) -> None:
"""This function is used in tasks when no notification is required."""


DEFAULT_AWAIT_MAX_TIMEOUT = 172800


class TaskJobService(ITaskService):
def __init__(
self,
Expand Down Expand Up @@ -141,6 +141,7 @@ def _send_worker_task(logger_: TaskUpdateNotifier) -> TaskResult:
task_type,
)
while not task_result_wrapper:
logger.info("💤 Sleeping 1 second...")
time.sleep(1)
self.event_bus.remove_listener(listener_id)
return task_result_wrapper[0]
Expand Down Expand Up @@ -283,22 +284,31 @@ def list_db_tasks(self, task_filter: TaskListFilter, request_params: RequestPara
user = None if request_params.user.is_site_admin() else request_params.user.impersonator
return self.repo.list(task_filter, user)

def await_task(self, task_id: str, timeout_sec: Optional[int] = None) -> None:
logger.info(f"Awaiting task {task_id}")
def await_task(self, task_id: str, timeout_sec: int = DEFAULT_AWAIT_MAX_TIMEOUT) -> None:
if task_id in self.tasks:
self.tasks[task_id].result(timeout_sec or DEFAULT_AWAIT_MAX_TIMEOUT)
try:
logger.info(f"🤔 Awaiting task '{task_id}' {timeout_sec}s...")
self.tasks[task_id].result(timeout_sec)
logger.info(f"📌 Task '{task_id}' done.")
except Exception as exc:
logger.critical(f"🤕 Task '{task_id}' failed: {exc}.")
raise
else:
logger.warning(f"Task {task_id} not handled by this worker, will poll for task completion from db")
end = time.time() + (timeout_sec or DEFAULT_AWAIT_MAX_TIMEOUT)
logger.warning(f"Task '{task_id}' not handled by this worker, will poll for task completion from db")
end = time.time() + timeout_sec
while time.time() < end:
with db():
task = self.repo.get(task_id)
if not task:
logger.error(f"Awaited task {task_id} was not found")
break
if task is None:
logger.error(f"Awaited task '{task_id}' was not found")
return
if TaskStatus(task.status).is_final():
break
time.sleep(2)
return
logger.info("💤 Sleeping 2 seconds...")
time.sleep(2)
logger.error(f"Timeout while awaiting task '{task_id}'")
with db():
self.repo.update_timeout(task_id, timeout_sec)

def _run_task(
self,
Expand Down Expand Up @@ -412,5 +422,6 @@ def _update_task_status(
task.result_status = result
task.result = command_result
if status.is_final():
task.completion_date = datetime.datetime.now(datetime.timezone.utc)
# Do not use the `timezone.utc` timezone to preserve a naive datetime.
task.completion_date = datetime.datetime.utcnow()
self.repo.save(task)
47 changes: 39 additions & 8 deletions antarest/core/tasks/web.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import concurrent.futures
import http
import logging
from typing import Any

from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException

from antarest.core.config import Config
from antarest.core.jwt import JWTUser
from antarest.core.requests import RequestParameters
from antarest.core.tasks.model import TaskListFilter
from antarest.core.tasks.service import TaskJobService
from antarest.core.tasks.model import TaskDTO, TaskListFilter
from antarest.core.tasks.service import DEFAULT_AWAIT_MAX_TIMEOUT, TaskJobService
from antarest.core.utils.web import APITag
from antarest.login.auth import Auth

Expand All @@ -17,13 +19,13 @@
def create_tasks_api(service: TaskJobService, config: Config) -> APIRouter:
"""
Endpoints login implementation
Args:
service: login facade service
config: server config
jwt: jwt manager
Returns:
API router
"""
bp = APIRouter(prefix="/v1")
auth = Auth(config)
Expand All @@ -36,17 +38,46 @@ def list_tasks(
request_params = RequestParameters(user=current_user)
return service.list_tasks(filter, request_params)

@bp.get("/tasks/{task_id}", tags=[APITag.tasks])
@bp.get("/tasks/{task_id}", tags=[APITag.tasks], response_model=TaskDTO)
def get_task(
task_id: str,
wait_for_completion: bool = False,
with_logs: bool = False,
timeout: int = DEFAULT_AWAIT_MAX_TIMEOUT,
current_user: JWTUser = Depends(auth.get_current_user),
) -> Any:
) -> TaskDTO:
"""
Retrieve information about a specific task.
Args:
- `task_id`: Unique identifier of the task.
- `wait_for_completion`: Set to `True` to wait for task completion.
- `with_logs`: Set to `True` to retrieve the job logs (Antares Solver logs).
- `timeout`: Maximum time in seconds to wait for task completion.
Raises:
- 408 REQUEST_TIMEOUT: when the request times out while waiting for task completion.
Returns:
TaskDTO: Information about the specified task.
"""
request_params = RequestParameters(user=current_user)
task_status = service.status_task(task_id, request_params, with_logs)

if wait_for_completion and not task_status.status.is_final():
service.await_task(task_id)
# Ensure 0 <= timeout <= 48 h
timeout = min(max(0, timeout), DEFAULT_AWAIT_MAX_TIMEOUT)
try:
service.await_task(task_id, timeout_sec=timeout)
except concurrent.futures.TimeoutError as exc: # pragma: no cover
# Note that if the task does not complete within the specified time,
# the task will continue running but the user will receive a timeout.
# In this case, it is the user's responsibility to cancel the task.
raise HTTPException(
status_code=http.HTTPStatus.REQUEST_TIMEOUT,
detail="The request timed out while waiting for task completion.",
) from exc

return service.status_task(task_id, request_params, with_logs)

@bp.put("/tasks/{task_id}/cancel", tags=[APITag.tasks])
Expand Down
1 change: 1 addition & 0 deletions antarest/core/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def retry(func: Callable[[], T], attempts: int = 10, interval: float = 0.5) -> T
attempt += 1
return func()
except Exception as e:
logger.info(f"💤 Sleeping {interval} second(s)...")
time.sleep(interval)
caught_exception = e
raise caught_exception or ShouldNotHappenException()
Expand Down
Loading

0 comments on commit 465db27

Please sign in to comment.