Skip to content

Commit c85fb5b

Browse files
authored
Merge pull request #376 from AdExNetwork/issue-319-event-aggregator-changes
Issue 319 event aggregator changes
2 parents 033f5d9 + 9afbe02 commit c85fb5b

File tree

5 files changed

+57
-61
lines changed

5 files changed

+57
-61
lines changed

sentry/src/analytics_recorder.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,26 @@
11
use crate::epoch;
2-
use crate::payout::get_payout;
32
use crate::Session;
43
use primitives::sentry::Event;
54
use primitives::sentry::{ChannelReport, PublisherReport};
6-
use primitives::{BigNum, Channel};
5+
use primitives::{BigNum, Channel, ValidatorId};
76
use redis::aio::MultiplexedConnection;
87
use redis::pipe;
98
use slog::{error, Logger};
109

10+
// Records only payout events
1111
pub async fn record(
1212
mut conn: MultiplexedConnection,
1313
channel: Channel,
1414
session: Session,
15-
events: Vec<Event>,
15+
events: Vec<(Event, Option<(ValidatorId, BigNum)>)>,
1616
logger: Logger,
1717
) {
1818
let mut db = pipe();
1919

2020
events
2121
.iter()
22-
.filter(|&ev| ev.is_click_event() || ev.is_impression_event())
23-
.for_each(|event: &Event| match event {
22+
.filter(|(ev, _)| ev.is_click_event() || ev.is_impression_event())
23+
.for_each(|(event, payout)| match event {
2424
Event::Impression {
2525
publisher,
2626
ad_unit,
@@ -34,17 +34,13 @@ pub async fn record(
3434
referrer,
3535
} => {
3636
let divisor = BigNum::from(10u64.pow(18));
37-
38-
let pay_amount = match get_payout(&logger, &channel, event, &session) {
39-
Ok(Some((_, payout))) => payout.div_floor(&divisor)
37+
let pay_amount = match payout {
38+
Some((_, payout)) => payout
39+
.div_floor(&divisor)
4040
.to_f64()
4141
.expect("Should always have a payout in f64 after division"),
4242
// This should never happen, as the conditions we are checking for in the .filter are the same as getPayout's
43-
Ok(None) => return,
44-
Err(err) => {
45-
error!(&logger, "Getting the payout failed: {}", &err; "module" => "analytics-recorder", "err" => ?err);
46-
return
47-
},
43+
None => return,
4844
};
4945

5046
if let Some(ad_unit) = ad_unit {

sentry/src/event_aggregator.rs

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::db::event_aggregate::insert_event_aggregate;
44
use crate::db::DbPool;
55
use crate::db::{get_channel_by_id, update_targeting_rules};
66
use crate::event_reducer;
7+
use crate::payout::get_payout;
78
use crate::Application;
89
use crate::ResponseError;
910
use crate::Session;
@@ -13,7 +14,7 @@ use chrono::Utc;
1314
use lazy_static::lazy_static;
1415
use primitives::adapter::Adapter;
1516
use primitives::sentry::{Event, EventAggregate};
16-
use primitives::{Channel, ChannelId};
17+
use primitives::{BigNum, Channel, ChannelId, ValidatorId};
1718
use slog::{error, Logger};
1819
use std::collections::HashMap;
1920
use std::env;
@@ -64,13 +65,13 @@ async fn store(db: &DbPool, channel_id: &ChannelId, logger: &Logger, recorder: R
6465
}
6566

6667
impl EventAggregator {
67-
pub async fn record<'a, A: Adapter>(
68+
pub async fn record<A: Adapter>(
6869
&self,
69-
app: &'a Application<A>,
70+
app: &Application<A>,
7071
channel_id: &ChannelId,
7172
session: &Session,
7273
auth: Option<&Auth>,
73-
events: &'a [Event],
74+
events: Vec<Event>,
7475
) -> Result<(), ResponseError> {
7576
let recorder = self.recorder.clone();
7677
let aggr_throttle = app.config.aggr_throttle;
@@ -97,7 +98,6 @@ impl EventAggregator {
9798
// insert into
9899
channel_recorder.insert(channel_id.to_owned(), record);
99100

100-
//
101101
// spawn async task that persists
102102
// the channel events to database
103103
if aggr_throttle > 0 {
@@ -131,7 +131,7 @@ impl EventAggregator {
131131
auth,
132132
&app.config.ip_rate_limit,
133133
&record.channel,
134-
events,
134+
&events,
135135
)
136136
.await
137137
.map_err(|e| match e {
@@ -155,18 +155,29 @@ impl EventAggregator {
155155
update_targeting_rules(&app.pool, &channel_id, &new_rules).await?;
156156
}
157157

158-
events.iter().for_each(|ev| {
159-
match event_reducer::reduce(
160-
&app.logger,
161-
&record.channel,
162-
&mut record.aggregate,
163-
ev,
164-
&session,
165-
) {
166-
Ok(_) => {}
167-
Err(err) => error!(&app.logger, "Event Reducer failed"; "error" => ?err ),
168-
}
169-
});
158+
// Pre-computing all payouts once
159+
let events_with_payout: Vec<(Event, Option<(ValidatorId, BigNum)>)> = events
160+
.iter()
161+
.filter(|ev| ev.is_click_event() || ev.is_impression_event())
162+
.map(|ev| {
163+
let payout = match get_payout(&app.logger, &record.channel, &ev, &session) {
164+
Ok(payout) => payout,
165+
Err(err) => return Err(err),
166+
};
167+
168+
match event_reducer::reduce(&record.channel, &mut record.aggregate, &ev, &payout) {
169+
Ok(_) => {}
170+
Err(err) => error!(&app.logger, "Event Reducred failed"; "error" => ?err),
171+
}
172+
173+
Ok((ev.clone(), payout))
174+
})
175+
.collect::<Result<_, _>>()?;
176+
177+
// We don't want to save empty aggregates
178+
if record.aggregate.events.is_empty() {
179+
return Ok(());
180+
}
170181

171182
// only time we don't have session is during
172183
// an unauthenticated close event
@@ -175,7 +186,7 @@ impl EventAggregator {
175186
redis.clone(),
176187
record.channel.clone(),
177188
session.clone(),
178-
events.to_owned().to_vec(),
189+
events_with_payout,
179190
app.logger.clone(),
180191
));
181192
}

sentry/src/event_reducer.rs

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,35 @@
1-
use crate::{payout::get_payout, Session};
21
use primitives::{
32
sentry::{AggregateEvents, Event, EventAggregate},
43
BigNum, Channel, ValidatorId,
54
};
6-
use slog::Logger;
75

86
pub(crate) fn reduce(
9-
logger: &Logger,
107
channel: &Channel,
118
initial_aggr: &mut EventAggregate,
129
ev: &Event,
13-
session: &Session,
10+
payout: &Option<(ValidatorId, BigNum)>,
1411
) -> Result<(), Box<dyn std::error::Error>> {
1512
let event_type = ev.to_string();
13+
1614
match ev {
1715
Event::Impression { publisher, .. } => {
1816
let impression = initial_aggr.events.get(&event_type);
19-
let payout = get_payout(logger, &channel, &ev, session)?;
2017
let merge = merge_payable_event(
2118
impression,
22-
payout.unwrap_or_else(|| (*publisher, Default::default())),
19+
payout
20+
.to_owned()
21+
.unwrap_or_else(|| (*publisher, Default::default())),
2322
);
2423

2524
initial_aggr.events.insert(event_type, merge);
2625
}
2726
Event::Click { publisher, .. } => {
2827
let clicks = initial_aggr.events.get(&event_type);
29-
let payout = get_payout(logger, &channel, &ev, session)?;
3028
let merge = merge_payable_event(
3129
clicks,
32-
payout.unwrap_or_else(|| (*publisher, Default::default())),
30+
payout
31+
.to_owned()
32+
.unwrap_or_else(|| (*publisher, Default::default())),
3333
);
3434

3535
initial_aggr.events.insert(event_type, merge);
@@ -77,15 +77,13 @@ fn merge_payable_event(
7777
mod test {
7878
use super::*;
7979
use chrono::Utc;
80-
use primitives::util::tests::{
81-
discard_logger,
82-
prep_db::{DUMMY_CHANNEL, IDS},
80+
use primitives::{
81+
util::tests::prep_db::{DUMMY_CHANNEL, IDS},
82+
BigNum,
8383
};
84-
use primitives::BigNum;
8584

8685
#[test]
8786
fn test_reduce() {
88-
let logger = discard_logger();
8987
let mut channel: Channel = DUMMY_CHANNEL.clone();
9088
channel.deposit_amount = 100.into();
9189
// make immutable again
@@ -103,16 +101,9 @@ mod test {
103101
ad_slot: None,
104102
referrer: None,
105103
};
106-
107-
let session = Session {
108-
ip: Default::default(),
109-
country: None,
110-
referrer_header: None,
111-
os: None,
112-
};
113-
104+
let payout = Some((IDS["publisher"], BigNum::from(1)));
114105
for i in 0..101 {
115-
reduce(&logger, &channel, &mut event_aggr, &event, &session)
106+
reduce(&channel, &mut event_aggr, &event, &payout)
116107
.expect(&format!("Should be able to reduce event #{}", i));
117108
}
118109

@@ -132,9 +123,7 @@ mod test {
132123
assert_eq!(event_counts, &BigNum::from(101));
133124

134125
let event_payouts = impression_event
135-
.event_counts
136-
.as_ref()
137-
.expect("there should be event_counts set")
126+
.event_payouts
138127
.get(&IDS["publisher"])
139128
.expect("There should be myAwesomePublisher event_payouts key");
140129
assert_eq!(event_payouts, &BigNum::from(101));

sentry/src/payout.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use primitives::{
99
use slog::{error, Logger};
1010
use std::cmp::{max, min};
1111

12-
type Result = std::result::Result<Option<(ValidatorId, BigNum)>, Error>;
12+
pub type Result = std::result::Result<Option<(ValidatorId, BigNum)>, Error>;
1313

1414
pub fn get_payout(logger: &Logger, channel: &Channel, event: &Event, session: &Session) -> Result {
1515
let event_type = event.to_string();

sentry/src/routes/channel.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,14 +201,14 @@ pub async fn insert_events<A: Adapter + 'static>(
201201
let channel_id = ChannelId::from_hex(route_params.index(0))?;
202202

203203
let body_bytes = hyper::body::to_bytes(req_body).await?;
204-
let request_body = serde_json::from_slice::<HashMap<String, Vec<Event>>>(&body_bytes)?;
204+
let mut request_body = serde_json::from_slice::<HashMap<String, Vec<Event>>>(&body_bytes)?;
205205

206206
let events = request_body
207-
.get("events")
207+
.remove("events")
208208
.ok_or_else(|| ResponseError::BadRequest("invalid request".to_string()))?;
209209

210210
app.event_aggregator
211-
.record(app, &channel_id, session, auth, &events)
211+
.record(app, &channel_id, session, auth, events)
212212
.await?;
213213

214214
Ok(Response::builder()

0 commit comments

Comments
 (0)