Skip to content

ref(cache): Move buffering of pending envelope to ProjectCache #1879

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a6a4682
ref(cache): Move buffering of pending envelope to ProjectCache
olksdr Feb 23, 2023
5fbe8a9
Fix lint
olksdr Feb 24, 2023
5a9c421
Merge branch 'master' into feat/project-cache-queue
olksdr Feb 24, 2023
1003234
Add CHANGELOG entry
olksdr Feb 24, 2023
d649845
Reuse check_envelope fn
olksdr Feb 24, 2023
5ac4c61
Introduce the index
olksdr Feb 24, 2023
c6554de
Small cleanup and docs
olksdr Feb 25, 2023
716cf9a
Merge remote-tracking branch 'origin/master' into feat/project-cache-…
olksdr Feb 25, 2023
00c3c1e
Simplify taking keys out of the index
olksdr Feb 25, 2023
5296068
Use hash set for index
olksdr Feb 25, 2023
ceb590d
Refactor processing part into separate func
olksdr Feb 27, 2023
4a1c670
Dispose envelopes linked to the expired projects
olksdr Feb 27, 2023
d4f1f8e
Merge branch 'master' into feat/project-cache-queue
olksdr Feb 27, 2023
99d4df0
Merge branch 'master' into feat/project-cache-queue
olksdr Mar 1, 2023
435ac7b
Switch to BTreeSet and address some review comments
olksdr Mar 1, 2023
cb2a1c5
ref: Simplify
jan-auer Mar 1, 2023
da7ebfb
ref: Changes from review
jan-auer Mar 1, 2023
67c94f3
Log only if there is anything to log
olksdr Mar 1, 2023
a143882
Change the condition to filter out invalid states
olksdr Mar 1, 2023
40c2727
Pick envelopes for the incoming root project
olksdr Mar 1, 2023
561dcf0
Merge branch 'master' into feat/project-cache-queue
olksdr Mar 1, 2023
03917a5
simplified predicated, use projects map
olksdr Mar 2, 2023
2d2edfa
Fix dequeue and use the proper project key for processing.
olksdr Mar 2, 2023
0f9b507
Update comments
olksdr Mar 2, 2023
46c14b5
Merge branch 'master' into feat/project-cache-queue
olksdr Mar 2, 2023
bcac670
review comments
olksdr Mar 2, 2023
381c893
review comments
olksdr Mar 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- Revert back the addition of metric names as tag on Sentry errors when relay drops metrics. ([#1873](https://github.com/getsentry/relay/pull/1873))
- Tag the dynamic sampling decision on `count_per_root_project` to measure effective sample rates. ([#1870](https://github.com/getsentry/relay/pull/1870))
- Deprecate fields on the profiling sample format. ([#1878](https://github.com/getsentry/relay/pull/1878))
- Move the pending envelopes buffering into the project cache. ([#1879](https://github.com/getsentry/relay/pull/1879))

## 23.2.0

Expand Down
124 changes: 6 additions & 118 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -20,16 +19,15 @@ use relay_system::BroadcastChannel;

use crate::actors::envelopes::{EnvelopeManager, SendMetrics};
use crate::actors::outcome::{DiscardReason, Outcome};
use crate::actors::processor::{EnvelopeProcessor, ProcessEnvelope};
use crate::actors::project_cache::{
AddSamplingState, CheckedEnvelope, ProjectCache, RequestUpdate,
};
#[cfg(feature = "processing")]
use crate::actors::processor::EnvelopeProcessor;
Comment on lines +22 to +23
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this only targeting processing relays?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the following in the code:

#[cfg(feature = "processing")]
if !was_rate_limited && config.processing_enabled() {
// If there were no cached rate limits active, let the processor check redis:
EnvelopeProcessor::from_registry().send(RateLimitFlushBuckets {
bucket_limiter,
partition_key,
});
return;
}

which already guards some of the code behind the feature flag, so the import must be there as well, otherwise the compiler will complain.

use crate::actors::project_cache::{CheckedEnvelope, ProjectCache, RequestUpdate};
use crate::envelope::Envelope;
use crate::extractors::RequestMeta;

use crate::service::Registry;
use crate::statsd::RelayCounters;
use crate::utils::{self, EnvelopeContext, EnvelopeLimiter, MetricsLimiter, RetryBackoff};
use crate::utils::{EnvelopeContext, EnvelopeLimiter, MetricsLimiter, RetryBackoff};

#[cfg(feature = "processing")]
use crate::actors::processor::RateLimitFlushBuckets;
Expand Down Expand Up @@ -399,8 +397,6 @@ pub struct Project {
config: Arc<Config>,
state: Option<Arc<ProjectState>>,
state_channel: Option<StateChannel>,
pending_validations: VecDeque<(Box<Envelope>, EnvelopeContext)>,
pending_sampling: VecDeque<ProcessEnvelope>,
rate_limits: RateLimits,
last_no_cache: Instant,
}
Expand All @@ -416,8 +412,6 @@ impl Project {
config,
state: None,
state_channel: None,
pending_validations: VecDeque::new(),
pending_sampling: VecDeque::new(),
rate_limits: RateLimits::new(),
last_no_cache: Instant::now(),
}
Expand Down Expand Up @@ -453,7 +447,7 @@ impl Project {
/// Returns the project state if it is not expired.
///
/// Convenience wrapper around [`expiry_state`](Self::expiry_state).
fn valid_state(&self) -> Option<Arc<ProjectState>> {
pub fn valid_state(&self) -> Option<Arc<ProjectState>> {
match self.expiry_state() {
ExpiryState::Updated(state) => Some(state),
ExpiryState::Stale(state) => Some(state),
Expand Down Expand Up @@ -642,93 +636,9 @@ impl Project {
self.get_cached_state(no_cache);
}

/// Validates the envelope and submits the envelope to the next stage.
///
/// If this project is disabled or rate limited, corresponding items are dropped from the
/// envelope. Remaining items in the Envelope are forwarded:
/// - If the envelope needs dynamic sampling, this sends [`AddSamplingState`] to the
/// [`ProjectCache`] to add the required project state.
/// - Otherwise, the envelope is directly submitted to the [`EnvelopeProcessor`].
fn flush_validation(
&mut self,
envelope: Box<Envelope>,
envelope_context: EnvelopeContext,
project_state: Arc<ProjectState>,
) {
if let Ok(checked) = self.check_envelope(envelope, envelope_context) {
if let Some((envelope, envelope_context)) = checked.envelope {
let mut process = ProcessEnvelope {
envelope,
envelope_context,
project_state,
sampling_project_state: None,
};

if let Some(sampling_key) = utils::get_sampling_key(&process.envelope) {
let own_key = process
.project_state
.get_public_key_config()
.map(|c| c.public_key);

if Some(sampling_key) == own_key {
process.sampling_project_state = Some(process.project_state.clone());
EnvelopeProcessor::from_registry().send(process);
} else {
ProjectCache::from_registry()
.send(AddSamplingState::new(sampling_key, process));
}
} else {
EnvelopeProcessor::from_registry().send(process);
}
}
}
}

/// Enqueues an envelope for validation.
///
/// If the project state is up to date, the message will be immediately sent to the next stage.
/// Otherwise, this queues the envelope and flushes it when the project has been updated.
///
/// This method will trigger an update of the project state internally if the state is stale or
/// outdated.
pub fn enqueue_validation(&mut self, envelope: Box<Envelope>, context: EnvelopeContext) {
match self.get_cached_state(envelope.meta().no_cache()) {
Some(state) if !state.invalid() => self.flush_validation(envelope, context, state),
_ => self.pending_validations.push_back((envelope, context)),
}
}

/// Adds the project state for dynamic sampling and submits the Envelope for processing.
fn flush_sampling(&self, mut message: ProcessEnvelope) {
// Intentionally ignore all errors. Fallback sampling behavior applies in this case.
if let Some(state) = self.valid_state().filter(|state| !state.invalid()) {
// Never use rules from another organization.
if state.organization_id == message.project_state.organization_id {
message.sampling_project_state = Some(state);
}
}

EnvelopeProcessor::from_registry().send(message);
}

/// Enqueues an envelope for adding a dynamic sampling project state.
///
/// If the project state is up to date, the message will be immediately submitted for
/// processing. Otherwise, this queues the envelope and flushes it when the project has been
/// updated.
///
/// This method will trigger an update of the project state internally if the state is stale or
/// outdated.
pub fn enqueue_sampling(&mut self, message: ProcessEnvelope) {
match self.get_cached_state(message.envelope.meta().no_cache()) {
Some(_) => self.flush_sampling(message),
None => self.pending_sampling.push_back(message),
}
}

/// Replaces the internal project state with a new one and triggers pending actions.
///
/// This flushes pending envelopes from [`ValidateEnvelope`] and [`AddSamplingState`] and
/// This flushes pending envelopes from [`ValidateEnvelope`] and
/// notifies all pending receivers from [`get_state`](Self::get_state).
///
/// `no_cache` should be passed from the requesting call. Updates with `no_cache` will always
Expand Down Expand Up @@ -775,16 +685,6 @@ impl Project {
return;
}

// Flush all queued `ValidateEnvelope` messages
while let Some((envelope, context)) = self.pending_validations.pop_front() {
self.flush_validation(envelope, context, state.clone());
}

// Flush all queued `AddSamplingState` messages
while let Some(message) = self.pending_sampling.pop_front() {
self.flush_sampling(message);
}

// Flush all waiting recipients.
relay_log::debug!("project state {} updated", self.project_key);
channel.inner.send(state);
Expand Down Expand Up @@ -921,18 +821,6 @@ impl Project {
}
}

impl Drop for Project {
fn drop(&mut self) {
let count = self.pending_validations.len() + self.pending_sampling.len();
if count > 0 {
relay_log::with_scope(
|scope| scope.set_tag("project_key", self.project_key),
|| relay_log::error!("dropped project with {} envelopes", count),
);
}
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
Loading