Skip to content

Commit e0c3be4

Browse files
authored
Add Remote Keypair Loader, a new keypair hotloading mechanism (#51)
* Add Remote Keypair Loader, a new keypair hotloading mechanism * agent.rs: fix comment, remote loader: fix missing spawn, warp paths * remote-loader: Fix key parsing to 64-byte vecs, adjust balance check * integration-tests: add keypair hotloading test * remote_keypair_loader.rs: default address to localhost, warn if changed * stray unused dep * exporter.rs: Explain how we avoid mem bloat in remote loader channel * remote_keypair_loader: yield the state lock before tokio sleep * config.toml: Describe metrics and keypair loader endpoint options * solana.rs: doc how pub keypair read error turns on remote loading * remote-loader: fix wrong default address, fix test error reporting
1 parent 9374b3f commit e0c3be4

File tree

8 files changed

+755
-224
lines changed

8 files changed

+755
-224
lines changed

config/config.toml

Lines changed: 16 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ key_store.root_path = "/path/to/keystore"
2121

2222
### Optional fields ###
2323

24+
# Path to publisher identity keypair. When not found, the network will expect a remote-loaded keypair. see remote_keypair_loader options for details.
25+
# key_store.publish_keypair_path = "publish_key_pair.json" # I exist, remote loading disabled
26+
# key_store.publish_keypair_path = "none" # I do not exist, remote loading activated for the network
27+
2428
# The interval with which to poll account information.
2529
# oracle.poll_interval_duration = "2m"
2630

@@ -56,59 +60,21 @@ key_store.root_path = "/path/to/keystore"
5660
# a value at least as large as (number of products published / number of products in a batch).
5761
# exporter.transaction_monitor.max_transactions = "100"
5862

59-
# Configuration for the optional secondary network this agent will publish data to. In most cases this should be a Solana endpoint.
60-
# [secondary_network]
61-
### Required fields ###
62-
63-
# HTTP(S) endpoint of the RPC node. The Solana public RPC endpoints are rate-limited, so a private node should be used.
64-
# rpc_url = "http://api.devnet.solana.com"
65-
66-
# WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network.
67-
# This can be omitted when oracle.subscriber_enabled is set to false.
68-
# wss_url = "ws://api.devnet.solana.com"
69-
70-
# Timeout for the requests to the RPC
71-
# rpc_timeout = "10s"
72-
73-
# Path to the key store.
74-
# key_store.root_path = "/path/to/keystore"
75-
76-
### Optional fields ###
77-
78-
# The interval with which to poll account information.
79-
# oracle.poll_interval_duration = "2m"
80-
81-
# Whether subscribing to account updates over websocket is enabled
82-
# oracle.subscriber_enabled = true
83-
84-
# Duration of the interval at which to refresh the cached network state (current slot and blockhash).
85-
# It is recommended to set this to slightly less than the network's block time,
86-
# as the slot fetched will be used as the time of the price update.
87-
# exporter.refresh_network_state_interval_duration = "200ms"
88-
89-
# Duration of the interval at which to publish updates
90-
# exporter.publish_interval_duration = "1s"
91-
92-
# Age after which a price update is considered stale and not published
93-
# exporter.staleness_threshold = "5s"
94-
95-
# Maximum size of a batch
96-
# exporter.max_batch_size = 12
97-
98-
# Maximum number of compute units requested by each update_price transaction
99-
# exporter.compute_unit_limit = 20000
63+
# Where to serve the quick-access dashboard and metrics.
64+
# metrics_server.bind_address = "127.0.0.1:8888"
10065

101-
# Price per compute unit offered for update_price transactions
102-
# exporter.compute_unit_price_micro_lamports =
66+
# Where to serve the remote keypair loading endpoint, under "/primary/load_keypair" and "/secondary/load_keypair"
67+
#
68+
# NOTE: non-loopback addresses must be used carefully, making sure the
69+
# connection is not exposed for unauthorized access.
70+
# remote_keypair_loader.bind_address = "127.0.0.1:9001"
10371

104-
# Duration of the interval with which to poll the status of transactions.
105-
# It is recommended to set this to a value close to exporter.publish_interval_duration
106-
# exporter.transaction_monitor.poll_interval_duration = "4s"
72+
# How much whole SOL must a keypair hold to be considered valid for use on a given network. Disabled with 0
73+
# remote_keypair_loader.primary_min_keypair_balance_sol = 1
74+
# remote_keypair_loader.secondary_min_keypair_balance_sol = 1
10775

108-
# Maximum number of recent transactions to monitor. When this number is exceeded,
109-
# the oldest transactions are no longer monitored. It is recommended to set this to
110-
# a value at least as large as (number of products published / number of products in a batch).
111-
# exporter.transaction_monitor.max_transactions = "100"
76+
# Configuration for the optional secondary network this agent will publish data to. In most cases this should be a Solana endpoint. The options correspond to the ones in primary_network
77+
# [secondary_network]
11278

11379
# Configuration for the JRPC API
11480
[pythd_adapter]

integration-tests/poetry.lock

Lines changed: 243 additions & 134 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

integration-tests/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ program-admin = { git = "https://github.com/pyth-network/program-admin.git", bra
1414
pytest = "^7.2"
1515
pytest-asyncio = "^0.18.3"
1616
pre-commit = "^2.21.0"
17+
requests = "^2.28.2"
1718
jsonrpc_websocket = "^3.1.4"
1819

1920
[build-system]

integration-tests/tests/test_integration.py

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import json
33
import os
4+
import requests
45
import time
56
from typing import Any, List
67
import pytest
@@ -140,12 +141,19 @@ def spawn(self, cmd, log_dir=None):
140141

141142
with open(stdout_path, 'w') as stdout:
142143
with open(stderr_path, 'w') as stderr:
143-
with subprocess.Popen(cmd.split(), stdout=stdout, stderr=stderr, env=env) as process:
144-
LOGGER.debug(
145-
"Spawned subprocess with command %s logging to %s", cmd, log_dir)
146-
yield
147-
process.terminate()
148-
process.wait()
144+
process = subprocess.Popen(cmd.split(), stdout=stdout, stderr=stderr, env=env)
145+
LOGGER.debug(
146+
"Spawned subprocess with command %s logging to %s", cmd, log_dir)
147+
yield
148+
149+
process.poll() # fills return code if available
150+
151+
if process.returncode is not None and process.returncode != 0:
152+
LOGGER.error("Spawned process \"%s\" finished with error code %d before teardown. See logs in %s", cmd, process.returncode, log_dir)
153+
154+
process.terminate()
155+
process.wait()
156+
149157
stderr.flush()
150158
stdout.flush()
151159

@@ -288,6 +296,9 @@ def agent_publish_keypair(self, agent_keystore_path, sync_accounts):
288296

289297
LOGGER.debug("Airdropping SOL to publish keypair at %s", path)
290298
self.run(f"solana airdrop 100 -k {path} -u localhost")
299+
address = self.run(f"solana address -k {path} -u localhost")
300+
balance = self.run(f"solana balance -k {path} -u localhost")
301+
LOGGER.debug(f"Publisher {address.stdout.strip()} balance: {balance.stdout.strip()}")
291302
time.sleep(8)
292303

293304
@pytest.fixture
@@ -312,13 +323,39 @@ def agent(self, sync_accounts, agent_keystore, tmp_path):
312323
time.sleep(3)
313324
yield
314325

326+
@pytest.fixture
327+
def agent_hotload(self, sync_accounts, agent_keystore, agent_keystore_path, tmp_path):
328+
"""
329+
Spawns an agent without a publish keypair, used for keypair hotloading testing
330+
"""
331+
os.remove(os.path.join(agent_keystore_path, "publish_key_pair.json"))
332+
333+
LOGGER.debug("Building hotload agent binary")
334+
self.run("cargo build --release")
335+
336+
log_dir = os.path.join(tmp_path, "agent_logs")
337+
LOGGER.debug("Launching hotload agent logging to %s", log_dir)
338+
339+
os.environ["RUST_BACKTRACE"] = "full"
340+
os.environ["RUST_LOG"] = "debug"
341+
with self.spawn("../target/release/agent --config agent_conf.toml", log_dir=log_dir):
342+
time.sleep(3)
343+
yield
344+
315345
@pytest_asyncio.fixture
316346
async def client(self, agent):
317347
client = PythAgentClient(address="ws://localhost:8910")
318348
await client.connect()
319349
yield client
320350
await client.close()
321351

352+
@pytest_asyncio.fixture
353+
async def client_hotload(self, agent_hotload):
354+
client = PythAgentClient(address="ws://localhost:8910")
355+
await client.connect()
356+
yield client
357+
await client.close()
358+
322359

323360
class TestUpdatePrice(PythTest):
324361

@@ -348,3 +385,16 @@ async def test_update_price_simple(self, client: PythAgentClient):
348385
assert price_account["price"] == 42
349386
assert price_account["conf"] == 2
350387
assert price_account["status"] == "trading"
388+
389+
@pytest.mark.asyncio
390+
async def test_update_price_simple_with_keypair_hotload(self, client_hotload: PythAgentClient):
391+
# Hotload the keypair into running agent
392+
hl_request = requests.post("http://localhost:9001/primary/load_keypair", json=PUBLISHER_KEYPAIR)
393+
394+
# Verify succesful hotload
395+
assert hl_request.status_code == 200
396+
397+
LOGGER.info("Publisher keypair hotload OK")
398+
399+
# Continue normally with the existing simple scenario
400+
await self.test_update_price_simple(client_hotload)

src/agent.rs

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ Note that there is an Oracle and Exporter for each network, but only one Local S
4848

4949
pub mod metrics;
5050
pub mod pythd;
51+
pub mod remote_keypair_loader;
5152
pub mod solana;
5253
pub mod store;
5354
use {
@@ -99,12 +100,15 @@ impl Agent {
99100
mpsc::channel(self.config.channel_capacities.local_store);
100101
let (pythd_adapter_tx, pythd_adapter_rx) =
101102
mpsc::channel(self.config.channel_capacities.pythd_adapter);
103+
let (primary_keypair_loader_tx, primary_keypair_loader_rx) = mpsc::channel(10);
104+
let (secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc::channel(10);
102105

103106
// Spawn the primary network
104107
jhs.extend(network::spawn_network(
105108
self.config.primary_network.clone(),
106109
local_store_tx.clone(),
107110
primary_oracle_updates_tx,
111+
primary_keypair_loader_tx,
108112
logger.new(o!("primary" => true)),
109113
)?);
110114

@@ -114,6 +118,7 @@ impl Agent {
114118
config.clone(),
115119
local_store_tx.clone(),
116120
secondary_oracle_updates_tx,
121+
secondary_keypair_loader_tx,
117122
logger.new(o!("primary" => false)),
118123
)?);
119124
}
@@ -153,9 +158,25 @@ impl Agent {
153158
self.config.metrics_server.bind_address,
154159
local_store_tx,
155160
global_store_lookup_tx,
156-
logger,
161+
logger.clone(),
157162
)));
158163

164+
// Spawn the remote keypair loader endpoint for both networks
165+
jhs.append(
166+
&mut remote_keypair_loader::RemoteKeypairLoader::spawn(
167+
primary_keypair_loader_rx,
168+
secondary_keypair_loader_rx,
169+
self.config.primary_network.rpc_url.clone(),
170+
self.config
171+
.secondary_network
172+
.as_ref()
173+
.map(|c| c.rpc_url.clone()),
174+
self.config.remote_keypair_loader.clone(),
175+
logger,
176+
)
177+
.await,
178+
);
179+
159180
// Wait for all tasks to complete
160181
join_all(jhs).await;
161182

@@ -164,11 +185,11 @@ impl Agent {
164185
}
165186

166187
pub mod config {
167-
168188
use {
169189
super::{
170190
metrics,
171191
pythd,
192+
remote_keypair_loader,
172193
solana::network,
173194
},
174195
anyhow::{
@@ -188,12 +209,13 @@ pub mod config {
188209
#[derive(Default, Deserialize, Debug)]
189210
#[serde(default)]
190211
pub struct Config {
191-
pub channel_capacities: ChannelCapacities,
192-
pub primary_network: network::Config,
193-
pub secondary_network: Option<network::Config>,
194-
pub pythd_adapter: pythd::adapter::Config,
195-
pub pythd_api_server: pythd::api::rpc::Config,
196-
pub metrics_server: metrics::Config,
212+
pub channel_capacities: ChannelCapacities,
213+
pub primary_network: network::Config,
214+
pub secondary_network: Option<network::Config>,
215+
pub pythd_adapter: pythd::adapter::Config,
216+
pub pythd_api_server: pythd::api::rpc::Config,
217+
pub metrics_server: metrics::Config,
218+
pub remote_keypair_loader: remote_keypair_loader::Config,
197219
}
198220

199221
impl Config {

0 commit comments

Comments
 (0)