Skip to content

Commit 2fb3775

Browse files
authored
fix: disable batch flushing by default (#139)
* fix: disable batch flushing by default * refactor: update readme and add best practices
1 parent f0fd404 commit 2fb3775

File tree

5 files changed

+54
-53
lines changed

5 files changed

+54
-53
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pyth-agent"
3-
version = "2.10.3"
3+
version = "2.10.4"
44
edition = "2021"
55

66
[[bin]]

README.md

Lines changed: 9 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -34,58 +34,22 @@ through the `RUST_LOG` environment variable using the standard
3434
`error|warn|info|debug|trace`.
3535

3636
#### Plain/JSON logging
37-
By default, pyth-agent will print plaintext log statements. This can be switched to structured JSON output with `-l json`.
38-
39-
#### Code location in logs
40-
For debugging purposes, you can specify `-L` to print file/line information with each log statement. This option is disabled by default.
41-
42-
### Key Store Config Migration [v1.x.x LEGACY]
43-
Pyth agent v2.0.0 introduces a simplified program and mapping key configuration. This breaking change alters how you define program/mapping key options in your agent config:
44-
```toml
45-
# Old v1.x.x way
46-
[primary network]
47-
key_store.root_path = "/path/to/keystore"
48-
key_store.publish_keypair_path = "publish_key_pair.json" # Relative path from root_path, "publish_key_pair.json" by default
49-
key_store.program_key_path = "program_key.json" # Relative path from root_path, "program_key.json" by default
50-
key_store.mapping_key_path = "mapping_key.json" # Relative path from root_path, "mapping_key.json" by default
51-
52-
# [...]
53-
54-
# New v2.0.0 way
55-
[primary_network]
56-
key_store.publish_keypair_path = "/path/to/keypair.json" # The root_path is gone, we specify the full path
57-
# Not using separate files anymore
58-
key_store.program_key = "LiteralProgramPubkeyInsideTheConfig" # contents of legacy program_key.json;
59-
key_store.mapping_key = "LiteralMappingPubkeyInsideTheConfig" # contents of legacy mapping_key.json
60-
61-
# [...]
62-
63-
```
64-
65-
#### Automatic Migration
66-
If you are upgrading to agent v2.0.0 with an existing config, you can use the provided automatic migrator program:
67-
```shell
68-
# Build
69-
$ cargo build --release
70-
# Run the migrator, making sure that the key store with previous keys is reachable
71-
$ target/release/agent-migrate-config -c <existing_config_file>.toml > my_new_config.toml
72-
```
73-
74-
#### `Could not open {mapping|program|...} key file`
75-
This error can appear if some of your program/mapping/publish key
76-
files are not reachable under their `key_store.*` setting values.
77-
78-
Ensure that your current working directory is correct for reaching the
79-
key store path inside your config. You may also migrate manually by
80-
changing `key_store.*_key_path` and `key_store.publish_keypair_path`
81-
options by hand, as described in the config example above.
37+
Pyth agent will print logs in plaintext in terminal and JSON format in non-terminal environments (e.g. when writing to a file).
8238

8339
## Run
8440
`cargo run --release -- --config <your_config.toml>` will build and run the agent in a single step.
8541

8642
## Publishing API
8743
A running agent will expose a WebSocket serving the JRPC publishing API documented [here](https://docs.pyth.network/documentation/publish-data/pyth-client-websocket-api). See `config/config.toml` for related settings.
8844

45+
## Best practices
46+
If your publisher is publishing updates to more than 50 price feeds, it is recommended that you do the following to reduce the connection overhead to the agent:
47+
- Batch your messages together and send them as a single request to the agent (as an array of messages). The agent will respond to the batch messages
48+
with a single response containing an array of individual responses (in the same order). If batching is not possible, you can disable the `instant_flush` option
49+
in the configuration file to let agent send the responses every `flush_interval` seconds.
50+
- Do not use subscribe to the price schedule. Instead, define a schedule on the client side and send the messages based on your own schedule. Ideally
51+
you should send price updates as soon as you have them to increase the latency of the data on the Pyth Network.
52+
8953
# Development
9054
## Unit Testing
9155
A collection of Rust unit tests is provided, ran with `cargo test`.

config/config.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,15 @@ listen_address = "127.0.0.1:8910"
1414
# received from the Price state.
1515
# notify_price_sched_tx_buffer = 10000
1616

17+
# Whether flush messages and responses to the client immediately. Once disabled the
18+
# messages will be flushed every `flush_interval_duration`. Disabling it is useful if
19+
# there are many messages to be sent between the client and the server to avoid overloading
20+
# the connection.
21+
# instant_flush = true
22+
1723
# Flush interval for responses and notifications. This is the maximum time the
18-
# server will wait before flushing the messages to the client.
24+
# server will wait before flushing the messages to the client. It will have no
25+
# effect if `instant_flush` is set to true.
1926
# flush_interval_duration = "50ms"
2027

2128
# Configuration for the primary network this agent will publish data to. In most cases this should be a Pythnet endpoint.

src/agent/pyth/rpc.rs

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use {
1818
anyhow,
1919
Result,
2020
},
21+
futures::future::OptionFuture,
2122
futures_util::{
2223
stream::{
2324
SplitSink,
@@ -50,7 +51,10 @@ use {
5051
sync::Arc,
5152
time::Duration,
5253
},
53-
tokio::sync::mpsc,
54+
tokio::{
55+
sync::mpsc,
56+
time::Interval,
57+
},
5458
tracing::instrument,
5559
warp::{
5660
ws::{
@@ -111,11 +115,18 @@ enum ConnectionError {
111115
WebsocketConnectionClosed,
112116
}
113117

118+
#[derive(Debug)]
119+
enum FlushStrategy {
120+
Instant,
121+
Interval(Interval),
122+
}
123+
114124
async fn handle_connection<S>(
115125
ws_conn: WebSocket,
116126
state: Arc<S>,
117127
notify_price_tx_buffer: usize,
118128
notify_price_sched_tx_buffer: usize,
129+
instant_flush: bool,
119130
flush_interval_duration: Duration,
120131
) where
121132
S: state::Prices,
@@ -129,7 +140,10 @@ async fn handle_connection<S>(
129140
let (mut notify_price_sched_tx, mut notify_price_sched_rx) =
130141
mpsc::channel(notify_price_sched_tx_buffer);
131142

132-
let mut flush_interval = tokio::time::interval(flush_interval_duration);
143+
let mut flush_strategy = match instant_flush {
144+
true => FlushStrategy::Instant,
145+
false => FlushStrategy::Interval(tokio::time::interval(flush_interval_duration)),
146+
};
133147

134148
loop {
135149
if let Err(err) = handle_next(
@@ -140,7 +154,7 @@ async fn handle_connection<S>(
140154
&mut notify_price_rx,
141155
&mut notify_price_sched_tx,
142156
&mut notify_price_sched_rx,
143-
&mut flush_interval,
157+
&mut flush_strategy,
144158
)
145159
.await
146160
{
@@ -156,6 +170,7 @@ async fn handle_connection<S>(
156170
}
157171
}
158172

173+
#[allow(clippy::too_many_arguments)]
159174
async fn handle_next<S>(
160175
state: &S,
161176
ws_tx: &mut SplitSink<WebSocket, Message>,
@@ -164,11 +179,17 @@ async fn handle_next<S>(
164179
notify_price_rx: &mut mpsc::Receiver<NotifyPrice>,
165180
notify_price_sched_tx: &mut mpsc::Sender<NotifyPriceSched>,
166181
notify_price_sched_rx: &mut mpsc::Receiver<NotifyPriceSched>,
167-
flush_interval: &mut tokio::time::Interval,
182+
flush_strategy: &mut FlushStrategy,
168183
) -> Result<()>
169184
where
170185
S: state::Prices,
171186
{
187+
let optional_flush_tick: OptionFuture<_> = match flush_strategy {
188+
FlushStrategy::Instant => None,
189+
FlushStrategy::Interval(interval) => Some(interval.tick()),
190+
}
191+
.into();
192+
172193
tokio::select! {
173194
msg = ws_rx.next() => {
174195
match msg {
@@ -196,9 +217,14 @@ where
196217
feed_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched))
197218
.await
198219
}
199-
_ = flush_interval.tick() => {
220+
Some(_) = optional_flush_tick => {
200221
flush(ws_tx).await
201222
}
223+
}?;
224+
225+
match flush_strategy {
226+
FlushStrategy::Interval(_) => Ok(()),
227+
FlushStrategy::Instant => flush(ws_tx).await,
202228
}
203229
}
204230

@@ -413,6 +439,8 @@ pub struct Config {
413439
/// Size of the buffer of each Server's channel on which `notify_price_sched` events are
414440
/// received from the Price state.
415441
pub notify_price_sched_tx_buffer: usize,
442+
/// Whether to flush immediately after sending a message or notification.
443+
pub instant_flush: bool,
416444
/// Flush interval duration for the notifications.
417445
#[serde(with = "humantime_serde")]
418446
pub flush_interval_duration: Duration,
@@ -424,6 +452,7 @@ impl Default for Config {
424452
listen_address: "127.0.0.1:8910".to_string(),
425453
notify_price_tx_buffer: 10000,
426454
notify_price_sched_tx_buffer: 10000,
455+
instant_flush: true,
427456
flush_interval_duration: Duration::from_millis(50),
428457
}
429458
}
@@ -465,6 +494,7 @@ where
465494
state,
466495
config.notify_price_tx_buffer,
467496
config.notify_price_sched_tx_buffer,
497+
config.instant_flush,
468498
config.flush_interval_duration,
469499
)
470500
.await

0 commit comments

Comments
 (0)