Skip to content

Commit 8fce1ec

Browse files
committed
Use the concurrency-limited factory throughout
This also gives a chance to reduce some implicit global variables.
1 parent f79ba38 commit 8fce1ec

File tree

4 files changed

+231
-95
lines changed

4 files changed

+231
-95
lines changed

compiler/base/orchestrator/src/coordinator.rs

+57-30
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use futures::{
33
stream::BoxStream,
44
Future, FutureExt, Stream, StreamExt,
55
};
6-
use once_cell::sync::Lazy;
76
use serde::Deserialize;
87
use snafu::prelude::*;
98
use std::{
@@ -821,29 +820,60 @@ enum DemultiplexCommand {
821820
ListenOnce(JobId, oneshot::Sender<WorkerMessage>),
822821
}
823822

823+
#[derive(Debug, Copy, Clone)]
824+
pub struct CoordinatorId {
825+
start: u64,
826+
id: u64,
827+
}
828+
824829
/// Enforces a limited number of concurrent `Coordinator`s.
825830
#[derive(Debug)]
826831
pub struct CoordinatorFactory {
827832
semaphore: Arc<Semaphore>,
833+
834+
start: u64,
835+
id: AtomicU64,
828836
}
829837

830838
impl CoordinatorFactory {
831839
pub fn new(maximum: usize) -> Self {
840+
let semaphore = Arc::new(Semaphore::new(maximum));
841+
842+
let now = std::time::SystemTime::now();
843+
let start = now
844+
.duration_since(std::time::UNIX_EPOCH)
845+
.unwrap_or_default()
846+
.as_secs();
847+
848+
let id = AtomicU64::new(0);
849+
832850
Self {
833-
semaphore: Arc::new(Semaphore::new(maximum)),
851+
semaphore,
852+
start,
853+
id,
834854
}
835855
}
836856

837-
pub async fn build<B>(&self, backend: B) -> LimitedCoordinator<B>
857+
fn next_id(&self) -> CoordinatorId {
858+
let start = self.start;
859+
let id = self.id.fetch_add(1, Ordering::SeqCst);
860+
861+
CoordinatorId { start, id }
862+
}
863+
864+
pub async fn build<B>(&self) -> LimitedCoordinator<B>
838865
where
839-
B: Backend,
866+
B: Backend + From<CoordinatorId>,
840867
{
841868
let semaphore = self.semaphore.clone();
842869
let permit = semaphore
843870
.acquire_owned()
844871
.await
845872
.expect("Unable to acquire permit");
846873

874+
let id = self.next_id();
875+
let backend = B::from(id);
876+
847877
let coordinator = Coordinator::new(backend);
848878

849879
LimitedCoordinator {
@@ -1149,12 +1179,6 @@ where
11491179
}
11501180
}
11511181

1152-
impl Coordinator<DockerBackend> {
1153-
pub fn new_docker() -> Self {
1154-
Self::new(DockerBackend(()))
1155-
}
1156-
}
1157-
11581182
#[derive(Debug)]
11591183
struct Container {
11601184
task: JoinHandle<Result<()>>,
@@ -2581,24 +2605,26 @@ fn basic_secure_docker_command() -> Command {
25812605
)
25822606
}
25832607

2584-
static DOCKER_BACKEND_START: Lazy<u64> = Lazy::new(|| {
2585-
use std::time;
2586-
2587-
let now = time::SystemTime::now();
2588-
now.duration_since(time::UNIX_EPOCH)
2589-
.unwrap_or_default()
2590-
.as_secs()
2591-
});
2592-
2593-
static DOCKER_BACKEND_ID: AtomicU64 = AtomicU64::new(0);
2608+
pub struct DockerBackend {
2609+
id: CoordinatorId,
2610+
instance: AtomicU64,
2611+
}
25942612

2595-
pub struct DockerBackend(());
2613+
impl From<CoordinatorId> for DockerBackend {
2614+
fn from(id: CoordinatorId) -> Self {
2615+
Self {
2616+
id,
2617+
instance: Default::default(),
2618+
}
2619+
}
2620+
}
25962621

25972622
impl DockerBackend {
25982623
fn next_name(&self) -> String {
2599-
let start = *DOCKER_BACKEND_START;
2600-
let id = DOCKER_BACKEND_ID.fetch_add(1, Ordering::SeqCst);
2601-
format!("playground-{start}-{id}")
2624+
let CoordinatorId { start, id } = self.id;
2625+
let instance = self.instance.fetch_add(1, Ordering::SeqCst);
2626+
2627+
format!("playground-{start}-{id}-{instance}")
26022628
}
26032629
}
26042630

@@ -2758,6 +2784,7 @@ fn spawn_io_queue(stdin: ChildStdin, stdout: ChildStdout, token: CancellationTok
27582784
mod tests {
27592785
use assertables::*;
27602786
use futures::future::{join, try_join_all};
2787+
use once_cell::sync::Lazy;
27612788
use std::{env, sync::Once};
27622789
use tempdir::TempDir;
27632790

@@ -2781,8 +2808,8 @@ mod tests {
27812808
project_dir: TempDir,
27822809
}
27832810

2784-
impl TestBackend {
2785-
fn new() -> Self {
2811+
impl From<CoordinatorId> for TestBackend {
2812+
fn from(_id: CoordinatorId) -> Self {
27862813
static COMPILE_WORKER_ONCE: Once = Once::new();
27872814

27882815
COMPILE_WORKER_ONCE.call_once(|| {
@@ -2839,12 +2866,12 @@ mod tests {
28392866
static TEST_COORDINATOR_FACTORY: Lazy<CoordinatorFactory> =
28402867
Lazy::new(|| CoordinatorFactory::new(*MAX_CONCURRENT_TESTS));
28412868

2842-
async fn new_coordinator_test() -> LimitedCoordinator<impl Backend> {
2843-
TEST_COORDINATOR_FACTORY.build(TestBackend::new()).await
2869+
async fn new_coordinator_test() -> LimitedCoordinator<TestBackend> {
2870+
TEST_COORDINATOR_FACTORY.build().await
28442871
}
28452872

2846-
async fn new_coordinator_docker() -> LimitedCoordinator<impl Backend> {
2847-
TEST_COORDINATOR_FACTORY.build(DockerBackend(())).await
2873+
async fn new_coordinator_docker() -> LimitedCoordinator<DockerBackend> {
2874+
TEST_COORDINATOR_FACTORY.build().await
28482875
}
28492876

28502877
async fn new_coordinator() -> LimitedCoordinator<impl Backend> {

ui/src/main.rs

+25
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#![deny(rust_2018_idioms)]
22

33
use crate::env::{PLAYGROUND_GITHUB_TOKEN, PLAYGROUND_UI_ROOT};
4+
use orchestrator::coordinator::CoordinatorFactory;
45
use serde::{Deserialize, Serialize};
56
use snafu::prelude::*;
67
use std::{
@@ -13,6 +14,8 @@ use tracing_subscriber::EnvFilter;
1314

1415
const DEFAULT_ADDRESS: &str = "127.0.0.1";
1516
const DEFAULT_PORT: u16 = 5000;
17+
const DEFAULT_COORDINATORS_ONE_OFF_LIMIT: usize = 10;
18+
const DEFAULT_COORDINATORS_WEBSOCKET_LIMIT: usize = 50;
1619

1720
mod env;
1821
mod gist;
@@ -45,6 +48,8 @@ struct Config {
4548
metrics_token: Option<String>,
4649
feature_flags: FeatureFlags,
4750
request_db_path: Option<PathBuf>,
51+
coordinators_one_off_limit: usize,
52+
coordinators_websocket_limit: usize,
4853
port: u16,
4954
root: PathBuf,
5055
}
@@ -102,13 +107,25 @@ impl Config {
102107

103108
let request_db_path = env::var_os("PLAYGROUND_REQUEST_DATABASE").map(Into::into);
104109

110+
let coordinators_one_off_limit = env::var("PLAYGROUND_COORDINATORS_ONE_OFF_LIMIT")
111+
.ok()
112+
.and_then(|l| l.parse().ok())
113+
.unwrap_or(DEFAULT_COORDINATORS_ONE_OFF_LIMIT);
114+
115+
let coordinators_websocket_limit = env::var("PLAYGROUND_COORDINATORS_WEBSOCKET_LIMIT")
116+
.ok()
117+
.and_then(|l| l.parse().ok())
118+
.unwrap_or(DEFAULT_COORDINATORS_WEBSOCKET_LIMIT);
119+
105120
Self {
106121
address,
107122
cors_enabled,
108123
gh_token,
109124
metrics_token,
110125
feature_flags,
111126
request_db_path,
127+
coordinators_one_off_limit,
128+
coordinators_websocket_limit,
112129
port,
113130
root,
114131
}
@@ -145,6 +162,14 @@ impl Config {
145162
request_db.expect("Unable to open request log database")
146163
}
147164

165+
fn coordinator_one_off_factory(&self) -> CoordinatorFactory {
166+
CoordinatorFactory::new(self.coordinators_one_off_limit)
167+
}
168+
169+
fn coordinator_websocket_factory(&self) -> CoordinatorFactory {
170+
CoordinatorFactory::new(self.coordinators_websocket_limit)
171+
}
172+
148173
fn server_socket_addr(&self) -> SocketAddr {
149174
let address = self.address.parse().expect("Invalid address");
150175
SocketAddr::new(address, self.port)

0 commit comments

Comments
 (0)