Skip to content

Commit 9fd1ce3

Browse files
Dav1ddeloewenheim
andauthored
ref(kafka): Optimize allocations when creating Kafka headers (#4739)
`OwnedHeaders` actually allocates, with a recent refactor this means we allocate for every message now, even if there are no headers to be added. The wrapper type now handles allocations properly and also optimizes for the case when we add the rate limited header. --------- Co-authored-by: Sebastian Zivota <[email protected]>
1 parent 35f08da commit 9fd1ce3

File tree

2 files changed

+83
-16
lines changed

2 files changed

+83
-16
lines changed

relay-kafka/src/producer/mod.rs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//! This module contains the kafka producer related code.
1+
//! This module contains the Kafka producer related code.
22
33
use std::borrow::Cow;
44
use std::collections::{BTreeMap, HashMap};
@@ -7,7 +7,7 @@ use std::sync::Arc;
77
use std::time::{Duration, Instant};
88

99
use rdkafka::ClientConfig;
10-
use rdkafka::message::{Header, OwnedHeaders};
10+
use rdkafka::message::Header;
1111
use rdkafka::producer::{BaseRecord, Producer as _};
1212
use relay_statsd::metric;
1313
use thiserror::Error;
@@ -16,6 +16,7 @@ use uuid::Uuid;
1616
use crate::config::{KafkaParams, KafkaTopic};
1717
use crate::debounced::Debounced;
1818
use crate::limits::KafkaRateLimits;
19+
use crate::producer::utils::KafkaHeaders;
1920
use crate::statsd::{KafkaCounters, KafkaGauges, KafkaHistograms};
2021

2122
mod utils;
@@ -318,15 +319,14 @@ impl Producer {
318319
topic = topic_name,
319320
);
320321

321-
let mut kafka_headers = OwnedHeaders::new();
322-
if let Some(headers) = headers {
323-
for (key, value) in headers {
324-
kafka_headers = kafka_headers.insert(Header {
325-
key,
326-
value: Some(value),
327-
});
328-
}
329-
}
322+
let mut headers = headers
323+
.unwrap_or(&BTreeMap::new())
324+
.iter()
325+
.map(|(key, value)| Header {
326+
key,
327+
value: Some(value),
328+
})
329+
.collect::<KafkaHeaders>();
330330

331331
let mut key = key;
332332
if let Some(ref limiter) = self.rate_limiter {
@@ -338,17 +338,18 @@ impl Producer {
338338
);
339339

340340
key = Uuid::new_v4().into_bytes();
341-
kafka_headers = kafka_headers.insert(Header {
341+
headers.insert(Header {
342342
key: "sentry-reshuffled",
343343
value: Some("1"),
344344
});
345345
}
346346
}
347347

348-
let record = BaseRecord::to(topic_name)
349-
.key(&key)
350-
.payload(payload)
351-
.headers(kafka_headers);
348+
let mut record = BaseRecord::to(topic_name).key(&key).payload(payload);
349+
350+
if let Some(headers) = headers.into_inner() {
351+
record = record.headers(headers);
352+
}
352353

353354
self.metrics.debounce(now, || {
354355
metric!(

relay-kafka/src/producer/utils.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,77 @@
11
use std::error::Error;
22

3+
use rdkafka::message::{Header, OwnedHeaders, ToBytes};
34
use rdkafka::producer::{DeliveryResult, ProducerContext};
45
use rdkafka::{ClientContext, Message};
56
use relay_statsd::metric;
67

78
use crate::statsd::{KafkaCounters, KafkaGauges};
89

10+
/// A thin wrapper around [`OwnedHeaders`].
11+
///
12+
/// Unlike [`OwnedHeaders`], this will not allocate on creation.
13+
/// Allocations are tuned for the use-case in a [`super::Producer`].
14+
pub struct KafkaHeaders(Option<OwnedHeaders>);
15+
16+
impl KafkaHeaders {
17+
pub fn new() -> Self {
18+
Self(None)
19+
}
20+
21+
pub fn insert<V>(&mut self, header: Header<'_, &V>)
22+
where
23+
V: ToBytes + ?Sized,
24+
{
25+
self.extend(Some(header));
26+
}
27+
28+
pub fn into_inner(self) -> Option<OwnedHeaders> {
29+
self.0
30+
}
31+
}
32+
33+
impl<'a, 'b, V> Extend<Header<'a, &'b V>> for KafkaHeaders
34+
where
35+
V: ToBytes + ?Sized,
36+
{
37+
fn extend<T: IntoIterator<Item = Header<'a, &'b V>>>(&mut self, iter: T) {
38+
let mut iter = iter.into_iter();
39+
40+
// Probe if the iterator is empty, if it is empty, no need to do anything.
41+
let Some(first) = iter.next() else {
42+
return;
43+
};
44+
45+
let mut headers = self.0.take().unwrap_or_else(|| {
46+
// Get a size hint from the iterator, +2 for the already removed
47+
// first element and reserving space for 1 extra header which is conditionally
48+
// added by the `Producer` in this crate.
49+
//
50+
// This means we might allocate a little bit too much, but we never have to resize
51+
// and allocate a second time, a good trade-off.
52+
let size = iter.size_hint().0 + 2;
53+
OwnedHeaders::new_with_capacity(size)
54+
});
55+
headers = headers.insert(first);
56+
for remaining in iter {
57+
headers = headers.insert(remaining);
58+
}
59+
60+
self.0 = Some(headers);
61+
}
62+
}
63+
64+
impl<'a, 'b, V> FromIterator<Header<'a, &'b V>> for KafkaHeaders
65+
where
66+
V: ToBytes + ?Sized,
67+
{
68+
fn from_iter<I: IntoIterator<Item = Header<'a, &'b V>>>(iter: I) -> Self {
69+
let mut c = Self::new();
70+
c.extend(iter);
71+
c
72+
}
73+
}
74+
975
/// Kafka client and producer context that logs statistics and producer errors.
1076
#[derive(Debug)]
1177
pub struct Context;

0 commit comments

Comments
 (0)