diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d891978aae..cbff191580d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ - Revert back the addition of metric names as tag on Sentry errors when relay drops metrics. ([#1873](https://github.com/getsentry/relay/pull/1873)) - Tag the dynamic sampling decision on `count_per_root_project` to measure effective sample rates. ([#1870](https://github.com/getsentry/relay/pull/1870)) - Deprecate fields on the profiling sample format. ([#1878](https://github.com/getsentry/relay/pull/1878)) +- Move the pending envelopes buffering into the project cache. ([#1879](https://github.com/getsentry/relay/pull/1879)) ## 23.2.0 diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index aba6a626dd2..6cf0dbd154c 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -1,4 +1,3 @@ -use std::collections::VecDeque; use std::sync::Arc; use std::time::Duration; @@ -20,16 +19,15 @@ use relay_system::BroadcastChannel; use crate::actors::envelopes::{EnvelopeManager, SendMetrics}; use crate::actors::outcome::{DiscardReason, Outcome}; -use crate::actors::processor::{EnvelopeProcessor, ProcessEnvelope}; -use crate::actors::project_cache::{ - AddSamplingState, CheckedEnvelope, ProjectCache, RequestUpdate, -}; +#[cfg(feature = "processing")] +use crate::actors::processor::EnvelopeProcessor; +use crate::actors::project_cache::{CheckedEnvelope, ProjectCache, RequestUpdate}; use crate::envelope::Envelope; use crate::extractors::RequestMeta; use crate::service::Registry; use crate::statsd::RelayCounters; -use crate::utils::{self, EnvelopeContext, EnvelopeLimiter, MetricsLimiter, RetryBackoff}; +use crate::utils::{EnvelopeContext, EnvelopeLimiter, MetricsLimiter, RetryBackoff}; #[cfg(feature = "processing")] use crate::actors::processor::RateLimitFlushBuckets; @@ -399,8 +397,6 @@ pub struct Project { config: Arc, state: Option>, state_channel: Option, - pending_validations: VecDeque<(Box, EnvelopeContext)>, - pending_sampling: VecDeque, rate_limits: RateLimits, last_no_cache: Instant, } @@ -416,8 +412,6 @@ impl Project { config, state: None, state_channel: None, - pending_validations: VecDeque::new(), - pending_sampling: VecDeque::new(), rate_limits: RateLimits::new(), last_no_cache: Instant::now(), } @@ -453,7 +447,7 @@ impl Project { /// Returns the project state if it is not expired. /// /// Convenience wrapper around [`expiry_state`](Self::expiry_state). - fn valid_state(&self) -> Option> { + pub fn valid_state(&self) -> Option> { match self.expiry_state() { ExpiryState::Updated(state) => Some(state), ExpiryState::Stale(state) => Some(state), @@ -642,93 +636,9 @@ impl Project { self.get_cached_state(no_cache); } - /// Validates the envelope and submits the envelope to the next stage. - /// - /// If this project is disabled or rate limited, corresponding items are dropped from the - /// envelope. Remaining items in the Envelope are forwarded: - /// - If the envelope needs dynamic sampling, this sends [`AddSamplingState`] to the - /// [`ProjectCache`] to add the required project state. - /// - Otherwise, the envelope is directly submitted to the [`EnvelopeProcessor`]. - fn flush_validation( - &mut self, - envelope: Box, - envelope_context: EnvelopeContext, - project_state: Arc, - ) { - if let Ok(checked) = self.check_envelope(envelope, envelope_context) { - if let Some((envelope, envelope_context)) = checked.envelope { - let mut process = ProcessEnvelope { - envelope, - envelope_context, - project_state, - sampling_project_state: None, - }; - - if let Some(sampling_key) = utils::get_sampling_key(&process.envelope) { - let own_key = process - .project_state - .get_public_key_config() - .map(|c| c.public_key); - - if Some(sampling_key) == own_key { - process.sampling_project_state = Some(process.project_state.clone()); - EnvelopeProcessor::from_registry().send(process); - } else { - ProjectCache::from_registry() - .send(AddSamplingState::new(sampling_key, process)); - } - } else { - EnvelopeProcessor::from_registry().send(process); - } - } - } - } - - /// Enqueues an envelope for validation. - /// - /// If the project state is up to date, the message will be immediately sent to the next stage. - /// Otherwise, this queues the envelope and flushes it when the project has been updated. - /// - /// This method will trigger an update of the project state internally if the state is stale or - /// outdated. - pub fn enqueue_validation(&mut self, envelope: Box, context: EnvelopeContext) { - match self.get_cached_state(envelope.meta().no_cache()) { - Some(state) if !state.invalid() => self.flush_validation(envelope, context, state), - _ => self.pending_validations.push_back((envelope, context)), - } - } - - /// Adds the project state for dynamic sampling and submits the Envelope for processing. - fn flush_sampling(&self, mut message: ProcessEnvelope) { - // Intentionally ignore all errors. Fallback sampling behavior applies in this case. - if let Some(state) = self.valid_state().filter(|state| !state.invalid()) { - // Never use rules from another organization. - if state.organization_id == message.project_state.organization_id { - message.sampling_project_state = Some(state); - } - } - - EnvelopeProcessor::from_registry().send(message); - } - - /// Enqueues an envelope for adding a dynamic sampling project state. - /// - /// If the project state is up to date, the message will be immediately submitted for - /// processing. Otherwise, this queues the envelope and flushes it when the project has been - /// updated. - /// - /// This method will trigger an update of the project state internally if the state is stale or - /// outdated. - pub fn enqueue_sampling(&mut self, message: ProcessEnvelope) { - match self.get_cached_state(message.envelope.meta().no_cache()) { - Some(_) => self.flush_sampling(message), - None => self.pending_sampling.push_back(message), - } - } - /// Replaces the internal project state with a new one and triggers pending actions. /// - /// This flushes pending envelopes from [`ValidateEnvelope`] and [`AddSamplingState`] and + /// This flushes pending envelopes from [`ValidateEnvelope`] and /// notifies all pending receivers from [`get_state`](Self::get_state). /// /// `no_cache` should be passed from the requesting call. Updates with `no_cache` will always @@ -775,16 +685,6 @@ impl Project { return; } - // Flush all queued `ValidateEnvelope` messages - while let Some((envelope, context)) = self.pending_validations.pop_front() { - self.flush_validation(envelope, context, state.clone()); - } - - // Flush all queued `AddSamplingState` messages - while let Some(message) = self.pending_sampling.pop_front() { - self.flush_sampling(message); - } - // Flush all waiting recipients. relay_log::debug!("project state {} updated", self.project_key); channel.inner.send(state); @@ -921,18 +821,6 @@ impl Project { } } -impl Drop for Project { - fn drop(&mut self) { - let count = self.pending_validations.len() + self.pending_sampling.len(); - if count > 0 { - relay_log::with_scope( - |scope| scope.set_tag("project_key", self.project_key), - || relay_log::error!("dropped project with {} envelopes", count), - ); - } - } -} - #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/relay-server/src/actors/project_cache.rs b/relay-server/src/actors/project_cache.rs index 3133c0e21a1..4d8f9fbd613 100644 --- a/relay-server/src/actors/project_cache.rs +++ b/relay-server/src/actors/project_cache.rs @@ -1,3 +1,4 @@ +use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; use tokio::sync::mpsc; @@ -12,7 +13,7 @@ use relay_statsd::metric; use relay_system::{Addr, FromMessage, Interface, Sender, Service}; use crate::actors::outcome::DiscardReason; -use crate::actors::processor::ProcessEnvelope; +use crate::actors::processor::{EnvelopeProcessor, ProcessEnvelope}; use crate::actors::project::{Project, ProjectSender, ProjectState}; use crate::actors::project_local::{LocalProjectSource, LocalProjectSourceService}; use crate::actors::project_upstream::{UpstreamProjectSource, UpstreamProjectSourceService}; @@ -131,11 +132,12 @@ impl CheckEnvelope { /// [`CheckEnvelope`]. Once the envelope has been validated, remaining items are forwarded to the /// next stage: /// -/// - If the envelope needs dynamic sampling, this sends [`AddSamplingState`] to the -/// [`ProjectCache`] to add the required project state. +/// - If the envelope needs dynamic sampling, and the project state is not cached or out of the +/// date, the envelopes is spooled and we continue when the state is fetched. /// - Otherwise, the envelope is directly submitted to the [`EnvelopeProcessor`]. /// /// [`EnvelopeProcessor`]: crate::actors::processor::EnvelopeProcessor +#[derive(Debug)] pub struct ValidateEnvelope { envelope: Box, context: EnvelopeContext, @@ -147,27 +149,6 @@ impl ValidateEnvelope { } } -/// Adds the project state for dynamic sampling and sends the envelope to processing. -/// -/// If the project state is up to date, the envelope will be immediately submitted for processing. -/// Otherwise, this queues the envelope and flushes it when the project has been updated. -/// -/// This message will trigger an update of the project state internally if the state is stale or -/// outdated. -pub struct AddSamplingState { - project_key: ProjectKey, - message: ProcessEnvelope, -} - -impl AddSamplingState { - pub fn new(project_key: ProjectKey, message: ProcessEnvelope) -> Self { - Self { - project_key, - message, - } - } -} - pub struct UpdateRateLimits { project_key: ProjectKey, rate_limits: RateLimits, @@ -206,7 +187,6 @@ pub enum ProjectCache { Sender>, ), ValidateEnvelope(ValidateEnvelope), - AddSamplingState(AddSamplingState), UpdateRateLimits(UpdateRateLimits), InsertMetrics(InsertMetrics), MergeBuckets(MergeBuckets), @@ -267,14 +247,6 @@ impl FromMessage for ProjectCache { } } -impl FromMessage for ProjectCache { - type Response = relay_system::NoResponse; - - fn from_message(message: AddSamplingState, _: ()) -> Self { - Self::AddSamplingState(message) - } -} - impl FromMessage for ProjectCache { type Response = relay_system::NoResponse; @@ -400,6 +372,86 @@ struct UpdateProjectState { no_cache: bool, } +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] +struct QueueKey { + own_key: ProjectKey, + sampling_key: ProjectKey, +} + +impl QueueKey { + fn new(own_key: ProjectKey, sampling_key: ProjectKey) -> Self { + Self { + own_key, + sampling_key, + } + } +} + +/// The queue (buffer) of the incoming envelopes. +#[derive(Debug, Default)] +struct Queue { + /// Contains the cache of the incoming envelopes. + buffer: BTreeMap, EnvelopeContext)>>, + /// Index of the buffered project keys. + index: BTreeMap>, +} + +impl Queue { + /// Creates an empty queue. + pub fn new() -> Self { + Self::default() + } + + /// Adds the value to the queue for the provided key. + pub fn enqueue(&mut self, key: QueueKey, value: (Box, EnvelopeContext)) { + self.index.entry(key.own_key).or_default().insert(key); + self.index.entry(key.sampling_key).or_default().insert(key); + self.buffer.entry(key).or_default().push(value); + } + + /// Returns the list of buffered envelopes if they satisfy a predicate. + pub fn dequeue

( + &mut self, + partial_key: &ProjectKey, + predicate: P, + ) -> Vec<(Box, EnvelopeContext)> + where + P: Fn(&QueueKey) -> bool, + { + let mut result = Vec::new(); + + let mut queue_keys = self.index.remove(partial_key).unwrap_or_default(); + let mut index = BTreeSet::new(); + + while let Some(queue_key) = queue_keys.pop_first() { + // Find those keys which match predicates and return keys into the index, where + // predicate is failing. + if predicate(&queue_key) { + if let Some(envelopes) = self.buffer.remove(&queue_key) { + result.extend(envelopes); + } + } else { + index.insert(queue_key); + } + } + + if !index.is_empty() { + self.index.insert(*partial_key, index); + } + + result + } +} + +impl Drop for Queue { + fn drop(&mut self) { + let count: usize = self.buffer.values().map(|v| v.len()).sum(); + if count > 0 { + relay_log::error!("dropped queue with {} envelopes", count); + } + } +} + /// Main broker of the [`ProjectCacheService`]. /// /// This handles incoming public messages, merges resolved project states, and maintains the actual @@ -412,6 +464,7 @@ struct ProjectCacheBroker { garbage_disposal: GarbageDisposal, source: ProjectSource, state_tx: mpsc::UnboundedSender, + pending_envelopes: Queue, } impl ProjectCacheBroker { @@ -431,7 +484,15 @@ impl ProjectCacheBroker { // Defer dropping the projects to a dedicated thread: let mut count = 0; - for (_, project) in expired { + for (project_key, project) in expired { + // Dequeue all the envelopes linked to the disposable project, which will be dropped + // once this for loop exits with an `Invalid(Internal)` outcome. + let envelopes = self.pending_envelopes.dequeue(&project_key, |_| true); + relay_log::with_scope( + |scope| scope.set_tag("project_key", project_key), + || relay_log::error!("evicted project with {} envelopes", envelopes.len()), + ); + self.garbage_disposal.dispose(project); count += 1; } @@ -439,7 +500,7 @@ impl ProjectCacheBroker { // Log garbage queue size: let queue_size = self.garbage_disposal.queue_size() as f64; - relay_statsd::metric!(gauge(RelayGauges::ProjectCacheGarbageQueueSize) = queue_size); + metric!(gauge(RelayGauges::ProjectCacheGarbageQueueSize) = queue_size); metric!(timer(RelayTimers::ProjectStateEvictionDuration) = eviction_start.elapsed()); } @@ -460,6 +521,10 @@ impl ProjectCacheBroker { }) } + /// Updates the [`Project`] with received [`ProjectState`]. + /// + /// If the project state is valid, the internal `pending_envelopes` queue is also checked if + /// there are any envelopes buffered for this specific project, which could be processed now. fn merge_state(&mut self, message: UpdateProjectState) { let UpdateProjectState { project_key, @@ -468,7 +533,38 @@ impl ProjectCacheBroker { } = message; self.get_or_create_project(project_key) - .update_state(state, no_cache) + .update_state(state.clone(), no_cache); + + // Envelopes need to remain in the queue while Relay receives invalid states from upstream. + if state.invalid() { + return; + } + + let envelopes = self.pending_envelopes.dequeue(&project_key, |queue_key| { + let partial_key = if queue_key.own_key == project_key { + queue_key.sampling_key + } else { + queue_key.own_key + }; + + // We return false if project is not cached or its state is invalid, true otherwise. + // We only have to check `partial_key`, because we already know that the `project_key`s `state` + // is valid and loaded. + self.projects + .get(&partial_key) + // Make sure we have only cached and valid state. + .and_then(|p| p.valid_state()) + .map_or(false, |s| !s.invalid()) + }); + + // Flush envelopes where both states have resolved. + for (envelope, envelope_context) in envelopes { + let sampling_state = utils::get_sampling_key(&envelope) + .and_then(|key| self.projects.get(&key)) + .and_then(|p| p.valid_state()); + + self.handle_processing(state.clone(), sampling_state, envelope, envelope_context); + } } fn handle_request_update(&mut self, message: RequestUpdate) { @@ -519,30 +615,90 @@ impl ProjectCacheBroker { &mut self, message: CheckEnvelope, ) -> Result { - let project = self.get_or_create_project(message.envelope.meta().public_key()); - + let CheckEnvelope { envelope, context } = message; + let project = self.get_or_create_project(envelope.meta().public_key()); // Preload the project cache so that it arrives a little earlier in processing. However, // do not pass `no_cache`. In case the project is rate limited, we do not want to force // a full reload. Fetching must not block the store request. project.prefetch(false); + project.check_envelope(envelope, context) + } + + /// Handles the processing of the provided envelope. + fn handle_processing( + &mut self, + state: Arc, + sampling_state: Option>, + envelope: Box, + envelope_context: EnvelopeContext, + ) { + let project_key = envelope.meta().public_key(); + // The `Envelope` and `EnvelopeContext` will be dropped if the `Project::check_envelope()` + // function returns any error, which will also be ignored here. + let Some(Ok(checked)) = self + .projects + .get_mut(&project_key) + .map(|p| p.check_envelope(envelope, envelope_context)) else { return; }; + + if let Some((envelope, envelope_context)) = checked.envelope { + let mut process = ProcessEnvelope { + envelope, + envelope_context, + project_state: state.clone(), + sampling_project_state: None, + }; - project.check_envelope(message.envelope, message.context) + if let Some(sampling_state) = sampling_state { + if state.organization_id == sampling_state.organization_id { + process.sampling_project_state = Some(sampling_state) + } + } + + EnvelopeProcessor::from_registry().send(process); + } } + /// Checks an incoming envelope and decides either process it immediately or buffer it. + /// + /// Few conditions are checked here: + /// - If there is no dynamic sampling key and the project is already cached, we do straight to + /// processing otherwise buffer the envelopes. + /// - If the dynamic sampling key is provided and if the root and sampling projects + /// are cached - process the envelope, buffer otherwise. + /// + /// This means if the caches are hot we always process all the incoming envelopes without any + /// delay. But in case the project state cannot be fetched, we keep buffering till the state + /// is eventually updated. + /// + /// The flushing of the buffered envelopes happens in `update_state`. fn handle_validate_envelope(&mut self, message: ValidateEnvelope) { - // Preload the project cache for dynamic sampling in parallel to the main one. - if let Some(sampling_key) = utils::get_sampling_key(&message.envelope) { - self.get_or_create_project(sampling_key) - .prefetch(message.envelope.meta().no_cache()); - } + let ValidateEnvelope { envelope, context } = message; + + // Fetch the project state for our key and make sure it's not invalid. + let own_key = envelope.meta().public_key(); + let project_state = self + .get_or_create_project(own_key) + .get_cached_state(envelope.meta().no_cache()) + .filter(|st| !st.invalid()); + + // Also, fetch the project state for sampling key and make sure it's not invalid. + let sampling_key = utils::get_sampling_key(&envelope); + let sampling_state = sampling_key.and_then(|key| { + self.get_or_create_project(key) + .get_cached_state(envelope.meta().no_cache()) + .filter(|st| !st.invalid()) + }); - self.get_or_create_project(message.envelope.meta().public_key()) - .enqueue_validation(message.envelope, message.context); - } + // Trigger processing once we have a project state and we either have a sampling project + // state or we do not need one. + if let Some(state) = project_state { + if sampling_state.is_some() || sampling_key.is_none() { + return self.handle_processing(state, sampling_state, envelope, context); + } + } - fn handle_add_sampling_state(&mut self, message: AddSamplingState) { - self.get_or_create_project(message.project_key) - .enqueue_sampling(message.message); + let key = QueueKey::new(own_key, sampling_key.unwrap_or(own_key)); + self.pending_envelopes.enqueue(key, (envelope, context)); } fn handle_rate_limits(&mut self, message: UpdateRateLimits) { @@ -578,7 +734,6 @@ impl ProjectCacheBroker { sender.send(self.handle_check_envelope(message)) } ProjectCache::ValidateEnvelope(message) => self.handle_validate_envelope(message), - ProjectCache::AddSamplingState(message) => self.handle_add_sampling_state(message), ProjectCache::UpdateRateLimits(message) => self.handle_rate_limits(message), ProjectCache::InsertMetrics(message) => self.handle_insert_metrics(message), ProjectCache::MergeBuckets(message) => self.handle_merge_buckets(message), @@ -622,6 +777,7 @@ impl Service for ProjectCacheService { garbage_disposal: GarbageDisposal::new(), source: ProjectSource::start(config, redis), state_tx, + pending_envelopes: Queue::new(), }; loop { diff --git a/tests/integration/test_query.py b/tests/integration/test_query.py index 2c23cf18c58..dc4e1854bc0 100644 --- a/tests/integration/test_query.py +++ b/tests/integration/test_query.py @@ -18,7 +18,7 @@ def test_local_project_config(mini_sentry, relay): project_id = 42 config = mini_sentry.basic_project_config(project_id) relay_config = { - "cache": {"file_interval": 1, "project_expiry": 0, "project_grace_period": 0} + "cache": {"file_interval": 1, "project_expiry": 1, "project_grace_period": 0} } relay = relay(mini_sentry, relay_config, wait_health_check=False) relay.config_dir.mkdir("projects").join("42.json").write(