Skip to content

Commit f2f4562

Browse files
committed
batch handler flush fixes
1 parent 68d818c commit f2f4562

1 file changed

Lines changed: 61 additions & 26 deletions

File tree

src/batch_writer.rs

Lines changed: 61 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ enum WriterCommand {
3636
event: Event,
3737
},
3838
Flush {
39-
response: tokio::sync::oneshot::Sender<()>,
39+
response: tokio::sync::oneshot::Sender<Result<()>>,
4040
},
4141
Shutdown,
4242
}
@@ -121,9 +121,10 @@ impl BatchWriter {
121121
.send(WriterCommand::Flush { response: tx })
122122
.await
123123
.map_err(|e| anyhow::anyhow!("Failed to send flush command: {}", e))?;
124-
rx.await
125-
.map_err(|e| anyhow::anyhow!("Flush response error: {}", e))?;
126-
Ok(())
124+
let result = rx
125+
.await
126+
.map_err(|e| anyhow::anyhow!("Flush response channel closed: {}", e))?;
127+
result.map_err(|e| anyhow::anyhow!("Flush failed: {}", e))
127128
}
128129
}
129130

@@ -163,7 +164,11 @@ async fn batch_writer_loop(
163164
info!("Received NodeConnected command for {}", node_id);
164165
node_updates.push((node_id, Some(info)));
165166
// Force flush node updates immediately to avoid race conditions
166-
flush_batch(&store, &mut event_batch, &mut node_updates).await;
167+
if let Err(e) =
168+
flush_batch(&store, &mut event_batch, &mut node_updates).await
169+
{
170+
error!("Flush batch error: {}", e);
171+
}
167172
}
168173
WriterCommand::Event {
169174
node_id,
@@ -172,19 +177,30 @@ async fn batch_writer_loop(
172177
} => {
173178
event_batch.push((node_id, event_id, event));
174179
if event_batch.len() >= MAX_BATCH_SIZE {
175-
flush_batch(&store, &mut event_batch, &mut node_updates).await;
180+
if let Err(e) =
181+
flush_batch(&store, &mut event_batch, &mut node_updates).await
182+
{
183+
error!("Flush batch error: {}", e);
184+
}
176185
}
177186
}
178187
WriterCommand::NodeDisconnected { node_id } => {
179188
node_updates.push((node_id, None));
180189
}
181190
WriterCommand::Flush { response } => {
182-
flush_batch(&store, &mut event_batch, &mut node_updates).await;
183-
let _ = response.send(());
191+
let result = flush_batch(&store, &mut event_batch, &mut node_updates).await;
192+
if let Err(ref e) = result {
193+
error!("Flush batch error: {}", e);
194+
}
195+
let _ = response.send(result);
184196
}
185197
WriterCommand::Shutdown => {
186198
info!("Batch writer shutting down");
187-
flush_batch(&store, &mut event_batch, &mut node_updates).await;
199+
if let Err(e) =
200+
flush_batch(&store, &mut event_batch, &mut node_updates).await
201+
{
202+
error!("Flush batch error: {}", e);
203+
}
188204
return Ok(());
189205
}
190206
}
@@ -209,7 +225,9 @@ async fn batch_writer_loop(
209225
_ = interval.tick() => {
210226
// Timeout reached, flush any pending events
211227
if !event_batch.is_empty() || !node_updates.is_empty() {
212-
flush_batch(&store, &mut event_batch, &mut node_updates).await;
228+
if let Err(e) = flush_batch(&store, &mut event_batch, &mut node_updates).await {
229+
error!("Periodic flush error: {}", e);
230+
}
213231
}
214232
}
215233
Some(command) = receiver.recv() => {
@@ -219,26 +237,35 @@ async fn batch_writer_loop(
219237

220238
// Flush if batch is full
221239
if event_batch.len() >= MAX_BATCH_SIZE {
222-
flush_batch(&store, &mut event_batch, &mut node_updates).await;
240+
if let Err(e) = flush_batch(&store, &mut event_batch, &mut node_updates).await {
241+
error!("Flush batch error: {}", e);
242+
}
223243
}
224244
}
225245
WriterCommand::NodeConnected { node_id, info } => {
226246
info!("Received NodeConnected command for {}", node_id);
227247
node_updates.push((node_id, Some(info)));
228248
// Force flush node updates immediately to avoid race conditions
229-
flush_batch(&store, &mut event_batch, &mut node_updates).await;
249+
if let Err(e) = flush_batch(&store, &mut event_batch, &mut node_updates).await {
250+
error!("Flush batch error: {}", e);
251+
}
230252
}
231253
WriterCommand::NodeDisconnected { node_id } => {
232254
node_updates.push((node_id, None));
233255
}
234256
WriterCommand::Flush { response } => {
235-
flush_batch(&store, &mut event_batch, &mut node_updates).await;
236-
let _ = response.send(());
257+
let result = flush_batch(&store, &mut event_batch, &mut node_updates).await;
258+
if let Err(ref e) = result {
259+
error!("Flush batch error: {}", e);
260+
}
261+
let _ = response.send(result);
237262
}
238263
WriterCommand::Shutdown => {
239264
info!("Batch writer shutting down");
240265
// Final flush
241-
flush_batch(&store, &mut event_batch, &mut node_updates).await;
266+
if let Err(e) = flush_batch(&store, &mut event_batch, &mut node_updates).await {
267+
error!("Flush batch error: {}", e);
268+
}
242269
break;
243270
}
244271
}
@@ -253,47 +280,55 @@ async fn flush_batch(
253280
store: &EventStore,
254281
event_batch: &mut Vec<(String, u64, Event)>,
255282
node_updates: &mut Vec<(String, Option<NodeInformation>)>,
256-
) {
283+
) -> Result<()> {
257284
let event_count = event_batch.len();
258285
let node_count = node_updates.len();
259286

260287
if event_count == 0 && node_count == 0 {
261-
return;
288+
return Ok(());
262289
}
263290

264291
info!(
265292
"Flushing batch: {} events, {} node updates",
266293
event_count, node_count
267294
);
268295

269-
// Process node updates first
296+
// Process node updates first (fail fast on errors)
270297
for (node_id, info) in node_updates.drain(..) {
271298
match info {
272299
Some(info) => {
273300
info!("Storing node connection for {}", node_id);
274-
match store.store_node_connected(&node_id, &info).await {
275-
Ok(_) => info!("Successfully stored node connection for {}", node_id),
276-
Err(e) => error!("Failed to store node connection {}: {}", node_id, e),
277-
}
301+
store
302+
.store_node_connected(&node_id, &info)
303+
.await
304+
.map_err(|e| {
305+
error!("Failed to store node connection {}: {}", node_id, e);
306+
anyhow::anyhow!("Node connection storage failed: {}", e)
307+
})?;
308+
info!("Successfully stored node connection for {}", node_id);
278309
}
279310
None => {
280311
debug!("Storing node disconnection for {}", node_id);
281-
if let Err(e) = store.store_node_disconnected(&node_id).await {
312+
store.store_node_disconnected(&node_id).await.map_err(|e| {
282313
error!("Failed to store node disconnection {}: {}", node_id, e);
283-
}
314+
anyhow::anyhow!("Node disconnection storage failed: {}", e)
315+
})?;
284316
}
285317
}
286318
}
287319

288320
// Process events using batch insert for optimal performance
289321
if !event_batch.is_empty() {
290322
let batch: Vec<(String, u64, Event)> = std::mem::take(event_batch);
291-
if let Err(e) = store.store_events_batch(batch).await {
323+
store.store_events_batch(batch).await.map_err(|e| {
292324
error!("Failed to store event batch: {}", e);
293-
}
325+
anyhow::anyhow!("Event batch storage failed: {}", e)
326+
})?;
294327
}
295328

296329
// Update metrics
297330
metrics::counter!("telemetry_events_flushed").increment(event_count as u64);
298331
metrics::counter!("telemetry_node_updates_flushed").increment(node_count as u64);
332+
333+
Ok(())
299334
}

0 commit comments

Comments
 (0)