Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Add logic to extract event json from userdata in prosperodumps. ([#4755](https://github.com/getsentry/relay/pull/4755)
- Add browser name/version to logs. ([#4757](https://github.com/getsentry/relay/pull/4757))
- Accept standalone spans in the V2 format. This feature is still highly experimental! ([#4771](https://github.com/getsentry/relay/pull/4771))
- Enable filtering sessions by IP address, release, and user agent. ([#4745](https://github.com/getsentry/relay/pull/4745))

**Internal**:

Expand Down
17 changes: 17 additions & 0 deletions relay-event-schema/src/protocol/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::fmt::{self, Display};
use std::time::SystemTime;

use chrono::{DateTime, Utc};
use relay_protocol::Getter;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

Expand Down Expand Up @@ -269,6 +270,14 @@ impl SessionLike for SessionUpdate {
}
}

// Dummy implementation of `Getter` to satisfy the bound of `should_filter`.
// We don't actually want to use `get_value` at this time.`
impl Getter for SessionUpdate {
fn get_value(&self, _path: &str) -> Option<relay_protocol::Val<'_>> {
None
}
}

#[allow(clippy::trivially_copy_pass_by_ref)]
fn is_zero(val: &u32) -> bool {
*val == 0
Expand Down Expand Up @@ -353,6 +362,14 @@ impl SessionAggregates {
}
}

// Dummy implementation of `Getter` to satisfy the bound of `should_filter`.
// We don't actually want to use `get_value` at this time.`
impl Getter for SessionAggregates {
fn get_value(&self, _path: &str) -> Option<relay_protocol::Val<'_>> {
None
}
}

#[cfg(test)]
mod tests {

Expand Down
85 changes: 84 additions & 1 deletion relay-filter/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
use url::Url;

use relay_event_schema::protocol::{
Csp, Event, EventType, Exception, LogEntry, Replay, Span, Values,
Csp, Event, EventType, Exception, LogEntry, Replay, SessionAggregates, SessionUpdate, Span,
Values,
};

/// A data item to which filters can be applied.
Expand Down Expand Up @@ -176,3 +177,85 @@ impl Filterable for Span {
None
}
}

impl Filterable for SessionUpdate {
fn csp(&self) -> Option<&Csp> {
None
}

fn exceptions(&self) -> Option<&Values<Exception>> {
None
}

fn ip_addr(&self) -> Option<&str> {
self.attributes
.ip_address
.as_ref()
.map(|addr| addr.as_str())
}

fn logentry(&self) -> Option<&LogEntry> {
None
}

fn release(&self) -> Option<&str> {
Some(&self.attributes.release)
}

fn transaction(&self) -> Option<&str> {
None
}

fn url(&self) -> Option<Url> {
None
}

fn user_agent(&self) -> Option<&str> {
self.attributes.user_agent.as_deref()
}

fn header(&self, _header_name: &str) -> Option<&str> {
None
}
}

impl Filterable for SessionAggregates {
fn csp(&self) -> Option<&Csp> {
None
}

fn exceptions(&self) -> Option<&Values<Exception>> {
None
}

fn ip_addr(&self) -> Option<&str> {
self.attributes
.ip_address
.as_ref()
.map(|addr| addr.as_str())
}

fn logentry(&self) -> Option<&LogEntry> {
None
}

fn release(&self) -> Option<&str> {
Some(&self.attributes.release)
}

fn transaction(&self) -> Option<&str> {
None
}

fn url(&self) -> Option<Url> {
None
}

fn user_agent(&self) -> Option<&str> {
self.attributes.user_agent.as_deref()
}

fn header(&self, _header_name: &str) -> Option<&str> {
None
}
}
21 changes: 14 additions & 7 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2010,24 +2010,26 @@ impl EnvelopeProcessorService {
async fn process_sessions(
&self,
managed_envelope: &mut TypedEnvelope<SessionGroup>,
project_info: Arc<ProjectInfo>,
rate_limits: Arc<RateLimits>,
config: &Config,
project_info: &ProjectInfo,
rate_limits: &RateLimits,
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
let mut extracted_metrics = ProcessingExtractedMetrics::new();

session::process(
managed_envelope,
&self.inner.global_config.current(),
config,
&mut extracted_metrics,
&project_info,
&self.inner.config,
project_info,
);

self.enforce_quotas(
managed_envelope,
Annotated::empty(),
&mut extracted_metrics,
&project_info,
&rate_limits,
project_info,
rate_limits,
)
.await?;

Expand Down Expand Up @@ -2287,7 +2289,12 @@ impl EnvelopeProcessorService {
reservoir_counters
)
}
ProcessingGroup::Session => run!(process_sessions, project_info, rate_limits),
ProcessingGroup::Session => run!(
process_sessions,
&self.inner.config.clone(),
&project_info,
&rate_limits
),
ProcessingGroup::Standalone => run!(
process_standalone,
self.inner.config.clone(),
Expand Down
133 changes: 88 additions & 45 deletions relay-server/src/services/processor/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ use std::net;

use chrono::{DateTime, Duration as SignedDuration, Utc};
use relay_config::Config;
use relay_dynamic_config::SessionMetricsConfig;
use relay_dynamic_config::{GlobalConfig, SessionMetricsConfig};
use relay_event_normalization::ClockDriftProcessor;
use relay_event_schema::protocol::{
IpAddr, SessionAggregates, SessionAttributes, SessionStatus, SessionUpdate,
};
use relay_filter::ProjectFiltersConfig;
use relay_metrics::Bucket;
use relay_statsd::metric;

Expand All @@ -19,48 +20,55 @@ use crate::services::projects::project::ProjectInfo;
use crate::statsd::RelayTimers;
use crate::utils::{ItemAction, TypedEnvelope};

#[derive(Debug, Clone, Copy)]
struct SessionProcessingConfig<'a> {
pub global_config: &'a GlobalConfig,
pub config: &'a Config,
pub filters_config: &'a ProjectFiltersConfig,
pub metrics_config: &'a SessionMetricsConfig,
pub client: Option<&'a str>,
pub client_addr: Option<std::net::IpAddr>,
pub received: DateTime<Utc>,
pub clock_drift_processor: &'a ClockDriftProcessor,
}

/// Validates all sessions and session aggregates in the envelope, if any.
///
/// Both are removed from the envelope if they contain invalid JSON or if their timestamps
/// are out of range after clock drift correction.
pub fn process(
managed_envelope: &mut TypedEnvelope<SessionGroup>,
global_config: &GlobalConfig,
config: &Config,
extracted_metrics: &mut ProcessingExtractedMetrics,
project_info: &ProjectInfo,
config: &Config,
) {
let received = managed_envelope.received_at();
let metrics_config = project_info.config().session_metrics;
let envelope = managed_envelope.envelope_mut();
let client = envelope.meta().client().map(|x| x.to_owned());
let client_addr = envelope.meta().client_addr();

let clock_drift_processor =
ClockDriftProcessor::new(envelope.sent_at(), received).at_least(MINIMUM_CLOCK_DRIFT);

let spc = SessionProcessingConfig {
global_config,
config,
filters_config: &project_info.config().filter_settings,
metrics_config: &project_info.config().session_metrics,
client: client.as_deref(),
client_addr,
received,
clock_drift_processor: &clock_drift_processor,
};

let mut session_extracted_metrics = Vec::new();
managed_envelope.retain_items(|item| {
let should_keep = match item.ty() {
ItemType::Session => process_session(
item,
config,
received,
client.as_deref(),
client_addr,
metrics_config,
&clock_drift_processor,
&mut session_extracted_metrics,
),
ItemType::Sessions => process_session_aggregates(
item,
config,
received,
client.as_deref(),
client_addr,
metrics_config,
&clock_drift_processor,
&mut session_extracted_metrics,
),
ItemType::Session => process_session(item, spc, &mut session_extracted_metrics),
ItemType::Sessions => {
process_session_aggregates(item, spc, &mut session_extracted_metrics)
}
_ => true, // Keep all other item types
};
if should_keep {
Expand Down Expand Up @@ -141,14 +149,20 @@ fn is_valid_session_timestamp(
#[allow(clippy::too_many_arguments)]
fn process_session(
item: &mut Item,
config: &Config,
received: DateTime<Utc>,
client: Option<&str>,
client_addr: Option<net::IpAddr>,
metrics_config: SessionMetricsConfig,
clock_drift_processor: &ClockDriftProcessor,
session_processing_config: SessionProcessingConfig,
extracted_metrics: &mut Vec<Bucket>,
) -> bool {
let SessionProcessingConfig {
global_config,
config,
filters_config,
metrics_config,
client,
client_addr,
received,
clock_drift_processor,
} = session_processing_config;

let mut changed = false;
let payload = item.payload();
let max_secs_in_future = config.max_secs_in_future();
Expand Down Expand Up @@ -212,6 +226,17 @@ fn process_session(
return false;
}

if relay_filter::should_filter(
&session,
client_addr,
filters_config,
global_config.filters(),
)
.is_err()
{
return false;
};

// Extract metrics if they haven't been extracted by a prior Relay
if metrics_config.is_enabled()
&& !item.metrics_extracted()
Expand Down Expand Up @@ -257,14 +282,20 @@ fn process_session(
#[allow(clippy::too_many_arguments)]
fn process_session_aggregates(
item: &mut Item,
config: &Config,
received: DateTime<Utc>,
client: Option<&str>,
client_addr: Option<net::IpAddr>,
metrics_config: SessionMetricsConfig,
clock_drift_processor: &ClockDriftProcessor,
session_processing_config: SessionProcessingConfig,
extracted_metrics: &mut Vec<Bucket>,
) -> bool {
let SessionProcessingConfig {
global_config,
config,
filters_config,
metrics_config,
client,
client_addr,
received,
clock_drift_processor,
} = session_processing_config;

let mut changed = false;
let payload = item.payload();
let max_secs_in_future = config.max_secs_in_future();
Expand Down Expand Up @@ -312,6 +343,17 @@ fn process_session_aggregates(
}
}

if relay_filter::should_filter(
&session,
client_addr,
filters_config,
global_config.filters(),
)
.is_err()
{
return false;
};

// Extract metrics if they haven't been extracted by a prior Relay
if metrics_config.is_enabled() && !item.metrics_extracted() {
for aggregate in &session.aggregates {
Expand Down Expand Up @@ -364,16 +406,17 @@ mod tests {

impl TestProcessSessionArguments<'_> {
fn run_session_producer(&mut self) -> bool {
process_session(
&mut self.item,
&Config::default(),
self.received,
self.client,
self.client_addr,
self.metrics_config,
&self.clock_drift_processor,
&mut self.extracted_metrics,
)
let spc = SessionProcessingConfig {
global_config: &Default::default(),
config: &Default::default(),
filters_config: &Default::default(),
metrics_config: &self.metrics_config,
client: self.client,
client_addr: self.client_addr,
received: self.received,
clock_drift_processor: &self.clock_drift_processor,
};
process_session(&mut self.item, spc, &mut self.extracted_metrics)
}

fn default() -> Self {
Expand Down
Loading
Loading