diff --git a/CHANGELOG.md b/CHANGELOG.md index 611bea10e29..f7069193298 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Add logic to extract event json from userdata in prosperodumps. ([#4755](https://github.com/getsentry/relay/pull/4755) - Add browser name/version to logs. ([#4757](https://github.com/getsentry/relay/pull/4757)) - Accept standalone spans in the V2 format. This feature is still highly experimental! ([#4771](https://github.com/getsentry/relay/pull/4771)) +- Enable filtering sessions by IP address, release, and user agent. ([#4745](https://github.com/getsentry/relay/pull/4745)) **Internal**: diff --git a/relay-event-schema/src/protocol/session.rs b/relay-event-schema/src/protocol/session.rs index 85c7becb400..edd4d38e4ae 100644 --- a/relay-event-schema/src/protocol/session.rs +++ b/relay-event-schema/src/protocol/session.rs @@ -2,6 +2,7 @@ use std::fmt::{self, Display}; use std::time::SystemTime; use chrono::{DateTime, Utc}; +use relay_protocol::Getter; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -269,6 +270,14 @@ impl SessionLike for SessionUpdate { } } +// Dummy implementation of `Getter` to satisfy the bound of `should_filter`. +// We don't actually want to use `get_value` at this time.` +impl Getter for SessionUpdate { + fn get_value(&self, _path: &str) -> Option> { + None + } +} + #[allow(clippy::trivially_copy_pass_by_ref)] fn is_zero(val: &u32) -> bool { *val == 0 @@ -353,6 +362,14 @@ impl SessionAggregates { } } +// Dummy implementation of `Getter` to satisfy the bound of `should_filter`. +// We don't actually want to use `get_value` at this time.` +impl Getter for SessionAggregates { + fn get_value(&self, _path: &str) -> Option> { + None + } +} + #[cfg(test)] mod tests { diff --git a/relay-filter/src/interface.rs b/relay-filter/src/interface.rs index 2246eec3dce..24802ace2e5 100644 --- a/relay-filter/src/interface.rs +++ b/relay-filter/src/interface.rs @@ -3,7 +3,8 @@ use url::Url; use relay_event_schema::protocol::{ - Csp, Event, EventType, Exception, LogEntry, Replay, Span, Values, + Csp, Event, EventType, Exception, LogEntry, Replay, SessionAggregates, SessionUpdate, Span, + Values, }; /// A data item to which filters can be applied. @@ -176,3 +177,85 @@ impl Filterable for Span { None } } + +impl Filterable for SessionUpdate { + fn csp(&self) -> Option<&Csp> { + None + } + + fn exceptions(&self) -> Option<&Values> { + None + } + + fn ip_addr(&self) -> Option<&str> { + self.attributes + .ip_address + .as_ref() + .map(|addr| addr.as_str()) + } + + fn logentry(&self) -> Option<&LogEntry> { + None + } + + fn release(&self) -> Option<&str> { + Some(&self.attributes.release) + } + + fn transaction(&self) -> Option<&str> { + None + } + + fn url(&self) -> Option { + None + } + + fn user_agent(&self) -> Option<&str> { + self.attributes.user_agent.as_deref() + } + + fn header(&self, _header_name: &str) -> Option<&str> { + None + } +} + +impl Filterable for SessionAggregates { + fn csp(&self) -> Option<&Csp> { + None + } + + fn exceptions(&self) -> Option<&Values> { + None + } + + fn ip_addr(&self) -> Option<&str> { + self.attributes + .ip_address + .as_ref() + .map(|addr| addr.as_str()) + } + + fn logentry(&self) -> Option<&LogEntry> { + None + } + + fn release(&self) -> Option<&str> { + Some(&self.attributes.release) + } + + fn transaction(&self) -> Option<&str> { + None + } + + fn url(&self) -> Option { + None + } + + fn user_agent(&self) -> Option<&str> { + self.attributes.user_agent.as_deref() + } + + fn header(&self, _header_name: &str) -> Option<&str> { + None + } +} diff --git a/relay-filter/src/lib.rs b/relay-filter/src/lib.rs index 7697429520e..8f5e6c968e3 100644 --- a/relay-filter/src/lib.rs +++ b/relay-filter/src/lib.rs @@ -44,6 +44,10 @@ pub use interface::Filterable; /// /// If the event should be filtered, the `Err` returned contains a filter reason. /// The reason is the message returned by the first filter that didn't pass. +/// +/// The `client_ip` parameter is the "client IP" extracted from the envelope. It's +/// used for client IP filtering and should not be confused with a "user IP" that may +/// be contained in the item, which is used for localhost filtering. pub fn should_filter( item: &F, client_ip: Option, diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index bf5effaf99b..64c4d3da7e0 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -2010,24 +2010,26 @@ impl EnvelopeProcessorService { async fn process_sessions( &self, managed_envelope: &mut TypedEnvelope, - project_info: Arc, - rate_limits: Arc, + config: &Config, + project_info: &ProjectInfo, + rate_limits: &RateLimits, ) -> Result, ProcessingError> { let mut extracted_metrics = ProcessingExtractedMetrics::new(); session::process( managed_envelope, + &self.inner.global_config.current(), + config, &mut extracted_metrics, - &project_info, - &self.inner.config, + project_info, ); self.enforce_quotas( managed_envelope, Annotated::empty(), &mut extracted_metrics, - &project_info, - &rate_limits, + project_info, + rate_limits, ) .await?; @@ -2287,7 +2289,12 @@ impl EnvelopeProcessorService { reservoir_counters ) } - ProcessingGroup::Session => run!(process_sessions, project_info, rate_limits), + ProcessingGroup::Session => run!( + process_sessions, + &self.inner.config.clone(), + &project_info, + &rate_limits + ), ProcessingGroup::Standalone => run!( process_standalone, self.inner.config.clone(), diff --git a/relay-server/src/services/processor/session.rs b/relay-server/src/services/processor/session.rs index 80313b89d68..8fb5be13349 100644 --- a/relay-server/src/services/processor/session.rs +++ b/relay-server/src/services/processor/session.rs @@ -5,11 +5,12 @@ use std::net; use chrono::{DateTime, Duration as SignedDuration, Utc}; use relay_config::Config; -use relay_dynamic_config::SessionMetricsConfig; +use relay_dynamic_config::{GlobalConfig, SessionMetricsConfig}; use relay_event_normalization::ClockDriftProcessor; use relay_event_schema::protocol::{ IpAddr, SessionAggregates, SessionAttributes, SessionStatus, SessionUpdate, }; +use relay_filter::ProjectFiltersConfig; use relay_metrics::Bucket; use relay_statsd::metric; @@ -19,18 +20,30 @@ use crate::services::projects::project::ProjectInfo; use crate::statsd::RelayTimers; use crate::utils::{ItemAction, TypedEnvelope}; +#[derive(Debug, Clone, Copy)] +struct SessionProcessingConfig<'a> { + pub global_config: &'a GlobalConfig, + pub config: &'a Config, + pub filters_config: &'a ProjectFiltersConfig, + pub metrics_config: &'a SessionMetricsConfig, + pub client: Option<&'a str>, + pub client_addr: Option, + pub received: DateTime, + pub clock_drift_processor: &'a ClockDriftProcessor, +} + /// Validates all sessions and session aggregates in the envelope, if any. /// /// Both are removed from the envelope if they contain invalid JSON or if their timestamps /// are out of range after clock drift correction. pub fn process( managed_envelope: &mut TypedEnvelope, + global_config: &GlobalConfig, + config: &Config, extracted_metrics: &mut ProcessingExtractedMetrics, project_info: &ProjectInfo, - config: &Config, ) { let received = managed_envelope.received_at(); - let metrics_config = project_info.config().session_metrics; let envelope = managed_envelope.envelope_mut(); let client = envelope.meta().client().map(|x| x.to_owned()); let client_addr = envelope.meta().client_addr(); @@ -38,29 +51,24 @@ pub fn process( let clock_drift_processor = ClockDriftProcessor::new(envelope.sent_at(), received).at_least(MINIMUM_CLOCK_DRIFT); + let spc = SessionProcessingConfig { + global_config, + config, + filters_config: &project_info.config().filter_settings, + metrics_config: &project_info.config().session_metrics, + client: client.as_deref(), + client_addr, + received, + clock_drift_processor: &clock_drift_processor, + }; + let mut session_extracted_metrics = Vec::new(); managed_envelope.retain_items(|item| { let should_keep = match item.ty() { - ItemType::Session => process_session( - item, - config, - received, - client.as_deref(), - client_addr, - metrics_config, - &clock_drift_processor, - &mut session_extracted_metrics, - ), - ItemType::Sessions => process_session_aggregates( - item, - config, - received, - client.as_deref(), - client_addr, - metrics_config, - &clock_drift_processor, - &mut session_extracted_metrics, - ), + ItemType::Session => process_session(item, spc, &mut session_extracted_metrics), + ItemType::Sessions => { + process_session_aggregates(item, spc, &mut session_extracted_metrics) + } _ => true, // Keep all other item types }; if should_keep { @@ -141,14 +149,20 @@ fn is_valid_session_timestamp( #[allow(clippy::too_many_arguments)] fn process_session( item: &mut Item, - config: &Config, - received: DateTime, - client: Option<&str>, - client_addr: Option, - metrics_config: SessionMetricsConfig, - clock_drift_processor: &ClockDriftProcessor, + session_processing_config: SessionProcessingConfig, extracted_metrics: &mut Vec, ) -> bool { + let SessionProcessingConfig { + global_config, + config, + filters_config, + metrics_config, + client, + client_addr, + received, + clock_drift_processor, + } = session_processing_config; + let mut changed = false; let payload = item.payload(); let max_secs_in_future = config.max_secs_in_future(); @@ -212,6 +226,17 @@ fn process_session( return false; } + if relay_filter::should_filter( + &session, + client_addr, + filters_config, + global_config.filters(), + ) + .is_err() + { + return false; + }; + // Extract metrics if they haven't been extracted by a prior Relay if metrics_config.is_enabled() && !item.metrics_extracted() @@ -257,14 +282,20 @@ fn process_session( #[allow(clippy::too_many_arguments)] fn process_session_aggregates( item: &mut Item, - config: &Config, - received: DateTime, - client: Option<&str>, - client_addr: Option, - metrics_config: SessionMetricsConfig, - clock_drift_processor: &ClockDriftProcessor, + session_processing_config: SessionProcessingConfig, extracted_metrics: &mut Vec, ) -> bool { + let SessionProcessingConfig { + global_config, + config, + filters_config, + metrics_config, + client, + client_addr, + received, + clock_drift_processor, + } = session_processing_config; + let mut changed = false; let payload = item.payload(); let max_secs_in_future = config.max_secs_in_future(); @@ -312,6 +343,17 @@ fn process_session_aggregates( } } + if relay_filter::should_filter( + &session, + client_addr, + filters_config, + global_config.filters(), + ) + .is_err() + { + return false; + }; + // Extract metrics if they haven't been extracted by a prior Relay if metrics_config.is_enabled() && !item.metrics_extracted() { for aggregate in &session.aggregates { @@ -364,16 +406,17 @@ mod tests { impl TestProcessSessionArguments<'_> { fn run_session_producer(&mut self) -> bool { - process_session( - &mut self.item, - &Config::default(), - self.received, - self.client, - self.client_addr, - self.metrics_config, - &self.clock_drift_processor, - &mut self.extracted_metrics, - ) + let spc = SessionProcessingConfig { + global_config: &Default::default(), + config: &Default::default(), + filters_config: &Default::default(), + metrics_config: &self.metrics_config, + client: self.client, + client_addr: self.client_addr, + received: self.received, + clock_drift_processor: &self.clock_drift_processor, + }; + process_session(&mut self.item, spc, &mut self.extracted_metrics) } fn default() -> Self { diff --git a/tests/integration/fixtures/__init__.py b/tests/integration/fixtures/__init__.py index 8ff2dc13d26..4c69816bbbb 100644 --- a/tests/integration/fixtures/__init__.py +++ b/tests/integration/fixtures/__init__.py @@ -250,13 +250,13 @@ def send_envelope(self, project_id, envelope, headers=None, dsn_key_idx=0): response.raise_for_status() return response - def send_session(self, project_id, payload, item_headers=None): + def send_session(self, project_id, payload, item_headers=None, headers=None): envelope = Envelope() envelope.add_session(payload) if item_headers: item = envelope.items[0] item.headers = {**item.headers, **item_headers} - self.send_envelope(project_id, envelope) + self.send_envelope(project_id, envelope, headers=headers) def send_transaction( self, @@ -292,10 +292,10 @@ def send_replay_event(self, project_id, payload, item_headers=None): self.send_envelope(project_id, envelope) - def send_session_aggregates(self, project_id, payload): + def send_session_aggregates(self, project_id, payload, headers=None): envelope = Envelope() envelope.add_item(Item(payload=PayloadRef(json=payload), type="sessions")) - self.send_envelope(project_id, envelope) + self.send_envelope(project_id, envelope, headers=headers) def send_client_report(self, project_id, payload): envelope = Envelope() diff --git a/tests/integration/test_session.py b/tests/integration/test_session.py index 33347937915..e67032399c8 100644 --- a/tests/integration/test_session.py +++ b/tests/integration/test_session.py @@ -236,3 +236,129 @@ def test_session_aggregates_invalid_release( ) sessions_consumer.assert_empty() + + +def test_session_filtering(mini_sentry, relay_with_processing, sessions_consumer): + relay = relay_with_processing() + sessions_consumer = sessions_consumer() + + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + filter_settings = project_config["config"]["filterSettings"] + filter_settings["webCrawlers"] = {"isEnabled": True} + filter_settings["localhost"] = {"isEnabled": True} + filter_settings["clientIps"] = {"blacklistedIps": ["1.2.3.0/24"]} + filter_settings["releases"] = {"releases": ["sentry-bad*"]} + + timestamp = datetime.now(tz=timezone.utc) + # should get filtered because web crawler filtering is enabled. + relay.send_session( + project_id, + { + "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", + "timestamp": timestamp.isoformat(), + "started": timestamp.isoformat(), + "attrs": {"user_agent": "BingBot"}, + }, + ) + + # should get filtered because localhost filtering is enabled. + relay.send_session( + project_id, + { + "sid": "8333339f-5675-4f89-a9a0-1c935255ab59", + "timestamp": timestamp.isoformat(), + "started": timestamp.isoformat(), + "attrs": {"ip_address": "127.0.0.1"}, + }, + ) + + # should get filtered because client IP filtering is enabled. + relay.send_session( + project_id, + { + "sid": "8333339f-5675-4f89-a9a0-1c935255ab59", + "timestamp": timestamp.isoformat(), + "started": timestamp.isoformat(), + "attrs": {}, + }, + headers={"X-Forwarded-For": "1.2.3.4"}, + ) + + # should get filtered because release filtering is enabled. + relay.send_session( + project_id, + { + "sid": "8333339f-5675-4f89-a9a0-1c935255ab59", + "timestamp": timestamp.isoformat(), + "started": timestamp.isoformat(), + "attrs": {"release": "sentry-bad@1.0.0"}, + }, + ) + + # should get filtered because web crawler filtering is enabled. + relay.send_session_aggregates( + project_id, + { + "aggregates": [ + { + "started": timestamp.isoformat(), + "did": "foobarbaz", + "exited": 2, + "errored": 3, + }, + ], + "attrs": {"user_agent": "BingBot"}, + }, + ) + + # should get filtered because localhost filtering is enabled. + relay.send_session_aggregates( + project_id, + { + "aggregates": [ + { + "started": timestamp.isoformat(), + "did": "foobarbaz", + "exited": 2, + "errored": 3, + }, + ], + "attrs": {"ip_address": "127.0.0.1"}, + }, + ) + + # should get filtered because client IP filtering is enabled. + relay.send_session_aggregates( + project_id, + { + "aggregates": [ + { + "started": timestamp.isoformat(), + "did": "foobarbaz", + "exited": 2, + "errored": 3, + }, + ], + "attrs": {}, + }, + headers={"X-Forwarded-For": "1.2.3.4"}, + ) + + # should get filtered because release filtering is enabled. + relay.send_session_aggregates( + project_id, + { + "aggregates": [ + { + "started": timestamp.isoformat(), + "did": "foobarbaz", + "exited": 2, + "errored": 3, + }, + ], + "attrs": {"release": "sentry-bad@1.0.0"}, + }, + ) + + sessions_consumer.assert_empty()