We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent b19090e commit b8c8f52Copy full SHA for b8c8f52
process/src/monitor.rs
@@ -172,9 +172,11 @@ impl<M: MessageBounds> MessageBus<M> for MonitorBus<M> {
172
.pending_since = Some(Instant::now());
173
let res = self.inner.publish(topic, message).await;
174
let mut writes = self.state.writes.entry(topic.to_string()).or_default();
175
- writes.written += 1;
176
writes.pending_since = None;
177
- *self.stream_writes.entry(topic.to_string()).or_default() += 1;
+ if res.is_ok() {
+ writes.written += 1;
178
+ *self.stream_writes.entry(topic.to_string()).or_default() += 1;
179
+ }
180
res
181
}
182
0 commit comments