Skip to content

Commit f0169ed

Browse files
committed
attempt at reducing memory leak
1 parent d26b677 commit f0169ed

File tree

1 file changed

+21
-9
lines changed

1 file changed

+21
-9
lines changed

src/database.rs

+21-9
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,8 @@ impl Database {
268268
}
269269
}
270270

271-
return Ok(table.clone());
271+
// Use Arc::clone instead of table.clone() to avoid deep copying
272+
return Ok(Arc::clone(table));
272273
}
273274

274275
// If not found and project_id is not "default", try the default table
@@ -286,7 +287,8 @@ impl Database {
286287
}
287288
}
288289

289-
return Ok(table.clone());
290+
// Use Arc::clone instead of table.clone() to avoid deep copying
291+
return Ok(Arc::clone(table));
290292
}
291293
}
292294

@@ -303,15 +305,25 @@ impl Database {
303305
configs.get("default").ok_or_else(|| anyhow::anyhow!("Project ID '{}' not found", "default"))?.clone()
304306
};
305307

306-
let mut table = table_ref.write().await;
307-
let ops = DeltaOps(table.clone());
308+
// Scope the write lock to minimize lock time
309+
let should_checkpoint = {
310+
let mut table = table_ref.write().await;
308311

309-
let write_op = ops.write(batch).with_partition_columns(OtelLogsAndSpans::partitions());
310-
*table = write_op.await?;
312+
// Create the DeltaOps with a clone of the table
313+
let write_op = DeltaOps(table.clone()).write(batch).with_partition_columns(OtelLogsAndSpans::partitions());
311314

312-
// Checkpoint the table every 10 versions
313-
let version = table.version();
314-
if version > 0 && version % 10 == 0 {
315+
let new_table = write_op.await?;
316+
let version = new_table.version();
317+
*table = new_table;
318+
319+
version > 0 && version % 40 == 0
320+
};
321+
322+
// Checkpoint outside the write lock if needed
323+
if should_checkpoint {
324+
// Take a read lock for checkpointing
325+
let table = table_ref.read().await;
326+
let version = table.version();
315327
info!("Checkpointing Delta table at version {}", version);
316328
checkpoints::create_checkpoint(&table, None).await?;
317329
}

0 commit comments

Comments
 (0)