Skip to content

Commit ed42c2a

Browse files
authored
feat(runtime): support CORES_MIN, CORES_MAX, and CORES_MAX_RATIO (#3731)
When the proxy boots up, it needs to select a number of I/O worker threads to allocate to the runtime. This change adds a new environment variable that allows this value to scale based on the number of CPUs available on on the host. A CORES_MAX_RATIO value of 1.0 will allocate one worker thread per CPU core. A lesser value will allocate fewer worker threads. Values are rounded to the nearest whole number. The CORES_MIN value sets a lower bound on the number of worker threads to use. The CORES_MAX value sets an upper bound.
1 parent 13a7916 commit ed42c2a

File tree

7 files changed

+232
-21
lines changed

7 files changed

+232
-21
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1305,6 +1305,7 @@ dependencies = [
13051305
"linkerd-opencensus",
13061306
"linkerd-opentelemetry",
13071307
"linkerd-tonic-stream",
1308+
"linkerd-workers",
13081309
"rangemap",
13091310
"regex",
13101311
"thiserror 2.0.12",
@@ -2693,6 +2694,10 @@ dependencies = [
26932694
"tracing",
26942695
]
26952696

2697+
[[package]]
2698+
name = "linkerd-workers"
2699+
version = "0.1.0"
2700+
26962701
[[package]]
26972702
name = "linkerd2-proxy"
26982703
version = "0.1.0"

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ members = [
8787
"linkerd/tracing",
8888
"linkerd/transport-header",
8989
"linkerd/transport-metrics",
90+
"linkerd/workers",
9091
"linkerd2-proxy",
9192
"opencensus-proto",
9293
"opentelemetry-proto",

linkerd/app/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ linkerd-error = { path = "../error" }
2727
linkerd-opencensus = { path = "../opencensus" }
2828
linkerd-opentelemetry = { path = "../opentelemetry" }
2929
linkerd-tonic-stream = { path = "../tonic-stream" }
30+
linkerd-workers = { path = "../workers" }
3031
rangemap = "1"
3132
regex = "1"
3233
thiserror = "2"

linkerd/app/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ pub use linkerd_app_core::{metrics, trace, transport::BindTcp, BUILD_INFO};
2929
use linkerd_app_gateway as gateway;
3030
use linkerd_app_inbound::{self as inbound, Inbound};
3131
use linkerd_app_outbound::{self as outbound, Outbound};
32+
pub use linkerd_workers::Workers;
3233
use std::pin::Pin;
3334
use tokio::{
3435
sync::mpsc,

linkerd/workers/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[package]
2+
name = "linkerd-workers"
3+
version = "0.1.0"
4+
edition = "2021"
5+
publish = false
6+
description = "CPU core allocation logic for Linkerd"
7+
8+
[dependencies]

linkerd/workers/src/lib.rs

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
//! Core allocation logic for Linkerd's worker threads.
2+
3+
use std::num::NonZeroUsize;
4+
5+
/// Determines the number of worker threads to use in a runtime.
6+
#[derive(Copy, Clone, Debug)]
7+
pub struct Workers {
8+
pub available: NonZeroUsize,
9+
pub max_ratio: Option<f64>,
10+
pub max_cores: Option<NonZeroUsize>,
11+
pub min_cores: NonZeroUsize,
12+
}
13+
14+
impl Workers {
15+
/// Calculate the number of cores to use based on the constraints.
16+
///
17+
/// The algorithm uses the following precedence:
18+
/// 1. The explicitly configured maximum cores, if present
19+
/// 2. The ratio-based calculation, if present
20+
/// 3. Default to 1 core
21+
///
22+
/// The result is constrained by both the minimum cores and the available cores.
23+
pub fn cores(&self) -> NonZeroUsize {
24+
let Self {
25+
available,
26+
max_ratio,
27+
max_cores,
28+
min_cores,
29+
} = *self;
30+
31+
max_cores
32+
.or_else(|| {
33+
max_ratio.and_then(|ratio| {
34+
let max = (available.get() as f64 * ratio).round() as usize;
35+
max.try_into().ok()
36+
})
37+
})
38+
.unwrap_or_else(|| 1.try_into().unwrap())
39+
.max(min_cores)
40+
.min(available)
41+
}
42+
}
43+
44+
#[cfg(test)]
45+
mod tests {
46+
use super::*;
47+
48+
#[test]
49+
fn min_cores_exceeds_max_cores() {
50+
let workers = Workers {
51+
available: NonZeroUsize::new(8).unwrap(),
52+
max_cores: NonZeroUsize::new(2),
53+
min_cores: NonZeroUsize::new(4).unwrap(),
54+
max_ratio: None,
55+
};
56+
assert_eq!(workers.cores().get(), 4);
57+
}
58+
59+
#[test]
60+
fn available_limits_max_cores() {
61+
let workers = Workers {
62+
available: NonZeroUsize::new(2).unwrap(),
63+
max_cores: NonZeroUsize::new(4),
64+
min_cores: NonZeroUsize::new(1).unwrap(),
65+
max_ratio: None,
66+
};
67+
assert_eq!(workers.cores().get(), 2);
68+
}
69+
70+
#[test]
71+
fn max_ratio_calculates_cores() {
72+
let workers = Workers {
73+
available: NonZeroUsize::new(10).unwrap(),
74+
max_cores: None,
75+
min_cores: NonZeroUsize::new(1).unwrap(),
76+
max_ratio: Some(0.5),
77+
};
78+
assert_eq!(workers.cores().get(), 5); // 10 * 0.5 = 5
79+
}
80+
81+
#[test]
82+
fn max_cores_overrides_ratio() {
83+
let workers = Workers {
84+
available: NonZeroUsize::new(10).unwrap(),
85+
max_cores: NonZeroUsize::new(3),
86+
min_cores: NonZeroUsize::new(1).unwrap(),
87+
max_ratio: Some(0.5),
88+
};
89+
assert_eq!(workers.cores().get(), 3);
90+
}
91+
92+
#[test]
93+
fn min_cores_exceeds_ratio_calculation() {
94+
let workers = Workers {
95+
available: NonZeroUsize::new(10).unwrap(),
96+
max_cores: None,
97+
min_cores: NonZeroUsize::new(6).unwrap(),
98+
max_ratio: Some(0.5),
99+
};
100+
assert_eq!(workers.cores().get(), 6); // min_cores > max_cores from ratio (5)
101+
}
102+
103+
#[test]
104+
fn fallback_to_min_cores_when_no_max() {
105+
let workers = Workers {
106+
available: NonZeroUsize::new(8).unwrap(),
107+
max_cores: None,
108+
min_cores: NonZeroUsize::new(2).unwrap(),
109+
max_ratio: None,
110+
};
111+
assert_eq!(workers.cores().get(), 2);
112+
}
113+
114+
#[test]
115+
fn single_cpu_environment() {
116+
let workers = Workers {
117+
available: NonZeroUsize::new(1).unwrap(),
118+
max_cores: NonZeroUsize::new(4),
119+
min_cores: NonZeroUsize::new(2).unwrap(),
120+
max_ratio: None,
121+
};
122+
assert_eq!(workers.cores().get(), 1);
123+
}
124+
125+
#[test]
126+
fn ratio() {
127+
// For 10 CPUs with 0.31 ratio, we get 3.1 cores, which rounds to 3
128+
let workers = Workers {
129+
available: NonZeroUsize::new(10).unwrap(),
130+
max_cores: None,
131+
min_cores: NonZeroUsize::new(1).unwrap(),
132+
max_ratio: Some(0.31),
133+
};
134+
assert_eq!(workers.cores().get(), 3);
135+
136+
// For 10 CPUs with 0.35 ratio, we get 3.5 cores, which rounds to 4
137+
let workers = Workers {
138+
available: NonZeroUsize::new(10).unwrap(),
139+
max_cores: None,
140+
min_cores: NonZeroUsize::new(1).unwrap(),
141+
max_ratio: Some(0.35),
142+
};
143+
assert_eq!(workers.cores().get(), 4);
144+
145+
// For 8 CPUs with 0.25 ratio, we get exactly 2 cores
146+
let workers = Workers {
147+
available: NonZeroUsize::new(8).unwrap(),
148+
max_cores: None,
149+
min_cores: NonZeroUsize::new(1).unwrap(),
150+
max_ratio: Some(0.25),
151+
};
152+
assert_eq!(workers.cores().get(), 2);
153+
154+
// For 96 CPUs with 1.0 ratio, we get all 96 cores
155+
let workers = Workers {
156+
available: NonZeroUsize::new(96).unwrap(),
157+
max_cores: None,
158+
min_cores: NonZeroUsize::new(1).unwrap(),
159+
max_ratio: Some(1.0),
160+
};
161+
assert_eq!(workers.cores().get(), 96);
162+
}
163+
}

linkerd2-proxy/src/rt.rs

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
use std::num::NonZeroUsize;
2+
3+
use linkerd_app::Workers;
14
use tokio::runtime::{Builder, Runtime};
2-
use tracing::{info, warn};
5+
use tracing::{debug, info, warn};
36

47
pub(crate) fn build() -> Runtime {
58
// The proxy creates an additional admin thread, but it would be wasteful to
@@ -9,45 +12,74 @@ pub(crate) fn build() -> Runtime {
912
//
1013
// The basic scheduler is used when the threaded scheduler would provide no
1114
// benefit.
12-
let mut cores = std::env::var("LINKERD2_PROXY_CORES")
15+
16+
let min_cores = std::env::var("LINKERD2_PROXY_CORES_MIN")
1317
.ok()
1418
.and_then(|v| {
15-
let opt = v.parse::<usize>().ok().filter(|n| *n > 0);
19+
let opt = v.parse::<usize>().ok().and_then(NonZeroUsize::new);
1620
if opt.is_none() {
17-
warn!(LINKERD2_PROXY_CORES = %v, "Ignoring invalid configuration");
21+
warn!(LINKERD2_PROXY_CORES_MIN = %v, "Ignoring invalid configuration");
1822
}
1923
opt
2024
})
21-
.unwrap_or(0);
25+
.or_else(|| {
26+
std::env::var("LINKERD2_PROXY_CORES").ok().and_then(|v| {
27+
let opt = v.parse::<usize>().ok().and_then(NonZeroUsize::new);
28+
if opt.is_none() {
29+
warn!(LINKERD2_PROXY_CORES = %v, "Ignoring invalid configuration");
30+
}
31+
opt
32+
})
33+
})
34+
.unwrap_or_else(|| NonZeroUsize::new(1).unwrap());
2235

23-
let cpus = num_cpus::get();
24-
debug_assert!(cpus > 0, "At least one CPU must be available");
25-
if cores > cpus {
26-
warn!(
27-
cpus,
28-
LINKERD2_PROXY_CORES = cores,
29-
"Ignoring configuration due to insufficient resources"
30-
);
31-
cores = cpus;
32-
}
36+
let max_cores = std::env::var("LINKERD2_PROXY_CORES_MAX")
37+
.ok()
38+
.and_then(|v| {
39+
let opt = v.parse::<usize>().ok().and_then(NonZeroUsize::new);
40+
if opt.is_none() {
41+
warn!(LINKERD2_PROXY_CORES_MAX = %v, "Ignoring invalid configuration");
42+
}
43+
opt
44+
});
45+
46+
let cores_ratio = std::env::var("LINKERD2_PROXY_CORES_MAX_RATIO")
47+
.ok()
48+
.and_then(|v| {
49+
let opt = v.parse::<f64>().ok().filter(|n| *n > 0.0 && *n <= 1.0);
50+
if opt.is_none() {
51+
warn!(LINKERD2_PROXY_CORES_MAX_RATIO = %v, "Ignoring invalid configuration");
52+
}
53+
opt
54+
});
55+
56+
let available_cpus = num_cpus::get();
57+
debug_assert!(available_cpus > 0, "At least one CPU must be available");
58+
let workers = Workers {
59+
available: NonZeroUsize::new(available_cpus)
60+
.unwrap_or_else(|| NonZeroUsize::new(1).unwrap()),
61+
max_ratio: cores_ratio,
62+
min_cores,
63+
max_cores,
64+
};
65+
debug!(?workers);
3366

34-
match cores {
35-
// `0` is unexpected, but it's a wild world out there.
36-
0 | 1 => {
67+
match workers.cores().get() {
68+
1 => {
3769
info!("Using single-threaded proxy runtime");
3870
Builder::new_current_thread()
3971
.enable_all()
4072
.thread_name("proxy")
4173
.build()
4274
.expect("failed to build basic runtime!")
4375
}
44-
num_cpus => {
76+
cores => {
4577
info!(%cores, "Using multi-threaded proxy runtime");
4678
Builder::new_multi_thread()
4779
.enable_all()
4880
.thread_name("proxy")
49-
.worker_threads(num_cpus)
50-
.max_blocking_threads(num_cpus)
81+
.worker_threads(cores)
82+
.max_blocking_threads(cores)
5183
.build()
5284
.expect("failed to build threaded runtime!")
5385
}

0 commit comments

Comments
 (0)