Skip to content

Commit

Permalink
test: move wait_task_completion function in utils.py for better r…
Browse files Browse the repository at this point in the history
…euse
  • Loading branch information
laurent-laporte-pro committed Jun 30, 2023
1 parent 8ae29c3 commit 7ab0330
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 25 deletions.
27 changes: 2 additions & 25 deletions tests/integration/test_studies_upgrade.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,13 @@
import os
import time

import pytest
from antarest.core.tasks.model import TaskStatus
from starlette.testclient import TestClient

from antarest.core.tasks.model import TaskDTO, TaskStatus
from tests.integration.utils import wait_task_completion

RUN_ON_WINDOWS = os.name == "nt"


def wait_task_completion(
client: TestClient,
access_token: str,
task_id: str,
*,
timeout: float = 10,
) -> TaskDTO:
end_time = time.time() + timeout
while time.time() < end_time:
time.sleep(0.1)
res = client.get(
f"/v1/tasks/{task_id}",
headers={"Authorization": f"Bearer {access_token}"},
json={"wait_for_completion": True},
)
assert res.status_code == 200
task = TaskDTO(**res.json())
if task.status not in {TaskStatus.PENDING, TaskStatus.RUNNING}:
return task
raise TimeoutError(f"{timeout} seconds")


class TestStudyUpgrade:
@pytest.mark.skipif(
RUN_ON_WINDOWS, reason="This test runs randomly on Windows"
Expand Down
25 changes: 25 additions & 0 deletions tests/integration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import time
from typing import Callable

from antarest.core.tasks.model import TaskDTO, TaskStatus
from starlette.testclient import TestClient


def wait_for(
predicate: Callable[[], bool], timeout: float = 10, sleep_time: float = 1
Expand All @@ -13,3 +16,25 @@ def wait_for(
return
time.sleep(sleep_time)
raise TimeoutError(f"task is still in progress after {timeout} seconds")


def wait_task_completion(
client: TestClient,
access_token: str,
task_id: str,
*,
timeout: float = 10,
) -> TaskDTO:
end_time = time.time() + timeout
while time.time() < end_time:
time.sleep(0.1)
res = client.get(
f"/v1/tasks/{task_id}",
headers={"Authorization": f"Bearer {access_token}"},
json={"wait_for_completion": True},
)
assert res.status_code == 200
task = TaskDTO(**res.json())
if task.status not in {TaskStatus.PENDING, TaskStatus.RUNNING}:
return task
raise TimeoutError(f"{timeout} seconds")

0 comments on commit 7ab0330

Please sign in to comment.