Skip to content

Commit 6f0406b

Browse files
committed
Add mergeable queue shutdown message type
1 parent 1b7eaf1 commit 6f0406b

File tree

3 files changed

+53
-19
lines changed

3 files changed

+53
-19
lines changed

src/bors/mergeable_queue.rs

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,16 @@ pub struct MergeableQueueItem {
3636
}
3737

3838
#[derive(Debug, Clone)]
39+
enum QueueMessage {
40+
Item(MergeableQueueItem),
41+
Shutdown,
42+
}
43+
3944
struct Item {
4045
/// When to process item (None = immediate).
4146
/// Reversed to create min-heap for expirations.
4247
expiration: Reverse<Option<Instant>>,
43-
inner: MergeableQueueItem,
48+
inner: QueueMessage,
4449
}
4550

4651
impl PartialEq for Item {
@@ -94,6 +99,23 @@ pub fn create_mergeable_queue() -> (MergeableQueueSender, MergeableQueueReceiver
9499
}
95100

96101
impl MergeableQueueSender {
102+
pub fn is_closed(&self) -> bool {
103+
self.inner.closed.load(Ordering::Relaxed)
104+
}
105+
106+
pub fn shutdown(&self) {
107+
let mut queue = self.inner.queue.lock().unwrap();
108+
109+
// Send shutdown message
110+
queue.push(Item {
111+
expiration: Reverse(None),
112+
inner: QueueMessage::Shutdown,
113+
});
114+
115+
// and wake receiver for immediate processing.
116+
self.inner.notify.notify_one();
117+
}
118+
97119
pub fn enqueue(&self, repo: GithubRepoName, pr_number: PullRequestNumber) {
98120
let expiration = Some(Instant::now() + BASE_DELAY);
99121

@@ -148,7 +170,7 @@ impl MergeableQueueSender {
148170

149171
queue.push(Item {
150172
expiration: Reverse(expiration),
151-
inner: item,
173+
inner: QueueMessage::Item(item),
152174
});
153175

154176
if should_notify {
@@ -157,19 +179,12 @@ impl MergeableQueueSender {
157179
}
158180
}
159181

160-
impl Drop for MergeableQueueSender {
161-
fn drop(&mut self) {
162-
// Count is 2 if this is the last sender (this sender + receiver)
163-
if Arc::strong_count(&self.inner) == 2 {
164-
self.inner.closed.store(true, Ordering::Relaxed);
165-
// Notify the receiver in case it's waiting.
166-
self.inner.notify.notify_one();
167-
}
182+
impl MergeableQueueReceiver {
183+
fn is_closed(&self) -> bool {
184+
self.inner.closed.load(Ordering::Relaxed)
168185
}
169-
}
170186

171-
impl MergeableQueueReceiver {
172-
fn peek_inner(&self) -> Result<MergeableQueueItem, Option<Duration>> {
187+
fn peek_inner(&self) -> Result<QueueMessage, Option<Duration>> {
173188
let now = Instant::now();
174189
let mut queue = self.inner.queue.lock().unwrap();
175190

@@ -206,30 +221,32 @@ impl MergeableQueueReceiver {
206221
pub async fn dequeue(&self) -> Option<(MergeableQueueItem, MergeableQueueSender)> {
207222
loop {
208223
// Closed and empty queue, we're done.
209-
if self.inner.closed.load(Ordering::Relaxed)
210-
&& self.inner.queue.lock().unwrap().is_empty()
211-
{
224+
if self.is_closed() && self.inner.queue.lock().unwrap().is_empty() {
212225
return None;
213226
}
214227

215228
match self.peek_inner() {
216229
// Item is ready.
217-
Ok(item) => {
230+
Ok(QueueMessage::Item(item)) => {
218231
break Some((
219232
item,
220233
MergeableQueueSender {
221234
inner: self.inner.clone(),
222235
},
223236
));
224237
}
238+
Ok(QueueMessage::Shutdown) => {
239+
self.inner.closed.store(true, Ordering::Relaxed);
240+
break None;
241+
}
225242
// Item exists but not ready, wait until then or until notified of a higher priority item.
226243
Err(Some(duration)) => {
227244
let _ = timeout(duration, self.inner.notify.notified()).await;
228245
}
229246
// Queue is empty, wait until notified of a new item.
230247
Err(None) => {
231248
// If closed, we're done
232-
if self.inner.closed.load(Ordering::Relaxed) {
249+
if self.is_closed() {
233250
return None;
234251
}
235252
// else, wait until next item.
@@ -309,7 +326,15 @@ pub async fn handle_mergeable_queue_item(
309326
tracing::info!(
310327
"Retrying mergeable state check for PR: {pull_request} ({attempt}/{MAX_RETRIES})",
311328
);
312-
mq_tx.enqueue_retry(mq_item);
329+
330+
if mq_tx.is_closed() {
331+
tracing::error!(
332+
"Attempted to enqueue retry for PR {pull_request} but mergeable queue is shutdown"
333+
);
334+
} else {
335+
mq_tx.enqueue_retry(mq_item);
336+
}
337+
313338
return Ok(());
314339
}
315340

src/github/server.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ pub async fn github_webhook_handler(
8787
pub struct BorsProcess {
8888
pub repository_tx: mpsc::Sender<BorsRepositoryEvent>,
8989
pub global_tx: mpsc::Sender<BorsGlobalEvent>,
90+
pub mergeable_queue_tx: MergeableQueueSender,
9091
pub bors_process: Pin<Box<dyn Future<Output = ()> + Send>>,
9192
}
9293

@@ -101,6 +102,8 @@ pub fn create_bors_process(
101102
let (global_tx, global_rx) = mpsc::channel::<BorsGlobalEvent>(1024);
102103
let (mergeable_queue_tx, mergeable_queue_rx) = create_mergeable_queue();
103104

105+
let mq_tx = mergeable_queue_tx.clone();
106+
104107
let service = async move {
105108
let ctx = Arc::new(ctx);
106109

@@ -138,6 +141,7 @@ pub fn create_bors_process(
138141
BorsProcess {
139142
repository_tx,
140143
global_tx,
144+
mergeable_queue_tx: mq_tx,
141145
bors_process: Box::pin(service),
142146
}
143147
}

src/tests/mocks/bors.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use tokio::sync::mpsc::Sender;
1212
use tokio::task::JoinHandle;
1313
use tower::Service;
1414

15+
use crate::bors::mergeable_queue::MergeableQueueSender;
1516
use crate::bors::{RollupMode, WAIT_FOR_REFRESH};
1617
use crate::database::{BuildStatus, DelegatedPermission, PullRequestModel};
1718
use crate::github::api::load_repositories;
@@ -98,6 +99,7 @@ pub struct BorsTester {
9899
http_mock: ExternalHttpMock,
99100
github: GitHubState,
100101
db: Arc<PgDbClient>,
102+
mergeable_queue_tx: MergeableQueueSender,
101103
// Sender for bors global events
102104
global_tx: Sender<BorsGlobalEvent>,
103105
}
@@ -121,6 +123,7 @@ impl BorsTester {
121123
let BorsProcess {
122124
repository_tx,
123125
global_tx,
126+
mergeable_queue_tx,
124127
bors_process,
125128
} = create_bors_process(ctx, mock.github_client(), mock.team_api_client());
126129

@@ -137,6 +140,7 @@ impl BorsTester {
137140
http_mock: mock,
138141
github,
139142
db,
143+
mergeable_queue_tx,
140144
global_tx,
141145
},
142146
bors,
@@ -628,6 +632,7 @@ impl BorsTester {
628632
// Make sure that the event channel senders are closed
629633
drop(self.app);
630634
drop(self.global_tx);
635+
self.mergeable_queue_tx.shutdown();
631636
// Wait until all events are handled in the bors service
632637
bors.await.unwrap();
633638
// Flush any local queues

0 commit comments

Comments
 (0)