Skip to content

Commit 68d818c

Browse files
committed
add db flushing capability for testing
1 parent 43cfb99 commit 68d818c

5 files changed

Lines changed: 364 additions & 21 deletions

File tree

src/batch_writer.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ enum WriterCommand {
3535
event_id: u64,
3636
event: Event,
3737
},
38+
Flush {
39+
response: tokio::sync::oneshot::Sender<()>,
40+
},
3841
Shutdown,
3942
}
4043

@@ -96,6 +99,32 @@ impl BatchWriter {
9699
pub async fn shutdown(&self) {
97100
let _ = self.sender.send(WriterCommand::Shutdown).await;
98101
}
102+
103+
/// Flush all pending writes to database
104+
///
105+
/// **For testing only**: This method forces immediate flush of all
106+
/// batched events and node updates to the database. It's necessary
107+
/// in tests to ensure data is written before queries, since the
108+
/// batch writer runs asynchronously in the background.
109+
///
110+
/// In production, the batch writer automatically flushes based on:
111+
/// - Time-based intervals (20ms)
112+
/// - Batch size limits (1000 events)
113+
/// - Node connection events (immediate flush)
114+
///
115+
/// This method is public (not #[cfg(test)]) because it's useful
116+
/// for graceful shutdown and debugging, but should NOT be called
117+
/// in normal operation as it defeats the purpose of batching.
118+
pub async fn flush(&self) -> Result<()> {
119+
let (tx, rx) = tokio::sync::oneshot::channel();
120+
self.sender
121+
.send(WriterCommand::Flush { response: tx })
122+
.await
123+
.map_err(|e| anyhow::anyhow!("Failed to send flush command: {}", e))?;
124+
rx.await
125+
.map_err(|e| anyhow::anyhow!("Flush response error: {}", e))?;
126+
Ok(())
127+
}
99128
}
100129

101130
async fn batch_writer_loop(
@@ -125,6 +154,7 @@ async fn batch_writer_loop(
125154
WriterCommand::NodeDisconnected { node_id } =>
126155
format!("NodeDisconnected({})", node_id),
127156
WriterCommand::Event { node_id, .. } => format!("Event({})", node_id),
157+
WriterCommand::Flush { .. } => "Flush".to_string(),
128158
WriterCommand::Shutdown => "Shutdown".to_string(),
129159
}
130160
);
@@ -148,6 +178,10 @@ async fn batch_writer_loop(
148178
WriterCommand::NodeDisconnected { node_id } => {
149179
node_updates.push((node_id, None));
150180
}
181+
WriterCommand::Flush { response } => {
182+
flush_batch(&store, &mut event_batch, &mut node_updates).await;
183+
let _ = response.send(());
184+
}
151185
WriterCommand::Shutdown => {
152186
info!("Batch writer shutting down");
153187
flush_batch(&store, &mut event_batch, &mut node_updates).await;
@@ -197,6 +231,10 @@ async fn batch_writer_loop(
197231
WriterCommand::NodeDisconnected { node_id } => {
198232
node_updates.push((node_id, None));
199233
}
234+
WriterCommand::Flush { response } => {
235+
flush_batch(&store, &mut event_batch, &mut node_updates).await;
236+
let _ = response.send(());
237+
}
200238
WriterCommand::Shutdown => {
201239
info!("Batch writer shutting down");
202240
// Final flush

src/server.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,18 @@ impl TelemetryServer {
230230
pub fn get_broadcaster(&self) -> Arc<EventBroadcaster> {
231231
Arc::clone(&self.broadcaster)
232232
}
233+
234+
/// Flush all pending batch writes to database
235+
///
236+
/// **For testing only**: Forces immediate flush of all buffered
237+
/// data to PostgreSQL. Necessary in tests to ensure data is written
238+
/// before queries execute.
239+
///
240+
/// In production, this can be used for graceful shutdown but should
241+
/// NOT be called during normal operation.
242+
pub async fn flush_writes(&self) -> anyhow::Result<()> {
243+
self.batch_writer.flush().await
244+
}
233245
}
234246

235247
async fn handle_connection_optimized(

src/store.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,26 @@ impl EventStore {
557557
Ok(metrics)
558558
}
559559

560-
/// Cleanup test data by truncating all tables (for testing only)
560+
/// Cleanup test data by truncating all tables
561+
///
562+
/// # Safety
563+
///
564+
/// **DANGER**: This method **DELETES ALL DATA** from the database by
565+
/// truncating all tables. It should **ONLY** be used in:
566+
/// - Test setup functions with isolated test databases
567+
/// - Development environments that are okay with data loss
568+
///
569+
/// **NEVER call this in production!**
570+
///
571+
/// # Example
572+
/// ```no_run
573+
/// # use tart_backend::EventStore;
574+
/// # async fn example() {
575+
/// let store = EventStore::new("postgres://localhost/tart_TEST").await.unwrap();
576+
/// // Only safe with dedicated test database!
577+
/// store.cleanup_test_data().await.unwrap();
578+
/// # }
579+
/// ```
561580
pub async fn cleanup_test_data(&self) -> Result<(), sqlx::Error> {
562581
sqlx::query("TRUNCATE TABLE events, nodes, node_status, blocks, stats_cache CASCADE")
563582
.execute(&self.pool)

tests/README.md

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
# TART Backend Test Suite
2+
3+
This document explains the testing architecture and best practices for the TART telemetry backend.
4+
5+
## Test Organization
6+
7+
### Unit Tests (No Database Required)
8+
These tests run in parallel and don't require external services:
9+
- `types_tests.rs` - Type encoding/decoding (12 tests)
10+
- `events_tests.rs` - Event serialization (18 tests)
11+
- `error_tests.rs` - Error handling and edge cases (15 tests)
12+
- `encoding_tests.rs` - Binary protocol encoding (16 tests)
13+
- Library tests in `src/` - Core logic (10 tests)
14+
15+
**Total: 71 unit tests**
16+
17+
### Integration Tests (Require PostgreSQL)
18+
These tests use a real PostgreSQL database and MUST run serially:
19+
- `api_tests.rs` - REST API endpoints (10 tests)
20+
- `integration_tests.rs` - End-to-end telemetry flow (8 tests)
21+
- `optimized_server_tests.rs` - Performance and concurrency (6 tests)
22+
23+
**Total: 24 integration tests**
24+
25+
## Running Tests Locally
26+
27+
```bash
28+
# Unit tests only (fast, no setup needed)
29+
cargo test --lib --test types_tests --test events_tests --test error_tests --test encoding_tests
30+
31+
# Integration tests (requires PostgreSQL)
32+
export TEST_DATABASE_URL="postgres://tart:tart_password@localhost:5432/tart_test"
33+
34+
# Start PostgreSQL (using docker-compose)
35+
docker-compose up -d postgres
36+
37+
# Create test database and run migrations
38+
cargo sqlx database create
39+
cargo sqlx migrate run
40+
41+
# Run integration tests SERIALLY
42+
cargo test --test api_tests --test integration_tests --test optimized_server_tests -- --test-threads=1
43+
```
44+
45+
## Why Tests Must Run Serially (`--test-threads=1`)
46+
47+
**Problem**: Integration tests share the same PostgreSQL database `tart_test`.
48+
49+
**Without serial execution:**
50+
- Test A connects 2 nodes → expects 2 in database
51+
- Test B connects 2 nodes → expects 2 in database
52+
- Tests run in parallel → both see 4 nodes → BOTH FAIL
53+
54+
**Solution**: Run with `--test-threads=1` to execute one test at a time.
55+
56+
Each test:
57+
1. Cleans the database (TRUNCATE all tables)
58+
2. Runs its scenario
59+
3. Next test cleans and runs
60+
61+
## The Flush Pattern - Why It's Necessary
62+
63+
### The Problem: Asynchronous Background Writer
64+
65+
TART uses a `BatchWriter` that runs in a background task for performance:
66+
67+
```
68+
Test sends data → Queues in channel → Background task → Batches → PostgreSQL
69+
70+
Test continues immediately!
71+
72+
Test queries database... but data might not be written yet!
73+
```
74+
75+
Even though:
76+
- Node connections flush immediately (line 151, 214 in batch_writer.rs)
77+
- Events batch every 20ms or 1000 events
78+
79+
The `node_connected()` method returns as soon as it QUEUES the message, not when it's written.
80+
81+
### The Solution: Explicit Flush with Synchronization
82+
83+
```rust
84+
// Test helper that WAITS for flush to complete
85+
async fn flush_and_wait(telemetry_server: &Arc<TelemetryServer>) {
86+
telemetry_server.flush_writes().await.expect("Flush failed");
87+
sleep(Duration::from_millis(50)).await; // PostgreSQL commit margin
88+
}
89+
90+
// In tests:
91+
connect_test_node(port, 1).await;
92+
flush_and_wait(&server).await; // ← BLOCKS until database write completes
93+
let response = get("/api/nodes").await; // Now data is guaranteed to be there
94+
```
95+
96+
### Why Not Just Sleep Longer?
97+
98+
**Bad approach:**
99+
```rust
100+
connect_test_node(port, 1).await;
101+
sleep(Duration::from_millis(5000)).await; // Hope this is enough?
102+
let response = get("/api/nodes").await;
103+
```
104+
105+
Problems:
106+
- Non-deterministic: Might work locally, fail in CI
107+
- Slow: Wastes time waiting
108+
- Brittle: Breaks if server is under load
109+
110+
**Good approach (current):**
111+
```rust
112+
connect_test_node(port, 1).await;
113+
flush_and_wait(&server).await; // Deterministic, fast, reliable
114+
let response = get("/api/nodes").await;
115+
```
116+
117+
## Test Isolation Pattern
118+
119+
### Database Cleanup (`#[cfg(test)]` protected)
120+
121+
```rust
122+
#[cfg(test)]
123+
impl EventStore {
124+
pub async fn cleanup_test_data(&self) -> Result<(), sqlx::Error> {
125+
sqlx::query("TRUNCATE TABLE events, nodes, ...").execute(&self.pool).await?;
126+
}
127+
}
128+
```
129+
130+
**Safety features:**
131+
- Only compiled in test builds (not available in production)
132+
- Used in `setup_test_api()` before each test
133+
- Ensures clean state for every test
134+
135+
### Common Test Fixtures
136+
137+
Located in `tests/common/mod.rs`:
138+
- `test_protocol_params()` - Creates valid ProtocolParameters
139+
- `test_node_info(peer_id)` - Creates valid NodeInformation
140+
- Reduces duplication across test files
141+
142+
## Best Practices We Follow
143+
144+
**Test Isolation**: Each test starts with clean database
145+
**Deterministic**: flush() instead of arbitrary sleeps
146+
**Safety**: Dangerous methods protected with #[cfg(test)]
147+
**Clear Intent**: Well-documented test helpers
148+
**Fast Unit Tests**: No database for 71 tests
149+
**Realistic Integration Tests**: Real PostgreSQL for 24 tests
150+
**CI Optimized**: Parallel unit tests, serial integration tests
151+
152+
## Alternative Approaches Considered
153+
154+
### 1. Separate Database Per Test
155+
```rust
156+
let db_name = format!("tart_test_{}", uuid::new_v4());
157+
// Create database, run test, drop database
158+
```
159+
- ✅ Perfect isolation
160+
- ❌ Very slow (create/drop overhead)
161+
- ❌ CI complexity
162+
163+
### 2. In-Memory Mock Database
164+
```rust
165+
let store = Arc::new(MockEventStore::new());
166+
```
167+
- ✅ Fast tests
168+
- ❌ Doesn't test real PostgreSQL behavior
169+
- ❌ Can miss query bugs, index issues, etc.
170+
171+
### 3. Transaction Rollback Pattern
172+
```rust
173+
BEGIN TRANSACTION;
174+
// Run test
175+
ROLLBACK;
176+
```
177+
- ✅ Good isolation
178+
- ❌ Can't use with async background writers
179+
- ❌ Doesn't work with multiple connections
180+
181+
### 4. Separate Writer for Tests
182+
```rust
183+
#[cfg(test)]
184+
struct SyncWriter { ... } // No batching
185+
#[cfg(not(test))]
186+
struct BatchWriter { ... } // Batching
187+
```
188+
- ✅ Tests are simple
189+
- ❌ Tests don't match production behavior
190+
- ❌ Large code duplication
191+
192+
**Our chosen approach (#flush + serial execution) balances all concerns.**
193+
194+
## Common Pitfalls
195+
196+
### ❌ Running Integration Tests in Parallel
197+
```bash
198+
cargo test # BAD: Tests conflict in shared database
199+
```
200+
201+
### ✅ Correct Way
202+
```bash
203+
cargo test --test api_tests -- --test-threads=1
204+
```
205+
206+
### ❌ Forgetting to Flush
207+
```rust
208+
connect_test_node(port, 1).await;
209+
// Immediately query - DATA MIGHT NOT BE THERE YET
210+
let response = get("/api/nodes").await;
211+
```
212+
213+
### ✅ Correct Pattern
214+
```rust
215+
connect_test_node(port, 1).await;
216+
flush_and_wait(&server).await; // Ensure data is written
217+
let response = get("/api/nodes").await;
218+
```
219+
220+
## CI Configuration
221+
222+
GitHub Actions workflow (`.github/workflows/ci.yml`):
223+
224+
```yaml
225+
# Unit tests in parallel (fast)
226+
cargo test --lib --test types_tests --test events_tests --test error_tests --test encoding_tests
227+
228+
# Integration tests serially (safe)
229+
cargo test --test api_tests --test integration_tests --test optimized_server_tests -- --test-threads=1
230+
```
231+
232+
This ensures:
233+
- Fast feedback for unit tests
234+
- Reliable integration tests
235+
- No database conflicts
236+
237+
## Future Improvements
238+
239+
Potential enhancements for the test suite:
240+
241+
1. **Test fixtures with realistic data** - Pre-populate database with sample nodes/events
242+
2. **Property-based testing** - Use proptest for fuzz testing encoders
243+
3. **Load testing** - Verify 1024 concurrent connections
244+
4. **Chaos testing** - Simulate network failures, database outages
245+
5. **Benchmark suite** - Track performance regressions
246+
247+
## Summary
248+
249+
Our testing approach follows industry best practices:
250+
- Separate unit and integration tests
251+
- Explicit synchronization instead of sleeps
252+
- Test isolation through database cleanup
253+
- Safety through compile-time checks (#[cfg(test)])
254+
- Clear documentation of patterns
255+
256+
The flush pattern is used by many production systems (Kafka, async loggers, batch processors) and is the correct solution for testing asynchronous background workers.

0 commit comments

Comments
 (0)