From 9c904f31bb591019e2b52282b8ff6f30d6a1b7cc Mon Sep 17 00:00:00 2001 From: Mohsen Azimi Date: Sun, 19 Jan 2025 12:14:23 +0700 Subject: [PATCH 1/8] feat: run serialization in parallel --- src/lib.rs | 337 ++++++++++++++++++++++++--------------------- src/parallel.rs | 325 +++++++++++++++++++++++++++++++++++++++++++ tests/test_perf.rs | 27 ++-- 3 files changed, 517 insertions(+), 172 deletions(-) create mode 100644 src/parallel.rs diff --git a/src/lib.rs b/src/lib.rs index 0b6e5e5..20a8861 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,8 @@ use std::path::{Path, PathBuf}; use std::process::{Command as SysCommand, Stdio}; use tracing::{debug, info}; use walkdir::WalkDir; +mod parallel; +use parallel::process_files_parallel; /// Helper macro to write debug statements both to standard debug log and to debug file if set. #[macro_export] @@ -192,7 +194,11 @@ fn build_final_config(cfg: Option) -> FinalConfig { merged_ignore.push(reg); } } - // Merge or add new priority rules + // Clear default priority rules if user provides their own + if !user_cfg.priority_rules.is_empty() { + merged_priority.clear(); + } + // Add user priority rules for user_rule in user_cfg.priority_rules { if user_rule.patterns.is_empty() { continue; @@ -348,14 +354,15 @@ pub fn get_file_priority( _ignore_pats: &[Regex], prio_list: &[PriorityPattern], ) -> i32 { - for prio in prio_list { + // Loop from highest score → lowest + for prio in prio_list.iter().rev() { for pat in &prio.patterns { if pat.is_match(rel_str) { return prio.score; } } } - 40 // fallback + 0 // fallback if nothing matches - lower than any user-defined priority } /// Get the commit time of the most recent change to each file. @@ -531,165 +538,188 @@ pub fn serialize_repo( None }; - // Collect files with their priorities - let mut files: Vec = Vec::new(); - let mut total_size = 0; - let mut current_chunk = 0; - let mut current_chunk_files = Vec::new(); - - // Walk directory tree - for entry in WalkDir::new(base_path) - .follow_links(true) - .into_iter() - .filter_map(|e| e.ok()) - { - let path = entry.path(); - if !path.is_file() { - continue; - } - - // Get path relative to base - let rel_path = path.strip_prefix(base_path).unwrap_or(path); - let rel_str = rel_path.to_string_lossy(); - - // Normalize path separators to forward slashes for consistent pattern matching - #[cfg(windows)] - let rel_str = rel_str.replace('\\', "/"); - - // Skip if matched by gitignore - #[cfg(windows)] - let gitignore_path = rel_path - .to_str() - .map(|s| s.replace('\\', "/")) - .map(PathBuf::from) - .unwrap_or(rel_path.to_path_buf()); - #[cfg(not(windows))] - let gitignore_path = rel_path.to_path_buf(); - - if gitignore.matched(&gitignore_path, false).is_ignore() { - debug!("Skipping {} - matched by gitignore", rel_str); - continue; - } + if stream { + // For streaming, we still use the old single-threaded approach + let mut files: Vec = Vec::new(); + let mut total_size = 0; + let mut current_chunk = 0; + let mut current_chunk_files = Vec::new(); + + // Walk directory tree + for entry in WalkDir::new(base_path) + .follow_links(true) + .into_iter() + .filter_map(|e| e.ok()) + { + let path = entry.path(); + if !path.is_file() { + continue; + } - // Skip if matched by our ignore patterns - let mut skip = false; - #[cfg(windows)] - let pattern_path = rel_str.replace('\\', "/"); - #[cfg(not(windows))] - let pattern_path = rel_str.to_string(); + // Get path relative to base + let rel_path = path.strip_prefix(base_path).unwrap_or(path); + let rel_str = rel_path.to_string_lossy(); + + // Normalize path separators to forward slashes for consistent pattern matching + #[cfg(windows)] + let rel_str = rel_str.replace('\\', "/"); + + // Skip if matched by gitignore + #[cfg(windows)] + let gitignore_path = rel_path + .to_str() + .map(|s| s.replace('\\', "/")) + .map(PathBuf::from) + .unwrap_or(rel_path.to_path_buf()); + #[cfg(not(windows))] + let gitignore_path = rel_path.to_path_buf(); + + if gitignore.matched(&gitignore_path, false).is_ignore() { + debug!("Skipping {} - matched by gitignore", rel_str); + continue; + } - for pat in &final_config.ignore_patterns { - if pat.is_match(&pattern_path) { - debug!("Skipping {} - matched ignore pattern", rel_str); - skip = true; - break; + // Skip if matched by our ignore patterns + let mut skip = false; + #[cfg(windows)] + let pattern_path = rel_str.replace('\\', "/"); + #[cfg(not(windows))] + let pattern_path = rel_str.to_string(); + + for pat in &final_config.ignore_patterns { + if pat.is_match(&pattern_path) { + debug!("Skipping {} - matched ignore pattern", rel_str); + skip = true; + break; + } + } + if skip { + continue; } - } - if skip { - continue; - } - // Calculate priority score - let mut priority = get_file_priority( - &pattern_path, - &final_config.ignore_patterns, - &final_config.priority_list, - ); + // Calculate priority score + let mut priority = get_file_priority( + &pattern_path, + &final_config.ignore_patterns, + &final_config.priority_list, + ); - // Apply rank-based boost if available - if let Some(ref boost_map) = recentness_boost { - if let Some(boost) = boost_map.get(&pattern_path) { - priority += *boost; + // Apply rank-based boost if available + if let Some(ref boost_map) = recentness_boost { + if let Some(boost) = boost_map.get(&pattern_path) { + priority += *boost; + } } - } - files.push(FileEntry { - path: path.to_path_buf(), - priority, - }); - } + files.push(FileEntry { + path: path.to_path_buf(), + priority, + }); + } - // Sort files by priority (ascending) so higher priority files come last - files.sort_by(|a, b| a.priority.cmp(&b.priority)); + // Sort files by priority (ascending) so higher priority files come last + files.sort_by(|a, b| a.priority.cmp(&b.priority)); - // Process files in sorted order - for file in files { - let path = file.path; - let rel_path = path.strip_prefix(base_path).unwrap_or(&path); - let rel_str = rel_path.to_string_lossy(); + // Process files in sorted order + for file in files { + let path = file.path; + let rel_path = path.strip_prefix(base_path).unwrap_or(&path); + let rel_str = rel_path.to_string_lossy(); - // Skip binary files - if let Some(ref cfg) = config { - if !is_text_file(&path, &cfg.binary_extensions) { + // Skip binary files + if let Some(ref cfg) = config { + if !is_text_file(&path, &cfg.binary_extensions) { + debug!("Skipping binary file: {}", rel_str); + continue; + } + } else if !is_text_file(&path, &[]) { debug!("Skipping binary file: {}", rel_str); continue; } - } else if !is_text_file(&path, &[]) { - debug!("Skipping binary file: {}", rel_str); - continue; - } - // Read file content - let content = match fs::read_to_string(&path) { - Ok(c) => c, - Err(e) => { - debug!("Failed to read {}: {}", rel_str, e); + // Read file content + let content = match fs::read_to_string(&path) { + Ok(c) => c, + Err(e) => { + debug!("Failed to read {}: {}", rel_str, e); + continue; + } + }; + + let size = count_size(&content, count_tokens); + if size == 0 { + debug!("Skipping empty file: {}", rel_str); continue; } - }; - let size = count_size(&content, count_tokens); - if size == 0 { - debug!("Skipping empty file: {}", rel_str); - continue; - } - - // If a single file is larger than max_size, split it into multiple chunks - if size > max_size { - debug_file!("File exceeds chunk size, splitting into multiple chunks"); - let mut remaining = content.as_str(); - let mut part = 0; - - while !remaining.is_empty() { - let mut chunk_size = if count_tokens { - // In token mode, count words until we hit max_size - let mut chars = 0; - for (tokens, word) in remaining.split_whitespace().enumerate() { - if tokens + 1 > max_size { - break; + // If a single file is larger than max_size, split it into multiple chunks + if size > max_size { + debug_file!("File exceeds chunk size, splitting into multiple chunks"); + let mut remaining = content.as_str(); + let mut part = 0; + + while !remaining.is_empty() { + let mut chunk_size = if count_tokens { + // In token mode, count words until we hit max_size + let mut chars = 0; + for (tokens, word) in remaining.split_whitespace().enumerate() { + if tokens + 1 > max_size { + break; + } + chars += word.len() + 1; // +1 for space } - chars += word.len() + 1; // +1 for space + chars + } else { + max_size + }; + + // Ensure we make progress even if no word boundary found + if chunk_size == 0 { + chunk_size = std::cmp::min(max_size, remaining.len()); } - chars - } else { - max_size - }; - - // Ensure we make progress even if no word boundary found - if chunk_size == 0 { - chunk_size = std::cmp::min(max_size, remaining.len()); - } - let (chunk, rest) = remaining.split_at(std::cmp::min(chunk_size, remaining.len())); - remaining = rest.trim_start(); + let (chunk, rest) = + remaining.split_at(std::cmp::min(chunk_size, remaining.len())); + remaining = rest.trim_start(); + + let chunk_files = + vec![(format!("{}:part{}", rel_str, part), chunk.to_string())]; + debug_file!("Written chunk {}", part); + write_chunk( + &chunk_files, + part, + output_dir.as_deref(), + stream, + count_tokens, + )?; + part += 1; + } + continue; + } - let chunk_files = vec![(format!("{}:part{}", rel_str, part), chunk.to_string())]; - debug_file!("Written chunk {}", part); + // Check if adding this file would exceed chunk size + if total_size + size > max_size && !current_chunk_files.is_empty() { + // Write current chunk write_chunk( - &chunk_files, - part, + ¤t_chunk_files, + current_chunk, output_dir.as_deref(), stream, count_tokens, )?; - part += 1; + debug_file!("Written chunk {}", current_chunk); + current_chunk += 1; + current_chunk_files.clear(); + total_size = 0; } - continue; + + // Add file to current chunk + current_chunk_files.push((rel_str.to_string(), content)); + total_size += size; } - // Check if adding this file would exceed chunk size - if total_size + size > max_size && !current_chunk_files.is_empty() { - // Write current chunk + // Write final chunk if any files remain + if !current_chunk_files.is_empty() { write_chunk( ¤t_chunk_files, current_chunk, @@ -698,32 +728,23 @@ pub fn serialize_repo( count_tokens, )?; debug_file!("Written chunk {}", current_chunk); - current_chunk += 1; - current_chunk_files.clear(); - total_size = 0; } - // Add file to current chunk - current_chunk_files.push((rel_str.to_string(), content)); - total_size += size; - } - - // Write final chunk if any files remain - if !current_chunk_files.is_empty() { - write_chunk( - ¤t_chunk_files, - current_chunk, - output_dir.as_deref(), - stream, - count_tokens, - )?; - debug_file!("Written chunk {}", current_chunk); - } - - if stream { Ok(None) + } else if let Some(out_dir) = output_dir { + // Use parallel processing for non-streaming mode + process_files_parallel( + base_path, + max_size, + &out_dir, + config.as_ref(), + &final_config.ignore_patterns, + &final_config.priority_list, + recentness_boost.as_ref(), + )?; + Ok(Some(out_dir)) } else { - Ok(output_dir) + Ok(None) } } @@ -802,7 +823,7 @@ fn compute_recentness_boost( return HashMap::new(); } - // Sort by ascending commit time + // Sort by ascending commit time => first is oldest let mut sorted: Vec<(&String, &u64)> = commit_times.iter().collect(); sorted.sort_by_key(|(_, t)| **t); @@ -819,8 +840,8 @@ fn compute_recentness_boost( let mut result = HashMap::new(); for (i, (path, _time)) in sorted.iter().enumerate() { - let rank = i as f64 / last_index; // 0.0..1.0 - let boost = (rank * max_boost as f64).round() as i32; + let rank = i as f64 / last_index; // 0.0..1.0 (older files get lower rank) + let boost = (rank * max_boost as f64).round() as i32; // Newer files get higher boost result.insert((*path).clone(), boost); } result diff --git a/src/parallel.rs b/src/parallel.rs new file mode 100644 index 0000000..ac4bbba --- /dev/null +++ b/src/parallel.rs @@ -0,0 +1,325 @@ +use crate::is_text_file; +use crate::{get_file_priority, PriorityPattern, YekConfig}; +use anyhow::Result; +use crossbeam::channel::{bounded, Receiver, Sender}; +use ignore::{gitignore::GitignoreBuilder, WalkBuilder}; +use num_cpus::get; +use regex::Regex; +use std::collections::HashMap; +use std::fs; +use std::io::{BufReader, Read}; +use std::path::{Path, PathBuf}; +use std::thread; +use tracing::{debug, info}; + +/// Represents a chunk of text read from one file +#[derive(Debug)] +pub struct FileChunk { + pub priority: i32, + pub file_index: usize, + pub part_index: usize, + pub rel_path: String, + pub content: String, +} + +/// File entry with priority for sorting +#[derive(Debug, Clone)] +struct FileEntry { + path: PathBuf, + priority: i32, + file_index: usize, +} + +/// Reads a file and determines if it's likely binary by checking for null bytes +fn is_likely_binary(path: &Path) -> Result { + let f = fs::File::open(path)?; + let mut reader = BufReader::new(f); + let mut buf = [0; 4096]; + let n = reader.read(&mut buf)?; + Ok(buf[..n].contains(&0)) +} + +/// Reads and chunks a single file, sending chunks through the channel +fn read_and_send_chunks( + file_entry: FileEntry, + base_path: &Path, + tx: &Sender, + max_size: usize, +) -> Result<()> { + // Skip if binary + if is_likely_binary(&file_entry.path)? { + return Ok(()); + } + + // Read file content + let content = fs::read_to_string(&file_entry.path)?; + if content.is_empty() { + return Ok(()); + } + + // Get relative path for display + let rel_path = file_entry + .path + .strip_prefix(base_path) + .unwrap_or(&file_entry.path) + .to_string_lossy() + .into_owned(); + + // If smaller than max_size, send as single chunk + if content.len() <= max_size { + let chunk = FileChunk { + priority: file_entry.priority, + file_index: file_entry.file_index, + part_index: 0, + rel_path, + content, + }; + tx.send(chunk).ok(); + return Ok(()); + } + + // Otherwise split into chunks + let mut start = 0; + let mut part_index = 0; + let bytes = content.as_bytes(); + + while start < bytes.len() { + let end = (start + max_size).min(bytes.len()); + let slice = &bytes[start..end]; + let chunk_str = String::from_utf8_lossy(slice).into_owned(); + + let chunk = FileChunk { + priority: file_entry.priority, + file_index: file_entry.file_index, + part_index, + rel_path: rel_path.clone(), + content: chunk_str, + }; + + tx.send(chunk).ok(); + start = end; + part_index += 1; + } + + Ok(()) +} + +/// Main parallel processing function that coordinates workers and aggregator +pub fn process_files_parallel( + base_dir: &Path, + max_size: usize, + output_dir: &Path, + config: Option<&YekConfig>, + ignore_patterns: &[Regex], + priority_list: &[PriorityPattern], + recentness_boost: Option<&HashMap>, +) -> Result<()> { + // Create output directory + fs::create_dir_all(output_dir)?; + + // Collect and sort files by priority + let files = collect_files( + base_dir, + config, + ignore_patterns, + priority_list, + recentness_boost, + )?; + if files.is_empty() { + return Ok(()); + } + + // Create channels for worker→aggregator communication + let (tx, rx) = bounded(256); + + // Spawn aggregator thread + let output_dir = output_dir.to_path_buf(); + let aggregator_handle = thread::spawn(move || aggregator_loop(rx, output_dir)); + + // Spawn worker threads + let num_threads = get(); + let chunk_size = files.len().div_ceil(num_threads); + let mut handles = Vec::new(); + + for chunk in files.chunks(chunk_size) { + let chunk_files = chunk.to_vec(); + let sender = tx.clone(); + let base_path = base_dir.to_path_buf(); + + let handle = thread::spawn(move || -> Result<()> { + for file_entry in chunk_files { + read_and_send_chunks(file_entry, &base_path, &sender, max_size)?; + } + Ok(()) + }); + handles.push(handle); + } + + // Drop original sender + drop(tx); + + // Wait for workers + for handle in handles { + handle.join().unwrap()?; + } + + // Wait for aggregator + aggregator_handle.join().unwrap()?; + + Ok(()) +} + +/// Collects files from directory respecting .gitignore and sorts by priority +fn collect_files( + base_dir: &Path, + config: Option<&YekConfig>, + ignore_patterns: &[Regex], + priority_list: &[PriorityPattern], + recentness_boost: Option<&HashMap>, +) -> Result> { + // Build gitignore matcher + let mut builder = GitignoreBuilder::new(base_dir); + let gitignore_path = base_dir.join(".gitignore"); + if gitignore_path.exists() { + builder.add(&gitignore_path); + } + let gitignore = builder + .build() + .unwrap_or_else(|_| GitignoreBuilder::new(base_dir).build().unwrap()); + + let mut builder = WalkBuilder::new(base_dir); + builder.follow_links(false).standard_filters(true); + + let mut results = Vec::new(); + let mut file_index = 0; + + for entry in builder.build().flatten() { + if entry.file_type().is_some_and(|ft| ft.is_file()) { + let path = entry.path().to_path_buf(); + let rel_path = path.strip_prefix(base_dir).unwrap_or(&path); + let rel_str = rel_path.to_string_lossy(); + + // Skip if matched by gitignore + #[cfg(windows)] + let gitignore_path = rel_path + .to_str() + .map(|s| s.replace('\\', "/")) + .map(PathBuf::from) + .unwrap_or(rel_path.to_path_buf()); + #[cfg(not(windows))] + let gitignore_path = rel_path.to_path_buf(); + + if gitignore.matched(&gitignore_path, false).is_ignore() { + debug!("Skipping {} - matched by gitignore", rel_str); + continue; + } + + // Skip if matched by our ignore patterns + let mut skip = false; + for pat in ignore_patterns { + if pat.is_match(&rel_str) { + debug!("Skipping {} - matched ignore pattern", rel_str); + skip = true; + break; + } + } + if skip { + continue; + } + + // Skip binary files + if let Some(cfg) = config { + if !is_text_file(&path, &cfg.binary_extensions) { + debug!("Skipping binary file: {}", rel_str); + continue; + } + } else if !is_text_file(&path, &[]) { + debug!("Skipping binary file: {}", rel_str); + continue; + } + + // Calculate priority score + let mut priority = get_file_priority(&rel_str, ignore_patterns, priority_list); + + // Apply git recentness boost + if let Some(boost_map) = recentness_boost { + if let Some(boost) = boost_map.get(&rel_str.to_string()) { + priority += *boost; + } + } + + results.push(FileEntry { + path, + priority, + file_index, + }); + file_index += 1; + } + } + + // Sort by priority (ascending) so higher priority files come last + results.sort_by(|a, b| { + // First sort by priority (ascending) + let p = a.priority.cmp(&b.priority); + if p != std::cmp::Ordering::Equal { + return p; + } + // If priorities are equal, sort by Git boost (ascending) + if let Some(boost_map) = recentness_boost { + let a_boost = boost_map + .get(&a.path.to_string_lossy().to_string()) + .unwrap_or(&0); + let b_boost = boost_map + .get(&b.path.to_string_lossy().to_string()) + .unwrap_or(&0); + return a_boost.cmp(b_boost); // Lower boost (older files) come first + } + std::cmp::Ordering::Equal + }); + Ok(results) +} + +/// Receives chunks from workers and writes them to files +fn aggregator_loop(rx: Receiver, output_dir: PathBuf) -> Result<()> { + fs::create_dir_all(&output_dir)?; + + let mut all_chunks = Vec::new(); + while let Ok(chunk) = rx.recv() { + all_chunks.push(chunk); + } + + all_chunks.sort_by(|a, b| { + let p = a.priority.cmp(&b.priority); + if p != std::cmp::Ordering::Equal { + return p; + } + let f = a.file_index.cmp(&b.file_index); + if f != std::cmp::Ordering::Equal { + return f; + } + a.part_index.cmp(&b.part_index) + }); + + let mut current_chunk = String::new(); + let current_chunk_index = 0; + + for chunk in all_chunks { + let mut content = String::new(); + content.push_str(&format!(">>>> {}\n", chunk.rel_path)); + content.push_str(&chunk.content); + content.push_str("\n\n"); + current_chunk.push_str(&content); + } + + if !current_chunk.is_empty() { + let out_path = output_dir.join(format!("chunk-{}.txt", current_chunk_index)); + fs::write(&out_path, ¤t_chunk)?; + info!( + "Written chunk {} with {} lines.", + current_chunk_index, + current_chunk.lines().count() + ); + } + + Ok(()) +} diff --git a/tests/test_perf.rs b/tests/test_perf.rs index 14e4d25..4d5f040 100644 --- a/tests/test_perf.rs +++ b/tests/test_perf.rs @@ -1,6 +1,6 @@ use std::fs; +use std::path::Path; use std::time::{Duration, Instant}; -use tempfile::TempDir; use yek::serialize_repo; struct PerfStats { @@ -34,9 +34,9 @@ fn test_serialization_performance() { const WARMUP_RUNS: usize = 2; const BENCH_RUNS: usize = 5; - // Create temporary test directory that will be automatically cleaned up - let test_dir = TempDir::new().unwrap(); - let output_dir = TempDir::new().unwrap(); + // Create test data directory + let test_dir = "test_perf_data"; + fs::create_dir_all(test_dir).unwrap(); // Create test files of different sizes let sizes = vec![1024, 1024 * 1024, 10 * 1024 * 1024]; // 1KB, 1MB, 10MB @@ -45,7 +45,7 @@ fn test_serialization_performance() { println!("------------------------------"); for size in sizes { - let filename = test_dir.path().join(format!("file_{}_bytes.txt", size)); + let filename = format!("{}/file_{}_bytes.txt", test_dir, size); let data = vec![b'a'; size]; fs::write(&filename, &data).unwrap(); @@ -55,16 +55,15 @@ fn test_serialization_performance() { for _ in 0..WARMUP_RUNS { serialize_repo( size, - Some(test_dir.path()), + Some(Path::new(test_dir)), false, false, None, - Some(output_dir.path()), + Some(Path::new("perf_output")), None, ) .unwrap(); - fs::remove_dir_all(output_dir.path()).unwrap(); - fs::create_dir_all(output_dir.path()).unwrap(); + fs::remove_dir_all("perf_output").unwrap(); } // Benchmark runs @@ -75,11 +74,11 @@ fn test_serialization_performance() { let start = Instant::now(); serialize_repo( size, - Some(test_dir.path()), + Some(Path::new(test_dir)), false, false, None, - Some(output_dir.path()), + Some(Path::new("perf_output")), None, ) .unwrap(); @@ -87,8 +86,7 @@ fn test_serialization_performance() { stats.update(duration); println!(" Run {}: {:?}", run, duration); - fs::remove_dir_all(output_dir.path()).unwrap(); - fs::create_dir_all(output_dir.path()).unwrap(); + fs::remove_dir_all("perf_output").unwrap(); } println!("\nStats for {}B:", size); @@ -97,5 +95,6 @@ fn test_serialization_performance() { println!(" Avg: {:?}", stats.avg); } - // TempDir will automatically clean up when dropped + // Final cleanup + fs::remove_dir_all(test_dir).unwrap(); } From b523963a031f83f801edae04d756d73863052999 Mon Sep 17 00:00:00 2001 From: Mohsen Azimi Date: Sun, 19 Jan 2025 12:21:05 +0700 Subject: [PATCH 2/8] fix: remove default priority list --- src/lib.rs | 14 ++++------- src/parallel.rs | 60 ++++++++++++++++++++++++++++++++-------------- tests/test_perf.rs | 48 ++++++++++++++++++++++--------------- 3 files changed, 76 insertions(+), 46 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 20a8861..3deb825 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,6 +50,8 @@ pub struct YekConfig { pub output_dir: Option, #[serde(default)] pub git_boost_max: Option, + #[serde(default)] + pub channel_capacity: Option, } #[derive(Debug, Deserialize, Default, Clone)] @@ -92,6 +94,7 @@ impl Default for YekConfig { binary_extensions: Vec::new(), // User extensions only, we'll combine with BINARY_FILE_EXTENSIONS output_dir: None, git_boost_max: None, + channel_capacity: None, } } } @@ -110,10 +113,7 @@ pub struct PriorityPattern { /// Default sets of priority patterns fn default_priority_list() -> Vec { - vec![PriorityPattern { - score: 50, - patterns: vec![Regex::new(r"^src/").unwrap()], - }] + Vec::new() // Return empty list - no default priorities } /// Default sets of ignore patterns (separate from .gitignore) @@ -194,11 +194,7 @@ fn build_final_config(cfg: Option) -> FinalConfig { merged_ignore.push(reg); } } - // Clear default priority rules if user provides their own - if !user_cfg.priority_rules.is_empty() { - merged_priority.clear(); - } - // Add user priority rules + // Add user priority rules without clearing defaults for user_rule in user_cfg.priority_rules { if user_rule.patterns.is_empty() { continue; diff --git a/src/parallel.rs b/src/parallel.rs index ac4bbba..536b9ea 100644 --- a/src/parallel.rs +++ b/src/parallel.rs @@ -74,7 +74,7 @@ fn read_and_send_chunks( rel_path, content, }; - tx.send(chunk).ok(); + tx.send(chunk)?; return Ok(()); } @@ -96,7 +96,7 @@ fn read_and_send_chunks( content: chunk_str, }; - tx.send(chunk).ok(); + tx.send(chunk)?; start = end; part_index += 1; } @@ -105,6 +105,8 @@ fn read_and_send_chunks( } /// Main parallel processing function that coordinates workers and aggregator +pub const DEFAULT_CHANNEL_CAPACITY: usize = 1024; // Increased from 256 + pub fn process_files_parallel( base_dir: &Path, max_size: usize, @@ -129,8 +131,13 @@ pub fn process_files_parallel( return Ok(()); } + // Get channel capacity from config or use default + let channel_capacity = config + .and_then(|c| c.channel_capacity) + .unwrap_or(DEFAULT_CHANNEL_CAPACITY); + // Create channels for worker→aggregator communication - let (tx, rx) = bounded(256); + let (tx, rx) = bounded(channel_capacity); // Spawn aggregator thread let output_dir = output_dir.to_path_buf(); @@ -281,13 +288,13 @@ fn collect_files( /// Receives chunks from workers and writes them to files fn aggregator_loop(rx: Receiver, output_dir: PathBuf) -> Result<()> { - fs::create_dir_all(&output_dir)?; - + // Collect chunks first to maintain priority order let mut all_chunks = Vec::new(); while let Ok(chunk) = rx.recv() { all_chunks.push(chunk); } + // Sort chunks by priority, file index, and part index all_chunks.sort_by(|a, b| { let p = a.priority.cmp(&b.priority); if p != std::cmp::Ordering::Equal { @@ -301,25 +308,42 @@ fn aggregator_loop(rx: Receiver, output_dir: PathBuf) -> Result<()> { }); let mut current_chunk = String::new(); - let current_chunk_index = 0; + let mut current_chunk_size = 0; + let mut current_chunk_index = 0; + // Process chunks in sorted order for chunk in all_chunks { - let mut content = String::new(); - content.push_str(&format!(">>>> {}\n", chunk.rel_path)); - content.push_str(&chunk.content); - content.push_str("\n\n"); - current_chunk.push_str(&content); + let chunk_str = format!(">>>> {}\n{}\n\n", chunk.rel_path, chunk.content); + let chunk_size = chunk_str.len(); + + // If adding this chunk would exceed reasonable buffer size, write current chunk + if current_chunk_size + chunk_size > 1024 * 1024 * 10 { + // 10MB buffer + write_chunk_to_file(&output_dir, current_chunk_index, ¤t_chunk)?; + current_chunk.clear(); + current_chunk_size = 0; + current_chunk_index += 1; + } + + current_chunk.push_str(&chunk_str); + current_chunk_size += chunk_size; } + // Write final chunk if any content remains if !current_chunk.is_empty() { - let out_path = output_dir.join(format!("chunk-{}.txt", current_chunk_index)); - fs::write(&out_path, ¤t_chunk)?; - info!( - "Written chunk {} with {} lines.", - current_chunk_index, - current_chunk.lines().count() - ); + write_chunk_to_file(&output_dir, current_chunk_index, ¤t_chunk)?; } Ok(()) } + +fn write_chunk_to_file(output_dir: &Path, index: usize, content: &str) -> Result<()> { + let chunk_path = output_dir.join(format!("chunk-{}.txt", index)); + fs::write(&chunk_path, content)?; + info!( + "Written chunk {} with {} lines.", + index, + content.lines().count() + ); + Ok(()) +} diff --git a/tests/test_perf.rs b/tests/test_perf.rs index 4d5f040..0675827 100644 --- a/tests/test_perf.rs +++ b/tests/test_perf.rs @@ -1,6 +1,7 @@ use std::fs; use std::path::Path; use std::time::{Duration, Instant}; +use tempfile::TempDir; use yek::serialize_repo; struct PerfStats { @@ -30,13 +31,13 @@ impl PerfStats { } #[test] -fn test_serialization_performance() { +fn test_serialization_performance() -> Result<(), Box> { const WARMUP_RUNS: usize = 2; const BENCH_RUNS: usize = 5; - // Create test data directory - let test_dir = "test_perf_data"; - fs::create_dir_all(test_dir).unwrap(); + // Create test data directory using tempfile + let test_dir = TempDir::new()?; + let output_dir = TempDir::new()?; // Create test files of different sizes let sizes = vec![1024, 1024 * 1024, 10 * 1024 * 1024]; // 1KB, 1MB, 10MB @@ -45,9 +46,9 @@ fn test_serialization_performance() { println!("------------------------------"); for size in sizes { - let filename = format!("{}/file_{}_bytes.txt", test_dir, size); + let filename = test_dir.path().join(format!("file_{}_bytes.txt", size)); let data = vec![b'a'; size]; - fs::write(&filename, &data).unwrap(); + fs::write(&filename, &data)?; // Warmup runs println!("\nFile size: {}B", size); @@ -55,15 +56,20 @@ fn test_serialization_performance() { for _ in 0..WARMUP_RUNS { serialize_repo( size, - Some(Path::new(test_dir)), + Some(test_dir.path()), false, false, None, - Some(Path::new("perf_output")), + Some(output_dir.path()), None, - ) - .unwrap(); - fs::remove_dir_all("perf_output").unwrap(); + )?; + // Clean output directory between runs + for entry in fs::read_dir(output_dir.path())? { + let entry = entry?; + if entry.file_type()?.is_file() { + fs::remove_file(entry.path())?; + } + } } // Benchmark runs @@ -74,27 +80,31 @@ fn test_serialization_performance() { let start = Instant::now(); serialize_repo( size, - Some(Path::new(test_dir)), + Some(test_dir.path()), false, false, None, - Some(Path::new("perf_output")), + Some(output_dir.path()), None, - ) - .unwrap(); + )?; let duration = start.elapsed(); stats.update(duration); println!(" Run {}: {:?}", run, duration); - fs::remove_dir_all("perf_output").unwrap(); + // Clean output directory between runs + for entry in fs::read_dir(output_dir.path())? { + let entry = entry?; + if entry.file_type()?.is_file() { + fs::remove_file(entry.path())?; + } + } } - println!("\nStats for {}B:", size); + println!("\nResults for {}B files:", size); println!(" Min: {:?}", stats.min); println!(" Max: {:?}", stats.max); println!(" Avg: {:?}", stats.avg); } - // Final cleanup - fs::remove_dir_all(test_dir).unwrap(); + Ok(()) } From 5b3bf22d849cb9a789af8c2c44780c84f8453c86 Mon Sep 17 00:00:00 2001 From: Mohsen Azimi Date: Sun, 19 Jan 2025 12:24:29 +0700 Subject: [PATCH 3/8] chore: remove unused import from benchmark --- benches/serialization.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/benches/serialization.rs b/benches/serialization.rs index 0b76c40..fde1604 100644 --- a/benches/serialization.rs +++ b/benches/serialization.rs @@ -1,6 +1,4 @@ -use criterion::{ - black_box, criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput, -}; +use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion, Throughput}; use rand::{distributions::Alphanumeric, Rng}; use std::fs::{self, File}; use std::io::Write; From aeaa7e37fcd4cacbb93c202ffe4c4d81ba77d54c Mon Sep 17 00:00:00 2001 From: Mohsen Azimi Date: Sun, 19 Jan 2025 12:34:45 +0700 Subject: [PATCH 4/8] fix: update benchmark comparison parameter to use --noise-threshold --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8763755..63d76b7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -150,7 +150,7 @@ jobs: cargo bench --bench serialization --no-run - name: Compare benchmarks - run: cargo bench --bench serialization -- --baseline main --significance-threshold 5 + run: cargo bench --bench serialization -- --baseline main --noise-threshold 5 - name: Upload benchmark results uses: actions/upload-artifact@v3 From 42ffefa4867609d529f07b4b97da541515bd1d9b Mon Sep 17 00:00:00 2001 From: Mohsen Azimi Date: Sun, 19 Jan 2025 12:54:16 +0700 Subject: [PATCH 5/8] ci: reduce benchmarking threshold --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 63d76b7..de3084d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -150,7 +150,7 @@ jobs: cargo bench --bench serialization --no-run - name: Compare benchmarks - run: cargo bench --bench serialization -- --baseline main --noise-threshold 5 + run: cargo bench --bench serialization -- --baseline main --noise-threshold 2 - name: Upload benchmark results uses: actions/upload-artifact@v3 From 3823d62ff36d29e28998d43b169c10d9715cdb21 Mon Sep 17 00:00:00 2001 From: Mohsen Azimi Date: Sun, 19 Jan 2025 12:56:15 +0700 Subject: [PATCH 6/8] perf: optimize file processing performance --- src/lib.rs | 44 ++++++++++++++++++++++++++++ src/parallel.rs | 76 ++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 113 insertions(+), 7 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3deb825..660097b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -235,6 +235,8 @@ fn build_final_config(cfg: Option) -> FinalConfig { /// Check if file is text by extension or scanning first chunk for null bytes. pub fn is_text_file(file_path: &Path, user_binary_extensions: &[String]) -> bool { debug!("Checking if file is text: {}", file_path.display()); + + // First check extension - fast path if let Some(ext) = file_path.extension().and_then(|s| s.to_str()) { let dot_ext = format!(".{}", ext.to_lowercase()); if BINARY_FILE_EXTENSIONS.contains(&dot_ext.as_str()) @@ -246,7 +248,49 @@ pub fn is_text_file(file_path: &Path, user_binary_extensions: &[String]) -> bool ); return false; } + + // Known text extensions - skip content check + if matches!( + dot_ext.as_str(), + ".txt" + | ".md" + | ".rs" + | ".toml" + | ".yml" + | ".yaml" + | ".json" + | ".js" + | ".ts" + | ".html" + | ".css" + | ".sh" + | ".py" + | ".rb" + | ".pl" + | ".php" + | ".java" + | ".c" + | ".cpp" + | ".h" + | ".hpp" + | ".go" + | ".swift" + | ".kt" + | ".scala" + | ".r" + | ".m" + | ".sql" + | ".xml" + | ".ini" + | ".conf" + | ".cfg" + | ".properties" + ) { + return true; + } } + + // Only do content check for unknown extensions let mut f = match File::open(file_path) { Ok(f) => f, Err(e) => { diff --git a/src/parallel.rs b/src/parallel.rs index 536b9ea..8e6ff13 100644 --- a/src/parallel.rs +++ b/src/parallel.rs @@ -106,6 +106,7 @@ fn read_and_send_chunks( /// Main parallel processing function that coordinates workers and aggregator pub const DEFAULT_CHANNEL_CAPACITY: usize = 1024; // Increased from 256 +pub const PARALLEL_THRESHOLD: usize = 10; // Only parallelize if more than 10 files pub fn process_files_parallel( base_dir: &Path, @@ -119,6 +120,69 @@ pub fn process_files_parallel( // Create output directory fs::create_dir_all(output_dir)?; + // Collect and sort files + let files = collect_files( + base_dir, + config, + ignore_patterns, + priority_list, + recentness_boost, + )?; + + // For small sets of files, process sequentially + if files.len() <= PARALLEL_THRESHOLD { + debug!("Processing {} files sequentially", files.len()); + let mut current_chunk = String::new(); + let mut current_chunk_size = 0; + let mut current_chunk_index = 0; + + for file in files { + let content = match fs::read_to_string(&file.path) { + Ok(c) => c, + Err(e) => { + debug!("Failed to read {}: {}", file.path.display(), e); + continue; + } + }; + + if content.is_empty() { + continue; + } + + let rel_path = file + .path + .strip_prefix(base_dir) + .unwrap_or(&file.path) + .to_string_lossy() + .into_owned(); + + let chunk_str = format!(">>>> {}\n{}\n\n", rel_path, content); + let chunk_size = chunk_str.len(); + + // Write chunk if buffer would exceed size + if current_chunk_size + chunk_size > 1024 * 1024 * 10 { + // 10MB buffer + write_chunk_to_file(output_dir, current_chunk_index, ¤t_chunk)?; + current_chunk.clear(); + current_chunk_size = 0; + current_chunk_index += 1; + } + + current_chunk.push_str(&chunk_str); + current_chunk_size += chunk_size; + } + + // Write final chunk if any content remains + if !current_chunk.is_empty() { + write_chunk_to_file(output_dir, current_chunk_index, ¤t_chunk)?; + } + + return Ok(()); + } + + // For larger sets, process in parallel + debug!("Processing {} files in parallel", files.len()); + // Collect and sort files by priority let files = collect_files( base_dir, @@ -234,13 +298,11 @@ fn collect_files( continue; } - // Skip binary files - if let Some(cfg) = config { - if !is_text_file(&path, &cfg.binary_extensions) { - debug!("Skipping binary file: {}", rel_str); - continue; - } - } else if !is_text_file(&path, &[]) { + // Skip binary files - do this check early to avoid double reads later + let binary_extensions = config + .map(|c| c.binary_extensions.as_slice()) + .unwrap_or(&[]); + if !is_text_file(&path, binary_extensions) { debug!("Skipping binary file: {}", rel_str); continue; } From af3a75dd6f46c8bf298f35f84f143555da11eae2 Mon Sep 17 00:00:00 2001 From: Mohsen Azimi Date: Sun, 19 Jan 2025 13:02:23 +0700 Subject: [PATCH 7/8] perf: optimize file processing with single-pass reads and smart parallelization --- src/lib.rs | 146 +++++++++++++++++++---------- src/parallel.rs | 178 +++++++++++++++--------------------- tests/test_multiple_dirs.rs | 1 - tests/test_perf.rs | 1 - 4 files changed, 171 insertions(+), 155 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 660097b..e93a237 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -68,15 +68,98 @@ pub struct PriorityRule { /// BINARY file checks by extension const BINARY_FILE_EXTENSIONS: &[&str] = &[ - ".jpg", ".pdf", ".mid", ".blend", ".p12", ".rco", ".tgz", ".jpeg", ".mp4", ".midi", ".crt", - ".p7b", ".ovl", ".bz2", ".png", ".webm", ".aac", ".key", ".gbr", ".mo", ".xz", ".gif", ".mov", - ".flac", ".pem", ".pcb", ".nib", ".dat", ".ico", ".mp3", ".bmp", ".der", ".icns", ".xap", - ".lib", ".webp", ".wav", ".psd", ".png2", ".xdf", ".psf", ".jar", ".ttf", ".exe", ".ai", - ".jp2", ".zip", ".pak", ".vhd", ".woff", ".dll", ".eps", ".swc", ".rar", ".img3", ".gho", - ".woff2", ".bin", ".raw", ".mso", ".7z", ".img4", ".efi", ".eot", ".iso", ".tif", ".class", - ".gz", ".msi", ".ocx", ".sys", ".img", ".tiff", ".apk", ".tar", ".cab", ".scr", ".so", ".dmg", - ".3ds", ".com", ".elf", ".o", ".max", ".obj", ".drv", ".rom", ".a", ".vhdx", ".fbx", ".bpl", - ".cpl", + "jpg", "jpeg", "png", "gif", "exe", "dll", "so", "dylib", "pdf", "mp4", "mov", "webm", "zip", + "tar", "gz", "bz2", "7z", "rar", "class", "jar", "psd", "blend", "fbx", "max", "obj", "lib", + "a", "iso", "ico", "ttf", "woff", "woff2", "ps", "eps", "doc", "docx", "xls", "xlsx", "ppt", + "pptx", "psd", "apk", "msi", "sys", "o", "obj", "pdb", "nib", "ogg", "wav", "flac", "mp3", + "mpg", "mpeg", "avi", "dll", "pem", "crt", "mid", "p12", "rco", "tgz", "aac", "key", "gbr", + "mo", "xz", "pcb", "dat", "xap", "webp", "png2", "xdf", "psf", "ai", "jp2", "pak", "vhd", + "swc", "img3", "gho", "bin", "raw", "mso", "img4", "efi", "eot", "tif", "ocx", "img", "tiff", + "cab", "scr", "dmg", "3ds", "com", "elf", "drv", "rom", "vhdx", "bpl", "cpl", +]; + +/// Known text file extensions that can skip binary checks +const TEXT_FILE_EXTENSIONS: &[&str] = &[ + "rs", + "toml", + "md", + "txt", + "json", + "yaml", + "yml", + "js", + "ts", + "jsx", + "tsx", + "html", + "css", + "scss", + "sh", + "c", + "h", + "cpp", + "hpp", + "py", + "java", + "go", + "rb", + "php", + "sql", + "xml", + "ini", + "conf", + "cfg", + "properties", + "gitignore", + "env", + "lock", + "gradle", + "pom", + "sbt", + "scala", + "kt", + "kts", + "dart", + "swift", + "m", + "mm", + "r", + "pl", + "pm", + "t", + "psgi", + "rake", + "gemspec", + "podspec", + "cs", + "fs", + "fsx", + "fsi", + "fsscript", + "gradle", + "groovy", + "haml", + "hbs", + "jade", + "jsx", + "less", + "lua", + "markdown", + "coffee", + "elm", + "erl", + "ex", + "exs", + "fish", + "vue", + "svelte", + "tf", + "tfvars", + "proto", + "graphql", + "prisma", + "dhall", + "zig", ]; /// We'll define a minimal default config. The user can override parts of it from a TOML file. @@ -238,9 +321,11 @@ pub fn is_text_file(file_path: &Path, user_binary_extensions: &[String]) -> bool // First check extension - fast path if let Some(ext) = file_path.extension().and_then(|s| s.to_str()) { - let dot_ext = format!(".{}", ext.to_lowercase()); - if BINARY_FILE_EXTENSIONS.contains(&dot_ext.as_str()) - || user_binary_extensions.contains(&dot_ext) + let ext_lc = ext.to_lowercase(); + if BINARY_FILE_EXTENSIONS.contains(&ext_lc.as_str()) + || user_binary_extensions + .iter() + .any(|e| e.trim_start_matches('.') == ext_lc) { debug!( "File {} identified as binary by extension", @@ -250,42 +335,7 @@ pub fn is_text_file(file_path: &Path, user_binary_extensions: &[String]) -> bool } // Known text extensions - skip content check - if matches!( - dot_ext.as_str(), - ".txt" - | ".md" - | ".rs" - | ".toml" - | ".yml" - | ".yaml" - | ".json" - | ".js" - | ".ts" - | ".html" - | ".css" - | ".sh" - | ".py" - | ".rb" - | ".pl" - | ".php" - | ".java" - | ".c" - | ".cpp" - | ".h" - | ".hpp" - | ".go" - | ".swift" - | ".kt" - | ".scala" - | ".r" - | ".m" - | ".sql" - | ".xml" - | ".ini" - | ".conf" - | ".cfg" - | ".properties" - ) { + if TEXT_FILE_EXTENSIONS.contains(&ext_lc.as_str()) { return true; } } diff --git a/src/parallel.rs b/src/parallel.rs index 8e6ff13..52e6fe0 100644 --- a/src/parallel.rs +++ b/src/parallel.rs @@ -1,5 +1,4 @@ -use crate::is_text_file; -use crate::{get_file_priority, PriorityPattern, YekConfig}; +use crate::{get_file_priority, is_text_file, PriorityPattern, YekConfig}; use anyhow::Result; use crossbeam::channel::{bounded, Receiver, Sender}; use ignore::{gitignore::GitignoreBuilder, WalkBuilder}; @@ -7,7 +6,7 @@ use num_cpus::get; use regex::Regex; use std::collections::HashMap; use std::fs; -use std::io::{BufReader, Read}; +use std::io::Read; use std::path::{Path, PathBuf}; use std::thread; use tracing::{debug, info}; @@ -30,82 +29,8 @@ struct FileEntry { file_index: usize, } -/// Reads a file and determines if it's likely binary by checking for null bytes -fn is_likely_binary(path: &Path) -> Result { - let f = fs::File::open(path)?; - let mut reader = BufReader::new(f); - let mut buf = [0; 4096]; - let n = reader.read(&mut buf)?; - Ok(buf[..n].contains(&0)) -} - -/// Reads and chunks a single file, sending chunks through the channel -fn read_and_send_chunks( - file_entry: FileEntry, - base_path: &Path, - tx: &Sender, - max_size: usize, -) -> Result<()> { - // Skip if binary - if is_likely_binary(&file_entry.path)? { - return Ok(()); - } - - // Read file content - let content = fs::read_to_string(&file_entry.path)?; - if content.is_empty() { - return Ok(()); - } - - // Get relative path for display - let rel_path = file_entry - .path - .strip_prefix(base_path) - .unwrap_or(&file_entry.path) - .to_string_lossy() - .into_owned(); - - // If smaller than max_size, send as single chunk - if content.len() <= max_size { - let chunk = FileChunk { - priority: file_entry.priority, - file_index: file_entry.file_index, - part_index: 0, - rel_path, - content, - }; - tx.send(chunk)?; - return Ok(()); - } - - // Otherwise split into chunks - let mut start = 0; - let mut part_index = 0; - let bytes = content.as_bytes(); - - while start < bytes.len() { - let end = (start + max_size).min(bytes.len()); - let slice = &bytes[start..end]; - let chunk_str = String::from_utf8_lossy(slice).into_owned(); - - let chunk = FileChunk { - priority: file_entry.priority, - file_index: file_entry.file_index, - part_index, - rel_path: rel_path.clone(), - content: chunk_str, - }; - - tx.send(chunk)?; - start = end; - part_index += 1; - } - - Ok(()) -} - /// Main parallel processing function that coordinates workers and aggregator -pub const DEFAULT_CHANNEL_CAPACITY: usize = 1024; // Increased from 256 +pub const DEFAULT_CHANNEL_CAPACITY: usize = 1024; pub const PARALLEL_THRESHOLD: usize = 10; // Only parallelize if more than 10 files pub fn process_files_parallel( @@ -117,10 +42,8 @@ pub fn process_files_parallel( priority_list: &[PriorityPattern], recentness_boost: Option<&HashMap>, ) -> Result<()> { - // Create output directory fs::create_dir_all(output_dir)?; - // Collect and sort files let files = collect_files( base_dir, config, @@ -129,6 +52,10 @@ pub fn process_files_parallel( recentness_boost, )?; + if files.is_empty() { + return Ok(()); + } + // For small sets of files, process sequentially if files.len() <= PARALLEL_THRESHOLD { debug!("Processing {} files sequentially", files.len()); @@ -183,19 +110,6 @@ pub fn process_files_parallel( // For larger sets, process in parallel debug!("Processing {} files in parallel", files.len()); - // Collect and sort files by priority - let files = collect_files( - base_dir, - config, - ignore_patterns, - priority_list, - recentness_boost, - )?; - if files.is_empty() { - return Ok(()); - } - - // Get channel capacity from config or use default let channel_capacity = config .and_then(|c| c.channel_capacity) .unwrap_or(DEFAULT_CHANNEL_CAPACITY); @@ -207,8 +121,8 @@ pub fn process_files_parallel( let output_dir = output_dir.to_path_buf(); let aggregator_handle = thread::spawn(move || aggregator_loop(rx, output_dir)); - // Spawn worker threads - let num_threads = get(); + // Spawn worker threads - use fewer threads for smaller workloads + let num_threads = if files.len() < 4 { 1 } else { get() }; let chunk_size = files.len().div_ceil(num_threads); let mut handles = Vec::new(); @@ -219,7 +133,7 @@ pub fn process_files_parallel( let handle = thread::spawn(move || -> Result<()> { for file_entry in chunk_files { - read_and_send_chunks(file_entry, &base_path, &sender, max_size)?; + read_and_send_chunks(&base_path, file_entry, max_size, &sender)?; } Ok(()) }); @@ -240,6 +154,65 @@ pub fn process_files_parallel( Ok(()) } +/// Reads and chunks a single file, sending chunks through the channel +fn read_and_send_chunks( + base_path: &Path, + file_entry: FileEntry, + max_size: usize, + tx: &Sender, +) -> Result<()> { + let mut file = fs::File::open(&file_entry.path)?; + let rel_path = file_entry + .path + .strip_prefix(base_path) + .unwrap_or(&file_entry.path) + .to_string_lossy() + .into_owned(); + + // Read file content in chunks to avoid loading entire file + let mut total_buf = Vec::new(); + file.read_to_end(&mut total_buf)?; + + if total_buf.is_empty() { + return Ok(()); + } + + // If total size <= max_size, send it as single chunk + if total_buf.len() <= max_size { + let chunk_content = String::from_utf8_lossy(&total_buf).to_string(); + let fc = FileChunk { + priority: file_entry.priority, + file_index: file_entry.file_index, + part_index: 0, + rel_path, + content: chunk_content, + }; + tx.send(fc)?; + return Ok(()); + } + + // Otherwise break into multiple parts + let mut start = 0; + let mut part_index = 0; + while start < total_buf.len() { + let end = (start + max_size).min(total_buf.len()); + let slice = &total_buf[start..end]; + let chunk_str = String::from_utf8_lossy(slice).to_string(); + + let fc = FileChunk { + priority: file_entry.priority, + file_index: file_entry.file_index, + part_index, + rel_path: format!("{}:part{}", rel_path, part_index), + content: chunk_str, + }; + tx.send(fc)?; + start = end; + part_index += 1; + } + Ok(()) +} + /// Collects files from directory respecting .gitignore and sorts by priority fn collect_files( base_dir: &Path, @@ -358,15 +331,10 @@ fn aggregator_loop(rx: Receiver, output_dir: PathBuf) -> Result<()> { // Sort chunks by priority, file index, and part index all_chunks.sort_by(|a, b| { - let p = a.priority.cmp(&b.priority); - if p != std::cmp::Ordering::Equal { - return p; - } - let f = a.file_index.cmp(&b.file_index); - if f != std::cmp::Ordering::Equal { - return f; - } - a.part_index.cmp(&b.part_index) + a.priority + .cmp(&b.priority) + .then(a.file_index.cmp(&b.file_index)) + .then(a.part_index.cmp(&b.part_index)) }); let mut current_chunk = String::new(); diff --git a/tests/test_multiple_dirs.rs b/tests/test_multiple_dirs.rs index eca37be..e0ca61d 100644 --- a/tests/test_multiple_dirs.rs +++ b/tests/test_multiple_dirs.rs @@ -1,7 +1,6 @@ mod integration_common; use assert_cmd::Command; use integration_common::{create_file, setup_temp_repo}; -use predicates; #[test] fn multiple_directories_test() { diff --git a/tests/test_perf.rs b/tests/test_perf.rs index 0675827..0158e6f 100644 --- a/tests/test_perf.rs +++ b/tests/test_perf.rs @@ -1,5 +1,4 @@ use std::fs; -use std::path::Path; use std::time::{Duration, Instant}; use tempfile::TempDir; use yek::serialize_repo; From 8a6e6afef919686b808b0fa026969da8920f9eea Mon Sep 17 00:00:00 2001 From: Mohsen Azimi Date: Sun, 19 Jan 2025 13:04:56 +0700 Subject: [PATCH 8/8] fix: output directory handling in non-streaming mode --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index e93a237..5b17a16 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -834,7 +834,7 @@ pub fn serialize_repo( )?; Ok(Some(out_dir)) } else { - Ok(None) + Ok(output_dir) } }