Skip to content

Commit fab07b4

Browse files
authored
Unchanged price publishing optimization (#73)
* exporter: Add unchanged price publishing threshold * exporter.rs: Fix negative time delta in unchanged data filtering
1 parent 4ab94d4 commit fab07b4

File tree

3 files changed

+61
-14
lines changed

3 files changed

+61
-14
lines changed

config/config.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ key_store.root_path = "/path/to/keystore"
109109
# Age after which a price update is considered stale and not published
110110
# exporter.staleness_threshold = "5s"
111111

112+
# Wait at least this long before publishing an unchanged price
113+
# state; unchanged price state means only timestamp has changed
114+
# with other state identical to last published state.
115+
# exporter.unchanged_publish_threshold = "5s"
116+
112117
# Maximum size of a batch
113118
# exporter.max_batch_size = 12
114119

src/agent/solana/exporter.rs

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@ use {
2424
join_all,
2525
},
2626
key_store::KeyStore,
27-
pyth_sdk::{
28-
Identifier,
29-
UnixTimestamp,
30-
},
27+
pyth_sdk::Identifier,
3128
pyth_sdk_solana::state::PriceStatus,
3229
serde::{
3330
Deserialize,
@@ -111,6 +108,10 @@ pub struct Config {
111108
/// Age after which a price update is considered stale and not published
112109
#[serde(with = "humantime_serde")]
113110
pub staleness_threshold: Duration,
111+
/// Wait at least this long before publishing an unchanged price
112+
/// state; unchanged price state means only timestamp has changed
113+
/// with other state identical to last published state.
114+
pub unchanged_publish_threshold: Duration,
114115
/// Maximum size of a batch
115116
pub max_batch_size: usize,
116117
/// Capacity of the channel between the Exporter and the Transaction Monitor
@@ -131,6 +132,7 @@ impl Default for Config {
131132
refresh_network_state_interval_duration: Duration::from_millis(200),
132133
publish_interval_duration: Duration::from_secs(1),
133134
staleness_threshold: Duration::from_secs(5),
135+
unchanged_publish_threshold: Duration::from_secs(5),
134136
max_batch_size: 12,
135137
inflight_transactions_channel_capacity: 10000,
136138
transaction_monitor: Default::default(),
@@ -212,8 +214,10 @@ pub struct Exporter {
212214
/// Channel on which to communicate with the local store
213215
local_store_tx: Sender<store::local::Message>,
214216

215-
/// The last time an update was published for each price identifier
216-
last_published_at: HashMap<PriceIdentifier, UnixTimestamp>,
217+
/// The last state published for each price identifier. Used to
218+
/// rule out stale data and prevent repetitive publishing of
219+
/// unchanged prices.
220+
last_published_state: HashMap<PriceIdentifier, PriceInfo>,
217221

218222
/// Watch receiver channel to access the current network state
219223
network_state_rx: watch::Receiver<NetworkState>,
@@ -252,7 +256,7 @@ impl Exporter {
252256
publish_interval,
253257
key_store,
254258
local_store_tx,
255-
last_published_at: HashMap::new(),
259+
last_published_state: HashMap::new(),
256260
network_state_rx,
257261
inflight_transactions_tx,
258262
publisher_permissions_rx,
@@ -289,16 +293,38 @@ impl Exporter {
289293
async fn publish_updates(&mut self) -> Result<()> {
290294
let local_store_contents = self.fetch_local_store_contents().await?;
291295

296+
let now = Utc::now().timestamp();
297+
292298
// Filter the contents to only include information we haven't already sent,
293299
// and to ignore stale information.
294300
let fresh_updates = local_store_contents
295301
.iter()
296302
.filter(|(identifier, info)| {
297-
*self.last_published_at.get(identifier).unwrap_or(&0) < info.timestamp
303+
// Filter out timestamps older than what we already published
304+
if let Some(last_info) = self.last_published_state.get(identifier) {
305+
last_info.timestamp < info.timestamp
306+
} else {
307+
true // No prior data found, letting the price through
308+
}
298309
})
299310
.filter(|(_identifier, info)| {
300-
(Utc::now().timestamp() - info.timestamp)
301-
< self.config.staleness_threshold.as_secs() as i64
311+
// Filter out timestamps that are old
312+
(now - info.timestamp) < self.config.staleness_threshold.as_secs() as i64
313+
})
314+
.filter(|(identifier, info)| {
315+
// Filter out unchanged price data if the max delay wasn't reached
316+
317+
if let Some(last_info) = self.last_published_state.get(identifier) {
318+
if (info.timestamp - last_info.timestamp)
319+
> self.config.unchanged_publish_threshold.as_secs() as i64
320+
{
321+
true // max delay since last published state reached, we publish anyway
322+
} else {
323+
!last_info.cmp_no_timestamp(*info) // Filter out if data is unchanged
324+
}
325+
} else {
326+
true // No prior data found, letting the price through
327+
}
302328
})
303329
.collect::<Vec<_>>();
304330

@@ -348,7 +374,7 @@ impl Exporter {
348374
// Note: This message is not an error. Some
349375
// publishers have different permissions on
350376
// primary/secondary networks
351-
info!(
377+
debug!(
352378
self.logger,
353379
"Exporter: Attempted to publish a price without permission, skipping";
354380
"unpermissioned_price_account" => key_from_id.to_string(),
@@ -373,13 +399,13 @@ impl Exporter {
373399
.publish_interval_duration
374400
.div_f64(num_batches as f64),
375401
);
376-
let mut batch_timestamps = HashMap::new();
402+
let mut batch_state = HashMap::new();
377403
let mut batch_futures = vec![];
378404
for batch in batches {
379405
batch_futures.push(self.publish_batch(batch, &publish_keypair));
380406

381407
for (identifier, info) in batch {
382-
batch_timestamps.insert(**identifier, info.timestamp);
408+
batch_state.insert(**identifier, (**info).clone());
383409
}
384410

385411
batch_send_interval.tick().await;
@@ -392,7 +418,7 @@ impl Exporter {
392418
.into_iter()
393419
.collect::<Result<Vec<_>>>()?;
394420

395-
self.last_published_at.extend(batch_timestamps);
421+
self.last_published_state.extend(batch_state);
396422

397423
Ok(())
398424
}

src/agent/store/local.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,22 @@ pub struct PriceInfo {
3333
pub timestamp: UnixTimestamp,
3434
}
3535

36+
impl PriceInfo {
37+
/// Returns false if any non-timestamp fields differ with `other`. Used for last published state comparison in exporter.
38+
pub fn cmp_no_timestamp(&self, other: &Self) -> bool {
39+
// Prevent forgetting to use a new field if we expand the type.
40+
#[deny(unused_variables)]
41+
let Self {
42+
status,
43+
price,
44+
conf,
45+
timestamp: _,
46+
} = self;
47+
48+
status == &other.status && price == &other.price && conf == &other.conf
49+
}
50+
}
51+
3652
#[derive(Debug)]
3753
pub enum Message {
3854
Update {

0 commit comments

Comments
 (0)