Skip to content

Commit

Permalink
feat(background-job): add sync commit history
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Zhang <[email protected]>
  • Loading branch information
zwpaper committed Feb 15, 2025
1 parent b3fd382 commit cb641c7
Show file tree
Hide file tree
Showing 12 changed files with 332 additions and 28 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/tabby-common/src/index/commit.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod fields {
// === Doc level fields ===
pub const GIT_URL: &str = "chunk_git_url";
pub const GIT_URL: &str = "git_url";
pub const SHA: &str = "sha";
pub const MESSAGE: &str = "message";
pub const AUTHOR_EMAIL: &str = "author_email";
Expand Down
1 change: 1 addition & 0 deletions crates/tabby-git/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tokio.workspace = true
tracing.workspace = true
ignore.workspace = true
grep = "0.3.1"
chrono.workspace = true

[dev-dependencies]
assert_matches.workspace = true
114 changes: 114 additions & 0 deletions crates/tabby-git/src/commit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use anyhow::Result;
use async_stream::stream;
use chrono::{DateTime, TimeZone, Utc};
use futures::stream::{BoxStream, StreamExt};
use git2::{Repository, Sort};
use tokio::sync::{mpsc, oneshot};

#[derive(Debug, Clone)]
pub struct Commit {
pub id: String,
pub message: String,
pub author_name: String,
pub author_email: String,
pub author_at: DateTime<Utc>,
pub committer_name: String,
pub committer_email: String,
pub commit_at: DateTime<Utc>,
}

impl From<git2::Commit<'_>> for Commit {
fn from(commit: git2::Commit) -> Self {
let author = commit.author();
let committer = commit.committer();

Self {
id: commit.id().to_string(),
message: commit.message().unwrap_or("").to_string(),
author_name: author.name().unwrap_or("").to_string(),
author_email: author.email().unwrap_or("").to_string(),
author_at: Utc
.timestamp_opt(author.when().seconds(), 0)
.single()
.unwrap_or_default(),
committer_name: committer.name().unwrap_or("").to_string(),
committer_email: committer.email().unwrap_or("").to_string(),
commit_at: Utc
.timestamp_opt(committer.when().seconds(), 0)
.single()
.unwrap_or_default(),
}
}
}

pub fn stream_commits(
repo_path: String,
) -> (BoxStream<'static, Result<Commit>>, oneshot::Sender<()>) {
let (stop_tx, stop_rx) = oneshot::channel();
let (tx, mut rx) = mpsc::channel(16);

// Spawn git operations in a tokio task
tokio::spawn({
let mut stop_rx = stop_rx;
let tx_data = tx.clone();
async move {
// Keep all git operations inside spawn_blocking

let result = tokio::task::spawn_blocking(move || {
let repo = match Repository::open(&repo_path) {
Ok(repo) => repo,
Err(e) => return Err(anyhow::anyhow!("Failed to open repository: {}", e)),
};

let mut revwalk = match repo.revwalk() {
Ok(walk) => walk,
Err(e) => return Err(anyhow::anyhow!("Failed to create revwalk: {}", e)),
};

revwalk.push_head().ok();
revwalk.set_sorting(Sort::TIME).ok();

// Process commits inside the blocking task
for oid in revwalk {
if stop_rx.try_recv().is_ok() {
break;
}

match oid.and_then(|oid| repo.find_commit(oid)) {
Ok(commit) => {
let commit: Commit = commit.into();
if tx_data.blocking_send(Ok(commit)).is_err() {
break;
}
}
Err(e) => {
if tx_data
.blocking_send(Err(anyhow::anyhow!("Failed to get commit: {}", e)))
.is_err()
{
break;
}
}
}
}
Ok(())
})
.await;

if let Err(e) = result {
tx.send(Err(anyhow::anyhow!("Task failed: {}", e)))
.await
.ok();
}
}
});

let s = stream! {
while let Some(result) = rx.recv().await {
yield result;
}
}
.boxed();

(s, stop_tx)
}
3 changes: 3 additions & 0 deletions crates/tabby-git/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod commit;
mod file_search;
mod serve_git;

Expand All @@ -10,6 +11,8 @@ use axum::{
};
use file_search::GitFileSearch;
use futures::Stream;

pub use commit::{stream_commits, Commit};
pub use grep::{GrepFile, GrepLine, GrepSubMatch, GrepTextOrBase64};

pub async fn search_files(
Expand Down
61 changes: 61 additions & 0 deletions crates/tabby-index/src/commit/indexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::sync::Arc;

use async_stream::stream;
use futures::StreamExt;
use tabby_common::index::corpus;
use tabby_inference::Embedding;

use super::types::CommitHistory;
use crate::{
commit::CommitHistoryBuilder,
indexer::{Indexer, TantivyDocBuilder},
};

pub struct CommitHistoryIndexer {
builder: TantivyDocBuilder<CommitHistory>,
indexer: Indexer,
}

impl CommitHistoryIndexer {
pub fn new(embedding: Arc<dyn Embedding>) -> Self {
let builder = CommitHistoryBuilder::new(embedding);
let builder = TantivyDocBuilder::new(corpus::STRUCTURED_DOC, builder);
let indexer = Indexer::new(corpus::COMMIT_HISTORY);
Self { indexer, builder }
}

pub async fn sync(&self, document: CommitHistory) -> bool {
if !self.require_updates(&document) {
return false;
}

stream! {
let (id, s) = self.builder.build(document).await;
self.indexer.delete(&id);

for await doc in s.buffer_unordered(std::cmp::max(std::thread::available_parallelism().unwrap().get() * 2, 32)) {
if let Ok(Some(doc)) = doc {
self.indexer.add(doc).await;
}
}
}.count().await;
true
}

pub async fn delete(&self, id: &str) -> bool {
if self.indexer.is_indexed(id) {
self.indexer.delete(id);
true
} else {
false
}
}

pub fn commit(self) {
self.indexer.commit();
}

fn require_updates(&self, document: &CommitHistory) -> bool {
true
}
}
57 changes: 43 additions & 14 deletions crates/tabby-index/src/commit/mod.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
mod types;
pub mod indexer;
pub mod types;

use std::{sync::Arc, vec};
use std::{pin::pin, sync::Arc, vec};

use anyhow::{bail, Result};
use async_stream::stream;
use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::{stream::BoxStream, StreamExt};
use git2::{Repository, Sort};
use serde_json::json;
use tabby_common::index::{commit::fields, corpus};
use tabby_common::{
config::CodeRepository,
index::{commit::fields, corpus},
};
use tabby_inference::Embedding;
use tokio::task::JoinHandle;
use tokio::{sync::oneshot, task::JoinHandle};
use tracing::warn;
use tracing::{info_span, Instrument};
use types::CommitHistory;

use crate::{indexer::TantivyDocBuilder, IndexAttributeBuilder};

fn create_commit_history_builder(
embedding: Arc<dyn Embedding>,
) -> TantivyDocBuilder<CommitHistory> {
let builder = CommitHistoryBuilder::new(embedding);
TantivyDocBuilder::new(corpus::COMMIT_HISTORY, builder)
}
use crate::{
indexer::{Indexer, TantivyDocBuilder},
IndexAttributeBuilder,
};

pub struct CommitHistoryBuilder {
embedding: Arc<dyn Embedding>,
Expand All @@ -30,6 +32,33 @@ impl CommitHistoryBuilder {
pub fn new(embedding: Arc<dyn Embedding>) -> Self {
Self { embedding }
}

pub async fn garbage_collection(&self) {
let index = Indexer::new(corpus::COMMIT_HISTORY);
stream! {
let mut num_to_keep = 0;
let mut num_to_delete = 0;

// for await id in index.iter_ids() {
// let Some(source_file_id) = SourceCode::source_file_id_from_id(&id) else {
// warn!("Failed to extract source file id from index id: {id}");
// num_to_delete += 1;
// index.delete(&id);
// continue;
// };

// if CodeIntelligence::check_source_file_id_matched(source_file_id) {
// num_to_keep += 1;
// } else {
// num_to_delete += 1;
// index.delete(&id);
// }
// }

logkit::info!("Finished garbage collection for code index: {num_to_keep} items kept, {num_to_delete} items removed");
index.commit();
}.collect::<()>().await;
}
}

#[async_trait]
Expand All @@ -41,7 +70,7 @@ impl IndexAttributeBuilder<CommitHistory> for CommitHistoryBuilder {
fields::MESSAGE: document.message,
fields::AUTHOR_EMAIL: document.author_email,
fields::AUTHOR_AT: document.author_at,
fields::COMMITTER: document.committer,
fields::COMMITTER: document.committer_email,
fields::COMMIT_AT: document.commit_at,
})
}
Expand Down
9 changes: 2 additions & 7 deletions crates/tabby-index/src/commit/types.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::stream::{BoxStream, StreamExt};
use serde_json::json;

use tabby_common::index::commit::fields;
use chrono::{DateTime, TimeZone, Utc};

use crate::indexer::{IndexId, ToIndexId};

Expand All @@ -15,7 +10,7 @@ pub struct CommitHistory {
pub message: String,
pub author_email: String,
pub author_at: DateTime<Utc>,
pub committer: String,
pub committer_email: String,
pub commit_at: DateTime<Utc>,

pub diff: Option<String>,
Expand Down
1 change: 1 addition & 0 deletions crates/tabby-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod public {
use super::*;
pub use super::{
code::CodeIndexer,
commit::{indexer::CommitHistoryIndexer, types::CommitHistory, CommitHistoryBuilder},
structured_doc::public::{
StructuredDoc, StructuredDocFields, StructuredDocIndexer, StructuredDocIssueFields,
StructuredDocPullDocumentFields, StructuredDocState, StructuredDocWebFields,
Expand Down
2 changes: 1 addition & 1 deletion crates/tabby/src/services/commit/tantivy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl CommitHistorySearch for CommitHistorySearchImpl {
content: &str,
params: &CommitHistorySearchParams,
) -> Result<CommitHistorySearchResponse> {
//FIXME(kweizh)
//TODO(kweizh)
Err(SearchError::NotReady)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tracing::debug;

use super::{helper::Job, BackgroundJobEvent};

mod commits;
mod error;
mod issues;
mod pulls;
Expand Down Expand Up @@ -106,12 +107,25 @@ impl SchedulerGithubGitlabJob {
"Pulling source code for repository {}",
repository.display_name
);
let code_repository = &CodeRepository::new(&authenticated_url, &repository.source_id());
let mut code = CodeIndexer::default();
code.refresh(
embedding.clone(),
&CodeRepository::new(&authenticated_url, &repository.source_id()),
)
.await?;
code.refresh(embedding.clone(), code_repository).await?;

logkit::info!(
"Indexing recent commits for repository {}",
repository.display_name
);

if let Err(err) = self
.sync_commit_history(code_repository, embedding.clone())
.await
{
integration_service
.update_integration_sync_status(&integration.id, Some(err.to_string()))
.await?;
logkit::error!("Failed to sync commit history: {}", err);
return Err(err);
};

logkit::info!(
"Indexing documents for repository {}",
Expand All @@ -132,6 +146,14 @@ impl SchedulerGithubGitlabJob {
Ok(())
}

async fn sync_commit_history(
&self,
repository: &CodeRepository,
embedding: Arc<dyn Embedding>,
) -> tabby_schema::Result<()> {
commits::refresh(embedding.clone(), repository).await
}

async fn sync_pulls(
&self,
integration: &Integration,
Expand Down
Loading

0 comments on commit cb641c7

Please sign in to comment.