|
1 |
| -use std::collections::HashSet; |
| 1 | +use std::{collections::HashSet, pin::Pin, sync::Arc}; |
2 | 2 |
|
3 | 3 | use bao_tree::ChunkRanges;
|
4 | 4 | use genawaiter::sync::{Co, Gen};
|
5 | 5 | use n0_future::{Stream, StreamExt};
|
6 |
| -use tracing::{debug, error, warn}; |
| 6 | +use tracing::{debug, error, info, warn}; |
7 | 7 |
|
8 | 8 | use crate::{api::Store, Hash, HashAndFormat};
|
9 | 9 |
|
@@ -130,14 +130,31 @@ fn gc_sweep<'a>(
|
130 | 130 | })
|
131 | 131 | }
|
132 | 132 |
|
133 |
| -#[derive(Debug, Clone)] |
| 133 | +#[derive(derive_more::Debug, Clone)] |
134 | 134 | pub struct GcConfig {
|
135 | 135 | pub interval: std::time::Duration,
|
| 136 | + #[debug("ProtectCallback")] |
| 137 | + pub add_protected: Option<ProtectCb>, |
136 | 138 | }
|
137 | 139 |
|
| 140 | +#[derive(Debug)] |
| 141 | +pub enum ProtectOutcome { |
| 142 | + Continue, |
| 143 | + Skip, |
| 144 | +} |
| 145 | + |
| 146 | +pub type ProtectCb = Arc< |
| 147 | + dyn for<'a> Fn( |
| 148 | + &'a mut HashSet<Hash>, |
| 149 | + ) |
| 150 | + -> Pin<Box<dyn std::future::Future<Output = ProtectOutcome> + Send + Sync + 'a>> |
| 151 | + + Send |
| 152 | + + Sync |
| 153 | + + 'static, |
| 154 | +>; |
| 155 | + |
138 | 156 | pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api::Result<()> {
|
139 | 157 | {
|
140 |
| - live.clear(); |
141 | 158 | store.clear_protected().await?;
|
142 | 159 | let mut stream = gc_mark(store, live);
|
143 | 160 | while let Some(ev) = stream.next().await {
|
@@ -179,7 +196,17 @@ pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api:
|
179 | 196 | pub async fn run_gc(store: Store, config: GcConfig) {
|
180 | 197 | let mut live = HashSet::new();
|
181 | 198 | loop {
|
| 199 | + live.clear(); |
182 | 200 | tokio::time::sleep(config.interval).await;
|
| 201 | + if let Some(ref cb) = config.add_protected { |
| 202 | + match (cb)(&mut live).await { |
| 203 | + ProtectOutcome::Continue => {} |
| 204 | + ProtectOutcome::Skip => { |
| 205 | + info!("Skip gc run: protect callback indicated skip"); |
| 206 | + continue; |
| 207 | + } |
| 208 | + } |
| 209 | + } |
183 | 210 | if let Err(e) = gc_run_once(&store, &mut live).await {
|
184 | 211 | error!("error during gc run: {e}");
|
185 | 212 | break;
|
@@ -284,6 +311,7 @@ mod tests {
|
284 | 311 | assert!(!data_path.exists());
|
285 | 312 | assert!(!outboard_path.exists());
|
286 | 313 | }
|
| 314 | + live.clear(); |
287 | 315 | // create a large partial file and check that the data and outboard file as well as
|
288 | 316 | // the sizes and bitfield files are deleted by gc
|
289 | 317 | {
|
|
0 commit comments