Skip to content

Commit 1ca84b0

Browse files
authored
Merge pull request #1047 from rust-lang/limited-coordinator-concurrency
Limit the concurrency of the coordinator to avoid trivial denial-of-service
2 parents c7d524c + 8fce1ec commit 1ca84b0

File tree

6 files changed

+296
-153
lines changed

6 files changed

+296
-153
lines changed

compiler/base/orchestrator/src/coordinator.rs

+120-83
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use futures::{
33
stream::BoxStream,
44
Future, FutureExt, Stream, StreamExt,
55
};
6-
use once_cell::sync::Lazy;
76
use serde::Deserialize;
87
use snafu::prelude::*;
98
use std::{
@@ -20,7 +19,7 @@ use tokio::{
2019
join,
2120
process::{Child, ChildStdin, ChildStdout, Command},
2221
select,
23-
sync::{mpsc, oneshot, OnceCell},
22+
sync::{mpsc, oneshot, OnceCell, OwnedSemaphorePermit, Semaphore},
2423
task::{JoinHandle, JoinSet},
2524
time::{self, MissedTickBehavior},
2625
};
@@ -821,6 +820,97 @@ enum DemultiplexCommand {
821820
ListenOnce(JobId, oneshot::Sender<WorkerMessage>),
822821
}
823822

823+
#[derive(Debug, Copy, Clone)]
824+
pub struct CoordinatorId {
825+
start: u64,
826+
id: u64,
827+
}
828+
829+
/// Enforces a limited number of concurrent `Coordinator`s.
830+
#[derive(Debug)]
831+
pub struct CoordinatorFactory {
832+
semaphore: Arc<Semaphore>,
833+
834+
start: u64,
835+
id: AtomicU64,
836+
}
837+
838+
impl CoordinatorFactory {
839+
pub fn new(maximum: usize) -> Self {
840+
let semaphore = Arc::new(Semaphore::new(maximum));
841+
842+
let now = std::time::SystemTime::now();
843+
let start = now
844+
.duration_since(std::time::UNIX_EPOCH)
845+
.unwrap_or_default()
846+
.as_secs();
847+
848+
let id = AtomicU64::new(0);
849+
850+
Self {
851+
semaphore,
852+
start,
853+
id,
854+
}
855+
}
856+
857+
fn next_id(&self) -> CoordinatorId {
858+
let start = self.start;
859+
let id = self.id.fetch_add(1, Ordering::SeqCst);
860+
861+
CoordinatorId { start, id }
862+
}
863+
864+
pub async fn build<B>(&self) -> LimitedCoordinator<B>
865+
where
866+
B: Backend + From<CoordinatorId>,
867+
{
868+
let semaphore = self.semaphore.clone();
869+
let permit = semaphore
870+
.acquire_owned()
871+
.await
872+
.expect("Unable to acquire permit");
873+
874+
let id = self.next_id();
875+
let backend = B::from(id);
876+
877+
let coordinator = Coordinator::new(backend);
878+
879+
LimitedCoordinator {
880+
coordinator,
881+
_permit: permit,
882+
}
883+
}
884+
}
885+
886+
pub struct LimitedCoordinator<T> {
887+
coordinator: Coordinator<T>,
888+
_permit: OwnedSemaphorePermit,
889+
}
890+
891+
impl<T> LimitedCoordinator<T>
892+
where
893+
T: Backend,
894+
{
895+
pub async fn shutdown(self) -> Result<T> {
896+
self.coordinator.shutdown().await
897+
}
898+
}
899+
900+
impl<T> ops::Deref for LimitedCoordinator<T> {
901+
type Target = Coordinator<T>;
902+
903+
fn deref(&self) -> &Self::Target {
904+
&self.coordinator
905+
}
906+
}
907+
908+
impl<T> ops::DerefMut for LimitedCoordinator<T> {
909+
fn deref_mut(&mut self) -> &mut Self::Target {
910+
&mut self.coordinator
911+
}
912+
}
913+
824914
#[derive(Debug)]
825915
pub struct Coordinator<B> {
826916
backend: B,
@@ -844,7 +934,7 @@ impl<B> Coordinator<B>
844934
where
845935
B: Backend,
846936
{
847-
pub async fn new(backend: B) -> Self {
937+
pub fn new(backend: B) -> Self {
848938
let token = CancellationToken::new();
849939

850940
Self {
@@ -1089,12 +1179,6 @@ where
10891179
}
10901180
}
10911181

1092-
impl Coordinator<DockerBackend> {
1093-
pub async fn new_docker() -> Self {
1094-
Self::new(DockerBackend(())).await
1095-
}
1096-
}
1097-
10981182
#[derive(Debug)]
10991183
struct Container {
11001184
task: JoinHandle<Result<()>>,
@@ -2521,24 +2605,26 @@ fn basic_secure_docker_command() -> Command {
25212605
)
25222606
}
25232607

2524-
static DOCKER_BACKEND_START: Lazy<u64> = Lazy::new(|| {
2525-
use std::time;
2526-
2527-
let now = time::SystemTime::now();
2528-
now.duration_since(time::UNIX_EPOCH)
2529-
.unwrap_or_default()
2530-
.as_secs()
2531-
});
2532-
2533-
static DOCKER_BACKEND_ID: AtomicU64 = AtomicU64::new(0);
2608+
pub struct DockerBackend {
2609+
id: CoordinatorId,
2610+
instance: AtomicU64,
2611+
}
25342612

2535-
pub struct DockerBackend(());
2613+
impl From<CoordinatorId> for DockerBackend {
2614+
fn from(id: CoordinatorId) -> Self {
2615+
Self {
2616+
id,
2617+
instance: Default::default(),
2618+
}
2619+
}
2620+
}
25362621

25372622
impl DockerBackend {
25382623
fn next_name(&self) -> String {
2539-
let start = *DOCKER_BACKEND_START;
2540-
let id = DOCKER_BACKEND_ID.fetch_add(1, Ordering::SeqCst);
2541-
format!("playground-{start}-{id}")
2624+
let CoordinatorId { start, id } = self.id;
2625+
let instance = self.instance.fetch_add(1, Ordering::SeqCst);
2626+
2627+
format!("playground-{start}-{id}-{instance}")
25422628
}
25432629
}
25442630

@@ -2697,14 +2783,10 @@ fn spawn_io_queue(stdin: ChildStdin, stdout: ChildStdout, token: CancellationTok
26972783
#[cfg(test)]
26982784
mod tests {
26992785
use assertables::*;
2700-
use futures::{
2701-
future::{join, try_join_all},
2702-
Future, FutureExt,
2703-
};
2786+
use futures::future::{join, try_join_all};
27042787
use once_cell::sync::Lazy;
2705-
use std::{env, sync::Once, time::Duration};
2788+
use std::{env, sync::Once};
27062789
use tempdir::TempDir;
2707-
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
27082790

27092791
use super::*;
27102792

@@ -2726,8 +2808,8 @@ mod tests {
27262808
project_dir: TempDir,
27272809
}
27282810

2729-
impl TestBackend {
2730-
fn new() -> Self {
2811+
impl From<CoordinatorId> for TestBackend {
2812+
fn from(_id: CoordinatorId) -> Self {
27312813
static COMPILE_WORKER_ONCE: Once = Once::new();
27322814

27332815
COMPILE_WORKER_ONCE.call_once(|| {
@@ -2781,63 +2863,18 @@ mod tests {
27812863
.unwrap_or(5)
27822864
});
27832865

2784-
static CONCURRENT_TEST_SEMAPHORE: Lazy<Arc<Semaphore>> =
2785-
Lazy::new(|| Arc::new(Semaphore::new(*MAX_CONCURRENT_TESTS)));
2786-
2787-
struct RestrictedCoordinator<T> {
2788-
_permit: OwnedSemaphorePermit,
2789-
coordinator: Coordinator<T>,
2790-
}
2791-
2792-
impl<T> RestrictedCoordinator<T>
2793-
where
2794-
T: Backend,
2795-
{
2796-
async fn with<F, Fut>(f: F) -> Self
2797-
where
2798-
F: FnOnce() -> Fut,
2799-
Fut: Future<Output = Coordinator<T>>,
2800-
{
2801-
let semaphore = CONCURRENT_TEST_SEMAPHORE.clone();
2802-
let permit = semaphore
2803-
.acquire_owned()
2804-
.await
2805-
.expect("Unable to acquire permit");
2806-
let coordinator = f().await;
2807-
Self {
2808-
_permit: permit,
2809-
coordinator,
2810-
}
2811-
}
2812-
2813-
async fn shutdown(self) -> super::Result<T, super::Error> {
2814-
self.coordinator.shutdown().await
2815-
}
2816-
}
2817-
2818-
impl<T> ops::Deref for RestrictedCoordinator<T> {
2819-
type Target = Coordinator<T>;
2820-
2821-
fn deref(&self) -> &Self::Target {
2822-
&self.coordinator
2823-
}
2824-
}
2825-
2826-
impl<T> ops::DerefMut for RestrictedCoordinator<T> {
2827-
fn deref_mut(&mut self) -> &mut Self::Target {
2828-
&mut self.coordinator
2829-
}
2830-
}
2866+
static TEST_COORDINATOR_FACTORY: Lazy<CoordinatorFactory> =
2867+
Lazy::new(|| CoordinatorFactory::new(*MAX_CONCURRENT_TESTS));
28312868

2832-
async fn new_coordinator_test() -> RestrictedCoordinator<impl Backend> {
2833-
RestrictedCoordinator::with(|| Coordinator::new(TestBackend::new())).await
2869+
async fn new_coordinator_test() -> LimitedCoordinator<TestBackend> {
2870+
TEST_COORDINATOR_FACTORY.build().await
28342871
}
28352872

2836-
async fn new_coordinator_docker() -> RestrictedCoordinator<impl Backend> {
2837-
RestrictedCoordinator::with(|| Coordinator::new_docker()).await
2873+
async fn new_coordinator_docker() -> LimitedCoordinator<DockerBackend> {
2874+
TEST_COORDINATOR_FACTORY.build().await
28382875
}
28392876

2840-
async fn new_coordinator() -> RestrictedCoordinator<impl Backend> {
2877+
async fn new_coordinator() -> LimitedCoordinator<impl Backend> {
28412878
#[cfg(not(force_docker))]
28422879
{
28432880
new_coordinator_test().await

compiler/base/orchestrator/src/worker.rs

-2
Original file line numberDiff line numberDiff line change
@@ -816,7 +816,6 @@ pub enum ProcessError {
816816

817817
#[cfg(target_os = "macos")]
818818
mod stats {
819-
use libc;
820819
use mach2::mach_time::{mach_timebase_info, mach_timebase_info_data_t};
821820
use snafu::prelude::*;
822821
use std::mem::MaybeUninit;
@@ -1132,7 +1131,6 @@ pub enum Utf8BufReaderError {
11321131
mod test {
11331132
use std::{
11341133
collections::VecDeque,
1135-
io,
11361134
pin::Pin,
11371135
task::{Context, Poll},
11381136
};

ui/src/main.rs

+25
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#![deny(rust_2018_idioms)]
22

33
use crate::env::{PLAYGROUND_GITHUB_TOKEN, PLAYGROUND_UI_ROOT};
4+
use orchestrator::coordinator::CoordinatorFactory;
45
use serde::{Deserialize, Serialize};
56
use snafu::prelude::*;
67
use std::{
@@ -13,6 +14,8 @@ use tracing_subscriber::EnvFilter;
1314

1415
const DEFAULT_ADDRESS: &str = "127.0.0.1";
1516
const DEFAULT_PORT: u16 = 5000;
17+
const DEFAULT_COORDINATORS_ONE_OFF_LIMIT: usize = 10;
18+
const DEFAULT_COORDINATORS_WEBSOCKET_LIMIT: usize = 50;
1619

1720
mod env;
1821
mod gist;
@@ -45,6 +48,8 @@ struct Config {
4548
metrics_token: Option<String>,
4649
feature_flags: FeatureFlags,
4750
request_db_path: Option<PathBuf>,
51+
coordinators_one_off_limit: usize,
52+
coordinators_websocket_limit: usize,
4853
port: u16,
4954
root: PathBuf,
5055
}
@@ -102,13 +107,25 @@ impl Config {
102107

103108
let request_db_path = env::var_os("PLAYGROUND_REQUEST_DATABASE").map(Into::into);
104109

110+
let coordinators_one_off_limit = env::var("PLAYGROUND_COORDINATORS_ONE_OFF_LIMIT")
111+
.ok()
112+
.and_then(|l| l.parse().ok())
113+
.unwrap_or(DEFAULT_COORDINATORS_ONE_OFF_LIMIT);
114+
115+
let coordinators_websocket_limit = env::var("PLAYGROUND_COORDINATORS_WEBSOCKET_LIMIT")
116+
.ok()
117+
.and_then(|l| l.parse().ok())
118+
.unwrap_or(DEFAULT_COORDINATORS_WEBSOCKET_LIMIT);
119+
105120
Self {
106121
address,
107122
cors_enabled,
108123
gh_token,
109124
metrics_token,
110125
feature_flags,
111126
request_db_path,
127+
coordinators_one_off_limit,
128+
coordinators_websocket_limit,
112129
port,
113130
root,
114131
}
@@ -145,6 +162,14 @@ impl Config {
145162
request_db.expect("Unable to open request log database")
146163
}
147164

165+
fn coordinator_one_off_factory(&self) -> CoordinatorFactory {
166+
CoordinatorFactory::new(self.coordinators_one_off_limit)
167+
}
168+
169+
fn coordinator_websocket_factory(&self) -> CoordinatorFactory {
170+
CoordinatorFactory::new(self.coordinators_websocket_limit)
171+
}
172+
148173
fn server_socket_addr(&self) -> SocketAddr {
149174
let address = self.address.parse().expect("Invalid address");
150175
SocketAddr::new(address, self.port)

ui/src/metrics.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use lazy_static::lazy_static;
22
use orchestrator::coordinator::{self, Channel, CompileTarget, CrateType, Edition, Mode};
33
use prometheus::{
4-
self, register_histogram, register_histogram_vec, register_int_counter,
5-
register_int_counter_vec, register_int_gauge, Histogram, HistogramVec, IntCounter,
6-
IntCounterVec, IntGauge,
4+
register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec,
5+
register_int_gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge,
76
};
87
use std::{
98
future::Future,

0 commit comments

Comments
 (0)