Skip to content

Commit 1b02c4f

Browse files
committed
Refactor, address review
1 parent a5000cc commit 1b02c4f

File tree

8 files changed

+209
-233
lines changed

8 files changed

+209
-233
lines changed

pyth-lazer-agent/config/config.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
relayer_urls = ["ws://localhost:1235/v1/transaction", "ws://localhost:1335/v1/transaction"]
1+
relayer_urls = ["ws://relayer-0.pyth-lazer.dourolabs.app/v1/transaction", "ws://relayer-0.pyth-lazer.dourolabs.app/v1/transaction"]
22
authorization_token = "token1"
33
publish_keypair_path = "/path/to/solana/id.json"
4-
listen_address = "0.0.0.0:1234"
5-
publish_interval_duration = "50ms"
4+
listen_address = "0.0.0.0:1776"
5+
publish_interval_duration = "0.5ms"

pyth-lazer-agent/src/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,17 @@ use std::time::Duration;
44

55
use config::{Environment, File};
66
use derivative::Derivative;
7-
use pyth_lazer_protocol::router::PublisherId;
87
use serde::Deserialize;
98
use url::Url;
109

1110
#[derive(Deserialize, Derivative, Clone, PartialEq)]
1211
#[derivative(Debug)]
1312
pub struct Config {
1413
pub listen_address: SocketAddr,
15-
pub publisher_id: PublisherId,
1614
pub relayer_urls: Vec<Url>,
15+
#[derivative(Debug = "ignore")]
1716
pub authorization_token: String,
17+
#[derivative(Debug = "ignore")]
1818
pub publish_keypair_path: PathBuf,
1919
#[serde(with = "humantime_serde", default = "default_publish_interval")]
2020
pub publish_interval_duration: Duration,

pyth-lazer-agent/src/http_server.rs

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,9 @@ pub async fn run(config: Config, lazer_publisher: LazerPublisher) -> Result<()>
3434

3535
loop {
3636
let stream_addr = listener.accept().await;
37-
let config_clone = config.clone();
3837
let lazer_publisher_clone = lazer_publisher.clone();
3938
tokio::spawn(async {
40-
if let Err(err) =
41-
try_handle_connection(stream_addr, config_clone, lazer_publisher_clone).await
42-
{
39+
if let Err(err) = try_handle_connection(stream_addr, lazer_publisher_clone).await {
4340
warn!("error while handling connection: {err:?}");
4441
}
4542
});
@@ -48,7 +45,6 @@ pub async fn run(config: Config, lazer_publisher: LazerPublisher) -> Result<()>
4845

4946
async fn try_handle_connection(
5047
stream_addr: io::Result<(TcpStream, SocketAddr)>,
51-
config: Config,
5248
lazer_publisher: LazerPublisher,
5349
) -> Result<()> {
5450
let (stream, remote_addr) = stream_addr?;
@@ -59,12 +55,7 @@ async fn try_handle_connection(
5955
TokioIo::new(stream),
6056
service_fn(move |r| {
6157
let request = RelayerRequest(r);
62-
request_handler(
63-
request,
64-
remote_addr,
65-
config.clone(),
66-
lazer_publisher.clone(),
67-
)
58+
request_handler(request, remote_addr, lazer_publisher.clone())
6859
}),
6960
)
7061
.with_upgrades()
@@ -76,7 +67,6 @@ async fn try_handle_connection(
7667
async fn request_handler(
7768
request: RelayerRequest,
7869
remote_addr: SocketAddr,
79-
config: Config,
8070
lazer_publisher: LazerPublisher,
8171
) -> Result<Response<FullBody>, BoxedError> {
8272
let path = request.0.uri().path();
@@ -107,7 +97,6 @@ async fn request_handler(
10797
Request::PublisherV1 | Request::PublisherV2 => {
10898
let publisher_connection_context = PublisherConnectionContext {
10999
request_type,
110-
publisher_id: config.publisher_id,
111100
_remote_addr: remote_addr,
112101
};
113102
tokio::spawn(handle_publisher(

pyth-lazer-agent/src/lazer_publisher.rs

Lines changed: 38 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
use crate::config::{CHANNEL_CAPACITY, Config};
2+
use crate::relayer_session::RelayerSender;
23
use anyhow::{Context, Result, bail};
34
use ed25519_dalek::{Signer, SigningKey};
4-
use futures_util::stream::{SplitSink, SplitStream};
5-
use futures_util::{SinkExt, StreamExt};
6-
use http::HeaderValue;
75
use protobuf::well_known_types::timestamp::Timestamp;
86
use protobuf::{Message, MessageField};
97
use pyth_lazer_publisher_sdk::publisher_update::{FeedUpdate, PublisherUpdate};
@@ -13,93 +11,34 @@ use pyth_lazer_publisher_sdk::transaction::{
1311
Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction,
1412
};
1513
use solana_keypair::read_keypair_file;
16-
use std::time::Duration;
17-
use tokio::net::TcpStream;
1814
use tokio::{
1915
select,
2016
sync::mpsc::{self, Receiver, Sender},
2117
time::interval,
2218
};
23-
use tokio_stream::StreamMap;
24-
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
25-
use tokio_tungstenite::{
26-
MaybeTlsStream, WebSocketStream, connect_async_with_config,
27-
tungstenite::Message as TungsteniteMessage,
28-
};
29-
use tracing::{error, instrument};
30-
use url::Url;
31-
32-
struct RelayerSender {
33-
ws_senders: Vec<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>>,
34-
}
19+
use tracing::error;
3520

36-
impl RelayerSender {
37-
async fn send_price_update(
38-
&mut self,
39-
signed_lazer_transaction: &SignedLazerTransaction,
40-
) -> Result<()> {
41-
tracing::debug!("price_update: {:?}", signed_lazer_transaction);
42-
let buf = signed_lazer_transaction.write_to_bytes()?;
43-
for sender in self.ws_senders.iter_mut() {
44-
sender.send(TungsteniteMessage::from(buf.clone())).await?;
45-
sender.flush().await?;
46-
}
47-
Ok(())
48-
}
49-
}
50-
51-
async fn connect_to_relayer(
52-
mut url: Url,
53-
token: &str,
54-
) -> Result<(
55-
SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>,
56-
SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
57-
)> {
58-
tracing::info!("connecting to the relayer at {}", url);
59-
url.set_path("/v1/transaction");
60-
let mut req = url.clone().into_client_request()?;
61-
let headers = req.headers_mut();
62-
headers.insert(
63-
"Authorization",
64-
HeaderValue::from_str(&format!("Bearer {}", token))?,
65-
);
66-
let (ws_stream, _) = connect_async_with_config(req, None, true).await?;
67-
Ok(ws_stream.split())
68-
}
69-
70-
async fn connect_to_relayers(
71-
config: &Config,
72-
) -> Result<(
73-
RelayerSender,
74-
Vec<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
75-
)> {
76-
let mut relayer_senders = Vec::new();
77-
let mut relayer_receivers = Vec::new();
78-
for url in config.relayer_urls.clone() {
79-
let (relayer_sender, relayer_receiver) =
80-
connect_to_relayer(url, &config.authorization_token).await?;
81-
relayer_senders.push(relayer_sender);
82-
relayer_receivers.push(relayer_receiver);
83-
}
84-
let sender = RelayerSender {
85-
ws_senders: relayer_senders,
86-
};
87-
tracing::info!("connected to relayers: {:?}", config.relayer_urls);
88-
Ok((sender, relayer_receivers))
89-
}
90-
91-
#[derive(Debug, Clone)]
21+
#[derive(Clone)]
9222
pub struct LazerPublisher {
9323
sender: Sender<FeedUpdate>,
9424
}
9525

9626
impl LazerPublisher {
9727
pub async fn new(config: &Config) -> Self {
28+
let relayer_senders = futures::future::join_all(
29+
config
30+
.relayer_urls
31+
.iter()
32+
.map(async |url| RelayerSender::new(url, &config.authorization_token).await),
33+
)
34+
.await;
35+
9836
let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY);
9937
let mut task = LazerPublisherTask {
10038
config: config.clone(),
10139
receiver,
10240
pending_updates: Vec::new(),
41+
relayer_senders,
10342
};
10443
tokio::spawn(async move { task.run().await });
10544
Self { sender }
@@ -116,43 +55,11 @@ struct LazerPublisherTask {
11655
config: Config,
11756
receiver: Receiver<FeedUpdate>,
11857
pending_updates: Vec<FeedUpdate>,
58+
relayer_senders: Vec<RelayerSender>,
11959
}
12060

12161
impl LazerPublisherTask {
122-
pub async fn run(&mut self) {
123-
let mut failure_count = 0;
124-
let retry_duration = Duration::from_secs(1);
125-
126-
loop {
127-
match self.run_relayer_connection().await {
128-
Ok(()) => {
129-
tracing::info!("lazer_publisher graceful shutdown");
130-
return;
131-
}
132-
Err(e) => {
133-
failure_count += 1;
134-
tracing::error!(
135-
"lazer_publisher failed with error: {:?}, failure_count: {}; retrying in {:?}",
136-
e,
137-
failure_count,
138-
retry_duration
139-
);
140-
tokio::time::sleep(retry_duration).await;
141-
}
142-
}
143-
}
144-
}
145-
146-
#[instrument(skip(self), fields(component = "lazer_publisher"))]
147-
pub async fn run_relayer_connection(&mut self) -> Result<()> {
148-
// Establish relayer connections
149-
// Relayer will drop the connection if no data received in 5s
150-
let (mut relayer_sender, relayer_receivers) = connect_to_relayers(&self.config).await?;
151-
let mut stream_map = StreamMap::new();
152-
for (i, receiver) in relayer_receivers.into_iter().enumerate() {
153-
stream_map.insert(self.config.relayer_urls[i].clone(), receiver);
154-
}
155-
62+
fn load_signing_key(&self) -> Result<SigningKey> {
15663
// Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher
15764
let publish_keypair = match read_keypair_file(&self.config.publish_keypair_path) {
15865
Ok(k) => k,
@@ -162,12 +69,23 @@ impl LazerPublisherTask {
16269
publish_keypair_path = self.config.publish_keypair_path.display().to_string(),
16370
"Reading publish keypair returned an error. ",
16471
);
165-
bail!("Reading publish keypair returned an error. ");
72+
bail!("Reading publish keypair returned an error.");
16673
}
16774
};
16875

169-
let signing_key = SigningKey::from_keypair_bytes(&publish_keypair.to_bytes())
170-
.context("Failed to create signing key from keypair")?;
76+
SigningKey::from_keypair_bytes(&publish_keypair.to_bytes())
77+
.context("Failed to create signing key from keypair")
78+
}
79+
80+
pub async fn run(&mut self) {
81+
let signing_key = match self.load_signing_key() {
82+
Ok(signing_key) => signing_key,
83+
Err(e) => {
84+
tracing::error!("Failed to load signing key: {e:?}");
85+
// Can't proceed on key failure
86+
panic!("Failed to load signing key: {e:?}");
87+
}
88+
};
17189

17290
let mut publish_interval = interval(self.config.publish_interval_duration);
17391
loop {
@@ -176,34 +94,15 @@ impl LazerPublisherTask {
17694
self.pending_updates.push(feed_update);
17795
}
17896
_ = publish_interval.tick() => {
179-
if let Err(err) = self.publish(&signing_key, &mut relayer_sender).await {
97+
if let Err(err) = self.batch_transaction(&signing_key).await {
18098
error!("Failed to publish updates: {}", err);
18199
}
182100
}
183-
// Handle messages from the relayers, such as errors if we send a bad update
184-
mapped_msg = stream_map.next() => {
185-
match mapped_msg {
186-
Some((relayer_url, Ok(msg))) => {
187-
tracing::debug!("Received message from relayer at {relayer_url}: {msg:?}");
188-
}
189-
Some((relayer_url, Err(e))) => {
190-
tracing::error!("Error receiving message from at relayer {relayer_url}: {e:?}");
191-
}
192-
None => {
193-
tracing::error!("relayer connection closed");
194-
bail!("relayer connection closed");
195-
}
196-
}
197-
}
198101
}
199102
}
200103
}
201104

202-
async fn publish(
203-
&mut self,
204-
signing_key: &SigningKey,
205-
relayer_sender: &mut RelayerSender,
206-
) -> Result<()> {
105+
async fn batch_transaction(&mut self, signing_key: &SigningKey) -> Result<()> {
207106
if self.pending_updates.is_empty() {
208107
return Ok(());
209108
}
@@ -238,12 +137,14 @@ impl LazerPublisherTask {
238137
payload: Some(buf),
239138
special_fields: Default::default(),
240139
};
241-
if let Err(e) = relayer_sender
242-
.send_price_update(&signed_lazer_transaction)
243-
.await
244-
{
245-
tracing::error!("Error publishing update to Lazer relayer: {e:?}");
246-
bail!("Failed to publish update to Lazer relayer: {e:?}");
140+
for relayer_sender in self.relayer_senders.iter() {
141+
if let Err(e) = relayer_sender
142+
.sender
143+
.send(signed_lazer_transaction.clone())
144+
.await
145+
{
146+
error!("Error sending transaction to Lazer relayer session: {e:?}");
147+
}
247148
}
248149

249150
self.pending_updates.clear();

pyth-lazer-agent/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ mod config;
1010
mod http_server;
1111
mod lazer_publisher;
1212
mod publisher_handle;
13+
mod relayer_session;
1314
mod websocket_utils;
1415

1516
#[derive(Parser)]

0 commit comments

Comments
 (0)