Skip to content

Commit 7fea375

Browse files
committed
ref(kafka): Make routing key optional, instead of randomizing it
1 parent 9fd1ce3 commit 7fea375

File tree

5 files changed

+49
-44
lines changed

5 files changed

+49
-44
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

relay-kafka/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ serde_json = { workspace = true, optional = true }
2323
thiserror = { workspace = true }
2424
sentry-kafka-schemas = { workspace = true, default-features = false, optional = true }
2525
parking_lot = { workspace = true }
26-
uuid = { workspace = true }
2726
hashbrown = { workspace = true }
2827

2928
[dev-dependencies]

relay-kafka/src/producer/mod.rs

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use rdkafka::message::Header;
1111
use rdkafka::producer::{BaseRecord, Producer as _};
1212
use relay_statsd::metric;
1313
use thiserror::Error;
14-
use uuid::Uuid;
1514

1615
use crate::config::{KafkaParams, KafkaTopic};
1716
use crate::debounced::Debounced;
@@ -77,7 +76,7 @@ pub enum ClientError {
7776
/// Describes the type which can be sent using kafka producer provided by this crate.
7877
pub trait Message {
7978
/// Returns the partitioning key for this kafka message determining.
80-
fn key(&self) -> [u8; 16];
79+
fn key(&self) -> Option<[u8; 16]>;
8180

8281
/// Returns the type of the message.
8382
fn variant(&self) -> &'static str;
@@ -188,7 +187,7 @@ impl KafkaClient {
188187
pub fn send(
189188
&self,
190189
topic: KafkaTopic,
191-
key: [u8; 16],
190+
key: Option<[u8; 16]>,
192191
headers: Option<&BTreeMap<String, String>>,
193192
variant: &str,
194193
payload: &[u8],
@@ -305,7 +304,7 @@ impl Producer {
305304
/// Sends the payload to the correct producer for the current topic.
306305
fn send(
307306
&self,
308-
key: [u8; 16],
307+
key: Option<[u8; 16]>,
309308
headers: Option<&BTreeMap<String, String>>,
310309
variant: &str,
311310
payload: &[u8],
@@ -328,28 +327,37 @@ impl Producer {
328327
})
329328
.collect::<KafkaHeaders>();
330329

331-
let mut key = key;
332-
if let Some(ref limiter) = self.rate_limiter {
333-
if limiter.try_increment(now, key, 1) < 1 {
334-
metric!(
335-
counter(KafkaCounters::ProducerPartitionKeyRateLimit) += 1,
336-
variant = variant,
337-
topic = topic_name,
338-
);
339-
340-
key = Uuid::new_v4().into_bytes();
341-
headers.insert(Header {
342-
key: "sentry-reshuffled",
343-
value: Some("1"),
344-
});
330+
let key = match (key, self.rate_limiter.as_ref()) {
331+
(Some(key), Some(limiter)) => {
332+
let is_limited = limiter.try_increment(now, key, 1) < 1;
333+
334+
if is_limited {
335+
metric!(
336+
counter(KafkaCounters::ProducerPartitionKeyRateLimit) += 1,
337+
variant = variant,
338+
topic = topic_name,
339+
);
340+
341+
headers.insert(Header {
342+
key: "sentry-reshuffled",
343+
value: Some("1"),
344+
});
345+
346+
None
347+
} else {
348+
Some(key)
349+
}
345350
}
346-
}
347-
348-
let mut record = BaseRecord::to(topic_name).key(&key).payload(payload);
351+
(key, _) => key,
352+
};
349353

354+
let mut record = BaseRecord::to(topic_name).payload(payload);
350355
if let Some(headers) = headers.into_inner() {
351356
record = record.headers(headers);
352357
}
358+
if let Some(key) = key.as_ref() {
359+
record = record.key(key);
360+
}
353361

354362
self.metrics.debounce(now, || {
355363
metric!(

relay-server/src/services/outcome.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,10 +1151,13 @@ impl OutcomeBroker {
11511151
KafkaTopic::Outcomes
11521152
};
11531153

1154-
let result =
1155-
producer
1156-
.client
1157-
.send(topic, key.into_bytes(), None, "outcome", payload.as_bytes());
1154+
let result = producer.client.send(
1155+
topic,
1156+
Some(key.into_bytes()),
1157+
None,
1158+
"outcome",
1159+
payload.as_bytes(),
1160+
);
11581161

11591162
match result {
11601163
Ok(_) => Ok(()),

relay-server/src/services/store.rs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1595,34 +1595,30 @@ impl Message for KafkaMessage<'_> {
15951595
}
15961596

15971597
/// Returns the partitioning key for this kafka message determining.
1598-
fn key(&self) -> [u8; 16] {
1599-
let mut uuid = match self {
1600-
Self::Event(message) => message.event_id.0,
1601-
Self::Attachment(message) => message.event_id.0,
1602-
Self::AttachmentChunk(message) => message.event_id.0,
1603-
Self::UserReport(message) => message.event_id.0,
1604-
Self::ReplayEvent(message) => message.replay_id.0,
1605-
Self::Span { message, .. } => message.trace_id.0,
1598+
fn key(&self) -> Option<[u8; 16]> {
1599+
match self {
1600+
Self::Event(message) => Some(message.event_id.0),
1601+
Self::Attachment(message) => Some(message.event_id.0),
1602+
Self::AttachmentChunk(message) => Some(message.event_id.0),
1603+
Self::UserReport(message) => Some(message.event_id.0),
1604+
Self::ReplayEvent(message) => Some(message.replay_id.0),
1605+
Self::Span { message, .. } => Some(message.trace_id.0),
16061606

16071607
// Monitor check-ins use the hinted UUID passed through from the Envelope.
16081608
//
16091609
// XXX(epurkhiser): In the future it would be better if all KafkaMessage's would
16101610
// recieve the routing_key_hint form their envelopes.
1611-
Self::CheckIn(message) => message.routing_key_hint.unwrap_or_else(Uuid::nil),
1611+
Self::CheckIn(message) => message.routing_key_hint,
16121612

16131613
// Random partitioning
16141614
Self::Profile(_)
16151615
| Self::Log { .. }
16161616
| Self::ReplayRecordingNotChunked(_)
16171617
| Self::ProfileChunk(_)
1618-
| Self::Metric { .. } => Uuid::nil(),
1619-
};
1620-
1621-
if uuid.is_nil() {
1622-
uuid = Uuid::new_v4();
1618+
| Self::Metric { .. } => None,
16231619
}
1624-
1625-
*uuid.as_bytes()
1620+
.filter(|uuid| !uuid.is_nil())
1621+
.map(|uuid| uuid.into_bytes())
16261622
}
16271623

16281624
fn headers(&self) -> Option<&BTreeMap<String, String>> {
@@ -1719,7 +1715,7 @@ mod tests {
17191715
for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
17201716
let res = producer
17211717
.client
1722-
.send(topic, *b"0123456789abcdef", None, "foo", b"");
1718+
.send(topic, Some(*b"0123456789abcdef"), None, "foo", b"");
17231719

17241720
assert!(matches!(res, Err(ClientError::InvalidTopicName)));
17251721
}

0 commit comments

Comments
 (0)