Skip to content

Commit d2c231e

Browse files
committed
WIP: Add API to update long-running VM allocations
1 parent d3cfcc2 commit d2c231e

File tree

4 files changed

+73
-2
lines changed

4 files changed

+73
-2
lines changed

vm_supervisor/resources.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
12
from datetime import datetime, timezone
23
from functools import lru_cache
4+
from typing import Set, Optional
35
from typing import Tuple
46

57
import cpuinfo
@@ -117,3 +119,9 @@ async def about_system_usage(request: web.Request):
117119
return web.json_response(
118120
text=usage.json(exclude_none=True),
119121
)
122+
123+
124+
class Allocation(BaseModel):
125+
long_running_vms: Set[str]
126+
on_demand_vms: Optional[Set[str]] = None
127+
jobs: Optional[Set] = None

vm_supervisor/run.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,3 +216,31 @@ async def run_code_on_event(vm_hash: VmHash, event, pubsub: PubSub):
216216
execution.stop_after_timeout(timeout=settings.REUSE_TIMEOUT)
217217
else:
218218
await execution.stop()
219+
220+
221+
async def start_long_running(vm_hash: VmHash, pubsub: PubSub) -> VmExecution:
222+
execution: Optional[VmExecution] = await pool.get_running_vm(vm_hash=vm_hash)
223+
224+
if not execution:
225+
execution = await create_vm_execution(vm_hash=vm_hash)\
226+
227+
# If the VM was already running in lambda mode, it should not expire
228+
# as long as it is also scheduled as long-running
229+
execution.marked_as_long_running = True
230+
execution.cancel_expiration()
231+
232+
await execution.becomes_ready()
233+
234+
# if settings.WATCH_FOR_UPDATES:
235+
# # FIXME: Is this added for every request ?
236+
# execution.start_watching_for_updates(pubsub=request.app["pubsub"])
237+
238+
return execution
239+
240+
241+
async def stop_long_running(vm_hash: VmHash) -> Optional[VmExecution]:
242+
logger.info(f"Stopping long running {vm_hash}")
243+
execution = await pool.get_running_vm(vm_hash)
244+
if execution:
245+
await execution.stop()
246+
return execution

vm_supervisor/supervisor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
status_check_fastapi,
2727
about_execution_records,
2828
status_check_version,
29+
update_allocations,
2930
)
3031

3132
logger = logging.getLogger(__name__)
@@ -53,6 +54,7 @@ async def server_version_middleware(
5354
web.get("/about/executions/records", about_execution_records),
5455
web.get("/about/usage/system", about_system_usage),
5556
web.get("/about/config", about_config),
57+
web.post("/control/allocations", update_allocations),
5658
web.get("/status/check/fastapi", status_check_fastapi),
5759
web.get("/status/check/version", status_check_version),
5860
web.route("*", "/vm/{ref}{suffix:.*}", run_code_from_path),

vm_supervisor/views.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,20 @@
33
import os.path
44
from string import Template
55
from typing import Awaitable, Optional
6-
from packaging.version import Version, InvalidVersion
76

87
import aiodns
98
import aiohttp
109
from aiohttp import web
1110
from aiohttp.web_exceptions import HTTPNotFound
11+
from packaging.version import Version, InvalidVersion
12+
from pydantic import ValidationError
1213

1314
from . import status, __version__
1415
from .conf import settings
1516
from .metrics import get_execution_records
1617
from .models import VmHash
17-
from .run import run_code_on_request, pool
18+
from .resources import Allocation
19+
from .run import run_code_on_request, pool, start_long_running
1820
from .utils import b32_to_b16, get_ref_from_dns, dumps_for_json
1921

2022
logger = logging.getLogger(__name__)
@@ -166,3 +168,34 @@ async def status_check_version(request: web.Request):
166168
)
167169
else:
168170
return web.HTTPForbidden(text=f"Outdated: version {current} < {reference}")
171+
172+
173+
async def update_allocations(request: web.Request):
174+
# TODO: Add some form of authentication
175+
try:
176+
data = await request.json()
177+
allocation = Allocation(**data)
178+
except ValidationError as error:
179+
return web.json_response(
180+
data=error.json(), status=web.HTTPBadRequest.status_code
181+
)
182+
183+
pubsub = request.app["pubsub"]
184+
185+
for vm_hash in allocation.long_running_vms:
186+
vm_hash = VmHash(vm_hash)
187+
logger.info(f"Starting long running VM {vm_hash}")
188+
await start_long_running(vm_hash, pubsub)
189+
190+
for execution in pool.get_long_running_executions():
191+
if execution.vm_hash not in allocation.long_running_vms:
192+
logger.info(f"Stopping long running VM {execution.vm_hash}")
193+
await execution.stop()
194+
execution.marked_as_long_running = False
195+
196+
if allocation.on_demand_vms:
197+
logger.info("Not supported yet: 'allocation.on_demand_vms'")
198+
if allocation.jobs:
199+
logger.info("Not supported yet: 'allocation.on_demand_vms'")
200+
201+
return web.json_response(data={"success": True})

0 commit comments

Comments
 (0)