Skip to content

Implement refresh mechanism for unknown mergeable PRs #247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion src/bors/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ pub async fn handle_bors_global_event(
ctx: Arc<BorsContext>,
gh_client: &Octocrab,
team_api_client: &TeamApiClient,
mergeable_queue_tx: MergeableQueueSender,
) -> anyhow::Result<()> {
let db = Arc::clone(&ctx.db);
match event {
Expand All @@ -234,9 +235,10 @@ pub async fn handle_bors_global_event(
ctx.repositories.read().unwrap().values().cloned().collect();
futures::future::join_all(repos.into_iter().map(|repo| {
let repo = Arc::clone(&repo);
let mergeable_queue_tx = mergeable_queue_tx.clone();
async {
let subspan = tracing::info_span!("Repo", repo = repo.repository().to_string());
refresh_repository(repo, Arc::clone(&db), team_api_client)
refresh_repository(repo, Arc::clone(&db), team_api_client, mergeable_queue_tx)
.instrument(subspan)
.await
}
Expand Down
1 change: 0 additions & 1 deletion src/bors/handlers/pr_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,6 @@ mod tests {
.await;
}

#[tracing_test::traced_test]
#[sqlx::test]
async fn open_and_merge_pr(pool: sqlx::PgPool) {
run_test(pool, |mut tester| async {
Expand Down
88 changes: 82 additions & 6 deletions src/bors/handlers/refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,32 @@ use std::time::Duration;

use anyhow::Context;
use chrono::{DateTime, Utc};
use futures::FutureExt;
use futures::future::join_all;

use crate::bors::Comment;
use crate::bors::RepositoryState;
use crate::bors::handlers::trybuild::cancel_build_workflows;
use crate::bors::mergeable_queue::MergeableQueueSender;
use crate::database::BuildStatus;
use crate::{PgDbClient, TeamApiClient};

pub async fn refresh_repository(
repo: Arc<RepositoryState>,
db: Arc<PgDbClient>,
team_api_client: &TeamApiClient,
mergeable_queue_tx: MergeableQueueSender,
) -> anyhow::Result<()> {
let repo = repo.as_ref();
if let (Ok(_), _, Ok(_)) = tokio::join!(
cancel_timed_out_builds(repo, db.as_ref()),
reload_permission(repo, team_api_client),
reload_config(repo)
) {
let results = join_all([
cancel_timed_out_builds(repo, db.as_ref()).boxed(),
reload_permission(repo, team_api_client).boxed(),
reload_config(repo).boxed(),
reload_unknown_mergeable_prs(repo, db.as_ref(), mergeable_queue_tx).boxed(),
])
.await;

if results.iter().all(|result| result.is_ok()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're swallowing errors here, we should log them instead. It was pre-existing though, so let's fix it in a separate PR.

Ok(())
} else {
tracing::error!("Failed to refresh repository");
Expand Down Expand Up @@ -79,6 +87,27 @@ async fn reload_permission(
Ok(())
}

async fn reload_unknown_mergeable_prs(
repo: &RepositoryState,
db: &PgDbClient,
mergeable_queue: MergeableQueueSender,
) -> anyhow::Result<()> {
let prs = db
.get_prs_with_unknown_mergeable_state(repo.repository())
.await?;

tracing::info!(
"Refreshing {} PR(s) with unknown mergeable state",
prs.len()
);

for pr in prs {
mergeable_queue.enqueue(repo.repository().clone(), pr.number);
}

Ok(())
}

async fn reload_config(repo: &RepositoryState) -> anyhow::Result<()> {
let config = repo.client.load_config().await?;
repo.config.store(Arc::new(config));
Expand Down Expand Up @@ -109,8 +138,9 @@ fn elapsed_time(date: DateTime<Utc>) -> Duration {
mod tests {
use crate::bors::handlers::WAIT_FOR_WORKFLOW_STARTED;
use crate::bors::handlers::refresh::MOCK_TIME;
use crate::database::{MergeableState, OctocrabMergeableState};
use crate::tests::mocks::{
BorsBuilder, GitHubState, WorkflowEvent, default_repo_name, run_test,
BorsBuilder, GitHubState, WorkflowEvent, default_pr_number, default_repo_name, run_test,
};
use chrono::Utc;
use std::future::Future;
Expand All @@ -133,6 +163,7 @@ timeout = 3600
"#,
)
}

#[sqlx::test]
async fn refresh_do_nothing_before_timeout(pool: sqlx::PgPool) {
BorsBuilder::new(pool)
Expand Down Expand Up @@ -207,6 +238,51 @@ timeout = 3600
gh.check_cancelled_workflows(default_repo_name(), &[1]);
}

#[sqlx::test]
async fn refresh_enqueues_unknown_mergeable_prs(pool: sqlx::PgPool) {
run_test(pool, |mut tester| async {
tester
.edit_pr(default_repo_name(), default_pr_number(), |pr| {
pr.mergeable_state = OctocrabMergeableState::Unknown
})
.await?;
tester
.wait_for_default_pr(|pr| pr.mergeable_state == MergeableState::Unknown)
.await?;
tester
.default_repo()
.lock()
.get_pr_mut(default_pr_number())
.mergeable_state = OctocrabMergeableState::Dirty;
tester.refresh().await;
tester
.wait_for_default_pr(|pr| pr.mergeable_state == MergeableState::HasConflicts)
.await?;
Ok(tester)
})
.await;
}

#[sqlx::test]
async fn refresh_enqueues_no_known_mergeable_prs(pool: sqlx::PgPool) {
run_test(pool, |mut tester| async {
tester
.edit_pr(default_repo_name(), default_pr_number(), |pr| {
pr.mergeable_state = OctocrabMergeableState::Clean
})
.await?;
tester
.wait_for_default_pr(|pr| pr.mergeable_state == MergeableState::Mergeable)
.await?;
tester.refresh().await;
tester
.wait_for_default_pr(|pr| pr.mergeable_state == MergeableState::Mergeable)
.await?;
Ok(tester)
})
.await;
}

async fn with_mocked_time<Fut: Future<Output = ()>>(in_future: Duration, future: Fut) {
// It is important to use this function only with a single threaded runtime,
// otherwise the `MOCK_TIME` variable might get mixed up between different threads.
Expand Down
18 changes: 13 additions & 5 deletions src/database/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ use crate::github::{CommitSha, GithubRepoName};
use super::operations::{
approve_pull_request, create_build, create_pull_request, create_workflow,
delegate_pull_request, find_build, find_pr_by_build,
get_nonclosed_pull_requests_by_base_branch, get_pull_request, get_repository,
get_running_builds, get_workflow_urls_for_build, get_workflows_for_build, set_pr_priority,
set_pr_rollup, set_pr_status, unapprove_pull_request, undelegate_pull_request,
update_build_status, update_mergeable_states_by_base_branch, update_pr_build_id,
update_pr_mergeable_state, update_workflow_status, upsert_pull_request, upsert_repository,
get_nonclosed_pull_requests_by_base_branch, get_prs_with_unknown_mergeable_state,
get_pull_request, get_repository, get_running_builds, get_workflow_urls_for_build,
get_workflows_for_build, set_pr_priority, set_pr_rollup, set_pr_status, unapprove_pull_request,
undelegate_pull_request, update_build_status, update_mergeable_states_by_base_branch,
update_pr_build_id, update_pr_mergeable_state, update_workflow_status, upsert_pull_request,
upsert_repository,
};
use super::{ApprovalInfo, DelegatedPermission, MergeableState, RunId};

Expand Down Expand Up @@ -121,6 +122,13 @@ impl PgDbClient {
get_nonclosed_pull_requests_by_base_branch(&self.pool, repo, base_branch).await
}

pub async fn get_prs_with_unknown_mergeable_state(
&self,
repo: &GithubRepoName,
) -> anyhow::Result<Vec<PullRequestModel>> {
get_prs_with_unknown_mergeable_state(&self.pool, repo).await
}

pub async fn create_pull_request(
&self,
repo: &GithubRepoName,
Expand Down
40 changes: 40 additions & 0 deletions src/database/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,46 @@ pub(crate) async fn update_pr_mergeable_state(
.await
}

pub(crate) async fn get_prs_with_unknown_mergeable_state(
executor: impl PgExecutor<'_>,
repo: &GithubRepoName,
) -> anyhow::Result<Vec<PullRequestModel>> {
measure_db_query("get_prs_with_unknown_mergeable_state", || async {
let records = sqlx::query_as!(
PullRequestModel,
r#"
SELECT
pr.id,
pr.repository as "repository: GithubRepoName",
pr.number as "number!: i64",
(
pr.approved_by,
pr.approved_sha
) AS "approval_status!: ApprovalStatus",
pr.status as "pr_status: PullRequestStatus",
pr.priority,
pr.rollup as "rollup: RollupMode",
pr.delegated_permission as "delegated_permission: DelegatedPermission",
pr.base_branch,
pr.mergeable_state as "mergeable_state: MergeableState",
pr.created_at as "created_at: DateTime<Utc>",
build AS "try_build: BuildModel"
FROM pull_request as pr
LEFT JOIN build ON pr.build_id = build.id
WHERE pr.repository = $1
AND pr.mergeable_state = 'unknown'
AND pr.status IN ('open', 'draft')
"#,
repo as &GithubRepoName
)
.fetch_all(executor)
.await?;

Ok(records)
})
.await
}

pub(crate) async fn update_mergeable_states_by_base_branch(
executor: impl PgExecutor<'_>,
repo: &GithubRepoName,
Expand Down
Loading