Skip to content

Commit b3f6d60

Browse files
committed
Implemented connection limiting on the incomming connections.
1 parent f10f83c commit b3f6d60

3 files changed

Lines changed: 17 additions & 1 deletion

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ rust-version = "1.87" # MSRV
1414
normal = [ "ntp-proto", "rustls-platform-verifier", "rustls-pemfile2", "rustls", "serde", "tokio-rustls", "toml", "tracing", "tracing-subscriber" ]
1515

1616
[dependencies]
17-
tokio = { version = "1.32", features = ["rt-multi-thread", "io-util", "fs", "net", "macros", "time" ] }
17+
tokio = { version = "1.32", features = ["rt-multi-thread", "io-util", "fs", "net", "macros", "time", "sync" ] }
1818
toml = { version = ">=0.6.0,<0.9.0", default-features = false, features = ["parse"] }
1919
tracing = "0.1.37"
2020
tracing-subscriber = { version = "0.3.0", default-features = false, features = ["std", "fmt", "ansi"] }

src/config.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,19 +105,27 @@ struct BareNtsPoolKeConfig {
105105
listen: SocketAddr,
106106
/// Which upstream servers to use.
107107
key_exchange_servers: Box<[KeyExchangeServer]>,
108+
/// Maximum amount of parallel connections (incoming)
109+
#[serde(default = "default_max_connections")]
110+
max_connections: usize,
108111
}
109112

110113
fn default_nts_ke_timeout() -> u64 {
111114
1000
112115
}
113116

117+
fn default_max_connections() -> usize {
118+
100
119+
}
120+
114121
#[derive(Clone)]
115122
pub struct NtsPoolKeConfig {
116123
pub server_tls: TlsAcceptor,
117124
pub upstream_tls: TlsConnector,
118125
pub listen: SocketAddr,
119126
pub key_exchange_servers: Box<[KeyExchangeServer]>,
120127
pub key_exchange_timeout: Duration,
128+
pub max_connections: usize,
121129
}
122130

123131
fn load_certificates(
@@ -192,6 +200,7 @@ impl<'de> Deserialize<'de> for NtsPoolKeConfig {
192200
listen: bare.listen,
193201
key_exchange_servers: bare.key_exchange_servers,
194202
key_exchange_timeout: std::time::Duration::from_millis(bare.key_exchange_timeout),
203+
max_connections: bare.max_connections,
195204
})
196205
}
197206
}

src/pool_ke.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,16 @@ impl NtsPoolKe {
7070

7171
async fn serve(self: Arc<Self>) -> std::io::Result<()> {
7272
let listener = TcpListener::bind(self.config.listen).await?;
73+
let connectionpermits = Arc::new(tokio::sync::Semaphore::new(self.config.max_connections));
7374

7475
info!("listening on '{:?}'", listener.local_addr());
7576

7677
loop {
78+
let permit = connectionpermits
79+
.clone()
80+
.acquire_owned()
81+
.await
82+
.expect("Semaphore shouldn't be closed");
7783
let (client_stream, source_address) = listener.accept().await?;
7884
let self_clone = self.clone();
7985

@@ -88,6 +94,7 @@ impl NtsPoolKe {
8894
Ok(Err(err)) => ::tracing::debug!(?err, ?source_address, "NTS Pool KE failed"),
8995
Ok(Ok(())) => ::tracing::debug!(?source_address, "NTS Pool KE completed"),
9096
}
97+
drop(permit);
9198
});
9299
}
93100
}

0 commit comments

Comments
 (0)