Skip to content

Commit 71f164a

Browse files
expose DatabaseScanner::ingest_batch as public ingestion API (meta-pytorch#3358)
Summary: this diff adds the narrow public ingestion seam needed by the snapshot bridge without changing telemetry ingestion semantics. it exposes DatabaseScanner::ingest_batch as a thin public wrapper over the existing internal named-batch insertion path, keeping ingestion ownership in monarch_distributed_telemetry while avoiding direct construction or manipulation of LiveTableData from the bridge crate. it also adds focused unit tests that pin down the contract of that public seam: first ingest creates a table from the batch schema, empty batches still register schema without appending rows, repeated ingests to the same table reuse the existing entry and accumulate rows, and the “empty first, non-empty later” path works correctly. that empty-batch schema-registration behavior is the key guarantee the snapshot ingestion path needs so all snapshot tables can be registered in DataFusion even when a given capture has zero rows for some table. Differential Revision: D99477096
1 parent 92d9346 commit 71f164a

1 file changed

Lines changed: 120 additions & 4 deletions

File tree

monarch_distributed_telemetry/src/database_scanner.rs

Lines changed: 120 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -313,10 +313,9 @@ impl DatabaseScanner {
313313
}
314314

315315
impl DatabaseScanner {
316-
/// Static method to push a batch to the table_data map.
317-
/// This can be used from closures that capture the Arc.
318-
///
319-
/// If the batch is empty, creates the table with the schema but doesn't append data.
316+
/// Push a batch into the named table in `table_data`.
317+
/// See [`ingest_batch`](Self::ingest_batch) for the public API
318+
/// and ingestion invariant registry (ID-*).
320319
fn push_batch_to_tables(
321320
table_data: &Arc<StdMutex<HashMap<String, Arc<LiveTableData>>>>,
322321
table_name: &str,
@@ -344,6 +343,40 @@ impl DatabaseScanner {
344343
Ok(())
345344
}
346345

346+
/// Ingest a `RecordBatch` into a named table.
347+
///
348+
/// Public ingestion surface for external callers (e.g.,
349+
/// `monarch_introspection_snapshot`). Delegates to the internal
350+
/// `push_batch_to_tables`.
351+
///
352+
/// # Ingestion invariants (ID-*)
353+
///
354+
/// - **ID-1 (wrapper parity):** `ingest_batch` is semantically
355+
/// identical to `push_batch_to_tables`.
356+
/// - **ID-2 (create on first batch):** If `table_name` is absent,
357+
/// a new `LiveTableData` is created from `batch.schema()`.
358+
/// - **ID-3 (empty batch registers schema):** An empty batch
359+
/// creates the table entry and preserves the schema —
360+
/// `LiveTableData::push` is a no-op for zero rows, but the
361+
/// `entry().or_insert_with()` in `push_batch_to_tables` runs
362+
/// unconditionally.
363+
/// - **ID-4 (append on existing table):** A non-empty batch for
364+
/// an existing table appends rows.
365+
/// - **ID-5 (ownership boundary):** Callers provide only
366+
/// `(table_data, table_name, batch)`; they do not interact with
367+
/// `LiveTableData` construction logic directly.
368+
/// - **ID-6 (error surface preserved):** `ingest_batch` preserves
369+
/// the existing error behavior of `push_batch_to_tables`,
370+
/// including lock-poisoning failures. It introduces no new
371+
/// ingestion semantics.
372+
pub fn ingest_batch(
373+
table_data: &Arc<StdMutex<HashMap<String, Arc<LiveTableData>>>>,
374+
table_name: &str,
375+
batch: RecordBatch,
376+
) -> anyhow::Result<()> {
377+
Self::push_batch_to_tables(table_data, table_name, batch)
378+
}
379+
347380
/// Create a RecordBatchSink that pushes batches to this scanner's tables.
348381
///
349382
/// The sink can be registered with hyperactor_telemetry::register_sink()
@@ -1163,4 +1196,87 @@ mod tests {
11631196
assert_eq!(frame_thread_ids.value(1), 2);
11641197
assert_eq!(filenames.value(1), "b.py");
11651198
}
1199+
1200+
// --- ingest_batch tests ---
1201+
// These reference the ID-* invariants defined on ingest_batch.
1202+
1203+
/// Helper: count rows in a table_data map entry.
1204+
fn ingest_row_count(
1205+
table_data: &Arc<StdMutex<HashMap<String, Arc<LiveTableData>>>>,
1206+
table_name: &str,
1207+
) -> usize {
1208+
let guard = table_data.lock().unwrap();
1209+
match guard.get(table_name) {
1210+
Some(table) => get_tokio_runtime().block_on(async {
1211+
table.mem_table().batches[0]
1212+
.read()
1213+
.await
1214+
.iter()
1215+
.map(|b| b.num_rows())
1216+
.sum::<usize>()
1217+
}),
1218+
None => 0,
1219+
}
1220+
}
1221+
1222+
// ID-2, ID-3: empty batch creates the table with schema but 0
1223+
// rows.
1224+
#[test]
1225+
fn test_ingest_batch_creates_table_for_empty_batch() {
1226+
let table_data = Arc::new(StdMutex::new(HashMap::new()));
1227+
let empty = make_batch(&[]);
1228+
1229+
DatabaseScanner::ingest_batch(&table_data, "t", empty.clone()).unwrap();
1230+
1231+
let guard = table_data.lock().unwrap();
1232+
assert!(guard.contains_key("t"), "ID-2: table should exist");
1233+
let table = guard.get("t").unwrap();
1234+
assert_eq!(table.schema(), empty.schema(), "ID-3: schema should match");
1235+
drop(guard);
1236+
assert_eq!(ingest_row_count(&table_data, "t"), 0, "ID-3: 0 rows");
1237+
}
1238+
1239+
// ID-2, ID-4: non-empty batch creates table and appends rows.
1240+
#[test]
1241+
fn test_ingest_batch_appends_non_empty_batch() {
1242+
let table_data = Arc::new(StdMutex::new(HashMap::new()));
1243+
1244+
DatabaseScanner::ingest_batch(&table_data, "t", make_batch(&[1, 2, 3])).unwrap();
1245+
1246+
assert_eq!(ingest_row_count(&table_data, "t"), 3);
1247+
}
1248+
1249+
// ID-4: two batches to the same table accumulate rows.
1250+
#[test]
1251+
fn test_ingest_batch_reuses_existing_table() {
1252+
let table_data = Arc::new(StdMutex::new(HashMap::new()));
1253+
1254+
DatabaseScanner::ingest_batch(&table_data, "t", make_batch(&[1, 2])).unwrap();
1255+
DatabaseScanner::ingest_batch(&table_data, "t", make_batch(&[3, 4, 5])).unwrap();
1256+
1257+
let guard = table_data.lock().unwrap();
1258+
assert_eq!(guard.len(), 1, "ID-4: still one table entry");
1259+
drop(guard);
1260+
assert_eq!(ingest_row_count(&table_data, "t"), 5);
1261+
}
1262+
1263+
// ID-3, ID-4: empty batch registers schema, then non-empty batch
1264+
// appends rows using the same schema.
1265+
#[test]
1266+
fn test_ingest_batch_empty_then_non_empty() {
1267+
let table_data = Arc::new(StdMutex::new(HashMap::new()));
1268+
let empty = make_batch(&[]);
1269+
1270+
// Register schema with empty batch.
1271+
DatabaseScanner::ingest_batch(&table_data, "t", empty.clone()).unwrap();
1272+
assert_eq!(ingest_row_count(&table_data, "t"), 0);
1273+
1274+
// Append rows.
1275+
DatabaseScanner::ingest_batch(&table_data, "t", make_batch(&[10, 20])).unwrap();
1276+
assert_eq!(ingest_row_count(&table_data, "t"), 2);
1277+
1278+
// Schema unchanged.
1279+
let guard = table_data.lock().unwrap();
1280+
assert_eq!(guard.get("t").unwrap().schema(), empty.schema());
1281+
}
11661282
}

0 commit comments

Comments
 (0)