Skip to content

Commit b1d288d

Browse files
committed
Merge branch 'mike/lazer-fixes-2' of github.com:pyth-network/pyth-agent into mike/testing-1
# Conflicts: # src/agent/services/lazer_exporter.rs
2 parents e8dc499 + f61ab5e commit b1d288d

File tree

2 files changed

+118
-44
lines changed

2 files changed

+118
-44
lines changed

src/agent/metrics.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,12 @@ impl ProductGlobalMetrics {
119119
let symbol_string = maybe_symbol
120120
.map(|x| x.into())
121121
.unwrap_or(format!("unknown_{}", product_key));
122-
tracing::info!("pythnet symbol: {} pubkey: {} hex: {}", symbol_string, product_key, hex::encode(product_key));
122+
tracing::info!(
123+
"pythnet symbol: {} pubkey: {} hex: {}",
124+
symbol_string,
125+
product_key,
126+
hex::encode(product_key)
127+
);
123128

124129
#[deny(unused_variables)]
125130
let Self { update_count } = self;

src/agent/services/lazer_exporter.rs

Lines changed: 112 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -350,13 +350,14 @@ mod lazer_exporter {
350350
},
351351
},
352352
std::{
353-
collections::{
354-
HashMap,
355-
HashSet,
356-
},
353+
collections::HashMap,
357354
sync::Arc,
355+
time::Duration,
356+
},
357+
tokio::sync::{
358+
broadcast::Sender,
359+
mpsc,
358360
},
359-
tokio::sync::broadcast::Sender,
360361
url::Url,
361362
};
362363

@@ -369,31 +370,43 @@ mod lazer_exporter {
369370
S: LocalStore,
370371
S: Send + Sync + 'static,
371372
{
372-
let mut lazer_symbols = get_lazer_symbol_map(&config.history_url).await;
373+
// We can't publish to Lazer without symbols, so crash the process if it fails.
374+
let mut lazer_symbols = match get_lazer_symbol_map(&config.history_url).await {
375+
Ok(symbol_map) => {
376+
if symbol_map.is_empty() {
377+
panic!("Retrieved zero Lazer symbols from {}", config.history_url);
378+
}
379+
symbol_map
380+
}
381+
Err(_) => {
382+
tracing::error!(
383+
"Failed to retrieve Lazer symbols from {}",
384+
config.history_url
385+
);
386+
panic!(
387+
"Failed to retrieve Lazer symbols from {}",
388+
config.history_url
389+
);
390+
}
391+
};
392+
373393
tracing::info!(
374394
"Retrieved {} Lazer feeds with hermes symbols from symbols endpoint: {}",
375395
lazer_symbols.len(),
376396
&config.history_url
377397
);
378-
for symbol in lazer_symbols.iter() {
379-
tracing::info!(
380-
"history endpoint identifier: {:?} hex: {:?} symbol response: {:?}",
381-
symbol.0,
382-
symbol.0.to_hex(),
383-
symbol.1
384-
);
385-
}
398+
399+
let (symbols_sender, mut symbols_receiver) = mpsc::channel(1);
400+
tokio::spawn(get_lazer_symbols_task(
401+
config.history_url.clone(),
402+
config.symbol_fetch_interval_duration.clone(),
403+
symbols_sender,
404+
));
386405

387406
let mut publish_interval = tokio::time::interval(config.publish_interval_duration);
388-
let mut symbol_fetch_interval =
389-
tokio::time::interval(config.symbol_fetch_interval_duration);
390-
let mut found_identifiers = HashSet::new();
391407

392408
loop {
393409
tokio::select! {
394-
_ = symbol_fetch_interval.tick() => {
395-
lazer_symbols = get_lazer_symbol_map(&config.history_url).await;
396-
},
397410
_ = publish_interval.tick() => {
398411
let publisher_timestamp = MessageField::some(Timestamp::now());
399412
let mut publisher_update = PublisherUpdate {
@@ -405,11 +418,6 @@ mod lazer_exporter {
405418

406419
// TODO: This read locks and clones local::Store::prices, which may not meet performance needs.
407420
for (identifier, price_info) in state.get_all_price_infos().await {
408-
if !found_identifiers.contains(&identifier) {
409-
tracing::info!("pythnet identifier: {:?} hex: {:?} price_info: {:?}", identifier, identifier.to_hex(), price_info);
410-
found_identifiers.insert(identifier);
411-
}
412-
413421
if let Some(symbol) = lazer_symbols.get(&identifier) {
414422
let source_timestamp_micros = price_info.timestamp.and_utc().timestamp_micros();
415423
let source_timestamp = MessageField::some(Timestamp {
@@ -465,33 +473,94 @@ mod lazer_exporter {
465473
tracing::error!("Error sending transaction to relayer receivers: {e}");
466474
}
467475
}
476+
},
477+
latest_symbol_map = symbols_receiver.recv() => {
478+
match latest_symbol_map {
479+
Some(symbol_map) => {
480+
tracing::info!("Refreshing Lazer symbol map with {} symbols", symbol_map.len());
481+
lazer_symbols = symbol_map
482+
}
483+
None => {
484+
// agent can continue but will eventually have a stale symbol set unless the process is cycled.
485+
tracing::error!("Lazer symbol refresh channel closed")
486+
}
487+
}
488+
},
489+
}
490+
}
491+
}
492+
493+
async fn get_lazer_symbols_task(
494+
history_url: Url,
495+
fetch_interval_duration: Duration,
496+
sender: mpsc::Sender<HashMap<pyth_sdk::Identifier, SymbolResponse>>,
497+
) {
498+
let mut symbol_fetch_interval = tokio::time::interval(fetch_interval_duration);
499+
500+
loop {
501+
tokio::select! {
502+
_ = symbol_fetch_interval.tick() => {
503+
tracing::info!("Refreshing Lazer symbol map from history service...");
504+
match get_lazer_symbol_map(&history_url).await {
505+
Ok(symbol_map) => {
506+
if symbol_map.is_empty() {
507+
tracing::error!("Retrieved zero Lazer symbols from {}", history_url);
508+
continue;
509+
}
510+
match sender.send(symbol_map).await {
511+
Ok(_) => (),
512+
Err(e) => {
513+
// agent can continue but will eventually have a stale symbol set unless the process is cycled.
514+
tracing::error!("Error sending refreshed symbol map to exporter task: {e}");
515+
}
516+
}
517+
},
518+
Err(_) => {
519+
tracing::error!("Failed to retrieve Lazer symbols from {} in refresh task", history_url);
520+
}
521+
}
468522
}
469523
}
470524
}
471525
}
472526

473527
async fn get_lazer_symbol_map(
474528
history_url: &Url,
475-
) -> HashMap<pyth_sdk::Identifier, SymbolResponse> {
476-
match fetch_symbols(history_url).await {
477-
Ok(symbols) => symbols
478-
.into_iter()
479-
.filter_map(|symbol| {
480-
let hermes_id = symbol.hermes_id.clone()?;
481-
match pyth_sdk::Identifier::from_hex(hermes_id.clone()) {
482-
Ok(id) => Some((id, symbol)),
483-
Err(e) => {
484-
tracing::warn!("Failed to parse hermes_id {}: {e:?}", hermes_id);
485-
None
486-
}
487-
}
488-
})
489-
.collect(),
490-
Err(e) => {
491-
tracing::error!("Failed to fetch Lazer symbols: {e:?}");
492-
HashMap::new()
529+
) -> anyhow::Result<HashMap<pyth_sdk::Identifier, SymbolResponse>> {
530+
const NUM_RETRIES: usize = 3;
531+
const RETRY_INTERVAL: Duration = Duration::from_secs(1);
532+
let mut retry_count = 0;
533+
534+
while retry_count < NUM_RETRIES {
535+
match fetch_symbols(history_url).await {
536+
Ok(symbols) => {
537+
let symbol_map = symbols
538+
.into_iter()
539+
.filter_map(|symbol| {
540+
let hermes_id = symbol.hermes_id.clone()?;
541+
match pyth_sdk::Identifier::from_hex(hermes_id.clone()) {
542+
Ok(id) => Some((id, symbol)),
543+
Err(e) => {
544+
tracing::warn!(
545+
"Failed to parse hermes_id {}: {e:?}",
546+
hermes_id
547+
);
548+
None
549+
}
550+
}
551+
})
552+
.collect();
553+
return Ok(symbol_map);
554+
}
555+
Err(e) => {
556+
tracing::error!("Failed to fetch Lazer symbols: {e:?}");
557+
558+
retry_count += 1;
559+
tokio::time::sleep(RETRY_INTERVAL).await;
560+
}
493561
}
494562
}
563+
anyhow::bail!("Lazer symbol map fetch failed after {NUM_RETRIES} attempts");
495564
}
496565
}
497566

0 commit comments

Comments
 (0)