Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Commit

Permalink
capture: add overflow_enabled option (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored May 27, 2024
1 parent 871441b commit ebaf596
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 23 deletions.
3 changes: 3 additions & 0 deletions capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub struct Config {
pub redis_url: String,
pub otel_url: Option<String>,

#[envconfig(default = "false")]
pub overflow_enabled: bool,

#[envconfig(default = "100")]
pub overflow_per_second_limit: NonZeroU32,

Expand Down
42 changes: 24 additions & 18 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,30 @@ where
.register("rdkafka".to_string(), Duration::seconds(30))
.await;

let partition = OverflowLimiter::new(
config.overflow_per_second_limit,
config.overflow_burst_limit,
config.overflow_forced_keys,
);
if config.export_prometheus {
let partition = partition.clone();
tokio::spawn(async move {
partition.report_metrics().await;
});
}
{
// Ensure that the rate limiter state does not grow unbounded
let partition = partition.clone();
tokio::spawn(async move {
partition.clean_state().await;
});
}
let partition = match config.overflow_enabled {
false => None,
true => {
let partition = OverflowLimiter::new(
config.overflow_per_second_limit,
config.overflow_burst_limit,
config.overflow_forced_keys,
);
if config.export_prometheus {
let partition = partition.clone();
tokio::spawn(async move {
partition.report_metrics().await;
});
}
{
// Ensure that the rate limiter state does not grow unbounded
let partition = partition.clone();
tokio::spawn(async move {
partition.clean_state().await;
});
}
Some(partition)
}
};
let sink = KafkaSink::new(config.kafka, sink_liveness, partition)
.expect("failed to start Kafka sink");

Expand Down
14 changes: 9 additions & 5 deletions capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl rdkafka::ClientContext for KafkaContext {
#[derive(Clone)]
pub struct KafkaSink {
producer: FutureProducer<KafkaContext>,
partition: OverflowLimiter,
partition: Option<OverflowLimiter>,
main_topic: String,
historical_topic: String,
}
Expand All @@ -89,7 +89,7 @@ impl KafkaSink {
pub fn new(
config: KafkaConfig,
liveness: HealthHandle,
partition: OverflowLimiter,
partition: Option<OverflowLimiter>,
) -> anyhow::Result<KafkaSink> {
info!("connecting to Kafka brokers at {}...", config.kafka_hosts);

Expand Down Expand Up @@ -150,7 +150,11 @@ impl KafkaSink {
DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events
DataType::AnalyticsMain => {
// TODO: deprecate capture-led overflow or move logic in handler
if self.partition.is_limited(&event_key) {
let is_limited = match &self.partition {
None => false,
Some(partition) => partition.is_limited(&event_key),
};
if is_limited {
(&self.main_topic, None) // Analytics overflow goes to the main topic without locality
} else {
(&self.main_topic, Some(event_key.as_str()))
Expand Down Expand Up @@ -280,11 +284,11 @@ mod tests {
let handle = registry
.register("one".to_string(), Duration::seconds(30))
.await;
let limiter = OverflowLimiter::new(
let limiter = Some(OverflowLimiter::new(
NonZeroU32::new(10).unwrap(),
NonZeroU32::new(10).unwrap(),
None,
);
));
let cluster = MockCluster::new(1).expect("failed to create mock brokers");
let config = config::KafkaConfig {
kafka_producer_linger_ms: 0,
Expand Down
1 change: 1 addition & 0 deletions capture/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
print_sink: false,
address: SocketAddr::from_str("127.0.0.1:0").unwrap(),
redis_url: "redis://localhost:6379/".to_string(),
overflow_enabled: false,
overflow_burst_limit: NonZeroU32::new(5).unwrap(),
overflow_per_second_limit: NonZeroU32::new(10).unwrap(),
overflow_forced_keys: None,
Expand Down
54 changes: 54 additions & 0 deletions capture/tests/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ async fn it_overflows_events_on_burst() -> Result<()> {

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.overflow_enabled = true;
config.overflow_burst_limit = NonZeroU32::new(2).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

Expand Down Expand Up @@ -223,6 +224,7 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> {

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.overflow_enabled = true;
config.overflow_burst_limit = NonZeroU32::new(1).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

Expand Down Expand Up @@ -254,6 +256,58 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn it_skips_overflows_when_disabled() -> Result<()> {
setup_tracing();

let token = random_string("token", 16);
let distinct_id = random_string("id", 16);

let topic = EphemeralTopic::new().await;

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.overflow_enabled = false;
config.overflow_burst_limit = NonZeroU32::new(2).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config).await;

let event = json!([{
"token": token,
"event": "event1",
"distinct_id": distinct_id
},{
"token": token,
"event": "event2",
"distinct_id": distinct_id
},{
"token": token,
"event": "event3",
"distinct_id": distinct_id
}]);

let res = server.capture_events(event.to_string()).await;
assert_eq!(StatusCode::OK, res.status());

assert_eq!(
topic.next_message_key()?.unwrap(),
format!("{}:{}", token, distinct_id)
);

assert_eq!(
topic.next_message_key()?.unwrap(),
format!("{}:{}", token, distinct_id)
);

// Should have triggered overflow, but has not
assert_eq!(
topic.next_message_key()?.unwrap(),
format!("{}:{}", token, distinct_id)
);
Ok(())
}

#[tokio::test]
async fn it_trims_distinct_id() -> Result<()> {
setup_tracing();
Expand Down

0 comments on commit ebaf596

Please sign in to comment.