1
- use std:: collections:: VecDeque ;
2
1
use std:: sync:: Arc ;
3
2
use std:: time:: Duration ;
4
3
@@ -20,16 +19,15 @@ use relay_system::BroadcastChannel;
20
19
21
20
use crate :: actors:: envelopes:: { EnvelopeManager , SendMetrics } ;
22
21
use crate :: actors:: outcome:: { DiscardReason , Outcome } ;
23
- use crate :: actors:: processor:: { EnvelopeProcessor , ProcessEnvelope } ;
24
- use crate :: actors:: project_cache:: {
25
- AddSamplingState , CheckedEnvelope , ProjectCache , RequestUpdate ,
26
- } ;
22
+ #[ cfg( feature = "processing" ) ]
23
+ use crate :: actors:: processor:: EnvelopeProcessor ;
24
+ use crate :: actors:: project_cache:: { CheckedEnvelope , ProjectCache , RequestUpdate } ;
27
25
use crate :: envelope:: Envelope ;
28
26
use crate :: extractors:: RequestMeta ;
29
27
30
28
use crate :: service:: Registry ;
31
29
use crate :: statsd:: RelayCounters ;
32
- use crate :: utils:: { self , EnvelopeContext , EnvelopeLimiter , MetricsLimiter , RetryBackoff } ;
30
+ use crate :: utils:: { EnvelopeContext , EnvelopeLimiter , MetricsLimiter , RetryBackoff } ;
33
31
34
32
#[ cfg( feature = "processing" ) ]
35
33
use crate :: actors:: processor:: RateLimitFlushBuckets ;
@@ -399,8 +397,6 @@ pub struct Project {
399
397
config : Arc < Config > ,
400
398
state : Option < Arc < ProjectState > > ,
401
399
state_channel : Option < StateChannel > ,
402
- pending_validations : VecDeque < ( Box < Envelope > , EnvelopeContext ) > ,
403
- pending_sampling : VecDeque < ProcessEnvelope > ,
404
400
rate_limits : RateLimits ,
405
401
last_no_cache : Instant ,
406
402
}
@@ -416,8 +412,6 @@ impl Project {
416
412
config,
417
413
state : None ,
418
414
state_channel : None ,
419
- pending_validations : VecDeque :: new ( ) ,
420
- pending_sampling : VecDeque :: new ( ) ,
421
415
rate_limits : RateLimits :: new ( ) ,
422
416
last_no_cache : Instant :: now ( ) ,
423
417
}
@@ -453,7 +447,7 @@ impl Project {
453
447
/// Returns the project state if it is not expired.
454
448
///
455
449
/// Convenience wrapper around [`expiry_state`](Self::expiry_state).
456
- fn valid_state ( & self ) -> Option < Arc < ProjectState > > {
450
+ pub fn valid_state ( & self ) -> Option < Arc < ProjectState > > {
457
451
match self . expiry_state ( ) {
458
452
ExpiryState :: Updated ( state) => Some ( state) ,
459
453
ExpiryState :: Stale ( state) => Some ( state) ,
@@ -642,93 +636,9 @@ impl Project {
642
636
self . get_cached_state ( no_cache) ;
643
637
}
644
638
645
- /// Validates the envelope and submits the envelope to the next stage.
646
- ///
647
- /// If this project is disabled or rate limited, corresponding items are dropped from the
648
- /// envelope. Remaining items in the Envelope are forwarded:
649
- /// - If the envelope needs dynamic sampling, this sends [`AddSamplingState`] to the
650
- /// [`ProjectCache`] to add the required project state.
651
- /// - Otherwise, the envelope is directly submitted to the [`EnvelopeProcessor`].
652
- fn flush_validation (
653
- & mut self ,
654
- envelope : Box < Envelope > ,
655
- envelope_context : EnvelopeContext ,
656
- project_state : Arc < ProjectState > ,
657
- ) {
658
- if let Ok ( checked) = self . check_envelope ( envelope, envelope_context) {
659
- if let Some ( ( envelope, envelope_context) ) = checked. envelope {
660
- let mut process = ProcessEnvelope {
661
- envelope,
662
- envelope_context,
663
- project_state,
664
- sampling_project_state : None ,
665
- } ;
666
-
667
- if let Some ( sampling_key) = utils:: get_sampling_key ( & process. envelope ) {
668
- let own_key = process
669
- . project_state
670
- . get_public_key_config ( )
671
- . map ( |c| c. public_key ) ;
672
-
673
- if Some ( sampling_key) == own_key {
674
- process. sampling_project_state = Some ( process. project_state . clone ( ) ) ;
675
- EnvelopeProcessor :: from_registry ( ) . send ( process) ;
676
- } else {
677
- ProjectCache :: from_registry ( )
678
- . send ( AddSamplingState :: new ( sampling_key, process) ) ;
679
- }
680
- } else {
681
- EnvelopeProcessor :: from_registry ( ) . send ( process) ;
682
- }
683
- }
684
- }
685
- }
686
-
687
- /// Enqueues an envelope for validation.
688
- ///
689
- /// If the project state is up to date, the message will be immediately sent to the next stage.
690
- /// Otherwise, this queues the envelope and flushes it when the project has been updated.
691
- ///
692
- /// This method will trigger an update of the project state internally if the state is stale or
693
- /// outdated.
694
- pub fn enqueue_validation ( & mut self , envelope : Box < Envelope > , context : EnvelopeContext ) {
695
- match self . get_cached_state ( envelope. meta ( ) . no_cache ( ) ) {
696
- Some ( state) if !state. invalid ( ) => self . flush_validation ( envelope, context, state) ,
697
- _ => self . pending_validations . push_back ( ( envelope, context) ) ,
698
- }
699
- }
700
-
701
- /// Adds the project state for dynamic sampling and submits the Envelope for processing.
702
- fn flush_sampling ( & self , mut message : ProcessEnvelope ) {
703
- // Intentionally ignore all errors. Fallback sampling behavior applies in this case.
704
- if let Some ( state) = self . valid_state ( ) . filter ( |state| !state. invalid ( ) ) {
705
- // Never use rules from another organization.
706
- if state. organization_id == message. project_state . organization_id {
707
- message. sampling_project_state = Some ( state) ;
708
- }
709
- }
710
-
711
- EnvelopeProcessor :: from_registry ( ) . send ( message) ;
712
- }
713
-
714
- /// Enqueues an envelope for adding a dynamic sampling project state.
715
- ///
716
- /// If the project state is up to date, the message will be immediately submitted for
717
- /// processing. Otherwise, this queues the envelope and flushes it when the project has been
718
- /// updated.
719
- ///
720
- /// This method will trigger an update of the project state internally if the state is stale or
721
- /// outdated.
722
- pub fn enqueue_sampling ( & mut self , message : ProcessEnvelope ) {
723
- match self . get_cached_state ( message. envelope . meta ( ) . no_cache ( ) ) {
724
- Some ( _) => self . flush_sampling ( message) ,
725
- None => self . pending_sampling . push_back ( message) ,
726
- }
727
- }
728
-
729
639
/// Replaces the internal project state with a new one and triggers pending actions.
730
640
///
731
- /// This flushes pending envelopes from [`ValidateEnvelope`] and [`AddSamplingState`] and
641
+ /// This flushes pending envelopes from [`ValidateEnvelope`] and
732
642
/// notifies all pending receivers from [`get_state`](Self::get_state).
733
643
///
734
644
/// `no_cache` should be passed from the requesting call. Updates with `no_cache` will always
@@ -775,16 +685,6 @@ impl Project {
775
685
return ;
776
686
}
777
687
778
- // Flush all queued `ValidateEnvelope` messages
779
- while let Some ( ( envelope, context) ) = self . pending_validations . pop_front ( ) {
780
- self . flush_validation ( envelope, context, state. clone ( ) ) ;
781
- }
782
-
783
- // Flush all queued `AddSamplingState` messages
784
- while let Some ( message) = self . pending_sampling . pop_front ( ) {
785
- self . flush_sampling ( message) ;
786
- }
787
-
788
688
// Flush all waiting recipients.
789
689
relay_log:: debug!( "project state {} updated" , self . project_key) ;
790
690
channel. inner . send ( state) ;
@@ -921,18 +821,6 @@ impl Project {
921
821
}
922
822
}
923
823
924
- impl Drop for Project {
925
- fn drop ( & mut self ) {
926
- let count = self . pending_validations . len ( ) + self . pending_sampling . len ( ) ;
927
- if count > 0 {
928
- relay_log:: with_scope (
929
- |scope| scope. set_tag ( "project_key" , self . project_key ) ,
930
- || relay_log:: error!( "dropped project with {} envelopes" , count) ,
931
- ) ;
932
- }
933
- }
934
- }
935
-
936
824
#[ cfg( test) ]
937
825
mod tests {
938
826
use std:: sync:: Arc ;
0 commit comments