Skip to content

Commit d698287

Browse files
authored
Merge pull request #19 from input-output-hk/sg/monitor
Monitor
2 parents ecab9ea + b8c8f52 commit d698287

File tree

15 files changed

+376
-46
lines changed

15 files changed

+376
-46
lines changed

modules/clock/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22

33
[package]
44
name = "caryatid_module_clock"
5-
version = "0.12.0"
5+
version = "0.13.0"
66
edition = "2021"
77
authors = ["Paul Clark <[email protected]>"]
88
description = "Clock module for Caryatid"
99
license = "Apache-2.0"
1010

1111
[dependencies]
12-
caryatid_sdk = { version="0.12", path = "../../sdk" }
12+
caryatid_sdk = { version="0.13", path = "../../sdk" }
1313
anyhow = "1.0"
1414
tokio = { version = "1", features = ["full"] }
1515
serde_json = "1.0"

modules/clock/src/clock.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//! Generates regular clock.tick events
33
44
use anyhow::Result;
5-
use caryatid_sdk::{module, Context, MessageBounds, Module};
5+
use caryatid_sdk::{module, Context, MessageBounds};
66
use chrono::{DateTime, Utc};
77
use config::Config;
88
use std::sync::Arc;
@@ -70,6 +70,7 @@ impl<M: From<ClockTickMessage> + MessageBounds> Clock<M> {
7070
mod tests {
7171
use super::*;
7272
use caryatid_sdk::mock_bus::MockBus;
73+
use caryatid_sdk::Module;
7374
use config::{Config, FileFormat};
7475
use tokio::sync::{mpsc, watch::Sender, Notify};
7576
use tokio::time::{timeout, Duration};
@@ -99,6 +100,7 @@ mod tests {
99100
struct TestSetup {
100101
module: Arc<dyn Module<Message>>,
101102
context: Arc<Context<Message>>,
103+
startup_watch: Sender<bool>,
102104
}
103105

104106
impl TestSetup {
@@ -120,12 +122,9 @@ mod tests {
120122
// Create mock bus
121123
let bus = Arc::new(MockBus::<Message>::new(&config));
122124

125+
let startup_watch = Sender::new(false);
123126
// Create a context
124-
let context = Arc::new(Context::new(
125-
config.clone(),
126-
bus.clone(),
127-
Sender::<bool>::new(false),
128-
));
127+
let context = Arc::new(Context::new(config.clone(), bus, startup_watch.subscribe()));
129128

130129
// Create the clock
131130
let clock = Clock::<Message> {
@@ -136,11 +135,12 @@ mod tests {
136135
Self {
137136
module: Arc::new(clock),
138137
context,
138+
startup_watch,
139139
}
140140
}
141141

142142
fn start(&self) {
143-
let _ = self.context.startup_watch.send(true);
143+
let _ = self.startup_watch.send(true);
144144
}
145145
}
146146

modules/playback/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22

33
[package]
44
name = "caryatid_module_playback"
5-
version = "0.9.0"
5+
version = "0.10.0"
66
edition = "2021"
77
authors = ["Alex Woods <[email protected]>"]
88
description = "Message playback module for Caryatid"
99
license = "Apache-2.0"
1010

1111
[dependencies]
12-
caryatid_sdk = { version="0.12", path = "../../sdk" }
12+
caryatid_sdk = { version="0.13", path = "../../sdk" }
1313
anyhow = "1.0"
1414
serde_json = "1.0"
1515
config = "0.15.11"

modules/record/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22

33
[package]
44
name = "caryatid_module_record"
5-
version = "0.9.0"
5+
version = "0.10.0"
66
edition = "2021"
77
authors = ["Alex Woods <[email protected]>"]
88
description = "Message recording module for Caryatid"
99
license = "Apache-2.0"
1010

1111
[dependencies]
12-
caryatid_sdk = { version="0.12", path = "../../sdk" }
12+
caryatid_sdk = { version="0.13", path = "../../sdk" }
1313
anyhow = "1.0"
1414
serde_json = "1.0"
1515
config = "0.15.11"

modules/rest_server/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22

33
[package]
44
name = "caryatid_module_rest_server"
5-
version = "0.14.0"
5+
version = "0.15.0"
66
edition = "2021"
77
authors = ["Paul Clark <[email protected]>"]
88
description = "REST server module for Caryatid"
99
license = "Apache-2.0"
1010

1111
[dependencies]
12-
caryatid_sdk = { version="0.12", path = "../../sdk" }
12+
caryatid_sdk = { version="0.13", path = "../../sdk" }
1313
anyhow = "1.0"
1414
serde_json = "1.0"
1515
config = "0.15.11"

modules/rest_server/src/rest_server.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//! Provides a REST endpoint which integrates with the message bus
33
44
use anyhow::Result;
5-
use caryatid_sdk::{module, Context, MessageBounds, Module};
5+
use caryatid_sdk::{module, Context, MessageBounds};
66
use config::Config;
77
use std::{collections::HashMap, sync::Arc};
88
use tracing::{error, info};
@@ -176,6 +176,7 @@ impl<M: From<RESTRequest> + GetRESTResponse + MessageBounds> RESTServer<M> {
176176
mod tests {
177177
use super::*;
178178
use caryatid_sdk::mock_bus::MockBus;
179+
use caryatid_sdk::Module;
179180
use config::{Config, FileFormat};
180181
use futures::future;
181182
use hyper::Client;
@@ -227,6 +228,7 @@ mod tests {
227228
struct TestSetup {
228229
module: Arc<dyn Module<Message>>,
229230
context: Arc<Context<Message>>,
231+
startup_watch: Sender<bool>,
230232
}
231233

232234
impl TestSetup {
@@ -249,10 +251,11 @@ mod tests {
249251
let mock_bus = Arc::new(MockBus::<Message>::new(&config));
250252

251253
// Create a context
254+
let startup_watch = Sender::new(false);
252255
let context = Arc::new(Context::new(
253256
config.clone(),
254-
mock_bus.clone(),
255-
Sender::<bool>::new(false),
257+
mock_bus,
258+
startup_watch.subscribe(),
256259
));
257260

258261
// Create the server
@@ -264,11 +267,12 @@ mod tests {
264267
Self {
265268
module: Arc::new(rest_server),
266269
context,
270+
startup_watch,
267271
}
268272
}
269273

270274
fn start(&self) {
271-
let _ = self.context.startup_watch.send(true);
275+
let _ = self.startup_watch.send(true);
272276
}
273277
}
274278

modules/spy/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22

33
[package]
44
name = "caryatid_module_spy"
5-
version = "0.12.0"
5+
version = "0.13.0"
66
edition = "2021"
77
authors = ["Paul Clark <[email protected]>"]
88
description = "Spy module for Caryatid"
99
license = "Apache-2.0"
1010

1111
[dependencies]
12-
caryatid_sdk = { version="0.12", path = "../../sdk" }
12+
caryatid_sdk = { version="0.13", path = "../../sdk" }
1313
anyhow = "1.0"
1414
tokio = { version = "1", features = ["full"] }
1515
serde_json = "1.0"

monitor.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Module monitor
2+
3+
Caryatid supports a "monitor" to see the activity of the different modules and topics in a process. It's implemented as an overlay to the message bus, which tracks reads/writes to each topic in an in-memory `DashMap`.
4+
5+
This monitor is useful for debugging slowness/stoppages, or for understanding the interactions between modules. It's not intended to serve as a form of instrumentation.
6+
7+
Below is how to enable it:
8+
```toml
9+
[monitor]
10+
output = "monitor.json" # Write to a file named monitor.json
11+
frequency_secs = 5.0 # every 5 seconds
12+
```
13+
14+
Every module shows a list of "reads" (streams it reads from) and "writes" (streams it writes to).
15+
16+
We track this information for reads:
17+
- `read`: how many messages this module has read from this topic.
18+
- `unread`: how many messages are available for this module on this topic. This is based on how many messages were published by another module, it will be an underestimate if messages are sent over rabbitmq.
19+
- `pending_for`: how long has this module been waiting to read a message on this topic.
20+
21+
We track this information for writes:
22+
- `written`: how many messages this module has written to this topic.
23+
- `pending_for`: how long has this module been waiting to write a message to this topic. If this is set, the topic is congested; some module is subscribed to it but not reading from it.

process/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
# Caraytid process - loads and runs modules
22
[package]
33
name = "caryatid_process"
4-
version = "0.12.2"
4+
version = "0.13.0"
55
edition = "2021"
66
authors = ["Paul Clark <[email protected]>"]
77
description = "Library for building a Caryatid process"
88
license = "Apache-2.0"
99

1010
[dependencies]
11-
caryatid_sdk = { version="0.12", path = "../sdk" }
11+
caryatid_sdk = { version="0.13", path = "../sdk" }
1212
futures = "0.3"
1313
anyhow = "1.0"
1414
tokio = { version = "1", features = ["full"] }
1515
config = "0.15.11"
1616
minicbor-serde = { version = "0.6", features = ["alloc"] }
17+
dashmap = "6"
1718
tracing = "0.1.40"
1819
serde = "1.0.210"
1920
serde_json = "1.0"

0 commit comments

Comments
 (0)