Skip to content

Commit 20a8fee

Browse files
committed
Drop subscriptions which have closed channels
1 parent 7ef1ac8 commit 20a8fee

File tree

1 file changed

+11
-0
lines changed

1 file changed

+11
-0
lines changed

src/agent/pythd/adapter.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ impl Adapter {
206206

207207
pub async fn run(&mut self) {
208208
loop {
209+
self.drop_closed_subscriptions();
209210
tokio::select! {
210211
Some(message) = self.message_rx.recv() => {
211212
if let Err(err) = self.handle_message(message).await {
@@ -510,6 +511,16 @@ impl Adapter {
510511
Ok(())
511512
}
512513

514+
fn drop_closed_subscriptions(&mut self) {
515+
for subscriptions in self.notify_price_subscriptions.values_mut() {
516+
subscriptions.retain(|subscription| !subscription.notify_price_tx.is_closed())
517+
}
518+
519+
for subscriptions in self.notify_price_sched_subscriptions.values_mut() {
520+
subscriptions.retain(|subscription| !subscription.notify_price_sched_tx.is_closed())
521+
}
522+
}
523+
513524
async fn handle_update_price(
514525
&self,
515526
account: &solana_sdk::pubkey::Pubkey,

0 commit comments

Comments
 (0)