Skip to content

Commit 66e4df8

Browse files
committed
add more debugging to batch writer
1 parent 515dd98 commit 66e4df8

2 files changed

Lines changed: 54 additions & 11 deletions

File tree

src/batch_writer.rs

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,10 @@ async fn batch_writer_loop(
142142
let mut node_updates: Vec<(String, Option<NodeInformation>)> = Vec::new();
143143

144144
info!("Batch writer started");
145+
eprintln!("DEBUG: Batch writer loop started");
145146

146-
// Small delay to allow all initial connections to queue
147-
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
147+
// Small delay to allow initialization
148+
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
148149

149150
// Try to receive initial messages immediately with timeout
150151
let mut initial_count = 0;
@@ -166,13 +167,22 @@ async fn batch_writer_loop(
166167
);
167168
match command {
168169
WriterCommand::NodeConnected { node_id, info } => {
170+
eprintln!("DEBUG: Processing NodeConnected for {}", node_id);
171+
let node_id_clone = node_id.clone();
169172
info!("Received NodeConnected command for {}", node_id);
170173
node_updates.push((node_id, Some(info)));
171174
// Force flush node updates immediately to avoid race conditions
172-
if let Err(e) =
173-
flush_batch(&store, &mut event_batch, &mut node_updates).await
174-
{
175-
error!("Flush batch error: {}", e);
175+
match flush_batch(&store, &mut event_batch, &mut node_updates).await {
176+
Ok(_) => {
177+
eprintln!("DEBUG: NodeConnected flush succeeded for {}", node_id_clone)
178+
}
179+
Err(e) => {
180+
eprintln!(
181+
"ERROR: NodeConnected flush failed for {}: {}",
182+
node_id_clone, e
183+
);
184+
error!("Flush batch error: {}", e);
185+
}
176186
}
177187
}
178188
WriterCommand::Event {
@@ -193,11 +203,21 @@ async fn batch_writer_loop(
193203
node_updates.push((node_id, None));
194204
}
195205
WriterCommand::Flush { response } => {
206+
eprintln!(
207+
"DEBUG: Processing Flush command (batch: {}, nodes: {})",
208+
event_batch.len(),
209+
node_updates.len()
210+
);
196211
let result = flush_batch(&store, &mut event_batch, &mut node_updates).await;
197-
if let Err(ref e) = result {
198-
error!("Flush batch error: {}", e);
212+
match &result {
213+
Ok(_) => eprintln!("DEBUG: Flush completed successfully"),
214+
Err(e) => {
215+
eprintln!("ERROR: Flush failed: {}", e);
216+
error!("Flush batch error: {}", e);
217+
}
199218
}
200219
let _ = response.send(result);
220+
eprintln!("DEBUG: Sent flush response");
201221
}
202222
WriterCommand::Shutdown => {
203223
info!("Batch writer shutting down");
@@ -259,11 +279,17 @@ async fn batch_writer_loop(
259279
node_updates.push((node_id, None));
260280
}
261281
WriterCommand::Flush { response } => {
282+
eprintln!("DEBUG: Processing Flush command (batch: {}, nodes: {})", event_batch.len(), node_updates.len());
262283
let result = flush_batch(&store, &mut event_batch, &mut node_updates).await;
263-
if let Err(ref e) = result {
264-
error!("Flush batch error: {}", e);
284+
match &result {
285+
Ok(_) => eprintln!("DEBUG: Flush completed successfully"),
286+
Err(e) => {
287+
eprintln!("ERROR: Flush failed: {}", e);
288+
error!("Flush batch error: {}", e);
289+
}
265290
}
266291
let _ = response.send(result);
292+
eprintln!("DEBUG: Sent flush response");
267293
}
268294
WriterCommand::Shutdown => {
269295
info!("Batch writer shutting down");
@@ -297,11 +323,16 @@ async fn flush_batch(
297323
"Flushing batch: {} events, {} node updates",
298324
event_count, node_count
299325
);
326+
eprintln!(
327+
"DEBUG: flush_batch called with {} events, {} nodes",
328+
event_count, node_count
329+
);
300330

301331
// Process node updates first (fail fast on errors)
302332
for (node_id, info) in node_updates.drain(..) {
303333
match info {
304334
Some(info) => {
335+
eprintln!("DEBUG: Storing node connection for {}", node_id);
305336
info!("Storing node connection for {}", node_id);
306337
store
307338
.store_node_connected(&node_id, &info)
@@ -310,6 +341,7 @@ async fn flush_batch(
310341
error!("Failed to store node connection {}: {}", node_id, e);
311342
anyhow::anyhow!("Node connection storage failed: {}", e)
312343
})?;
344+
eprintln!("DEBUG: Successfully stored node connection for {}", node_id);
313345
info!("Successfully stored node connection for {}", node_id);
314346
}
315347
None => {
@@ -335,5 +367,9 @@ async fn flush_batch(
335367
metrics::counter!("telemetry_events_flushed").increment(event_count as u64);
336368
metrics::counter!("telemetry_node_updates_flushed").increment(node_count as u64);
337369

370+
eprintln!(
371+
"DEBUG: flush_batch completed - wrote {} events, {} nodes",
372+
event_count, node_count
373+
);
338374
Ok(())
339375
}

tests/api_tests.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,14 @@ async fn flush_and_wait(telemetry_server: &Arc<TelemetryServer>) {
9797
// Flush batch writer and wait for completion
9898
// This ensures all queued writes have been processed and written to PostgreSQL
9999
// before we query the database in tests
100-
telemetry_server.flush_writes().await.expect("Flush failed");
100+
eprintln!("DEBUG: Calling flush_writes()");
101+
match telemetry_server.flush_writes().await {
102+
Ok(_) => eprintln!("DEBUG: Flush completed successfully"),
103+
Err(e) => {
104+
eprintln!("ERROR: Flush failed: {}", e);
105+
panic!("Flush failed: {}", e);
106+
}
107+
}
101108
// Small delay to ensure PostgreSQL commit completes
102109
sleep(Duration::from_millis(50)).await;
103110
}

0 commit comments

Comments
 (0)