Skip to content

Commit 87e4d86

Browse files
authored
channel config: add slog buffer capacity, doc capacities in config (#56)
1 parent b3f5b64 commit 87e4d86

File tree

3 files changed

+78
-26
lines changed

3 files changed

+78
-26
lines changed

config/config.toml

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,45 @@ key_store.root_path = "/path/to/keystore"
2121

2222
### Optional fields ###
2323

24-
# Path to publisher identity keypair. When not found, the network will expect a remote-loaded keypair. see remote_keypair_loader options for details.
24+
# Channel capacities. These refer to async messaging channels
25+
# internally used by the agent's subroutines
26+
27+
# Capacity of the channel used to broadcast shutdown events to all
28+
# components
29+
# channel_capacities.shutdown = 10000
30+
31+
# Capacity of the channel used to send updates from the primary Oracle
32+
# to the Global Store
33+
# channel_capacities.primary_oracle_updates = 10000
34+
35+
# Capacity of the channel used to send updates from the secondary
36+
# Oracle to the Global Store
37+
# channel_capacities.secondary_oracle_updates = 10000
38+
39+
# Capacity of the channel the Pythd API Adapter uses to send lookup
40+
# requests to the Global Store
41+
# channel_capacities.global_store_lookup = 10000
42+
43+
# Capacity of the channel the Pythd API Adapter uses to communicate
44+
# with the Local Store
45+
# channel_capacities.local_store_lookup = 10000
46+
47+
# Capacity of the channel on which the Local Store receives messages
48+
# channel_capacities.local_store = 10000
49+
50+
# Capacity of the channel on which the Pythd API Adapter receives
51+
# messages
52+
# channel_capacities.pythd_adapter = 10000
53+
54+
# Capacity of the slog logging channel. Adjust this value if you see
55+
# complaints about channel capacity from slog
56+
# channel_capacities.logger_buffer = 10000
57+
58+
59+
# Path to publisher identity keypair. When the specified path is not
60+
# found on startup, the relevant primary/secondary network will expect
61+
# a remote-loaded keypair. See remote_keypair_loader options for
62+
# details.
2563
# key_store.publish_keypair_path = "publish_key_pair.json" # I exist, remote loading disabled
2664
# key_store.publish_keypair_path = "none" # I do not exist, remote loading activated for the network
2765

src/agent.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
+--------------------------------+ +--------------------------------+
44
| RPC Node, e.g. Pythnet | | RPC Node, e.g. Solana Mainnet |
55
+--------------------------------+ +--------------------------------+
6-
| ^ ^ |
6+
| ^ ^ |
77
+---|---------------------|------+ +---|---------------------|------+
88
| | Primary Network | | | Secondary Network | |
99
| v | | | | v |
@@ -39,14 +39,14 @@ Publisher data write path:
3939
- The Adapter then transforms this into the Pyth SDK data structures and sends it to the Local Store.
4040
- The Local Store holds the latest price data the user has submitted for each price feed.
4141
- The Exporters periodically query the Local Store for the latest user-submitted data,
42-
and send it to the RPC node.
42+
and send it to the RPC node.
4343
4444
Publisher data read path:
4545
- The Oracles continually fetch data from the RPC node, and pass this to the Global Store.
4646
- The Global Store holds a unified view of the latest observed data from both networks, in the Pyth SDK data structures.
4747
- When a user queries for this data using the Pythd JRPC Websocket API, the Adapter fetches
48-
the latest data from the Global Store. It transforms this from the Pyth SDK data structures into the
49-
Pythd JRPC Websocket API data structures.
48+
the latest data from the Global Store. It transforms this from the Pyth SDK data structures into the
49+
Pythd JRPC Websocket API data structures.
5050
- The Pythd JRPC Websocket API then sends this data to the user.
5151
5252
Remote Keypair Loading:
@@ -270,6 +270,8 @@ pub mod config {
270270
pub local_store: usize,
271271
/// Capacity of the channel on which the Pythd API Adapter receives messages
272272
pub pythd_adapter: usize,
273+
/// Capacity of the slog logging channel. Adjust this value if you see complaints about channel capacity from slog
274+
pub logger_buffer: usize,
273275
}
274276

275277
impl Default for ChannelCapacities {
@@ -282,6 +284,7 @@ pub mod config {
282284
local_store_lookup: 10000,
283285
local_store: 10000,
284286
pythd_adapter: 10000,
287+
logger_buffer: 10000,
285288
}
286289
}
287290
}

src/bin/agent.rs

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use {
2-
anyhow::Result,
2+
anyhow::{
3+
Context,
4+
Result,
5+
},
36
clap::Parser,
47
pyth_agent::agent::{
58
config::Config,
@@ -11,6 +14,7 @@ use {
1114
Drain,
1215
Logger,
1316
},
17+
slog_async::Async,
1418
slog_envlogger::LogBuilder,
1519
std::{
1620
env,
@@ -28,29 +32,36 @@ struct Arguments {
2832
}
2933

3034
#[tokio::main]
31-
async fn main() {
32-
let logger = slog::Logger::root(
33-
slog_async::Async::default(
34-
LogBuilder::new(
35-
slog_term::FullFormat::new(slog_term::TermDecorator::new().stdout().build())
36-
.build()
37-
.fuse(),
38-
)
39-
.parse(&env::var("RUST_LOG").unwrap_or("info".to_string()))
40-
.build(),
41-
)
42-
.fuse(),
43-
o!(),
44-
);
45-
46-
if let Err(err) = start(logger.clone()).await {
35+
async fn main() -> Result<()> {
36+
// Parse config early for logging channel capacity
37+
let config = Config::new(Arguments::parse().config).context("Could not parse config")?;
38+
39+
// A plain slog drain that sits inside an async drain instance
40+
let inner_drain = LogBuilder::new(
41+
slog_term::FullFormat::new(slog_term::TermDecorator::new().stdout().build())
42+
.build()
43+
.fuse(), // Yell loud on logger internal errors
44+
)
45+
.parse(&env::var("RUST_LOG").unwrap_or("info".to_string()))
46+
.build();
47+
48+
// The top level async drain
49+
let async_drain = Async::new(inner_drain)
50+
.chan_size(config.channel_capacities.logger_buffer)
51+
.build()
52+
.fuse();
53+
54+
let logger = slog::Logger::root(async_drain, o!());
55+
56+
if let Err(err) = start(config, logger.clone()).await {
4757
error!(logger, "{:#}", err; "error" => format!("{:?}", err));
58+
return Err(err);
4859
}
60+
61+
Ok(())
4962
}
5063

51-
async fn start(logger: Logger) -> Result<()> {
52-
Agent::new(Config::new(Arguments::parse().config)?)
53-
.start(logger)
54-
.await;
64+
async fn start(config: Config, logger: Logger) -> Result<()> {
65+
Agent::new(config).start(logger).await;
5566
Ok(())
5667
}

0 commit comments

Comments
 (0)