1- use std:: net:: SocketAddr ;
1+ use std:: {
2+ io:: Cursor ,
3+ net:: { Ipv4Addr , SocketAddr } ,
4+ } ;
25
3- use ntp_proto:: { NtpClock , NtpDuration , SourceController , TwoWaySource } ;
6+ use ntp_proto:: {
7+ CsptpPacket , Measurement , NtpClock , NtpDuration , NtpInstant , NtpTimestamp ,
8+ OneWaySourceSnapshot , OneWaySourceUpdate , ReferenceId , SourceController , SystemSourceUpdate ,
9+ TwoWaySource ,
10+ } ;
411use timestamped_socket:: {
512 interface:: InterfaceName ,
6- socket:: { Connected , Socket } ,
13+ socket:: { Connected , RecvResult , Socket , open_ip } ,
714} ;
8- use tracing:: { Instrument , Span , instrument} ;
15+ use tracing:: { Instrument , Span , error , instrument, warn } ;
916
10- use crate :: daemon:: { config:: TimestampMode , ntp_source:: SourceChannels , spawn:: SourceId } ;
17+ use crate :: daemon:: {
18+ config:: TimestampMode ,
19+ exitcode,
20+ ntp_source:: { MsgForSystem , SourceChannels } ,
21+ spawn:: SourceId ,
22+ util:: convert_net_timestamp,
23+ } ;
24+
25+ #[ derive( Debug ) ]
26+ enum SocketResult {
27+ Ok ,
28+ Abort ,
29+ }
1130
1231pub ( crate ) struct CsptpSourceTask <
1332 C : ' static + NtpClock + Send ,
@@ -23,6 +42,9 @@ pub(crate) struct CsptpSourceTask<
2342 channels : SourceChannels < Controller :: ControllerMessage , Controller :: SourceMessage > ,
2443
2544 source : TwoWaySource < Controller > ,
45+
46+ last_send_timestamp : Option < NtpTimestamp > ,
47+ seqid : u16 ,
2648}
2749
2850impl < C : ' static + NtpClock + Send , Controller : SourceController < MeasurementDelay = NtpDuration > >
@@ -52,9 +74,244 @@ impl<C: 'static + NtpClock + Send, Controller: SourceController<MeasurementDelay
5274 source_addr,
5375 socket : None ,
5476 source,
77+ last_send_timestamp : None ,
78+ seqid : 0 ,
5579 } ;
80+
81+ process. run ( ) . await ;
5682 } )
5783 . instrument ( Span :: current ( ) ) ,
5884 )
5985 }
86+
87+ async fn setup_socket ( & mut self ) -> SocketResult {
88+ if self . socket . is_some ( ) {
89+ return SocketResult :: Ok ;
90+ }
91+
92+ let socket_res = match self . interface {
93+ #[ cfg( target_os = "linux" ) ]
94+ Some ( interface) => {
95+ use timestamped_socket:: socket:: open_interface_udp;
96+
97+ open_interface_udp (
98+ interface,
99+ 319 , /*lets os choose*/
100+ self . timestamp_mode . as_interface_mode ( ) ,
101+ None ,
102+ )
103+ . and_then ( |socket| socket. connect ( self . source_addr ) )
104+ }
105+ _ => open_ip (
106+ SocketAddr :: new ( Ipv4Addr :: UNSPECIFIED . into ( ) , 319 ) ,
107+ self . timestamp_mode . as_general_mode ( ) ,
108+ )
109+ . and_then ( |socket| socket. connect ( self . source_addr ) ) ,
110+ } ;
111+
112+ self . socket = match socket_res {
113+ Ok ( socket) => Some ( socket) ,
114+ Err ( error) => {
115+ warn ! ( ?error, "Could not open socket" ) ;
116+ return SocketResult :: Abort ;
117+ }
118+ } ;
119+
120+ SocketResult :: Ok
121+ }
122+
123+ async fn run ( & mut self ) {
124+ let mut buf = [ 0u8 ; 1024 ] ;
125+
126+ let poll_wait = tokio:: time:: sleep ( std:: time:: Duration :: default ( ) ) ;
127+ tokio:: pin!( poll_wait) ;
128+ let mut poll_wait = poll_wait;
129+
130+ #[ allow( clippy:: large_enum_variant) ]
131+ enum SelectResult < Controller : SourceController > {
132+ Timer ,
133+ Recv ( Result < RecvResult < SocketAddr > , std:: io:: Error > ) ,
134+ SystemUpdate (
135+ Result <
136+ SystemSourceUpdate < Controller :: ControllerMessage > ,
137+ tokio:: sync:: broadcast:: error:: RecvError ,
138+ > ,
139+ ) ,
140+ }
141+
142+ loop {
143+ let selected: SelectResult < Controller > = tokio:: select! {
144+ ( ) = & mut poll_wait => {
145+ SelectResult :: Timer
146+ } ,
147+ result = self . channels. system_update_receiver. recv( ) => {
148+ SelectResult :: SystemUpdate ( result)
149+ } ,
150+ result = async { if let Some ( ref mut socket) = self . socket { socket. recv( & mut buf) . await } else { std:: future:: pending( ) . await } } => {
151+ SelectResult :: Recv ( result)
152+ } ,
153+ } ;
154+
155+ match selected {
156+ SelectResult :: Timer => {
157+ if matches ! ( self . setup_socket( ) . await , SocketResult :: Abort ) {
158+ self . channels
159+ . msg_for_system_sender
160+ . send ( MsgForSystem :: NetworkIssue ( self . index ) )
161+ . await
162+ . ok ( ) ;
163+ self . channels
164+ . source_snapshots
165+ . write ( )
166+ . expect ( "Unexpected poisoned mutex" )
167+ . remove ( & self . index ) ;
168+ return ;
169+ }
170+
171+ let mut cursor = Cursor :: new ( buf. as_mut_slice ( ) ) ;
172+ self . seqid = self . seqid . wrapping_add ( 1 ) ;
173+ let mut tlvbuffer = [ 0u8 ; 1024 ] ;
174+ CsptpPacket :: request ( & mut tlvbuffer, self . seqid )
175+ . serialize ( & mut cursor)
176+ . unwrap ( ) ;
177+ let packet_size = cursor. position ( ) as usize ;
178+ let packet = & buf[ ..packet_size] ;
179+
180+ match self . clock . now ( ) {
181+ Err ( e) => {
182+ // we cannot determine the origin_timestamp
183+ error ! ( error = ?e, "There was an error retrieving the current time" ) ;
184+
185+ // report as no permissions, since this seems the most likely
186+ std:: process:: exit ( exitcode:: NOPERM ) ;
187+ }
188+ Ok ( ts) => {
189+ self . last_send_timestamp = Some ( ts) ;
190+ }
191+ }
192+
193+ match self . socket . as_mut ( ) . unwrap ( ) . send ( & packet) . await {
194+ Err ( error) => {
195+ warn ! ( ?error, "poll message could not be sent" ) ;
196+
197+ match error. raw_os_error ( ) {
198+ Some ( libc:: EHOSTDOWN )
199+ | Some ( libc:: EHOSTUNREACH )
200+ | Some ( libc:: ENETDOWN )
201+ | Some ( libc:: ENETUNREACH ) => {
202+ self . channels
203+ . msg_for_system_sender
204+ . send ( MsgForSystem :: NetworkIssue ( self . index ) )
205+ . await
206+ . ok ( ) ;
207+ self . channels
208+ . source_snapshots
209+ . write ( )
210+ . expect ( "Unexpected poisoned mutex" )
211+ . remove ( & self . index ) ;
212+ return ;
213+ }
214+ _ => { }
215+ }
216+ }
217+ Ok ( opt_send_timestamp) => {
218+ self . channels
219+ . source_snapshots
220+ . write ( )
221+ . expect ( "Unexpected poisoned mutex" )
222+ . insert (
223+ self . index ,
224+ self . source . observe (
225+ self . name . clone ( ) ,
226+ self . source_addr . to_string ( ) ,
227+ self . index ,
228+ ) ,
229+ ) ;
230+
231+ // update the last_send_timestamp with the one given by the kernel, if available
232+ self . last_send_timestamp = opt_send_timestamp
233+ . map ( convert_net_timestamp)
234+ . or ( self . last_send_timestamp ) ;
235+ }
236+ }
237+ }
238+ SelectResult :: Recv ( Ok ( RecvResult {
239+ bytes_read,
240+ timestamp,
241+ ..
242+ } ) ) => {
243+ let packet = & buf[ ..bytes_read] ;
244+ let timestamp = timestamp
245+ . map ( convert_net_timestamp)
246+ . unwrap_or_else ( || self . clock . now ( ) . unwrap ( ) ) ;
247+
248+ let Ok ( packet) = CsptpPacket :: deserialize ( & packet) else {
249+ break ;
250+ } ;
251+
252+ let Some ( response_data) = packet. get_csptp_response_data ( ) else {
253+ break ;
254+ } ;
255+ let Some ( remote_send_timestamp) = packet. get_origin_timestamp ( ) else {
256+ break ;
257+ } ;
258+
259+ let Some ( t1) = self . last_send_timestamp . take ( ) else {
260+ break ;
261+ } ;
262+ let t2 = NtpTimestamp :: from_statime ( & response_data. req_ingress_timestamp ) ;
263+ let t3 = NtpTimestamp :: from_statime ( & remote_send_timestamp) ;
264+ let t4 = timestamp;
265+
266+ let measurement = Measurement {
267+ delay : ( t4 - t1) - ( t3 - t2) ,
268+ offset : ( ( t2 - t1) + ( t3 - t4) ) / 2 ,
269+ localtime : t1 + ( t4 - t1) / 2 ,
270+ monotime : NtpInstant :: now ( ) ,
271+ stratum : 1 ,
272+ root_delay : NtpDuration :: ZERO ,
273+ root_dispersion : NtpDuration :: ZERO ,
274+ leap : ntp_proto:: NtpLeapIndicator :: NoWarning ,
275+ precision : 0 ,
276+ } ;
277+
278+ let update = OneWaySourceUpdate {
279+ snapshot : OneWaySourceSnapshot {
280+ source_id : ReferenceId :: PPS ,
281+ stratum : 0 ,
282+ } ,
283+ message : self . source . handle_measurement ( std:: dbg!( measurement) ) ,
284+ } ;
285+
286+ self . channels
287+ . msg_for_system_sender
288+ . send ( MsgForSystem :: OneWaySourceUpdate ( self . index , update) )
289+ . await
290+ . ok ( ) ;
291+
292+ self . channels
293+ . source_snapshots
294+ . write ( )
295+ . expect ( "Unexpected poisoned mutex" )
296+ . insert (
297+ self . index ,
298+ self . source . observe (
299+ self . name . clone ( ) ,
300+ self . source_addr . to_string ( ) ,
301+ self . index ,
302+ ) ,
303+ ) ;
304+
305+ poll_wait
306+ . as_mut ( )
307+ . reset ( tokio:: time:: Instant :: now ( ) + std:: time:: Duration :: from_secs ( 1 ) ) ;
308+ }
309+ SelectResult :: Recv ( Err ( _) ) => { /* ignore for now */ }
310+ SelectResult :: SystemUpdate ( Ok ( update) ) => {
311+ self . source . handle_message ( update. message ) ;
312+ }
313+ SelectResult :: SystemUpdate ( Err ( _) ) => { /* ignore for now */ }
314+ }
315+ }
316+ }
60317}
0 commit comments