diff --git a/Cargo.lock b/Cargo.lock index ea89f8f3623167..8ebde8a37a91e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12855,6 +12855,23 @@ dependencies = [ "zlog", ] +[[package]] +name = "project_benchmarks" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "client", + "futures 0.3.31", + "gpui", + "http_client", + "language", + "node_runtime", + "project", + "settings", + "watch", +] + [[package]] name = "project_panel" version = "0.1.0" @@ -20555,6 +20572,16 @@ dependencies = [ "zlog", ] +[[package]] +name = "worktree_benchmarks" +version = "0.1.0" +dependencies = [ + "fs", + "gpui", + "settings", + "worktree", +] + [[package]] name = "writeable" version = "0.6.1" diff --git a/Cargo.toml b/Cargo.toml index 5f4a72a5a6f62b..c3ddd0f42fc779 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -126,6 +126,7 @@ members = [ "crates/picker", "crates/prettier", "crates/project", + "crates/project_benchmarks", "crates/project_panel", "crates/project_symbols", "crates/prompt_store", @@ -194,6 +195,7 @@ members = [ "crates/web_search_providers", "crates/workspace", "crates/worktree", + "crates/worktree_benchmarks", "crates/x_ai", "crates/zed", "crates/zed_actions", diff --git a/crates/project/src/buffer_store.rs b/crates/project/src/buffer_store.rs index 9c7caa6280c650..b36b44cfd4d0b3 100644 --- a/crates/project/src/buffer_store.rs +++ b/crates/project/src/buffer_store.rs @@ -1,14 +1,12 @@ use crate::{ - ProjectItem as _, ProjectPath, + ProjectPath, lsp_store::OpenLspBufferHandle, - search::SearchQuery, worktree_store::{WorktreeStore, WorktreeStoreEvent}, }; use anyhow::{Context as _, Result, anyhow}; use client::Client; use collections::{HashMap, HashSet, hash_map}; -use fs::Fs; -use futures::{Future, FutureExt as _, StreamExt, channel::oneshot, future::Shared}; +use futures::{Future, FutureExt as _, channel::oneshot, future::Shared}; use gpui::{ App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, }; @@ -23,8 +21,8 @@ use rpc::{ AnyProtoClient, ErrorCode, ErrorExt as _, TypedEnvelope, proto::{self}, }; -use smol::channel::Receiver; -use std::{io, pin::pin, sync::Arc, time::Instant}; + +use std::{io, sync::Arc, time::Instant}; use text::{BufferId, ReplicaId}; use util::{ResultExt as _, TryFutureExt, debug_panic, maybe, rel_path::RelPath}; use worktree::{File, PathChange, ProjectEntryId, Worktree, WorktreeId}; @@ -972,6 +970,10 @@ impl BufferStore { .filter_map(|buffer| buffer.upgrade()) } + pub(crate) fn is_searchable(&self, id: &BufferId) -> bool { + !self.non_searchable_buffers.contains(&id) + } + pub fn loading_buffers( &self, ) -> impl Iterator>>)> { @@ -1096,63 +1098,6 @@ impl BufferStore { Some(()) } - pub fn find_search_candidates( - &mut self, - query: &SearchQuery, - mut limit: usize, - fs: Arc, - cx: &mut Context, - ) -> Receiver> { - let (tx, rx) = smol::channel::unbounded(); - let mut open_buffers = HashSet::default(); - let mut unnamed_buffers = Vec::new(); - for handle in self.buffers() { - let buffer = handle.read(cx); - if self.non_searchable_buffers.contains(&buffer.remote_id()) { - continue; - } else if let Some(entry_id) = buffer.entry_id(cx) { - open_buffers.insert(entry_id); - } else { - limit = limit.saturating_sub(1); - unnamed_buffers.push(handle) - }; - } - - const MAX_CONCURRENT_BUFFER_OPENS: usize = 64; - let project_paths_rx = self - .worktree_store - .update(cx, |worktree_store, cx| { - worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx) - }) - .chunks(MAX_CONCURRENT_BUFFER_OPENS); - - cx.spawn(async move |this, cx| { - for buffer in unnamed_buffers { - tx.send(buffer).await.ok(); - } - - let mut project_paths_rx = pin!(project_paths_rx); - while let Some(project_paths) = project_paths_rx.next().await { - let buffers = this.update(cx, |this, cx| { - project_paths - .into_iter() - .map(|project_path| this.open_buffer(project_path, cx)) - .collect::>() - })?; - for buffer_task in buffers { - if let Some(buffer) = buffer_task.await.log_err() - && tx.send(buffer).await.is_err() - { - return anyhow::Ok(()); - } - } - } - anyhow::Ok(()) - }) - .detach(); - rx - } - fn on_buffer_event( &mut self, buffer: Entity, diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 4a7ac5fa50fac0..678607b5321999 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -11,6 +11,7 @@ pub mod lsp_command; pub mod lsp_store; mod manifest_tree; pub mod prettier_store; +mod project_search; pub mod project_settings; pub mod search; mod task_inventory; @@ -39,6 +40,7 @@ use crate::{ agent_server_store::AllAgentServersSettings, git_store::GitStore, lsp_store::{SymbolLocation, log_store::LogKind}, + project_search::SearchResultsHandle, }; pub use agent_server_store::{AgentServerStore, AgentServersUpdated}; pub use git_store::{ @@ -46,6 +48,7 @@ pub use git_store::{ git_traversal::{ChildEntriesGitIter, GitEntry, GitEntryRef, GitTraversal}, }; pub use manifest_tree::ManifestTree; +pub use project_search::Search; use anyhow::{Context as _, Result, anyhow}; use buffer_store::{BufferStore, BufferStoreEvent}; @@ -109,7 +112,7 @@ use snippet_provider::SnippetProvider; use std::{ borrow::Cow, collections::BTreeMap, - ops::Range, + ops::{Not as _, Range}, path::{Path, PathBuf}, pin::pin, str, @@ -123,7 +126,7 @@ use text::{Anchor, BufferId, OffsetRangeExt, Point, Rope}; use toolchain_store::EmptyToolchainStore; use util::{ ResultExt as _, maybe, - paths::{PathStyle, SanitizedPath, compare_paths, is_absolute}, + paths::{PathStyle, SanitizedPath, is_absolute}, rel_path::RelPath, }; use worktree::{CreatedEntry, Snapshot, Traversal}; @@ -150,8 +153,6 @@ pub use lsp_store::{ }; pub use toolchain_store::{ToolchainStore, Toolchains}; const MAX_PROJECT_SEARCH_HISTORY_SIZE: usize = 500; -const MAX_SEARCH_RESULT_FILES: usize = 5_000; -const MAX_SEARCH_RESULT_RANGES: usize = 10_000; pub trait ProjectItem: 'static { fn try_open( @@ -3935,179 +3936,44 @@ impl Project { }) } - pub fn search(&mut self, query: SearchQuery, cx: &mut Context) -> Receiver { - let (result_tx, result_rx) = smol::channel::unbounded(); - - let matching_buffers_rx = if query.is_opened_only() { - self.sort_search_candidates(&query, cx) - } else { - self.find_search_candidate_buffers(&query, MAX_SEARCH_RESULT_FILES + 1, cx) - }; - - cx.spawn(async move |_, cx| { - let mut range_count = 0; - let mut buffer_count = 0; - let mut limit_reached = false; - let query = Arc::new(query); - let chunks = matching_buffers_rx.ready_chunks(64); - - // Now that we know what paths match the query, we will load at most - // 64 buffers at a time to avoid overwhelming the main thread. For each - // opened buffer, we will spawn a background task that retrieves all the - // ranges in the buffer matched by the query. - let mut chunks = pin!(chunks); - 'outer: while let Some(matching_buffer_chunk) = chunks.next().await { - let mut chunk_results = Vec::with_capacity(matching_buffer_chunk.len()); - for buffer in matching_buffer_chunk { - let query = query.clone(); - let snapshot = buffer.read_with(cx, |buffer, _| buffer.snapshot())?; - chunk_results.push(cx.background_spawn(async move { - let ranges = query - .search(&snapshot, None) - .await - .iter() - .map(|range| { - snapshot.anchor_before(range.start) - ..snapshot.anchor_after(range.end) - }) - .collect::>(); - anyhow::Ok((buffer, ranges)) - })); - } - - let chunk_results = futures::future::join_all(chunk_results).await; - for result in chunk_results { - if let Some((buffer, ranges)) = result.log_err() { - range_count += ranges.len(); - buffer_count += 1; - result_tx - .send(SearchResult::Buffer { buffer, ranges }) - .await?; - if buffer_count > MAX_SEARCH_RESULT_FILES - || range_count > MAX_SEARCH_RESULT_RANGES - { - limit_reached = true; - break 'outer; - } - } - } - } - - if limit_reached { - result_tx.send(SearchResult::LimitReached).await?; - } - - anyhow::Ok(()) - }) - .detach(); - - result_rx - } - - fn find_search_candidate_buffers( - &mut self, - query: &SearchQuery, - limit: usize, - cx: &mut Context, - ) -> Receiver> { - if self.is_local() { - let fs = self.fs.clone(); - self.buffer_store.update(cx, |buffer_store, cx| { - buffer_store.find_search_candidates(query, limit, fs, cx) - }) - } else { - self.find_search_candidates_remote(query, limit, cx) - } - } - - fn sort_search_candidates( - &mut self, - search_query: &SearchQuery, - cx: &mut Context, - ) -> Receiver> { - let worktree_store = self.worktree_store.read(cx); - let mut buffers = search_query - .buffers() - .into_iter() - .flatten() - .filter(|buffer| { - let b = buffer.read(cx); - if let Some(file) = b.file() { - if !search_query.match_path(file.path().as_std_path()) { - return false; - } - if let Some(entry) = b - .entry_id(cx) - .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx)) - && entry.is_ignored - && !search_query.include_ignored() - { - return false; - } - } - true - }) - .collect::>(); - let (tx, rx) = smol::channel::unbounded(); - buffers.sort_by(|a, b| match (a.read(cx).file(), b.read(cx).file()) { - (None, None) => a.read(cx).remote_id().cmp(&b.read(cx).remote_id()), - (None, Some(_)) => std::cmp::Ordering::Less, - (Some(_), None) => std::cmp::Ordering::Greater, - (Some(a), Some(b)) => compare_paths( - (a.path().as_std_path(), true), - (b.path().as_std_path(), true), - ), - }); - for buffer in buffers { - tx.send_blocking(buffer.clone()).unwrap() - } - - rx - } - - fn find_search_candidates_remote( - &mut self, - query: &SearchQuery, - limit: usize, - cx: &mut Context, - ) -> Receiver> { - let (tx, rx) = smol::channel::unbounded(); - - let (client, remote_id): (AnyProtoClient, _) = if let Some(ssh_client) = &self.remote_client - { - (ssh_client.read(cx).proto_client(), 0) + fn search_impl(&mut self, query: SearchQuery, cx: &mut Context) -> SearchResultsHandle { + let client: Option<(AnyProtoClient, _)> = if let Some(ssh_client) = &self.remote_client { + Some((ssh_client.read(cx).proto_client(), 0)) } else if let Some(remote_id) = self.remote_id() { - (self.collab_client.clone().into(), remote_id) + self.is_local() + .not() + .then(|| (self.collab_client.clone().into(), remote_id)) } else { - return rx; + None }; - - let request = client.request(proto::FindSearchCandidates { - project_id: remote_id, - query: Some(query.to_proto()), - limit: limit as _, - }); - let guard = self.retain_remotely_created_models(cx); - - cx.spawn(async move |project, cx| { - let response = request.await?; - for buffer_id in response.buffer_ids { - let buffer_id = BufferId::new(buffer_id)?; - let buffer = project - .update(cx, |project, cx| { - project.buffer_store.update(cx, |buffer_store, cx| { - buffer_store.wait_for_remote_buffer(buffer_id, cx) - }) - })? - .await?; - let _ = tx.send(buffer).await; + let searcher = if query.is_opened_only() { + project_search::Search::open_buffers_only( + self.buffer_store.clone(), + self.worktree_store.clone(), + project_search::Search::MAX_SEARCH_RESULT_FILES + 1, + ) + } else { + match client { + Some((client, remote_id)) => project_search::Search::remote( + self.buffer_store.clone(), + self.worktree_store.clone(), + project_search::Search::MAX_SEARCH_RESULT_FILES + 1, + (client, remote_id, self.remotely_created_models.clone()), + ), + None => project_search::Search::local( + self.fs.clone(), + self.buffer_store.clone(), + self.worktree_store.clone(), + project_search::Search::MAX_SEARCH_RESULT_FILES + 1, + cx, + ), } + }; + searcher.into_handle(query, cx) + } - drop(guard); - anyhow::Ok(()) - }) - .detach_and_log_err(cx); - rx + pub fn search(&mut self, query: SearchQuery, cx: &mut Context) -> Receiver { + self.search_impl(query, cx).results(cx) } pub fn request_lsp( @@ -4832,18 +4698,31 @@ impl Project { fn retain_remotely_created_models( &mut self, cx: &mut Context, + ) -> RemotelyCreatedModelGuard { + Self::retain_remotely_created_models_impl( + &self.remotely_created_models, + &self.buffer_store, + &self.worktree_store, + cx, + ) + } + + fn retain_remotely_created_models_impl( + models: &Arc>, + buffer_store: &Entity, + worktree_store: &Entity, + cx: &mut App, ) -> RemotelyCreatedModelGuard { { - let mut remotely_create_models = self.remotely_created_models.lock(); + let mut remotely_create_models = models.lock(); if remotely_create_models.retain_count == 0 { - remotely_create_models.buffers = self.buffer_store.read(cx).buffers().collect(); - remotely_create_models.worktrees = - self.worktree_store.read(cx).worktrees().collect(); + remotely_create_models.buffers = buffer_store.read(cx).buffers().collect(); + remotely_create_models.worktrees = worktree_store.read(cx).worktrees().collect(); } remotely_create_models.retain_count += 1; } RemotelyCreatedModelGuard { - remote_models: Arc::downgrade(&self.remotely_created_models), + remote_models: Arc::downgrade(&models), } } @@ -4913,7 +4792,7 @@ impl Project { let query = SearchQuery::from_proto(message.query.context("missing query field")?, path_style)?; let results = this.update(&mut cx, |this, cx| { - this.find_search_candidate_buffers(&query, message.limit as _, cx) + this.search_impl(query, cx).matching_buffers(cx) })?; let mut response = proto::FindSearchCandidatesResponse { diff --git a/crates/project/src/project_search.rs b/crates/project/src/project_search.rs new file mode 100644 index 00000000000000..25fe578bd7dc23 --- /dev/null +++ b/crates/project/src/project_search.rs @@ -0,0 +1,754 @@ +use std::{ + io::{BufRead, BufReader}, + path::Path, + pin::pin, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, +}; + +use anyhow::Context; +use collections::HashSet; +use fs::Fs; +use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered}; +use gpui::{App, AppContext, AsyncApp, Entity, Task}; +use language::{Buffer, BufferSnapshot}; +use parking_lot::Mutex; +use postage::oneshot; +use rpc::{AnyProtoClient, proto}; +use smol::{ + channel::{Receiver, Sender, bounded, unbounded}, + future::FutureExt, +}; + +use text::BufferId; +use util::{ResultExt, maybe, paths::compare_rel_paths}; +use worktree::{Entry, ProjectEntryId, Snapshot, Worktree}; + +use crate::{ + Project, ProjectItem, ProjectPath, RemotelyCreatedModels, + buffer_store::BufferStore, + search::{SearchQuery, SearchResult}, + worktree_store::WorktreeStore, +}; + +pub struct Search { + buffer_store: Entity, + worktree_store: Entity, + limit: usize, + kind: SearchKind, +} + +/// Represents search setup, before it is actually kicked off with Search::into_results +enum SearchKind { + /// Search for candidates by inspecting file contents on file system, avoiding loading the buffer unless we know that a given file contains a match. + Local { + fs: Arc, + worktrees: Vec>, + }, + /// Query remote host for candidates. As of writing, the host runs a local search in "buffers with matches only" mode. + Remote { + client: AnyProtoClient, + remote_id: u64, + models: Arc>, + }, + /// Run search against a known set of candidates. Even when working with a remote host, this won't round-trip to host. + OpenBuffersOnly, +} + +/// Represents results of project search and allows one to either obtain match positions OR +/// just the handles to buffers that may match the search. Grabbing the handles is cheaper than obtaining full match positions, because in that case we'll look for +/// at most one match in each file. +#[must_use] +pub struct SearchResultsHandle { + results: Receiver, + matching_buffers: Receiver>, + trigger_search: Box Task<()> + Send + Sync>, +} + +impl SearchResultsHandle { + pub fn results(self, cx: &mut App) -> Receiver { + (self.trigger_search)(cx).detach(); + self.results + } + pub fn matching_buffers(self, cx: &mut App) -> Receiver> { + (self.trigger_search)(cx).detach(); + self.matching_buffers + } +} + +#[derive(Clone)] +enum FindSearchCandidates { + Local { + fs: Arc, + /// Start off with all paths in project and filter them based on: + /// - Include filters + /// - Exclude filters + /// - Only open buffers + /// - Scan ignored files + /// Put another way: filter out files that can't match (without looking at file contents) + input_paths_rx: Receiver, + /// After that, if the buffer is not yet loaded, we'll figure out if it contains at least one match + /// based on disk contents of a buffer. This step is not performed for buffers we already have in memory. + confirm_contents_will_match_tx: Sender, + confirm_contents_will_match_rx: Receiver, + /// Of those that contain at least one match (or are already in memory), look for rest of matches (and figure out their ranges). + /// But wait - first, we need to go back to the main thread to open a buffer (& create an entity for it). + get_buffer_for_full_scan_tx: Sender, + }, + Remote, + OpenBuffersOnly, +} + +impl Search { + pub fn local( + fs: Arc, + buffer_store: Entity, + worktree_store: Entity, + limit: usize, + cx: &mut App, + ) -> Self { + let worktrees = worktree_store.read(cx).visible_worktrees(cx).collect(); + Self { + kind: SearchKind::Local { fs, worktrees }, + buffer_store, + worktree_store, + limit, + } + } + + pub(crate) fn remote( + buffer_store: Entity, + worktree_store: Entity, + limit: usize, + client_state: (AnyProtoClient, u64, Arc>), + ) -> Self { + Self { + kind: SearchKind::Remote { + client: client_state.0, + remote_id: client_state.1, + models: client_state.2, + }, + buffer_store, + worktree_store, + limit, + } + } + pub(crate) fn open_buffers_only( + buffer_store: Entity, + worktree_store: Entity, + limit: usize, + ) -> Self { + Self { + kind: SearchKind::OpenBuffersOnly, + buffer_store, + worktree_store, + limit, + } + } + + pub(crate) const MAX_SEARCH_RESULT_FILES: usize = 5_000; + pub(crate) const MAX_SEARCH_RESULT_RANGES: usize = 10_000; + /// Prepares a project search run. The resulting [`SearchResultsHandle`] has to be used to specify whether you're interested in matching buffers + /// or full search results. + pub fn into_handle(mut self, query: SearchQuery, cx: &mut App) -> SearchResultsHandle { + let mut open_buffers = HashSet::default(); + let mut unnamed_buffers = Vec::new(); + const MAX_CONCURRENT_BUFFER_OPENS: usize = 64; + let buffers = self.buffer_store.read(cx); + for handle in buffers.buffers() { + let buffer = handle.read(cx); + if !buffers.is_searchable(&buffer.remote_id()) { + continue; + } else if let Some(entry_id) = buffer.entry_id(cx) { + open_buffers.insert(entry_id); + } else { + self.limit -= self.limit.saturating_sub(1); + unnamed_buffers.push(handle) + }; + } + let executor = cx.background_executor().clone(); + let (tx, rx) = unbounded(); + let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded(); + let matching_buffers = grab_buffer_snapshot_rx.clone(); + let trigger_search = Box::new(move |cx: &mut App| { + cx.spawn(async move |cx| { + for buffer in unnamed_buffers { + _ = grab_buffer_snapshot_tx.send(buffer).await; + } + + let (find_all_matches_tx, find_all_matches_rx) = + bounded(MAX_CONCURRENT_BUFFER_OPENS); + + let (candidate_searcher, tasks) = match self.kind { + SearchKind::OpenBuffersOnly => { + let Ok(open_buffers) = cx.update(|cx| self.all_loaded_buffers(&query, cx)) + else { + return; + }; + let fill_requests = cx + .background_spawn(async move { + for buffer in open_buffers { + if let Err(_) = grab_buffer_snapshot_tx.send(buffer).await { + return; + } + } + }) + .boxed_local(); + (FindSearchCandidates::OpenBuffersOnly, vec![fill_requests]) + } + SearchKind::Local { + fs, + ref mut worktrees, + } => { + let (get_buffer_for_full_scan_tx, get_buffer_for_full_scan_rx) = + unbounded(); + let (confirm_contents_will_match_tx, confirm_contents_will_match_rx) = + bounded(64); + let (sorted_search_results_tx, sorted_search_results_rx) = unbounded(); + + let (input_paths_tx, input_paths_rx) = unbounded(); + + let tasks = vec![ + cx.spawn(Self::provide_search_paths( + std::mem::take(worktrees), + query.include_ignored(), + input_paths_tx, + sorted_search_results_tx, + )) + .boxed_local(), + Self::open_buffers( + &self.buffer_store, + get_buffer_for_full_scan_rx, + grab_buffer_snapshot_tx, + cx.clone(), + ) + .boxed_local(), + cx.background_spawn(Self::maintain_sorted_search_results( + sorted_search_results_rx, + get_buffer_for_full_scan_tx.clone(), + self.limit, + )) + .boxed_local(), + ]; + ( + FindSearchCandidates::Local { + fs, + get_buffer_for_full_scan_tx, + confirm_contents_will_match_tx, + confirm_contents_will_match_rx, + input_paths_rx, + }, + tasks, + ) + } + SearchKind::Remote { + client, + remote_id, + models, + } => { + let request = client.request(proto::FindSearchCandidates { + project_id: remote_id, + query: Some(query.to_proto()), + limit: self.limit as _, + }); + let Ok(guard) = cx.update(|cx| { + Project::retain_remotely_created_models_impl( + &models, + &self.buffer_store, + &self.worktree_store, + cx, + ) + }) else { + return; + }; + let buffer_store = self.buffer_store.downgrade(); + let issue_remote_buffers_request = cx + .spawn(async move |cx| { + let _ = maybe!(async move { + let response = request.await?; + + for buffer_id in response.buffer_ids { + let buffer_id = BufferId::new(buffer_id)?; + let buffer = buffer_store + .update(cx, |buffer_store, cx| { + buffer_store.wait_for_remote_buffer(buffer_id, cx) + })? + .await?; + let _ = grab_buffer_snapshot_tx.send(buffer).await; + } + + drop(guard); + anyhow::Ok(()) + }) + .await + .log_err(); + }) + .boxed_local(); + ( + FindSearchCandidates::Remote, + vec![issue_remote_buffers_request], + ) + } + }; + + let matches_count = AtomicUsize::new(0); + let matched_buffer_count = AtomicUsize::new(0); + + let worker_pool = executor.scoped(|scope| { + let num_cpus = executor.num_cpus(); + + assert!(num_cpus > 0); + for _ in 0..executor.num_cpus() - 1 { + let worker = Worker { + query: &query, + open_buffers: &open_buffers, + matched_buffer_count: &matched_buffer_count, + matches_count: &matches_count, + candidates: candidate_searcher.clone(), + find_all_matches_rx: find_all_matches_rx.clone(), + publish_matches: tx.clone(), + }; + scope.spawn(worker.run()); + } + drop(tx); + drop(find_all_matches_rx); + drop(candidate_searcher); + }); + + let buffer_snapshots = Self::grab_buffer_snapshots( + grab_buffer_snapshot_rx, + find_all_matches_tx, + cx.clone(), + ); + futures::future::join_all( + [worker_pool.boxed_local(), buffer_snapshots.boxed_local()] + .into_iter() + .chain(tasks), + ) + .await; + }) + }); + + SearchResultsHandle { + results: rx, + matching_buffers, + trigger_search, + } + } + + fn provide_search_paths( + worktrees: Vec>, + include_ignored: bool, + tx: Sender, + results: Sender>, + ) -> impl AsyncFnOnce(&mut AsyncApp) { + async move |cx| { + _ = maybe!(async move { + for worktree in worktrees { + let (mut snapshot, worktree_settings) = worktree + .read_with(cx, |this, _| { + Some((this.snapshot(), this.as_local()?.settings())) + })? + .context("The worktree is not local")?; + if include_ignored { + // Pre-fetch all of the ignored directories as they're going to be searched. + let mut entries_to_refresh = vec![]; + for entry in snapshot.entries(include_ignored, 0) { + if entry.is_ignored && entry.kind.is_unloaded() { + if !worktree_settings.is_path_excluded(&entry.path) { + entries_to_refresh.push(entry.path.clone()); + } + } + } + let barrier = worktree.update(cx, |this, _| { + let local = this.as_local_mut()?; + let barrier = entries_to_refresh + .into_iter() + .map(|path| local.add_path_prefix_to_scan(path).into_future()) + .collect::>(); + Some(barrier) + })?; + if let Some(barriers) = barrier { + futures::future::join_all(barriers).await; + } + snapshot = worktree.read_with(cx, |this, _| this.snapshot())?; + } + cx.background_executor() + .scoped(|scope| { + scope.spawn(async { + for entry in snapshot.files(include_ignored, 0) { + let (should_scan_tx, should_scan_rx) = oneshot::channel(); + let Ok(_) = tx + .send(InputPath { + entry: entry.clone(), + snapshot: snapshot.clone(), + should_scan_tx, + }) + .await + else { + return; + }; + if results.send(should_scan_rx).await.is_err() { + return; + }; + } + }) + }) + .await; + } + anyhow::Ok(()) + }) + .await; + } + } + + async fn maintain_sorted_search_results( + rx: Receiver>, + paths_for_full_scan: Sender, + limit: usize, + ) { + let mut rx = pin!(rx); + let mut matched = 0; + while let Some(mut next_path_result) = rx.next().await { + let Some(successful_path) = next_path_result.next().await else { + // This math did not produce a match, hence skip it. + continue; + }; + if paths_for_full_scan.send(successful_path).await.is_err() { + return; + }; + matched += 1; + if matched >= limit { + break; + } + } + } + + /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf. + async fn open_buffers( + buffer_store: &Entity, + rx: Receiver, + find_all_matches_tx: Sender>, + mut cx: AsyncApp, + ) { + let mut rx = pin!(rx.ready_chunks(64)); + _ = maybe!(async move { + while let Some(requested_paths) = rx.next().await { + let mut buffers = buffer_store.update(&mut cx, |this, cx| { + requested_paths + .into_iter() + .map(|path| this.open_buffer(path, cx)) + .collect::>() + })?; + + while let Some(buffer) = buffers.next().await { + if let Some(buffer) = buffer.log_err() { + find_all_matches_tx.send(buffer).await?; + } + } + } + Result::<_, anyhow::Error>::Ok(()) + }) + .await; + } + + async fn grab_buffer_snapshots( + rx: Receiver>, + find_all_matches_tx: Sender<(Entity, BufferSnapshot)>, + mut cx: AsyncApp, + ) { + _ = maybe!(async move { + while let Ok(buffer) = rx.recv().await { + let snapshot = buffer.read_with(&mut cx, |this, _| this.snapshot())?; + find_all_matches_tx.send((buffer, snapshot)).await?; + } + Result::<_, anyhow::Error>::Ok(()) + }) + .await; + } + + fn all_loaded_buffers(&self, search_query: &SearchQuery, cx: &App) -> Vec> { + let worktree_store = self.worktree_store.read(cx); + let mut buffers = search_query + .buffers() + .into_iter() + .flatten() + .filter(|buffer| { + let b = buffer.read(cx); + if let Some(file) = b.file() { + if !search_query.match_path(file.path().as_std_path()) { + return false; + } + if !search_query.include_ignored() + && let Some(entry) = b + .entry_id(cx) + .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx)) + && entry.is_ignored + { + return false; + } + } + true + }) + .cloned() + .collect::>(); + buffers.sort_by(|a, b| { + let a = a.read(cx); + let b = b.read(cx); + match (a.file(), b.file()) { + (None, None) => a.remote_id().cmp(&b.remote_id()), + (None, Some(_)) => std::cmp::Ordering::Less, + (Some(_), None) => std::cmp::Ordering::Greater, + (Some(a), Some(b)) => compare_rel_paths((a.path(), true), (b.path(), true)), + } + }); + + buffers + } +} + +struct Worker<'search> { + query: &'search SearchQuery, + matched_buffer_count: &'search AtomicUsize, + matches_count: &'search AtomicUsize, + open_buffers: &'search HashSet, + candidates: FindSearchCandidates, + /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot. + find_all_matches_rx: Receiver<(Entity, BufferSnapshot)>, + /// Cool, we have results; let's share them with the world. + publish_matches: Sender, +} + +impl Worker<'_> { + async fn run(mut self) { + let ( + input_paths_rx, + confirm_contents_will_match_rx, + mut confirm_contents_will_match_tx, + mut get_buffer_for_full_scan_tx, + fs, + ) = match self.candidates { + FindSearchCandidates::Local { + fs, + input_paths_rx, + confirm_contents_will_match_rx, + confirm_contents_will_match_tx, + get_buffer_for_full_scan_tx, + } => ( + input_paths_rx, + confirm_contents_will_match_rx, + confirm_contents_will_match_tx, + get_buffer_for_full_scan_tx, + Some(fs), + ), + FindSearchCandidates::Remote | FindSearchCandidates::OpenBuffersOnly => ( + unbounded().1, + unbounded().1, + unbounded().0, + unbounded().0, + None, + ), + }; + let mut find_all_matches = pin!(self.find_all_matches_rx.fuse()); + let mut find_first_match = pin!(confirm_contents_will_match_rx.fuse()); + let mut scan_path = pin!(input_paths_rx.fuse()); + + loop { + let handler = RequestHandler { + query: self.query, + open_entries: &self.open_buffers, + fs: fs.as_deref(), + matched_buffer_count: self.matched_buffer_count, + matches_count: self.matches_count, + confirm_contents_will_match_tx: &confirm_contents_will_match_tx, + get_buffer_for_full_scan_tx: &get_buffer_for_full_scan_tx, + publish_matches: &self.publish_matches, + }; + // Whenever we notice that some step of a pipeline is closed, we don't want to close subsequent + // steps straight away. Another worker might be about to produce a value that will + // be pushed there, thus we'll replace current worker's pipe with a dummy one. + // That way, we'll only ever close a next-stage channel when ALL workers do so. + select_biased! { + find_all_matches = find_all_matches.next() => { + + if self.publish_matches.is_closed() { + break; + } + let Some(matches) = find_all_matches else { + self.publish_matches = bounded(1).0; + continue; + }; + let result = handler.handle_find_all_matches(matches).await; + if let Some(_should_bail) = result { + + self.publish_matches = bounded(1).0; + continue; + } + }, + find_first_match = find_first_match.next() => { + if let Some(buffer_with_at_least_one_match) = find_first_match { + handler.handle_find_first_match(buffer_with_at_least_one_match).await; + } else { + get_buffer_for_full_scan_tx = bounded(1).0; + } + + }, + scan_path = scan_path.next() => { + if let Some(path_to_scan) = scan_path { + handler.handle_scan_path(path_to_scan).await; + } else { + // If we're the last worker to notice that this is not producing values, close the upstream. + confirm_contents_will_match_tx = bounded(1).0; + } + + } + complete => { + break + }, + + } + } + } +} + +struct RequestHandler<'worker> { + query: &'worker SearchQuery, + fs: Option<&'worker dyn Fs>, + open_entries: &'worker HashSet, + matched_buffer_count: &'worker AtomicUsize, + matches_count: &'worker AtomicUsize, + + confirm_contents_will_match_tx: &'worker Sender, + get_buffer_for_full_scan_tx: &'worker Sender, + publish_matches: &'worker Sender, +} + +struct LimitReached; + +impl RequestHandler<'_> { + async fn handle_find_all_matches( + &self, + (buffer, snapshot): (Entity, BufferSnapshot), + ) -> Option { + let ranges = self + .query + .search(&snapshot, None) + .await + .iter() + .map(|range| snapshot.anchor_before(range.start)..snapshot.anchor_after(range.end)) + .collect::>(); + + let matched_ranges = ranges.len(); + if self.matched_buffer_count.fetch_add(1, Ordering::Release) + > Search::MAX_SEARCH_RESULT_FILES + || self + .matches_count + .fetch_add(matched_ranges, Ordering::Release) + > Search::MAX_SEARCH_RESULT_RANGES + { + _ = self.publish_matches.send(SearchResult::LimitReached).await; + Some(LimitReached) + } else { + _ = self + .publish_matches + .send(SearchResult::Buffer { buffer, ranges }) + .await; + None + } + } + async fn handle_find_first_match(&self, mut entry: MatchingEntry) { + _=maybe!(async move { + let abs_path = entry.worktree_root.join(entry.path.path.as_std_path()); + let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else { + return anyhow::Ok(()); + }; + + let mut file = BufReader::new(file); + let file_start = file.fill_buf()?; + + if let Err(Some(starting_position)) = + std::str::from_utf8(file_start).map_err(|e| e.error_len()) + { + // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on; + // That way we can still match files in a streaming fashion without having look at "obviously binary" files. + log::debug!( + "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}" + ); + return Ok(()); + } + + if self.query.detect(file).unwrap_or(false) { + // Yes, we should scan the whole file. + entry.should_scan_tx.send(entry.path).await?; + } + Ok(()) + }).await; + } + + async fn handle_scan_path(&self, req: InputPath) { + _ = maybe!(async move { + let InputPath { + entry, + + snapshot, + should_scan_tx, + } = req; + + if entry.is_fifo || !entry.is_file() { + return Ok(()); + } + + if self.query.filters_path() { + let matched_path = if self.query.match_full_paths() { + let mut full_path = snapshot.root_name().as_std_path().to_owned(); + full_path.push(entry.path.as_std_path()); + self.query.match_path(&full_path) + } else { + self.query.match_path(entry.path.as_std_path()) + }; + if !matched_path { + return Ok(()); + } + } + + if self.open_entries.contains(&entry.id) { + // The buffer is already in memory and that's the version we want to scan; + // hence skip the dilly-dally and look for all matches straight away. + self.get_buffer_for_full_scan_tx + .send(ProjectPath { + worktree_id: snapshot.id(), + path: entry.path.clone(), + }) + .await?; + } else { + self.confirm_contents_will_match_tx + .send(MatchingEntry { + should_scan_tx: should_scan_tx, + worktree_root: snapshot.abs_path().clone(), + path: ProjectPath { + worktree_id: snapshot.id(), + path: entry.path.clone(), + }, + }) + .await?; + } + + anyhow::Ok(()) + }) + .await; + } +} + +struct InputPath { + entry: Entry, + snapshot: Snapshot, + should_scan_tx: oneshot::Sender, +} + +struct MatchingEntry { + worktree_root: Arc, + path: ProjectPath, + should_scan_tx: oneshot::Sender, +} diff --git a/crates/project/src/worktree_store.rs b/crates/project/src/worktree_store.rs index e6da207dadbde3..676c96f4331d73 100644 --- a/crates/project/src/worktree_store.rs +++ b/crates/project/src/worktree_store.rs @@ -8,10 +8,7 @@ use std::{ use anyhow::{Context as _, Result, anyhow, bail}; use collections::{HashMap, HashSet}; use fs::{Fs, copy_recursive}; -use futures::{ - FutureExt, SinkExt, - future::{BoxFuture, Shared}, -}; +use futures::{FutureExt, SinkExt, future::Shared}; use gpui::{ App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity, }; @@ -999,148 +996,14 @@ impl WorktreeStore { matching_paths_rx } - fn scan_ignored_dir<'a>( - fs: &'a Arc, - snapshot: &'a worktree::Snapshot, - path: &'a RelPath, - query: &'a SearchQuery, - filter_tx: &'a Sender, - output_tx: &'a Sender>, - ) -> BoxFuture<'a, Result<()>> { - async move { - let abs_path = snapshot.absolutize(path); - let Some(mut files) = fs - .read_dir(&abs_path) - .await - .with_context(|| format!("listing ignored path {abs_path:?}")) - .log_err() - else { - return Ok(()); - }; - - let mut results = Vec::new(); - - while let Some(Ok(file)) = files.next().await { - let Some(metadata) = fs - .metadata(&file) - .await - .with_context(|| format!("fetching fs metadata for {abs_path:?}")) - .log_err() - .flatten() - else { - continue; - }; - if metadata.is_symlink || metadata.is_fifo { - continue; - } - let relative_path = file.strip_prefix(snapshot.abs_path())?; - let relative_path = RelPath::new(&relative_path, snapshot.path_style()) - .context("getting relative path")?; - results.push((relative_path.into_arc(), !metadata.is_dir)) - } - results.sort_by(|(a_path, _), (b_path, _)| a_path.cmp(b_path)); - for (path, is_file) in results { - if is_file { - if query.filters_path() { - let matched_path = if query.match_full_paths() { - let mut full_path = snapshot.root_name().as_std_path().to_owned(); - full_path.push(path.as_std_path()); - query.match_path(&full_path) - } else { - query.match_path(&path.as_std_path()) - }; - if !matched_path { - continue; - } - } - let (tx, rx) = oneshot::channel(); - output_tx.send(rx).await?; - filter_tx - .send(MatchingEntry { - respond: tx, - worktree_root: snapshot.abs_path().clone(), - path: ProjectPath { - worktree_id: snapshot.id(), - path: path.into_arc(), - }, - }) - .await?; - } else { - Self::scan_ignored_dir(fs, snapshot, &path, query, filter_tx, output_tx) - .await?; - } - } - Ok(()) - } - .boxed() - } - async fn find_candidate_paths( - fs: Arc, - snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>, - open_entries: HashSet, - query: SearchQuery, - filter_tx: Sender, - output_tx: Sender>, + _: Arc, + _: Vec<(worktree::Snapshot, WorktreeSettings)>, + _: HashSet, + _: SearchQuery, + _: Sender, + _: Sender>, ) -> Result<()> { - for (snapshot, settings) in snapshots { - for entry in snapshot.entries(query.include_ignored(), 0) { - if entry.is_dir() && entry.is_ignored { - if !settings.is_path_excluded(&entry.path) { - Self::scan_ignored_dir( - &fs, - &snapshot, - &entry.path, - &query, - &filter_tx, - &output_tx, - ) - .await?; - } - continue; - } - - if entry.is_fifo || !entry.is_file() { - continue; - } - - if query.filters_path() { - let matched_path = if query.match_full_paths() { - let mut full_path = snapshot.root_name().as_std_path().to_owned(); - full_path.push(entry.path.as_std_path()); - query.match_path(&full_path) - } else { - query.match_path(entry.path.as_std_path()) - }; - if !matched_path { - continue; - } - } - - let (mut tx, rx) = oneshot::channel(); - - if open_entries.contains(&entry.id) { - tx.send(ProjectPath { - worktree_id: snapshot.id(), - path: entry.path.clone(), - }) - .await?; - } else { - filter_tx - .send(MatchingEntry { - respond: tx, - worktree_root: snapshot.abs_path().clone(), - path: ProjectPath { - worktree_id: snapshot.id(), - path: entry.path.clone(), - }, - }) - .await?; - } - - output_tx.send(rx).await?; - } - } Ok(()) } diff --git a/crates/project_benchmarks/Cargo.toml b/crates/project_benchmarks/Cargo.toml new file mode 100644 index 00000000000000..1171d468c649bd --- /dev/null +++ b/crates/project_benchmarks/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "project_benchmarks" +version = "0.1.0" +publish.workspace = true +edition.workspace = true + +[dependencies] +anyhow.workspace = true +clap.workspace = true +client.workspace = true +futures.workspace = true +gpui = { workspace = true, features = ["windows-manifest"] } +http_client = { workspace = true, features = ["test-support"]} +language.workspace = true +node_runtime.workspace = true +project.workspace = true +settings.workspace = true +watch.workspace = true + +[lints] +workspace = true diff --git a/crates/project_benchmarks/LICENSE-GPL b/crates/project_benchmarks/LICENSE-GPL new file mode 120000 index 00000000000000..89e542f750cd38 --- /dev/null +++ b/crates/project_benchmarks/LICENSE-GPL @@ -0,0 +1 @@ +../../LICENSE-GPL \ No newline at end of file diff --git a/crates/project_benchmarks/src/main.rs b/crates/project_benchmarks/src/main.rs new file mode 100644 index 00000000000000..5075016665a072 --- /dev/null +++ b/crates/project_benchmarks/src/main.rs @@ -0,0 +1,136 @@ +use std::sync::Arc; + +use clap::Parser; +use client::{Client, UserStore}; +use gpui::{AppContext as _, Application}; +use http_client::FakeHttpClient; +use language::LanguageRegistry; +use node_runtime::NodeRuntime; +use project::{ + Project, RealFs, + search::{SearchQuery, SearchResult}, +}; + +#[derive(Parser)] +struct Args { + /// List of worktrees to run the search against. + worktrees: Vec, + #[clap(short)] + query: String, + /// Treat query as a regex. + #[clap(short, long)] + regex: bool, + /// Matches have to be standalone words. + #[clap(long)] + whole_word: bool, + /// Make matching case-sensitive. + #[clap(long, default_value_t = true)] + case_sensitive: bool, + /// Include gitignored files in the search. + #[clap(long)] + include_ignored: bool, +} + +fn main() -> Result<(), anyhow::Error> { + let args = Args::parse(); + let query = if args.regex { + SearchQuery::regex( + args.query, + args.whole_word, + args.case_sensitive, + args.include_ignored, + false, + Default::default(), + Default::default(), + false, + None, + ) + } else { + SearchQuery::text( + args.query, + args.whole_word, + args.case_sensitive, + args.include_ignored, + Default::default(), + Default::default(), + false, + None, + ) + }?; + Application::headless().run(|cx| { + settings::init(cx); + client::init_settings(cx); + language::init(cx); + Project::init_settings(cx); + let client = Client::production(cx); + let http_client = FakeHttpClient::with_200_response(); + let (_, rx) = watch::channel(None); + let node = NodeRuntime::new(http_client, None, rx); + let user_store = cx.new(|cx| UserStore::new(client.clone(), cx)); + let registry = Arc::new(LanguageRegistry::new(cx.background_executor().clone())); + let fs = Arc::new(RealFs::new(None, cx.background_executor().clone())); + let project = Project::local( + client, + node, + user_store, + registry, + fs, + Some(Default::default()), + cx, + ); + + project.clone().update(cx, move |_, cx| { + cx.spawn(async move |_, cx| { + println!("Loading worktrees"); + let worktrees = project.update(cx, |this, cx| { + args.worktrees + .into_iter() + .map(|worktree| this.find_or_create_worktree(worktree, true, cx)) + .collect::>() + })?; + + let worktrees = futures::future::join_all(worktrees) + .await + .into_iter() + .collect::, anyhow::Error>>()?; + + for (worktree, _) in &worktrees { + worktree + .update(cx, |this, _| this.as_local().unwrap().scan_complete())? + .await; + } + println!("Worktrees loaded"); + + println!("Starting a project search"); + let timer = std::time::Instant::now(); + let mut first_match = None; + let matches = project + .update(cx, |this, cx| this.search(query, cx)) + .unwrap(); + let mut matched_files = 0; + let mut matched_chunks = 0; + while let Ok(match_result) = matches.recv().await { + if first_match.is_none() { + let time = timer.elapsed(); + first_match = Some(time); + println!("First match found after {time:?}"); + } + if let SearchResult::Buffer { ranges, .. } = match_result { + matched_files += 1; + matched_chunks += ranges.len(); + } + } + let elapsed = timer.elapsed(); + println!( + "Finished project search after {elapsed:?}. Matched {matched_files} files and {matched_chunks} excerpts" + ); + drop(project); + cx.update(|cx| cx.quit())?; + + anyhow::Ok(()) + }) + .detach(); + }); + }); + Ok(()) +} diff --git a/crates/remote_server/Cargo.toml b/crates/remote_server/Cargo.toml index 3d28f6ba565330..b702c75119af9f 100644 --- a/crates/remote_server/Cargo.toml +++ b/crates/remote_server/Cargo.toml @@ -75,7 +75,7 @@ minidumper.workspace = true [dev-dependencies] action_log.workspace = true -agent.workspace = true +agent = { workspace = true, features = ["test-support"] } client = { workspace = true, features = ["test-support"] } clock = { workspace = true, features = ["test-support"] } collections.workspace = true diff --git a/crates/remote_server/src/headless_project.rs b/crates/remote_server/src/headless_project.rs index 1c42f1995daa03..2f429ed80aa4bf 100644 --- a/crates/remote_server/src/headless_project.rs +++ b/crates/remote_server/src/headless_project.rs @@ -639,9 +639,15 @@ impl HeadlessProject { PathStyle::local(), )?; let results = this.update(&mut cx, |this, cx| { - this.buffer_store.update(cx, |buffer_store, cx| { - buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx) - }) + project::Search::local( + this.fs.clone(), + this.buffer_store.clone(), + this.worktree_store.clone(), + message.limit as _, + cx, + ) + .into_handle(query, cx) + .matching_buffers(cx) })?; let mut response = proto::FindSearchCandidatesResponse {