1
+ use std:: collections:: VecDeque ;
1
2
use std:: sync:: Arc ;
2
3
use std:: time:: Duration ;
3
4
@@ -19,15 +20,16 @@ use relay_system::BroadcastChannel;
19
20
20
21
use crate :: actors:: envelopes:: { EnvelopeManager , SendMetrics } ;
21
22
use crate :: actors:: outcome:: { DiscardReason , Outcome } ;
22
- #[ cfg( feature = "processing" ) ]
23
- use crate :: actors:: processor:: EnvelopeProcessor ;
24
- use crate :: actors:: project_cache:: { CheckedEnvelope , ProjectCache , RequestUpdate } ;
23
+ use crate :: actors:: processor:: { EnvelopeProcessor , ProcessEnvelope } ;
24
+ use crate :: actors:: project_cache:: {
25
+ AddSamplingState , CheckedEnvelope , ProjectCache , RequestUpdate ,
26
+ } ;
25
27
use crate :: envelope:: Envelope ;
26
28
use crate :: extractors:: RequestMeta ;
27
29
28
30
use crate :: service:: Registry ;
29
31
use crate :: statsd:: RelayCounters ;
30
- use crate :: utils:: { EnvelopeContext , EnvelopeLimiter , MetricsLimiter , RetryBackoff } ;
32
+ use crate :: utils:: { self , EnvelopeContext , EnvelopeLimiter , MetricsLimiter , RetryBackoff } ;
31
33
32
34
#[ cfg( feature = "processing" ) ]
33
35
use crate :: actors:: processor:: RateLimitFlushBuckets ;
@@ -397,6 +399,8 @@ pub struct Project {
397
399
config : Arc < Config > ,
398
400
state : Option < Arc < ProjectState > > ,
399
401
state_channel : Option < StateChannel > ,
402
+ pending_validations : VecDeque < ( Box < Envelope > , EnvelopeContext ) > ,
403
+ pending_sampling : VecDeque < ProcessEnvelope > ,
400
404
rate_limits : RateLimits ,
401
405
last_no_cache : Instant ,
402
406
}
@@ -412,6 +416,8 @@ impl Project {
412
416
config,
413
417
state : None ,
414
418
state_channel : None ,
419
+ pending_validations : VecDeque :: new ( ) ,
420
+ pending_sampling : VecDeque :: new ( ) ,
415
421
rate_limits : RateLimits :: new ( ) ,
416
422
last_no_cache : Instant :: now ( ) ,
417
423
}
@@ -447,7 +453,7 @@ impl Project {
447
453
/// Returns the project state if it is not expired.
448
454
///
449
455
/// Convenience wrapper around [`expiry_state`](Self::expiry_state).
450
- pub fn valid_state ( & self ) -> Option < Arc < ProjectState > > {
456
+ fn valid_state ( & self ) -> Option < Arc < ProjectState > > {
451
457
match self . expiry_state ( ) {
452
458
ExpiryState :: Updated ( state) => Some ( state) ,
453
459
ExpiryState :: Stale ( state) => Some ( state) ,
@@ -636,9 +642,93 @@ impl Project {
636
642
self . get_cached_state ( no_cache) ;
637
643
}
638
644
645
+ /// Validates the envelope and submits the envelope to the next stage.
646
+ ///
647
+ /// If this project is disabled or rate limited, corresponding items are dropped from the
648
+ /// envelope. Remaining items in the Envelope are forwarded:
649
+ /// - If the envelope needs dynamic sampling, this sends [`AddSamplingState`] to the
650
+ /// [`ProjectCache`] to add the required project state.
651
+ /// - Otherwise, the envelope is directly submitted to the [`EnvelopeProcessor`].
652
+ fn flush_validation (
653
+ & mut self ,
654
+ envelope : Box < Envelope > ,
655
+ envelope_context : EnvelopeContext ,
656
+ project_state : Arc < ProjectState > ,
657
+ ) {
658
+ if let Ok ( checked) = self . check_envelope ( envelope, envelope_context) {
659
+ if let Some ( ( envelope, envelope_context) ) = checked. envelope {
660
+ let mut process = ProcessEnvelope {
661
+ envelope,
662
+ envelope_context,
663
+ project_state,
664
+ sampling_project_state : None ,
665
+ } ;
666
+
667
+ if let Some ( sampling_key) = utils:: get_sampling_key ( & process. envelope ) {
668
+ let own_key = process
669
+ . project_state
670
+ . get_public_key_config ( )
671
+ . map ( |c| c. public_key ) ;
672
+
673
+ if Some ( sampling_key) == own_key {
674
+ process. sampling_project_state = Some ( process. project_state . clone ( ) ) ;
675
+ EnvelopeProcessor :: from_registry ( ) . send ( process) ;
676
+ } else {
677
+ ProjectCache :: from_registry ( )
678
+ . send ( AddSamplingState :: new ( sampling_key, process) ) ;
679
+ }
680
+ } else {
681
+ EnvelopeProcessor :: from_registry ( ) . send ( process) ;
682
+ }
683
+ }
684
+ }
685
+ }
686
+
687
+ /// Enqueues an envelope for validation.
688
+ ///
689
+ /// If the project state is up to date, the message will be immediately sent to the next stage.
690
+ /// Otherwise, this queues the envelope and flushes it when the project has been updated.
691
+ ///
692
+ /// This method will trigger an update of the project state internally if the state is stale or
693
+ /// outdated.
694
+ pub fn enqueue_validation ( & mut self , envelope : Box < Envelope > , context : EnvelopeContext ) {
695
+ match self . get_cached_state ( envelope. meta ( ) . no_cache ( ) ) {
696
+ Some ( state) if !state. invalid ( ) => self . flush_validation ( envelope, context, state) ,
697
+ _ => self . pending_validations . push_back ( ( envelope, context) ) ,
698
+ }
699
+ }
700
+
701
+ /// Adds the project state for dynamic sampling and submits the Envelope for processing.
702
+ fn flush_sampling ( & self , mut message : ProcessEnvelope ) {
703
+ // Intentionally ignore all errors. Fallback sampling behavior applies in this case.
704
+ if let Some ( state) = self . valid_state ( ) . filter ( |state| !state. invalid ( ) ) {
705
+ // Never use rules from another organization.
706
+ if state. organization_id == message. project_state . organization_id {
707
+ message. sampling_project_state = Some ( state) ;
708
+ }
709
+ }
710
+
711
+ EnvelopeProcessor :: from_registry ( ) . send ( message) ;
712
+ }
713
+
714
+ /// Enqueues an envelope for adding a dynamic sampling project state.
715
+ ///
716
+ /// If the project state is up to date, the message will be immediately submitted for
717
+ /// processing. Otherwise, this queues the envelope and flushes it when the project has been
718
+ /// updated.
719
+ ///
720
+ /// This method will trigger an update of the project state internally if the state is stale or
721
+ /// outdated.
722
+ pub fn enqueue_sampling ( & mut self , message : ProcessEnvelope ) {
723
+ match self . get_cached_state ( message. envelope . meta ( ) . no_cache ( ) ) {
724
+ Some ( _) => self . flush_sampling ( message) ,
725
+ None => self . pending_sampling . push_back ( message) ,
726
+ }
727
+ }
728
+
639
729
/// Replaces the internal project state with a new one and triggers pending actions.
640
730
///
641
- /// This flushes pending envelopes from [`ValidateEnvelope`] and
731
+ /// This flushes pending envelopes from [`ValidateEnvelope`] and [`AddSamplingState`] and
642
732
/// notifies all pending receivers from [`get_state`](Self::get_state).
643
733
///
644
734
/// `no_cache` should be passed from the requesting call. Updates with `no_cache` will always
@@ -685,6 +775,16 @@ impl Project {
685
775
return ;
686
776
}
687
777
778
+ // Flush all queued `ValidateEnvelope` messages
779
+ while let Some ( ( envelope, context) ) = self . pending_validations . pop_front ( ) {
780
+ self . flush_validation ( envelope, context, state. clone ( ) ) ;
781
+ }
782
+
783
+ // Flush all queued `AddSamplingState` messages
784
+ while let Some ( message) = self . pending_sampling . pop_front ( ) {
785
+ self . flush_sampling ( message) ;
786
+ }
787
+
688
788
// Flush all waiting recipients.
689
789
relay_log:: debug!( "project state {} updated" , self . project_key) ;
690
790
channel. inner . send ( state) ;
@@ -821,6 +921,18 @@ impl Project {
821
921
}
822
922
}
823
923
924
+ impl Drop for Project {
925
+ fn drop ( & mut self ) {
926
+ let count = self . pending_validations . len ( ) + self . pending_sampling . len ( ) ;
927
+ if count > 0 {
928
+ relay_log:: with_scope (
929
+ |scope| scope. set_tag ( "project_key" , self . project_key ) ,
930
+ || relay_log:: error!( "dropped project with {} envelopes" , count) ,
931
+ ) ;
932
+ }
933
+ }
934
+ }
935
+
824
936
#[ cfg( test) ]
825
937
mod tests {
826
938
use std:: sync:: Arc ;
0 commit comments