|
3 | 3 | import asyncio
|
4 | 4 | import json
|
5 | 5 | import logging
|
| 6 | +import threading |
6 | 7 | from collections.abc import Iterable
|
7 | 8 | from datetime import datetime, timezone
|
8 | 9 | from typing import Optional
|
@@ -43,12 +44,12 @@ class VmPool:
|
43 | 44 | network: Optional[Network]
|
44 | 45 | snapshot_manager: Optional[SnapshotManager] = None
|
45 | 46 | systemd_manager: SystemDManager
|
46 |
| - creation_semaphore: asyncio.Semaphore |
| 47 | + creation_lock: threading.Lock |
47 | 48 |
|
48 | 49 | def __init__(self):
|
49 | 50 | self.counter = settings.START_ID_INDEX
|
50 | 51 | self.executions = {}
|
51 |
| - self.creation_semaphore = asyncio.Semaphore(1) |
| 52 | + self.creation_lock = threading.Lock() |
52 | 53 |
|
53 | 54 | self.network = (
|
54 | 55 | Network(
|
@@ -89,7 +90,7 @@ async def create_a_vm(
|
89 | 90 | ) -> VmExecution:
|
90 | 91 | """Create a new Aleph Firecracker VM from an Aleph function message."""
|
91 | 92 |
|
92 |
| - await self.creation_semaphore.acquire() |
| 93 | + self.creation_lock.acquire() |
93 | 94 |
|
94 | 95 | # Check if an execution is already present for this VM, then return it.
|
95 | 96 | # Do not `await` in this section.
|
@@ -135,12 +136,10 @@ async def create_a_vm(
|
135 | 136 | self.forget_vm(vm_hash)
|
136 | 137 | raise
|
137 | 138 | finally:
|
138 |
| - self.creation_semaphore.release() |
| 139 | + self.creation_lock.release() |
139 | 140 |
|
140 | 141 | self._schedule_forget_on_stop(execution)
|
141 | 142 |
|
142 |
| - self.creation_semaphore.release() |
143 |
| - |
144 | 143 | return execution
|
145 | 144 |
|
146 | 145 | def get_unique_vm_id(self) -> int:
|
|
0 commit comments