Skip to content

Commit dfebb4d

Browse files
authored
refactor: migrate adapter service to api (#117)
1 parent 0d6b36a commit dfebb4d

File tree

13 files changed

+743
-1600
lines changed

13 files changed

+743
-1600
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ async-trait = "0.1.79"
1818
warp = { version = "0.3.6", features = ["websocket"] }
1919
tokio = { version = "1.37.0", features = ["full"] }
2020
tokio-stream = "0.1.15"
21+
futures = { version = "0.3.30" }
2122
futures-util = { version = "0.3.30", default-features = false, features = [
2223
"sink",
2324
] }

src/agent.rs

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,16 @@ pub mod store;
7373
use {
7474
self::{
7575
config::Config,
76-
pythd::api::rpc,
76+
pythd::{
77+
adapter::notifier,
78+
api::rpc,
79+
},
7780
solana::network,
7881
},
7982
anyhow::Result,
8083
futures_util::future::join_all,
8184
slog::Logger,
85+
std::sync::Arc,
8286
tokio::sync::{
8387
broadcast,
8488
mpsc,
@@ -113,8 +117,7 @@ impl Agent {
113117

114118
// Create the channels
115119
// TODO: make all components listen to shutdown signal
116-
let (shutdown_tx, shutdown_rx) =
117-
broadcast::channel(self.config.channel_capacities.shutdown);
120+
let (shutdown_tx, _) = broadcast::channel(self.config.channel_capacities.shutdown);
118121
let (primary_oracle_updates_tx, primary_oracle_updates_rx) =
119122
mpsc::channel(self.config.channel_capacities.primary_oracle_updates);
120123
let (secondary_oracle_updates_tx, secondary_oracle_updates_rx) =
@@ -123,8 +126,6 @@ impl Agent {
123126
mpsc::channel(self.config.channel_capacities.global_store_lookup);
124127
let (local_store_tx, local_store_rx) =
125128
mpsc::channel(self.config.channel_capacities.local_store);
126-
let (pythd_adapter_tx, pythd_adapter_rx) =
127-
mpsc::channel(self.config.channel_capacities.pythd_adapter);
128129
let (primary_keypair_loader_tx, primary_keypair_loader_rx) = mpsc::channel(10);
129130
let (secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc::channel(10);
130131

@@ -152,34 +153,38 @@ impl Agent {
152153
)?);
153154
}
154155

156+
// Create the Pythd Adapter.
157+
let adapter = Arc::new(pythd::adapter::Adapter::new(
158+
self.config.pythd_adapter.clone(),
159+
global_store_lookup_tx.clone(),
160+
local_store_tx.clone(),
161+
logger.clone(),
162+
));
163+
164+
// Create the Notifier task for the Pythd RPC.
165+
jhs.push(tokio::spawn(notifier(
166+
adapter.clone(),
167+
shutdown_tx.subscribe(),
168+
)));
169+
155170
// Spawn the Global Store
156171
jhs.push(store::global::spawn_store(
157172
global_store_lookup_rx,
158173
primary_oracle_updates_rx,
159174
secondary_oracle_updates_rx,
160-
pythd_adapter_tx.clone(),
175+
adapter.clone(),
161176
logger.clone(),
162177
));
163178

164179
// Spawn the Local Store
165180
jhs.push(store::local::spawn_store(local_store_rx, logger.clone()));
166181

167-
// Spawn the Pythd Adapter
168-
jhs.push(pythd::adapter::spawn_adapter(
169-
self.config.pythd_adapter.clone(),
170-
pythd_adapter_rx,
171-
global_store_lookup_tx.clone(),
172-
local_store_tx.clone(),
173-
shutdown_tx.subscribe(),
174-
logger.clone(),
175-
));
176-
177182
// Spawn the Pythd API Server
178183
jhs.push(tokio::spawn(rpc::run(
179184
self.config.pythd_api_server.clone(),
180185
logger.clone(),
181-
pythd_adapter_tx,
182-
shutdown_rx,
186+
adapter,
187+
shutdown_tx.subscribe(),
183188
)));
184189

185190
// Spawn the metrics server

0 commit comments

Comments
 (0)