Skip to content

Commit 4985864

Browse files
authored
Send publisher permissions using a watch channel (#118)
Fixes heap growth if Exporter::update_our_prices is not called as often as updates
1 parent dfebb4d commit 4985864

File tree

3 files changed

+41
-58
lines changed

3 files changed

+41
-58
lines changed

src/agent/solana.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ pub mod network {
88

99
use {
1010
super::{
11-
super::{
12-
store,
13-
store::global,
11+
super::store::{
12+
self,
13+
global,
1414
},
1515
exporter,
1616
key_store::{
@@ -29,8 +29,8 @@ pub mod network {
2929
std::time::Duration,
3030
tokio::{
3131
sync::{
32-
mpsc,
3332
mpsc::Sender,
33+
watch,
3434
},
3535
task::JoinHandle,
3636
},
@@ -86,8 +86,7 @@ pub mod network {
8686
logger: Logger,
8787
) -> Result<Vec<JoinHandle<()>>> {
8888
// Publisher permissions updates between oracle and exporter
89-
let (publisher_permissions_tx, publisher_permissions_rx) =
90-
mpsc::channel(config.oracle.updates_channel_capacity);
89+
let (publisher_permissions_tx, publisher_permissions_rx) = watch::channel(<_>::default());
9190

9291
// Spawn the Oracle
9392
let mut jhs = oracle::spawn_oracle(

src/agent/solana/exporter.rs

Lines changed: 28 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ pub fn spawn_exporter(
172172
network: Network,
173173
rpc_url: &str,
174174
rpc_timeout: Duration,
175-
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
175+
publisher_permissions_rx: watch::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
176176
key_store: KeyStore,
177177
local_store_tx: Sender<store::local::Message>,
178178
global_store_tx: Sender<store::global::Lookup>,
@@ -260,7 +260,7 @@ pub struct Exporter {
260260
inflight_transactions_tx: Sender<Signature>,
261261

262262
/// publisher => { permissioned_price => market hours } as read by the oracle module
263-
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
263+
publisher_permissions_rx: watch::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
264264

265265
/// Currently known permissioned prices of this publisher along with their market hours
266266
our_prices: HashMap<Pubkey, MarketSchedule>,
@@ -287,7 +287,7 @@ impl Exporter {
287287
global_store_tx: Sender<store::global::Lookup>,
288288
network_state_rx: watch::Receiver<NetworkState>,
289289
inflight_transactions_tx: Sender<Signature>,
290-
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
290+
publisher_permissions_rx: watch::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
291291
keypair_request_tx: mpsc::Sender<KeypairRequest>,
292292
logger: Logger,
293293
) -> Self {
@@ -556,51 +556,34 @@ impl Exporter {
556556
/// The loop ensures that we clear the channel and use
557557
/// only the final, latest message; try_recv() is
558558
/// non-blocking.
559-
///
560-
/// Note: This behavior is similar to
561-
/// tokio::sync::watch::channel(), which was not appropriate here
562-
/// because its internal RwLock would complain about not being
563-
/// Send with the HashMap<HashSet<Pubkey>> inside.
564-
/// TODO(2023-05-05): Debug the watch::channel() compilation errors
565559
fn update_our_prices(&mut self, publish_pubkey: &Pubkey) {
566-
loop {
567-
match self.publisher_permissions_rx.try_recv() {
568-
Ok(publisher_permissions) => {
569-
self.our_prices = publisher_permissions.get(publish_pubkey) .cloned()
570-
.unwrap_or_else( || {
571-
warn!(
572-
self.logger,
573-
"Exporter: No permissioned prices were found for the publishing keypair on-chain. This is expected only on startup.";
574-
"publish_pubkey" => publish_pubkey.to_string(),
575-
);
576-
HashMap::new()
577-
});
578-
trace!(
579-
self.logger,
580-
"Exporter: read permissioned price accounts from channel";
581-
"new_value" => format!("{:?}", self.our_prices),
582-
);
583-
}
584-
// Expected failures when channel is empty
585-
Err(TryRecvError::Empty) => {
586-
trace!(
587-
self.logger,
588-
"Exporter: No more permissioned price accounts in channel, using cached value";
589-
);
590-
break;
591-
}
592-
// Unexpected failures (channel closed, internal errors etc.)
593-
Err(other) => {
594-
warn!(
595-
self.logger,
596-
"Exporter: Updating permissioned price accounts failed unexpectedly, using cached value";
597-
"cached_value" => format!("{:?}", self.our_prices),
598-
"error" => other.to_string(),
599-
);
600-
break;
601-
}
560+
match self.publisher_permissions_rx.has_changed() {
561+
Ok(true) => {}
562+
Ok(false) => return,
563+
Err(other) => {
564+
warn!(
565+
self.logger,
566+
"Exporter: Updating permissioned price accounts failed unexpectedly, using cached value";
567+
"cached_value" => format!("{:?}", self.our_prices),
568+
"error" => other.to_string(),
569+
);
570+
return;
602571
}
603572
}
573+
574+
self.our_prices = self
575+
.publisher_permissions_rx
576+
.borrow_and_update()
577+
.get(publish_pubkey)
578+
.cloned()
579+
.unwrap_or_else(|| {
580+
warn!(
581+
self.logger,
582+
"Exporter: No permissioned prices were found for the publishing keypair on-chain. This is expected only on startup.";
583+
"publish_pubkey" => publish_pubkey.to_string(),
584+
);
585+
HashMap::new()
586+
});
604587
}
605588

606589
async fn fetch_local_store_contents(&self) -> Result<HashMap<PriceIdentifier, PriceInfo>> {

src/agent/solana/oracle.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ use {
4343
time::Duration,
4444
},
4545
tokio::{
46-
sync::mpsc,
46+
sync::{
47+
mpsc,
48+
watch,
49+
},
4750
task::JoinHandle,
4851
time::Interval,
4952
},
@@ -204,7 +207,7 @@ pub fn spawn_oracle(
204207
wss_url: &str,
205208
rpc_timeout: Duration,
206209
global_store_update_tx: mpsc::Sender<global::Update>,
207-
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
210+
publisher_permissions_tx: watch::Sender<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
208211
key_store: KeyStore,
209212
logger: Logger,
210213
) -> Vec<JoinHandle<()>> {
@@ -419,7 +422,7 @@ struct Poller {
419422
data_tx: mpsc::Sender<Data>,
420423

421424
/// Updates about permissioned price accounts from oracle to exporter
422-
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
425+
publisher_permissions_tx: watch::Sender<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
423426

424427
/// The RPC client to use to poll data from the RPC node
425428
rpc_client: RpcClient,
@@ -439,7 +442,7 @@ struct Poller {
439442
impl Poller {
440443
pub fn new(
441444
data_tx: mpsc::Sender<Data>,
442-
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
445+
publisher_permissions_tx: watch::Sender<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
443446
rpc_url: &str,
444447
rpc_timeout: Duration,
445448
commitment: CommitmentLevel,
@@ -481,9 +484,7 @@ impl Poller {
481484
let fresh_data = self.poll().await?;
482485

483486
self.publisher_permissions_tx
484-
.send(fresh_data.publisher_permissions.clone())
485-
.await
486-
.context("Updating permissioned price accounts for exporter")?;
487+
.send_replace(fresh_data.publisher_permissions.clone());
487488

488489
self.data_tx
489490
.send(fresh_data)

0 commit comments

Comments
 (0)