-
Notifications
You must be signed in to change notification settings - Fork 98
ref(cache): Move buffering of pending envelope to ProjectCache #1879
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic seems sound to me, just some general questions / code suggestions. Let me know if I made any wrong assumptions.
if let Ok(CheckedEnvelope { | ||
envelope: Some((envelope, envelope_context)), | ||
.. | ||
}) = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it OK to swallow the error case here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's how this was done before, but we also can rethink this behaviour now.
for qkey in keys.drain() { | ||
if f(&qkey) { | ||
if let Some(envelopes) = self.buffer.remove(&qkey) { | ||
result.extend(envelopes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if this function could return an iterator of envelopes, something like
keys.drain().map(|qkey| {
// ...
envelopes.into_iter()
}).flatten()
Just need to be careful to consume the iterator when calling dequeue
before garbage disposal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still have to think about this a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can use drain_filter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iker-barriocanal that's a nightly feature, unfortunately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a blocker for the current PR, but generally speaking, if all you want to do is iterate over a sequence once, there is no reason to copy its data over to a new vector.
For our case specifically, once we change to disk spooling, we will need some form of batching anyway, because the dequeued data will be too large to copy into one big vector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right.
And I also think it is not important for now but we definitely will have to change this behaviour once we start handling the disk access and might have many more envelopes in persistent buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused with this PR, so mostly questions.
#[cfg(feature = "processing")] | ||
use crate::actors::processor::EnvelopeProcessor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this only targeting processing relays?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have the following in the code:
relay/relay-server/src/actors/project.rs
Lines 798 to 807 in 381c893
#[cfg(feature = "processing")] | |
if !was_rate_limited && config.processing_enabled() { | |
// If there were no cached rate limits active, let the processor check redis: | |
EnvelopeProcessor::from_registry().send(RateLimitFlushBuckets { | |
bucket_limiter, | |
partition_key, | |
}); | |
return; | |
} |
which already guards some of the code behind the feature flag, so the import must be there as well, otherwise the compiler will complain.
self.index.entry(key.key).or_default().insert(key); | ||
self.index.entry(key.sampling_key).or_default().insert(key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the purpose of another entry with key.sampling_key
, why are we doing that?
for qkey in keys.drain() { | ||
if f(&qkey) { | ||
if let Some(envelopes) = self.buffer.remove(&qkey) { | ||
result.extend(envelopes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can use drain_filter
?
|
||
// We return false if project is not cached or its state is invalid. | ||
self.projects | ||
.get(&queue_key.sampling_key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why queue_key.sampling_key
, instead of queue_key.key
?
{ | ||
let mut result = Vec::new(); | ||
|
||
let mut keys = self.index.remove(partial_key).unwrap_or_default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let mut keys = self.index.remove(partial_key).unwrap_or_default(); | |
let mut queue_keys = self.index.remove(partial_key).unwrap_or_default(); |
pub fn dequeue<P>( | ||
&mut self, | ||
partial_key: &ProjectKey, | ||
f: P, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f: P, | |
predicate: P, |
for qkey in keys.drain() { | ||
if f(&qkey) { | ||
if let Some(envelopes) = self.buffer.remove(&qkey) { | ||
result.extend(envelopes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iker-barriocanal that's a nightly feature, unfortunately.
let sampling_state = sampling_key.and_then(|key| { | ||
self.get_or_create_project(key) | ||
.get_cached_state(envelope.meta().no_cache()) | ||
.filter(|st| !st.invalid()) | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be moved inside if let Some(state) =
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bumping this comment. There's no need to get_or_create_project
the sampling state when the project_state
is None
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other places we use projects map directly, and from what I see this is the only one place now we call self.get_or_create_project(...).get_cached_state(...)
which initiate the upstream request to update the sampling state if it's not in the cache.
So if the project_state
is None
we are most probably request the update for it, and now just queue the envelope, and at the same time we request already sampling state to be able to process those queued envelopes when state are updated.
But maybe I'm missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, you are right. I overlooked that get_cached_state
triggers a fetch from the upstream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please read the comments (especially the one about root_key
). Nothing blocking though.
for qkey in keys.drain() { | ||
if f(&qkey) { | ||
if let Some(envelopes) = self.buffer.remove(&qkey) { | ||
result.extend(envelopes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a blocker for the current PR, but generally speaking, if all you want to do is iterate over a sequence once, there is no reason to copy its data over to a new vector.
For our case specifically, once we change to disk spooling, we will need some form of batching anyway, because the dequeued data will be too large to copy into one big vector.
@@ -374,13 +374,16 @@ struct UpdateProjectState { | |||
|
|||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] | |||
struct QueueKey { | |||
key: ProjectKey, | |||
root_key: ProjectKey, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid root
is too ambiguous here, because it's also used to refer to the "root" of the trace, which is the sampling project. I have no better name to be honest. Maybe own_key
or project_key
?
|
||
// We return false if project is not cached or its state is invalid. | ||
// We return false if project is not cached or its state is invalid, true otherwise. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// We return false if project is not cached or its state is invalid, true otherwise. | |
// We return false if project is not cached or its state is invalid, true otherwise. | |
// We only have to check `partial_key`, because we already know that the `project_key`s `state` is valid and loaded. |
.and_then(|key| self.projects.get(&key)) | ||
.and_then(|p| p.valid_state()); | ||
|
||
self.handle_processing(state.clone(), sampling_state, envelope, envelope_context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.handle_processing(state.clone(), sampling_state, envelope, envelope_context) | |
self.handle_processing(state.clone(), sampling_state, envelope, envelope_context); |
let sampling_state = sampling_key.and_then(|key| { | ||
self.get_or_create_project(key) | ||
.get_cached_state(envelope.meta().no_cache()) | ||
.filter(|st| !st.invalid()) | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bumping this comment. There's no need to get_or_create_project
the sampling state when the project_state
is None
.
let ValidateEnvelope { envelope, context } = message; | ||
|
||
// Fetch the project state for our key and make sure it's not invalid. | ||
let root_key = envelope.meta().public_key(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment on ambiguity of "root project".
relay/relay-server/src/actors/processor.rs
Line 224 in e7c862e
/// Metrics associated with the sampling project (a.k.a. root or head project) |
@@ -131,11 +133,12 @@ impl CheckEnvelope { | |||
/// [`CheckEnvelope`]. Once the envelope has been validated, remaining items are forwarded to the | |||
/// next stage: | |||
/// | |||
/// - If the envelope needs dynamic sampling, this sends [`AddSamplingState`] to the | |||
/// [`ProjectCache`] to add the required project state. | |||
/// - If the envelope needs dynamic sampling, and the project state is not cached or out of the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We could rewrite parts of the paragraph before. It talks about stages, but now we just wait for both project states concurrently (or simultaneously) and then move on to CheckEnvelope
.
@@ -400,6 +373,92 @@ struct UpdateProjectState { | |||
no_cache: bool, | |||
} | |||
|
|||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @jjbayer, do we want to define the order of derives in the coding guideline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jan-auer what rule would you suggest? Derives are order-sensitive, so there are some limits (see rust-lang/rustfmt#1867 (comment)).
…e" (#1906) Reverts #1879 Introduced regression: [RELAY-2NFR](https://sentry.my.sentry.io/organizations/sentry/issues/393031/events/7dae7f8e304e456ea5355a2169f3a075/?project=4) The project key on the ProjectState was missing.
These changes moving buffering of the incoming envelopes in the
ProjectCache
.Current implementation still keeps, so called queue in memory and using
HashMap
with a composite keyQueueKey {key, sampling_key}
, wheresampling_key
can be the same as a key if there is no sampling project identified. The values to these keys areVec
of boxedEnvelope
with theirEnvelopeContext
.Once we get an update for project state, we check all variants of
QueueKey
which contains the currentProjectKey
and if all the project states are cached we try to flush buffered envelopes indexed by theseQeueuKey
.The envelops will be buffered if:
This change also removes all the buffering from the
Project
and reduces its responsibility. Now it just keeps its own state and configuration and the envelope handling is done outside of it.