Skip to content

feat: Lazer publisher agent #2718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 8, 2025
Merged

feat: Lazer publisher agent #2718

merged 6 commits into from
Jun 8, 2025

Conversation

merolish
Copy link
Contributor

Summary

Based on the Lazer relayer, this binary receives incoming update streams from a publisher and sends signed transactions to the Lazer relayer itself.

Rationale

Managing transaction signatures on the publisher side.

How has this been tested?

  • Current tests cover my changes
  • Added new tests
  • Manually tested the code

Copy link

vercel bot commented May 22, 2025

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
api-reference ✅ Ready (Inspect) Visit Preview 💬 Add feedback Jun 8, 2025 7:55pm
component-library ✅ Ready (Inspect) Visit Preview 💬 Add feedback Jun 8, 2025 7:55pm
developer-hub ✅ Ready (Inspect) Visit Preview 💬 Add feedback Jun 8, 2025 7:55pm
entropy-debugger ✅ Ready (Inspect) Visit Preview 💬 Add feedback Jun 8, 2025 7:55pm
entropy-explorer ✅ Ready (Inspect) Visit Preview 💬 Add feedback Jun 8, 2025 7:55pm
insights ✅ Ready (Inspect) Visit Preview 💬 Add feedback Jun 8, 2025 7:55pm
proposals ✅ Ready (Inspect) Visit Preview 💬 Add feedback Jun 8, 2025 7:55pm
staking ✅ Ready (Inspect) Visit Preview 💬 Add feedback Jun 8, 2025 7:55pm

@darunrs
Copy link
Contributor

darunrs commented May 22, 2025

At a high level, is there a reason we can't put this in the Pyth Publisher SDK? Will review this in more detail later today when I get the time.

@merolish
Copy link
Contributor Author

The sdk is a library crate and this is a binary which will have some docker/devops stuff as well. No strong feelings though.

Copy link
Contributor

@darunrs darunrs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gave it a preliminary review. Broadly speaking, it seems this PR is running along the idea of getting them to run it first, then have them send the new type to a new endpoint after?

Retaining the V1 and V2 endpoints maintains the confusion behind all the types. I don't think providing them a new reasonably easy type to work with is that large a lift after getting them to run the agent, which is probably harder? But yeah curious to hear what you had in mind in terms of the onboarding process.

@@ -0,0 +1,5 @@
relayer_urls = ["ws://localhost:1235/v1/transaction", "ws://localhost:1335/v1/transaction"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is local tilt Lazer. What's your plan for preparing staging and prod URLs/Configs? Different config files for each stage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I shouldn't have checked this in here, will provide some sort of reference config and figure out how to run this with tilt (which is what I've been doing for local testing).

Comment on lines +2 to +3
authorization_token = "token1"
publish_keypair_path = "/path/to/solana/id.json"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be CLI args or something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, good call.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a principle I like us to be as similar to agent wrt to config. so let's leave it here

authorization_token = "token1"
publish_keypair_path = "/path/to/solana/id.json"
listen_address = "0.0.0.0:1234"
publish_interval_duration = "50ms"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We submit once every 0.5ms. So 500 microseconds.

}

impl RelayerSender {
async fn send_price_update(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is sending a transaction, not a price update?
We should be correct in terms of semantics. A SignedLazerTransaction contains the LazerTransaction. A Transaction can be a PublisherUpdate. The PublisherUpdate contains a batch of FeedUpdates. Each FeedUpdate targets one Feed ID and can be one of some set of types of updates such as PriceUpdate or FundingRateUpdate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the log line was just copy-pasted from other agent/publisher code, that's my mistake. I do understand and agree with your description above and indeed the intent is to send an entire SLT each call here.

Comment on lines 30 to 31
//pub const MAX_EXPIRY_TIME_US: u64 = 5_000_000; // 5s
//pub const MAX_TIMESTAMP_ERROR_US: u64 = 50_000; // 5ms
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove? Seems some leftover comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, these are from the relayer code which does timestamp validation, but I'm not sure if we should bother with that in the agent.

Comment on lines 96 to 97
_ = ping_interval.tick() => {
send_ping(&mut ws_sender).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pinging them is not necessary as Agent will not be able to collect and send this info anywhere. Maybe you can put it behind a flag to log the ping results, but for now it's not necessary.

Comment on lines 100 to 102
_ = &mut publisher_timeout => {
bail!("no updates received from publisher after {:?}", PUBLISHER_TIMEOUT);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the value in d/c'ing them from Agent? These are both being ran on their end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Holdover code from relayer.

Comment on lines 110 to 112
if let Some(Incoming::Pong(_)) = receive_type {
continue;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the ping code if its not going to do anything.

}
}

pub async fn send_ping<T: AsyncRead + AsyncWrite + Unpin>(sender: &mut Sender<T>) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same deal here. Don't need it if we don't use it.

continue;
}
}
} //_ => bail!("Publisher API request set with invalid context"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's left over from the relayer code, will remove.

Comment on lines 140 to 148
for relayer_sender in self.relayer_senders.iter() {
if let Err(e) = relayer_sender
.sender
.send(signed_lazer_transaction.clone())
.await
{
error!("Error sending transaction to Lazer relayer session: {e:?}");
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this sync? does it wait for 1 relayer send to finish before running the next one? In that case we should fix it.
Ideally concurrent, at least async.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just sending to the channel for each relayer thread to pick up, but I guess I shouldn't call await for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is sync, though the docs do mention "This method will never block the current thread.". Regardless, it'd be nice to do them concurrently.

Copy link
Contributor Author

@merolish merolish Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All right, then I'm going to use futures::future::join_all whenever possible for fan-out cases like this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that's the wrong mpsc sender anyway. We're using tokio's. That one is async. But yeah use join_all. Be careful about failure cases though. Honestly sending into an MPSC should not fail but if it does, we want to still be able to send to the rest. I guess its possible for the channel to close due to filling up and so on.

impl RelayerSessionTask {
pub async fn run(&mut self) {
let mut failure_count = 0;
let retry_duration = Duration::from_secs(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might wanna have expo backoff. Having 1 sec down time for a random disconnect is not good.
Also make it configurable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pavel used the backoff crate for configuring expo retries. Perhaps you can use that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see it used in a couple places in lazer, will do that here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

@darunrs darunrs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I think this is pretty good. Some small comments. After that, I think we'll be good to go.

Comment on lines 23 to 25
fn default_publish_interval() -> Duration {
Duration::from_millis(50)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this is not the interval we want to publish at. We want to publish at 0.5ms interval. On tick, we send any new feed updates over. The interval is overall, not per feed.

If no new data has arrived, we can skip the tick, up until perhaps 25ms where we instead send all current feed data over at once.

Publisher update rate will now have to be each unique source timestamp dated update. We'll probably need to update some metrics for this.

IMO, the above makes sense as an approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd updated my own un-checked in config and not the default here 😄 .

So we'll maintain most recent update per feed and then send an entire snapshot if we hit this expiry period?

I don't quite understand the wording of the last statement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm no I guess on each tick you check if something hasn't been updated/sent for 25ms, and you send whatever you have. Otherwise, just keep sending only the new data that has come in.

The logic on tick is:

  1. Collect all new updates
  2. Collect latest data of feeds not updated for 25ms
  3. Update all feeds whose data is being sent that they're now being sent in whatever cache we are using
  4. Send the data

Something like that. Basically, we make sure we always send data on 0.5ms ticks if new data is coming in, and always send data every 25ms for sure for each feed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if a feed hasn't been published for 25ms we retransmit its latest update? Makes sense, although I admit I don't see what this sort of heartbeating buys us at this step in the data flow.

Copy link
Contributor

@darunrs darunrs Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its hard as a server to know for sure if a client is behaving appropriately. If the client consistently sends something over, we can expect it is functioning properly. The connection being open is not sufficient to know this. We also still want to trigger aggregations and such. If the data itself (source timestamp) gets old enough, it will be expired anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@darunrs 0.5ms doesn't sound right. That means 2000 updates per second per relayer. 50ms is also too much, I think we should fine-tune this number. I'm thinking somewhere between 2-10 ms.


#[derive(Parser)]
#[command(version)]
struct Cli {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I can agree with that. Ali actually mentioned wanting to keep the interface the same as Pyth Agent and that's a good point too.

Comment on lines 140 to 148
for relayer_sender in self.relayer_senders.iter() {
if let Err(e) = relayer_sender
.sender
.send(signed_lazer_transaction.clone())
.await
{
error!("Error sending transaction to Lazer relayer session: {e:?}");
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is sync, though the docs do mention "This method will never block the current thread.". Regardless, it'd be nice to do them concurrently.


pub struct PublisherConnectionContext {
pub request_type: http_server::Request,
pub _remote_addr: SocketAddr,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the leading underscore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just did this temporarily for clippy because we don't log or metric this anywhere yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field is initialized in the struct but is never read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok. If we don't intend to use the field, and we don't have a follow up PR already planned that enables the use of that field, let's remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps at some point perhaps we log it or it goes in a metric?

Copy link
Contributor

@darunrs darunrs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! LGTM other than two smaller comments.

There's not really much of any tests written up here. I would put down a linear task to add unit tests and add them in where possible. And also manually test it.

}
futures::future::join_all(
self.relayer_senders
.iter_mut()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is iter_mut() needed? send should only need a regular reference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. Was experimenting with AI suggestions and left this in. Removing.

let mut failure_count = 0;
let retry_duration = Duration::from_secs(1);

loop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having a manual loop, use backoff's retry. Something like this:
retry(expo_backoff_config, || { match connection().await { Ok(()) => 'shutdown', Err(err) => handle_err } })

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nice.

RUN apt update && apt install -y libssl-dev && apt clean all

COPY --from=builder /pyth-lazer-agent/target/release/pyth-lazer-agent /pyth-lazer-agent/
COPY --from=builder /pyth-lazer-agent/config/* /pyth-lazer-agent/config/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed what pyth-agent does, although I'm not sure the best way to manage config here. Also we'll need to add the key file.

}

fn default_publish_interval() -> Duration {
Duration::from_micros(500)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this too aggressive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per Darun's comments above, it sounded like half a millisecond was the target frequency. I don't have background on our requirements here.

Copy link
Contributor

@darunrs darunrs Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The background on this is that the frequency needs to support 1ms feeds. Granted we don't have many currently. But the update frequency needs to be low enough to provide granularity here. The relayer side batching does 0.5ms batches itself currently.

let signing_key = SigningKey::from_keypair_bytes(&publish_keypair.to_bytes())
.context("Failed to create signing key from keypair")?;

let mut publish_interval = interval(self.config.publish_interval_duration);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interval is pinned on its own internally (and implements Unpin).

)
.await;

self.pending_updates.clear();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since you release the task a line before, here you might clear some pending updates that you haven't sent yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really? pending_updates only gets pushes in the other arm of the select!, I'd have thought that and batch_transaction is atomic with respect to that.

@merolish
Copy link
Contributor Author

merolish commented Jun 7, 2025

@darunrs I've done ad hoc integration testing using example publisher -> lazer agent -> relayer in tilt, but yes, it's time for me to get into the habit of mod tests in any new code, will try to get some in this weekend.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants