Skip to content

Commit 5dfb780

Browse files
rnijvelddavidv1992
authored andcommitted
CSPTP source setup
1 parent f94d840 commit 5dfb780

File tree

13 files changed

+334
-3
lines changed

13 files changed

+334
-3
lines changed

ntp-proto/src/algorithm/kalman/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,21 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug + Send + 'static> TimeSyncC
435435
)
436436
}
437437

438+
fn add_two_way_source(
439+
&mut self,
440+
id: Self::SourceId,
441+
source_config: SourceConfig,
442+
) -> Self::NtpSourceController {
443+
self.sources.insert(id, (None, false));
444+
KalmanSourceController::new(
445+
id,
446+
self.algo_config,
447+
None,
448+
source_config,
449+
AveragingBuffer::default(),
450+
)
451+
}
452+
438453
fn remove_source(&mut self, id: SourceId) {
439454
self.sources.remove(&id);
440455
}

ntp-proto/src/algorithm/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,12 @@ pub trait TimeSyncController: Sized + Send + 'static {
9090
measurement_noise_estimate: f64,
9191
period: Option<f64>,
9292
) -> Self::OneWaySourceController;
93+
/// Create a new two way source with given identity (used e.g. with CSPTP sources)
94+
fn add_two_way_source(
95+
&mut self,
96+
id: Self::SourceId,
97+
source_config: SourceConfig,
98+
) -> Self::NtpSourceController;
9399
/// Notify the controller that a previous source has gone
94100
fn remove_source(&mut self, id: Self::SourceId);
95101
/// Notify the controller that the status of a source (whether

ntp-proto/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ mod exports {
119119
AcceptSynchronizationError, Measurement, NtpSource, NtpSourceAction,
120120
NtpSourceActionIterator, NtpSourceSnapshot, NtpSourceUpdate, ObservableSourceState,
121121
OneWaySource, OneWaySourceSnapshot, OneWaySourceUpdate, ProtocolVersion, Reach,
122-
SourceNtsData,
122+
SourceNtsData, TwoWaySource,
123123
};
124124
pub use super::system::{
125125
System, SystemAction, SystemActionIterator, SystemSnapshot, SystemSourceUpdate,

ntp-proto/src/system.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::time::Duration;
66
use std::{fmt::Debug, hash::Hash};
77

88
use crate::packet::v5::server_reference_id::{BloomFilter, ServerId};
9-
use crate::source::{NtpSourceUpdate, SourceSnapshot};
9+
use crate::source::{NtpSourceUpdate, SourceSnapshot, TwoWaySource};
1010
use crate::{NtpTimestamp, OneWaySource, OneWaySourceUpdate};
1111
use crate::{
1212
algorithm::{StateUpdate, TimeSyncController},
@@ -324,6 +324,19 @@ impl<SourceId: Hash + Eq + Copy + Debug, Controller: TimeSyncController<SourceId
324324
))
325325
}
326326

327+
#[allow(clippy::type_complexity)]
328+
pub fn create_csptp_source(
329+
&mut self,
330+
id: SourceId,
331+
source_config: SourceConfig,
332+
) -> Result<TwoWaySource<Controller::NtpSourceController>, <Controller::Clock as NtpClock>::Error>
333+
{
334+
self.ensure_controller_control()?;
335+
let controller = self.controller.add_two_way_source(id, source_config);
336+
self.sources.insert(id, None);
337+
Ok(TwoWaySource::new(controller))
338+
}
339+
327340
pub fn handle_source_remove(
328341
&mut self,
329342
id: SourceId,

ntpd/src/daemon/config/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,7 @@ impl Config {
448448
NtpSourceConfig::Sock(_) => count += 1,
449449
#[cfg(feature = "pps")]
450450
NtpSourceConfig::Pps(_) => {} // PPS sources don't count
451+
NtpSourceConfig::Csptp(_) => count += 1,
451452
}
452453
}
453454
count
@@ -483,6 +484,7 @@ impl Config {
483484
NtpSourceConfig::Sock(_) => false,
484485
#[cfg(feature = "pps")]
485486
NtpSourceConfig::Pps(_) => false,
487+
NtpSourceConfig::Csptp(_) => false,
486488
NtpSourceConfig::Standard(config) => {
487489
matches!(config.first.ntp_version, ProtocolVersion::V5)
488490
}

ntpd/src/daemon/config/ntp_source.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,12 @@ impl<'de> Deserialize<'de> for PpsSourceConfig {
400400
}
401401
}
402402

403+
#[derive(Deserialize, Debug, PartialEq, Clone)]
404+
#[serde(rename_all = "kebab-case", deny_unknown_fields)]
405+
pub struct CsptpSourceConfig {
406+
pub address: CsptpAddress,
407+
}
408+
403409
#[derive(Debug, Deserialize, PartialEq, Clone)]
404410
#[serde(tag = "mode")]
405411
pub enum NtpSourceConfig {
@@ -417,6 +423,8 @@ pub enum NtpSourceConfig {
417423
#[cfg(feature = "pps")]
418424
#[serde(rename = "pps")]
419425
Pps(PpsSourceConfig),
426+
#[serde(rename = "csptp")]
427+
Csptp(CsptpSourceConfig),
420428
}
421429

422430
/// A normalized address has a host and a port part. However, the host may be
@@ -463,6 +471,9 @@ pub struct NtpAddress(pub NormalizedAddress);
463471
#[derive(Debug, Clone, PartialEq, Eq)]
464472
pub struct NtsKeAddress(pub NormalizedAddress);
465473

474+
#[derive(Debug, Clone, PartialEq, Eq)]
475+
pub struct CsptpAddress(pub NormalizedAddress);
476+
466477
impl<'de> Deserialize<'de> for NtpAddress {
467478
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
468479
where
@@ -487,6 +498,18 @@ impl<'de> Deserialize<'de> for NtsKeAddress {
487498
}
488499
}
489500

501+
impl<'de> Deserialize<'de> for CsptpAddress {
502+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
503+
where
504+
D: Deserializer<'de>,
505+
{
506+
let s = String::deserialize(deserializer)?;
507+
Ok(NormalizedAddress::from_string_csptp(s)
508+
.map_err(serde::de::Error::custom)?
509+
.into())
510+
}
511+
}
512+
490513
impl From<NormalizedAddress> for NtpAddress {
491514
fn from(addr: NormalizedAddress) -> Self {
492515
Self(addr)
@@ -499,6 +522,12 @@ impl From<NormalizedAddress> for NtsKeAddress {
499522
}
500523
}
501524

525+
impl From<NormalizedAddress> for CsptpAddress {
526+
fn from(addr: NormalizedAddress) -> Self {
527+
Self(addr)
528+
}
529+
}
530+
502531
impl Deref for NtsKeAddress {
503532
type Target = NormalizedAddress;
504533

@@ -515,9 +544,18 @@ impl Deref for NtpAddress {
515544
}
516545
}
517546

547+
impl Deref for CsptpAddress {
548+
type Target = NormalizedAddress;
549+
550+
fn deref(&self) -> &Self::Target {
551+
&self.0
552+
}
553+
}
554+
518555
impl NormalizedAddress {
519556
const NTP_DEFAULT_PORT: u16 = 123;
520557
const NTS_KE_DEFAULT_PORT: u16 = 4460;
558+
const CSPTP_DEFAULT_PORT: u16 = 319;
521559

522560
/// Specifically, this adds the `:123` port if no port is specified
523561
pub(crate) fn from_string_ntp(address: String) -> std::io::Result<Self> {
@@ -545,6 +583,19 @@ impl NormalizedAddress {
545583
})
546584
}
547585

586+
/// Specifically, this adds the `:319` port if no port is specified
587+
pub(crate) fn from_string_csptp(address: String) -> std::io::Result<Self> {
588+
let (server_name, port) = Self::from_string_help(address, Self::CSPTP_DEFAULT_PORT)?;
589+
590+
Ok(Self {
591+
server_name,
592+
port,
593+
594+
#[cfg(test)]
595+
hardcoded_dns_resolve: HardcodedDnsResolve::default(),
596+
})
597+
}
598+
548599
fn from_string_help(address: String, default_port: u16) -> std::io::Result<(String, u16)> {
549600
if address.split(':').count() > 2 {
550601
// IPv6, try to parse it as such
@@ -673,6 +724,7 @@ mod tests {
673724
NtpSourceConfig::Sock(_c) => "".to_string(),
674725
#[cfg(feature = "pps")]
675726
NtpSourceConfig::Pps(_c) => "".to_string(),
727+
NtpSourceConfig::Csptp(c) => c.address.to_string(),
676728
}
677729
}
678730

ntpd/src/daemon/csptp_source.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
use std::net::SocketAddr;
2+
3+
use ntp_proto::{NtpClock, NtpDuration, SourceController, TwoWaySource};
4+
use timestamped_socket::{
5+
interface::InterfaceName,
6+
socket::{Connected, Socket},
7+
};
8+
use tracing::{Instrument, Span, instrument};
9+
10+
use crate::daemon::{config::TimestampMode, ntp_source::SourceChannels, spawn::SourceId};
11+
12+
pub(crate) struct CsptpSourceTask<
13+
C: 'static + NtpClock + Send,
14+
Controller: SourceController<MeasurementDelay = NtpDuration>,
15+
> {
16+
index: SourceId,
17+
clock: C,
18+
interface: Option<InterfaceName>,
19+
timestamp_mode: TimestampMode,
20+
name: String,
21+
source_addr: SocketAddr,
22+
socket: Option<Socket<SocketAddr, Connected>>,
23+
channels: SourceChannels<Controller::ControllerMessage, Controller::SourceMessage>,
24+
25+
source: TwoWaySource<Controller>,
26+
}
27+
28+
impl<C: 'static + NtpClock + Send, Controller: SourceController<MeasurementDelay = NtpDuration>>
29+
CsptpSourceTask<C, Controller>
30+
{
31+
#[allow(clippy::too_many_arguments)]
32+
#[instrument(level = tracing::Level::ERROR, name = "CSPTP Source", skip(timestamp_mode, clock, channels, source))]
33+
pub fn spawn(
34+
index: SourceId,
35+
name: String,
36+
source_addr: SocketAddr,
37+
interface: Option<InterfaceName>,
38+
clock: C,
39+
timestamp_mode: TimestampMode,
40+
channels: SourceChannels<Controller::ControllerMessage, Controller::SourceMessage>,
41+
source: TwoWaySource<Controller>,
42+
) -> tokio::task::JoinHandle<()> {
43+
tokio::spawn(
44+
(async move {
45+
let mut process = CsptpSourceTask {
46+
index,
47+
name,
48+
clock,
49+
channels,
50+
interface,
51+
timestamp_mode,
52+
source_addr,
53+
socket: None,
54+
source,
55+
};
56+
})
57+
.instrument(Span::current()),
58+
)
59+
}
60+
}

ntpd/src/daemon/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
mod clock;
22
pub mod config;
33
mod csptp_server;
4+
mod csptp_source;
45
pub mod keyexchange;
56
mod local_ip_provider;
67
mod ntp_source;

0 commit comments

Comments
 (0)