Skip to content

Commit 5c0e253

Browse files
script3rclaudehappy-otter
committed
Replace mutex+Vec with streaming channel pipeline
Eliminate mutex contention and synchronous barrier for large directory scanning: - Replace Arc<Mutex<Vec<PathBuf>>> with crossbeam unbounded channel - Files are sent to channel as discovered, workers process immediately - Use rayon's par_bridge() to parallelize channel consumption - Batch progress updates to every 100 files instead of every file - Remove unnecessary progress updates when skipping oversized files This streaming architecture provides better scalability for large directories (100K+ files, monorepos, node_modules) by overlapping file discovery with scanning and eliminating lock contention. Generated with [Claude Code](https://claude.ai/code) via [Happy](https://happy.engineering) Co-Authored-By: Claude <noreply@anthropic.com> Co-Authored-By: Happy <yesreply@happy.engineering>
1 parent 847d836 commit 5c0e253

File tree

1 file changed

+39
-33
lines changed

1 file changed

+39
-33
lines changed

src/main.rs

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,9 @@ fn main() -> Result<()> {
169169
Ok(())
170170
});
171171

172-
// First pass: collect files to scan
173-
let files_to_scan = Arc::new(std::sync::Mutex::new(Vec::new()));
172+
// Streaming architecture: WalkBuilder sends files to channel, rayon workers process immediately
173+
// This eliminates the mutex contention and synchronous barrier of collect-then-process
174+
let (file_tx, file_rx) = channel::unbounded::<PathBuf>();
174175
let patterns_for_discovery = patterns.clone();
175176
let file_count_discovery = file_count.clone();
176177
let skipped_oversize_discovery = skipped_oversize_count.clone();
@@ -222,6 +223,28 @@ fn main() -> Result<()> {
222223

223224
let max_bytes = cli.max_file_mb.map(|mb| mb.saturating_mul(1024 * 1024));
224225

226+
// Spawn scanner workers that process files as they're discovered
227+
let scan_bar_for_workers = scan_bar.clone();
228+
let scanned_count_for_workers = scanned_count.clone();
229+
let patterns_for_workers = patterns.clone();
230+
let tx_for_workers = tx.clone();
231+
232+
// Use a thread to run the parallel scanner on the receiving end
233+
let scanner_handle = std::thread::spawn(move || {
234+
// Process files as they arrive from the channel
235+
file_rx.into_iter().par_bridge().for_each(|path| {
236+
if let Err(err) = process_file(&path, &patterns_for_workers, &tx_for_workers) {
237+
eprintln!("Error processing {}: {err:#}", path.display());
238+
}
239+
240+
scanned_count_for_workers.fetch_add(1, Ordering::Relaxed);
241+
if let Some(pb) = &scan_bar_for_workers {
242+
pb.inc(1);
243+
}
244+
});
245+
});
246+
247+
// Walk and send files to channel (no mutex contention!)
225248
walk_builder
226249
.hidden(false)
227250
.ignore(cli.gitignore)
@@ -233,7 +256,7 @@ fn main() -> Result<()> {
233256
.build_parallel()
234257
.run(|| {
235258
let patterns = patterns_for_discovery.clone();
236-
let files = files_to_scan.clone();
259+
let file_tx = file_tx.clone();
237260
let file_count = file_count_discovery.clone();
238261
let discovery_bar = discovery_bar.clone();
239262
let skipped_oversize = skipped_oversize_discovery.clone();
@@ -245,24 +268,20 @@ fn main() -> Result<()> {
245268
&& meta.len() > limit
246269
{
247270
skipped_oversize.fetch_add(1, Ordering::Relaxed);
248-
if let Some(pb) = &discovery_bar {
249-
let found = file_count.load(Ordering::Relaxed);
250-
let skipped = skipped_oversize.load(Ordering::Relaxed);
251-
pb.set_message(format!(
252-
"Found {} files to scan (skipped {} oversized)",
253-
found, skipped
254-
));
255-
}
256271
return ignore::WalkState::Continue;
257272
}
258273

259274
let path = e.path().to_path_buf();
260275
if let Some(lang) = scan::language_from_path(&path)
261276
&& patterns.supports_language(lang)
262277
{
263-
files.lock().unwrap().push(path);
278+
// Send to channel instead of pushing to mutex-protected Vec
279+
let _ = file_tx.send(path);
264280
let count = file_count.fetch_add(1, Ordering::Relaxed) + 1;
265-
if let Some(pb) = &discovery_bar {
281+
// Batch progress updates: only update every 100 files
282+
if let Some(pb) = &discovery_bar
283+
&& (count.is_multiple_of(100) || count == 1)
284+
{
266285
let skipped = skipped_oversize.load(Ordering::Relaxed);
267286
pb.set_message(format!(
268287
"Found {} files to scan (skipped {} oversized)",
@@ -278,38 +297,25 @@ fn main() -> Result<()> {
278297
})
279298
});
280299

300+
// Discovery complete - close the channel so scanners know to finish
301+
drop(file_tx);
302+
303+
let total_files = file_count.load(Ordering::Relaxed);
281304
if let Some(pb) = &discovery_bar {
282305
pb.finish_with_message(format!(
283306
"Found {} files to scan (skipped {} oversized)",
284-
file_count.load(Ordering::Relaxed),
307+
total_files,
285308
skipped_oversize_count.load(Ordering::Relaxed)
286309
));
287310
}
288311

289-
// Second pass: scan files in parallel
290-
let files_to_scan = Arc::try_unwrap(files_to_scan)
291-
.unwrap()
292-
.into_inner()
293-
.unwrap();
294-
let total_files = files_to_scan.len();
295-
296312
if let Some(pb) = &scan_bar {
297313
pb.set_length(total_files as u64);
298314
pb.set_message("Scanning files...");
299315
}
300316

301-
// Process files in parallel with rayon
302-
files_to_scan.into_par_iter().for_each(|path| {
303-
// Process the file
304-
if let Err(err) = process_file(&path, &patterns, &tx) {
305-
eprintln!("Error processing {}: {err:#}", path.display());
306-
}
307-
308-
scanned_count.fetch_add(1, Ordering::Relaxed);
309-
if let Some(pb) = &scan_bar {
310-
pb.inc(1);
311-
}
312-
});
317+
// Wait for all scanning to complete
318+
scanner_handle.join().expect("scanner thread panicked");
313319

314320
// All files have been processed
315321
drop(tx);

0 commit comments

Comments
 (0)