Skip to content

Commit

Permalink
Revert "Update background task collection (ref #661)"
Browse files Browse the repository at this point in the history
This reverts commit 3646988.
  • Loading branch information
tcompa committed May 3, 2023
1 parent 3646988 commit d577886
Showing 1 changed file with 72 additions and 70 deletions.
142 changes: 72 additions & 70 deletions fractal_server/app/api/v1/task.py
Original file line number Diff line number Diff line change
@@ -35,9 +35,7 @@
from ....tasks.collection import get_log_path
from ....tasks.collection import inspect_package
from ...db import AsyncSession
from ...db import DBSyncSession
from ...db import get_db
from ...db import get_sync_db
from ...models import State
from ...models import Task
from ...security import current_active_superuser
@@ -62,91 +60,95 @@ async def _background_collect_pip(
directory.
"""

with get_sync_db() as db:
# Note: anext(get_db()) is only available for python>=3.10
db = await get_db().__anext__()

state: State = db.get(State, state_id)
state: State = await db.get(State, state_id)

logger_name = task_pkg.package.replace("/", "_")
logger = set_logger(
logger_name=logger_name,
log_file_path=get_log_path(venv_path),
)
logger_name = task_pkg.package.replace("/", "_")
logger = set_logger(
logger_name=logger_name,
log_file_path=get_log_path(venv_path),
)

logger.debug("Start background task collection")
data = TaskCollectStatus(**state.data)
data.info = None
logger.debug("Start background task collection")
data = TaskCollectStatus(**state.data)
data.info = None

try:
# install
logger.debug("Task-collection status: installing")
data.status = "installing"

state.data = data.sanitised_dict()
db.merge(state)
db.commit()
task_list = await create_package_environment_pip(
venv_path=venv_path,
task_pkg=task_pkg,
logger_name=logger_name,
)
try:
# install
logger.debug("Task-collection status: installing")
data.status = "installing"

# collect
logger.debug("Task-collection status: collecting")
data.status = "collecting"
state.data = data.sanitised_dict()
db.merge(state)
db.commit()
tasks = _insert_tasks(task_list=task_list, db=db)

# finalise
logger.debug("Task-collection status: finalising")
collection_path = get_collection_path(venv_path)
data.task_list = tasks
with collection_path.open("w") as f:
json.dump(data.sanitised_dict(), f)

# Update DB
data.status = "OK"
data.log = get_collection_log(venv_path)
state.data = data.sanitised_dict()
db.add(state)
db.merge(state)
db.commit()

# Write last logs to file
logger.debug("Task-collection status: OK")
logger.info("Background task collection completed successfully")
close_logger(logger)
state.data = data.sanitised_dict()
await db.merge(state)
await db.commit()
task_list = await create_package_environment_pip(
venv_path=venv_path,
task_pkg=task_pkg,
logger_name=logger_name,
)

except Exception as e:
# Write last logs to file
logger.debug("Task-collection status: fail")
logger.info(f"Background collection failed. Original error: {e}")
close_logger(logger)
# collect
logger.debug("Task-collection status: collecting")
data.status = "collecting"
state.data = data.sanitised_dict()
await db.merge(state)
await db.commit()
tasks = await _insert_tasks(task_list=task_list, db=db)

# finalise
logger.debug("Task-collection status: finalising")
collection_path = get_collection_path(venv_path)
data.task_list = tasks
with collection_path.open("w") as f:
json.dump(data.sanitised_dict(), f)

# Update DB
data.status = "OK"
data.log = get_collection_log(venv_path)
state.data = data.sanitised_dict()
db.add(state)
await db.merge(state)
await db.commit()

# Write last logs to file
logger.debug("Task-collection status: OK")
logger.info("Background task collection completed successfully")
close_logger(logger)
await db.close()

# Update db
data.status = "fail"
data.info = f"Original error: {e}"
data.log = get_collection_log(venv_path)
state.data = data.sanitised_dict()
db.merge(state)
db.commit()
except Exception as e:
# Write last logs to file
logger.debug("Task-collection status: fail")
logger.info(f"Background collection failed. Original error: {e}")
close_logger(logger)

# Delete corrupted package dir
shell_rmtree(venv_path)
# Update db
data.status = "fail"
data.info = f"Original error: {e}"
data.log = get_collection_log(venv_path)
state.data = data.sanitised_dict()
await db.merge(state)
await db.commit()
await db.close()

# Delete corrupted package dir
shell_rmtree(venv_path)


def _insert_tasks(
async def _insert_tasks(
task_list: list[TaskCreate],
db: DBSyncSession,
db: AsyncSession,
) -> list[Task]:
"""
Insert tasks into database
"""
task_db_list = [Task.from_orm(t) for t in task_list]
db.add_all(task_db_list)
db.commit()
[db.refresh(t) for t in task_db_list]
await db.commit()
await asyncio.gather(*[db.refresh(t) for t in task_db_list])
await db.close()
return task_db_list


0 comments on commit d577886

Please sign in to comment.