@@ -6,7 +6,7 @@ use crate::{
6
6
} ;
7
7
8
8
use axum:: extract:: ws:: { Message , WebSocket } ;
9
- use futures:: { Future , FutureExt , StreamExt , TryFutureExt } ;
9
+ use futures:: { future :: Fuse , Future , FutureExt , StreamExt , TryFutureExt } ;
10
10
use orchestrator:: {
11
11
coordinator:: { self , Coordinator , DockerBackend } ,
12
12
DropErrorDetailsExt ,
@@ -16,6 +16,7 @@ use std::{
16
16
collections:: BTreeMap ,
17
17
convert:: TryFrom ,
18
18
mem,
19
+ pin:: pin,
19
20
sync:: {
20
21
atomic:: { AtomicU64 , Ordering } ,
21
22
Arc ,
@@ -351,9 +352,8 @@ async fn handle_core(mut socket: WebSocket, feature_flags: FeatureFlags) {
351
352
}
352
353
353
354
let mut manager = CoordinatorManager :: new ( ) . await ;
354
- tokio:: pin! {
355
- let session_timeout = time:: sleep( CoordinatorManager :: SESSION_TIMEOUT ) ;
356
- }
355
+ let mut session_timeout = pin ! ( time:: sleep( CoordinatorManager :: SESSION_TIMEOUT ) ) ;
356
+ let mut idle_timeout = pin ! ( Fuse :: terminated( ) ) ;
357
357
358
358
let mut active_executions = BTreeMap :: new ( ) ;
359
359
let mut active_execution_gc_interval = time:: interval ( Duration :: from_secs ( 30 ) ) ;
@@ -394,6 +394,12 @@ async fn handle_core(mut socket: WebSocket, feature_flags: FeatureFlags) {
394
394
395
395
// We don't care if there are no running tasks
396
396
Some ( task) = manager. join_next( ) => {
397
+ // The last task has completed which means we are a
398
+ // candidate for idling in a little while.
399
+ if manager. is_empty( ) {
400
+ idle_timeout. set( time:: sleep( CoordinatorManager :: IDLE_TIMEOUT ) . fuse( ) ) ;
401
+ }
402
+
397
403
let ( error, meta) = match task {
398
404
Ok ( Ok ( ( ) ) ) => continue ,
399
405
Ok ( Err ( error) ) => error,
@@ -425,7 +431,7 @@ async fn handle_core(mut socket: WebSocket, feature_flags: FeatureFlags) {
425
431
. collect( ) ;
426
432
} ,
427
433
428
- _ = time :: sleep ( CoordinatorManager :: IDLE_TIMEOUT ) , if manager. is_empty( ) => {
434
+ _ = & mut idle_timeout , if manager. is_empty( ) => {
429
435
let idled = manager. idle( ) . await . context( StreamingCoordinatorIdleSnafu ) ;
430
436
431
437
let Err ( error) = idled else { continue } ;
0 commit comments