Skip to content

Commit c8090c8

Browse files
committed
chore: update otel process ctx protocol
The spec has been updated with the following minor change: - `published_at_ns` is renamed to `monotonic_published_at_ns` and use the BOOTTIME clock instead of the default one - the timestamp becomes the only point of synchronization (instead of using the signature during the first publication)
1 parent b04809b commit c8090c8

File tree

2 files changed

+64
-60
lines changed

2 files changed

+64
-60
lines changed

libdd-library-config/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,4 @@ serial_test = "3.2"
3232

3333
[target.'cfg(unix)'.dependencies]
3434
memfd = { version = "0.6" }
35-
rustix = { version = "1.1.3", features = ["param", "mm", "process", "fs"] }
35+
rustix = { version = "1.1.3", features = ["param", "mm", "process", "fs", "time"] }

libdd-library-config/src/otel_process_ctx.rs

Lines changed: 63 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub mod linux {
2222
atomic::{fence, AtomicU64, Ordering},
2323
Mutex, MutexGuard,
2424
},
25-
time::{SystemTime, UNIX_EPOCH},
25+
time::Duration,
2626
};
2727

2828
use anyhow::Context;
@@ -31,6 +31,7 @@ pub mod linux {
3131
fs::{ftruncate, memfd_create, MemfdFlags},
3232
mm::{madvise, mmap, mmap_anonymous, munmap, Advice, MapFlags, ProtFlags},
3333
process::{getpid, set_virtual_memory_region_name, Pid},
34+
time::{clock_gettime, ClockId},
3435
};
3536

3637
use libdd_trace_protobuf::opentelemetry::proto::common::v1::ProcessContext;
@@ -52,21 +53,17 @@ pub mod linux {
5253
/// based synchronization requires the use of atomics to have any effect (see [Mandatory
5354
/// atomic](https://doc.rust-lang.org/std/sync/atomic/fn.fence.html#mandatory-atomic))
5455
///
55-
/// We use `signature` as a release notification for publication, and `published_at_ns` for
56-
/// updates. Ideally, those should be two `AtomicU64`, but this isn't compatible with
57-
/// `#[repr(C, packed)]`, since `AtomicU64` can't be used in a packed structure for alignment
58-
/// reason (what's more, their alignment might be bigger than the one of `u64` on some
59-
/// platforms).
60-
///
61-
/// In practice, given the page size and the layout of `MappingHeader`, the alignment should
62-
/// match (we statically test for it anyway). We can then use [`AtomicU64::from_ptr`] to create
63-
/// an atomic view of those fields when synchronization is needed.
56+
/// We use `monotonic_published_at_ns` for synchronization with the reader. Ideally, it should
57+
/// be an `AtomicU64`, but this is incompatible with `#[repr(C, packed)]` by default, as it
58+
/// could be misaligned. In our case, given the page size and the layout of `MappingHeader`, it
59+
/// is actually 8-bytes aligned: we use [`AtomicU64::from_ptr`] to create an atomic view when
60+
/// synchronization is needed.
6461
#[repr(C, packed)]
6562
struct MappingHeader {
6663
signature: [u8; 8],
6764
version: u32,
6865
payload_size: u32,
69-
published_at_ns: u64,
66+
monotonic_published_at_ns: u64,
7067
payload_ptr: *const u8,
7168
}
7269

@@ -232,7 +229,7 @@ pub mod linux {
232229
unsafe { madvise(mapping.start_addr, size, Advice::LinuxDontFork) }
233230
.context("madvise MADVISE_DONTFORK failed")?;
234231

235-
let published_at_ns = time_now_ns().ok_or_else(|| {
232+
let published_at_ns = since_boottime_ns().ok_or_else(|| {
236233
anyhow::anyhow!("failed to get current time for process context publication")
237234
})?;
238235

@@ -245,27 +242,26 @@ pub mod linux {
245242
ptr::write(
246243
header,
247244
MappingHeader {
248-
// signature will be set atomically at last
249-
signature: [0; 8],
245+
signature: *SIGNATURE,
250246
version: PROCESS_CTX_VERSION,
251247
payload_size: payload
252248
.len()
253249
.try_into()
254250
.context("payload size overflowed")?,
255-
published_at_ns,
251+
// will be set atomically at last
252+
monotonic_published_at_ns: 0,
256253
payload_ptr: payload.as_ptr(),
257254
},
258255
);
259-
// We typically want to avoid the compiler and the hardware to re-order the write to
260-
// the signature (which should be last according to the
256+
// We typically want to avoid the compiler and the hardware to re-order the write
257+
// to the `monotonic_published_at_ns` (which should be last according to the
261258
// specification) with the writes to other fields of the header.
262259
//
263260
// To do so, we implement synchronization during publication _as if the reader were
264261
// another thread of this program_, using atomics and fences.
265262
fence(Ordering::SeqCst);
266-
AtomicU64::from_ptr((*header).signature.as_mut_ptr().cast::<u64>())
267-
// To avoid shuffling bytes, we must use the native endianness
268-
.store(u64::from_ne_bytes(*SIGNATURE), Ordering::Relaxed);
263+
AtomicU64::from_ptr(addr_of_mut!((*header).monotonic_published_at_ns))
264+
.store(published_at_ns, Ordering::Relaxed);
269265
}
270266

271267
let _ = mapping.set_name();
@@ -281,28 +277,27 @@ pub mod linux {
281277
fn update(&mut self, payload: Vec<u8>) -> anyhow::Result<()> {
282278
let header = self.mapping.start_addr as *mut MappingHeader;
283279

284-
let published_at_ns = time_now_ns()
280+
let monotonic_published_at_ns = since_boottime_ns()
285281
.ok_or_else(|| anyhow::anyhow!("could not get the current timestamp"))?;
286282
let payload_size = payload.len().try_into().map_err(|_| {
287-
anyhow::anyhow!("couldn't update process protocol: new payload too large")
283+
anyhow::anyhow!("couldn't update process context: new payload too large")
288284
})?;
289285

290-
// Safety
286+
// Safety:
291287
//
292288
// [^atomic-u64-alignment]: Page size is at minimum 4KB and will be always 8 bytes
293-
// aligned even on exotic platforms. The respective offsets of `signature` and
294-
// `published_at_ns` are 0 and 16 bytes, so they are 8-bytes aligned (`AtomicU64` has
295-
// both a size and align of 8 bytes).
289+
// aligned even on exotic platforms. The offset `monotonic_published_at_ns` is 16
290+
// bytes, so it's 8-bytes aligned (`AtomicU64` has both a size and align of 8 bytes).
296291
//
297292
// The header memory is valid for both read and writes.
298293
let published_at_atomic =
299-
unsafe { AtomicU64::from_ptr(addr_of_mut!((*header).published_at_ns)) };
294+
unsafe { AtomicU64::from_ptr(addr_of_mut!((*header).monotonic_published_at_ns)) };
300295

301296
// A process shouldn't try to concurrently update its own context
302297
//
303-
// Note: be careful of early return while `published_at` is still zero, as this would
304-
// effectively "lock" any future publishing. Move throwing code above this swap, or
305-
// properly restore the previous value if the former can't be done.
298+
// Note: be careful of early return while `monotonic_published_at` is still zero, as
299+
// this would effectively "lock" any future publishing. Move throwing code above this
300+
// swap, or properly restore the previous value if the former can't be done.
306301
if published_at_atomic.swap(0, Ordering::Relaxed) == 0 {
307302
return Err(anyhow::anyhow!(
308303
"concurrent update of the process context is not supported"
@@ -320,7 +315,7 @@ pub mod linux {
320315
}
321316

322317
fence(Ordering::SeqCst);
323-
published_at_atomic.store(published_at_ns, Ordering::Relaxed);
318+
published_at_atomic.store(monotonic_published_at_ns, Ordering::Relaxed);
324319

325320
Ok(())
326321
}
@@ -335,11 +330,10 @@ pub mod linux {
335330
size_of::<MappingHeader>()
336331
}
337332

338-
fn time_now_ns() -> Option<u64> {
339-
SystemTime::now()
340-
.duration_since(UNIX_EPOCH)
341-
.ok()
342-
.and_then(|d| u64::try_from(d.as_nanos()).ok())
333+
/// Returns the value of the monotonic BOOTTIME clock in nanoseconds.
334+
fn since_boottime_ns() -> Option<u64> {
335+
let duration = Duration::try_from(clock_gettime(ClockId::Boottime)).ok()?;
336+
u64::try_from(duration.as_nanos()).ok()
343337
}
344338

345339
/// Locks the context handle. Returns a uniform error if the lock has been poisoned.
@@ -424,6 +418,7 @@ pub mod linux {
424418
use std::{
425419
fs::File,
426420
io::{BufRead, BufReader},
421+
ptr::{self, addr_of_mut},
427422
sync::atomic::{fence, AtomicU64, Ordering},
428423
};
429424

@@ -447,19 +442,32 @@ pub mod linux {
447442
|| name.starts_with("[anon:OTEL_CTX]")
448443
}
449444

450-
/// Reads the signature from a memory address to verify it's an OTEL_CTX mapping. This also
451-
/// establish proper synchronization/memory ordering through atomics since the reader is
452-
/// the same process in this test setup.
453-
fn verify_signature_at(addr: usize) -> bool {
454-
let ptr: *mut u64 = std::ptr::with_exposed_provenance_mut(addr);
455-
// Safety: We're reading from our own process memory at an address
456-
// we found in /proc/self/maps. This should be safe as long as the
457-
// mapping exists and has read permissions.
445+
/// Establishes proper synchronization/memory ordering with the writer, checking that
446+
/// `monotonic_published_at` is not zero and that the signature is correct. Returns a
447+
/// pointer to the initialized header in case of success.
448+
fn verify_mapping_at(addr: usize) -> anyhow::Result<*const MappingHeader> {
449+
let header: *mut MappingHeader = ptr::with_exposed_provenance_mut(addr);
450+
// Safety: we're reading from our own process memory at an address we found in
451+
// /proc/self/maps. This should be safe as long as the mapping exists and has read
452+
// permissions.
458453
//
459-
// For the alignment constraint of `AtomicU64`, see [atomic-u64-alignment].
460-
let signature = unsafe { AtomicU64::from_ptr(ptr).load(Ordering::Relaxed) };
454+
// For the alignment constraint of `AtomicU64`, see [^atomic-u64-alignment].
455+
let published_at = unsafe {
456+
AtomicU64::from_ptr(addr_of_mut!((*header).monotonic_published_at_ns))
457+
.load(Ordering::Relaxed)
458+
};
459+
ensure!(published_at != 0, "monotonic_published_at_ns is zero: couldn't read an initialized header in the candidate mapping");
461460
fence(Ordering::SeqCst);
462-
&signature.to_ne_bytes() == super::SIGNATURE
461+
462+
// Safety: if `monotonic_published_at_ns` is non-zero, the header is properly
463+
// initialized and thus readable.
464+
let signature = unsafe { &header.as_ref().unwrap().signature };
465+
ensure!(
466+
signature == super::SIGNATURE,
467+
"invalid signature in the candidate mapping"
468+
);
469+
470+
Ok(header)
463471
}
464472

465473
/// Find the OTEL_CTX mapping in /proc/self/maps
@@ -487,15 +495,8 @@ pub mod linux {
487495
/// This searches `/proc/self/maps` for an OTEL_CTX mapping and decodes its contents.
488496
pub fn read_process_context() -> anyhow::Result<MappingHeader> {
489497
let mapping_addr = find_otel_mapping()?;
490-
let header_ptr = mapping_addr as *const MappingHeader;
491-
492-
// Note: verifying the signature ensures proper synchronization
493-
ensure!(
494-
verify_signature_at(mapping_addr),
495-
"verification of the signature failed"
496-
);
497-
498-
// Safety: we found this address in /proc/self/maps and verified the signature
498+
let header_ptr = verify_mapping_at(mapping_addr)?;
499+
// Safety: the pointer returned by `verify_mapping_at` points to an initialized header
499500
Ok(unsafe { std::ptr::read(header_ptr) })
500501
}
501502

@@ -524,10 +525,13 @@ pub mod linux {
524525
header.payload_size == payload_v1.len() as u32,
525526
"wrong payload size"
526527
);
527-
assert!(header.published_at_ns > 0, "published_at_ns is zero");
528+
assert!(
529+
header.monotonic_published_at_ns > 0,
530+
"monotonic_published_at_ns is zero"
531+
);
528532
assert!(read_payload == payload_v1.as_bytes(), "payload mismatch");
529533

530-
let published_at_ns_v1 = header.published_at_ns;
534+
let published_at_ns_v1 = header.monotonic_published_at_ns;
531535
// Ensure the clock advances so the updated timestamp is strictly greater
532536
std::thread::sleep(std::time::Duration::from_nanos(10));
533537

@@ -551,7 +555,7 @@ pub mod linux {
551555
"wrong payload size"
552556
);
553557
assert!(
554-
header.published_at_ns > published_at_ns_v1,
558+
header.monotonic_published_at_ns > published_at_ns_v1,
555559
"published_at_ns should be strictly greater after update"
556560
);
557561
assert!(read_payload == payload_v2.as_bytes(), "payload mismatch");

0 commit comments

Comments
 (0)