Skip to content

Commit 6f3241a

Browse files
committed
Review comments: parallelize channel send, exponential backoff
1 parent 3486c61 commit 6f3241a

File tree

6 files changed

+43
-27
lines changed

6 files changed

+43
-27
lines changed

pyth-lazer-agent/Cargo.lock

Lines changed: 21 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyth-lazer-agent/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pyth-lazer-publisher-sdk = "0.1.5"
88
pyth-lazer-protocol = "0.7.2"
99

1010
anyhow = "1.0.98"
11+
backoff = "0.4.0"
1112
bincode = { version = "2.0.1", features = ["serde"] }
1213
clap = { version = "4.5.32", features = ["derive"] }
1314
config = "0.15.11"
@@ -26,7 +27,6 @@ serde_json = "1.0.140"
2627
soketto = { version = "0.8.1", features = ["http"] }
2728
solana-keypair = "2.2.1"
2829
tokio = { version = "1.44.1", features = ["full"] }
29-
tokio-stream = "0.1.17"
3030
tokio-tungstenite = { version = "0.26.2", features = ["native-tls", "url"] }
3131
tokio-util = { version = "0.7.14", features = ["compat"] }
3232
tracing = "0.1.41"

pyth-lazer-agent/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub struct Config {
2121
}
2222

2323
fn default_publish_interval() -> Duration {
24-
Duration::from_millis(50)
24+
Duration::from_micros(500)
2525
}
2626

2727
pub fn load_config(config_path: String) -> anyhow::Result<Config> {

pyth-lazer-agent/src/lazer_publisher.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -137,15 +137,12 @@ impl LazerPublisherTask {
137137
payload: Some(buf),
138138
special_fields: Default::default(),
139139
};
140-
for relayer_sender in self.relayer_senders.iter() {
141-
if let Err(e) = relayer_sender
142-
.sender
143-
.send(signed_lazer_transaction.clone())
144-
.await
145-
{
146-
error!("Error sending transaction to Lazer relayer session: {e:?}");
147-
}
148-
}
140+
futures::future::join_all(
141+
self.relayer_senders
142+
.iter_mut()
143+
.map(|relayer_sender| relayer_sender.sender.send(signed_lazer_transaction.clone())),
144+
)
145+
.await;
149146

150147
self.pending_updates.clear();
151148
Ok(())

pyth-lazer-agent/src/publisher_handle.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ async fn try_handle_publisher(
165165
continue;
166166
}
167167
}
168-
} //_ => bail!("Publisher API request set with invalid context"),
168+
}
169169
};
170170

171171
lazer_publisher.push_feed_update(feed_update).await?;

pyth-lazer-agent/src/relayer_session.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use crate::config::CHANNEL_CAPACITY;
22
use anyhow::{Result, bail};
3+
use backoff::ExponentialBackoffBuilder;
4+
use backoff::backoff::Backoff;
35
use futures_util::stream::{SplitSink, SplitStream};
46
use futures_util::{SinkExt, StreamExt};
57
use http::HeaderValue;
@@ -85,8 +87,15 @@ struct RelayerSessionTask {
8587

8688
impl RelayerSessionTask {
8789
pub async fn run(&mut self) {
90+
let initial_interval = Duration::from_millis(100);
91+
let max_interval = Duration::from_secs(5);
92+
let mut backoff = ExponentialBackoffBuilder::new()
93+
.with_initial_interval(initial_interval)
94+
.with_max_interval(max_interval)
95+
.with_max_elapsed_time(None)
96+
.build();
97+
8898
let mut failure_count = 0;
89-
let retry_duration = Duration::from_secs(1);
9099

91100
loop {
92101
match self.run_relayer_connection().await {
@@ -96,13 +105,14 @@ impl RelayerSessionTask {
96105
}
97106
Err(e) => {
98107
failure_count += 1;
108+
let next_backoff = backoff.next_backoff().unwrap_or(max_interval);
99109
tracing::error!(
100110
"relayer session failed with error: {:?}, failure_count: {}; retrying in {:?}",
101111
e,
102112
failure_count,
103-
retry_duration
113+
next_backoff
104114
);
105-
tokio::time::sleep(retry_duration).await;
115+
tokio::time::sleep(next_backoff).await;
106116
}
107117
}
108118
}

0 commit comments

Comments
 (0)