Skip to content

Commit 612dd59

Browse files
authored
worker/jobs/downloads/update_metadata: Remove spawn_blocking() call (#10030)
1 parent 86286ba commit 612dd59

File tree

1 file changed

+111
-66
lines changed

1 file changed

+111
-66
lines changed

src/worker/jobs/downloads/update_metadata.rs

Lines changed: 111 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
use crate::schema::version_downloads;
2-
use crate::tasks::spawn_blocking;
3-
use crate::util::diesel::Conn;
42
use crate::worker::Environment;
53
use crates_io_worker::BackgroundJob;
64
use diesel::prelude::*;
75
use diesel::sql_types::BigInt;
8-
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
6+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
97
use std::sync::Arc;
108
use std::time::{Duration, Instant};
119

@@ -19,16 +17,12 @@ impl BackgroundJob for UpdateDownloads {
1917
type Context = Arc<Environment>;
2018

2119
async fn run(&self, env: Self::Context) -> anyhow::Result<()> {
22-
let conn = env.deadpool.get().await?;
23-
spawn_blocking(move || {
24-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
25-
Ok(update(conn)?)
26-
})
27-
.await?
20+
let mut conn = env.deadpool.get().await?;
21+
Ok(update(&mut conn).await?)
2822
}
2923
}
3024

31-
fn update(conn: &mut impl Conn) -> QueryResult<()> {
25+
async fn update(conn: &mut AsyncPgConnection) -> QueryResult<()> {
3226
use diesel::dsl::now;
3327
use diesel::select;
3428

@@ -46,7 +40,7 @@ fn update(conn: &mut impl Conn) -> QueryResult<()> {
4640

4741
let start_time = Instant::now();
4842
loop {
49-
let update_count = batch_update(BATCH_SIZE, conn)?;
43+
let update_count = batch_update(BATCH_SIZE, conn).await?;
5044
info!("Updated {update_count} versions");
5145
if update_count < BATCH_SIZE {
5246
break;
@@ -67,18 +61,23 @@ fn update(conn: &mut impl Conn) -> QueryResult<()> {
6761
.filter(version_downloads::date.lt(diesel::dsl::date(now)))
6862
.filter(version_downloads::downloads.eq(version_downloads::counted))
6963
.filter(version_downloads::processed.eq(false))
70-
.execute(conn)?;
64+
.execute(conn)
65+
.await?;
7166
info!("Finished freezing old version_downloads");
7267

7368
define_sql_function!(fn refresh_recent_crate_downloads());
74-
select(refresh_recent_crate_downloads()).execute(conn)?;
69+
70+
select(refresh_recent_crate_downloads())
71+
.execute(conn)
72+
.await?;
73+
7574
info!("Finished running refresh_recent_crate_downloads");
7675

7776
Ok(())
7877
}
7978

8079
#[instrument(skip_all)]
81-
fn batch_update(batch_size: i64, conn: &mut impl Conn) -> QueryResult<i64> {
80+
async fn batch_update(batch_size: i64, conn: &mut AsyncPgConnection) -> QueryResult<i64> {
8281
table! {
8382
/// Imaginary table to make Diesel happy when using the `sql_query` function.
8483
sql_query_results (count) {
@@ -97,7 +96,8 @@ fn batch_update(batch_size: i64, conn: &mut impl Conn) -> QueryResult<i64> {
9796

9897
let result = diesel::sql_query(include_str!("update_metadata.sql"))
9998
.bind::<BigInt, _>(batch_size)
100-
.get_result::<SqlQueryResult>(conn)?;
99+
.get_result::<SqlQueryResult>(conn)
100+
.await?;
101101

102102
Ok(result.count)
103103
}
@@ -107,13 +107,16 @@ mod tests {
107107
use super::*;
108108
use crate::models::{Crate, NewCrate, NewUser, NewVersion, User, Version};
109109
use crate::schema::{crate_downloads, crates, users, versions};
110+
use crate::util::diesel::Conn;
110111
use crates_io_test_db::TestDatabase;
112+
use diesel_async::AsyncConnection;
111113

112-
fn user(conn: &mut impl Conn) -> User {
114+
async fn user(conn: &mut AsyncPgConnection) -> User {
113115
let user = NewUser::new(2, "login", None, None, "access_token");
114116
diesel::insert_into(users::table)
115117
.values(user)
116118
.get_result(conn)
119+
.await
117120
.unwrap()
118121
}
119122

@@ -134,57 +137,66 @@ mod tests {
134137
(krate, version)
135138
}
136139

137-
#[test]
138-
fn increment() {
140+
#[tokio::test]
141+
async fn increment() {
139142
use diesel::dsl::*;
140143

141144
let test_db = TestDatabase::new();
142145
let conn = &mut test_db.connect();
143-
let user = user(conn);
146+
let mut async_conn = test_db.async_connect().await;
147+
148+
let user = user(&mut async_conn).await;
144149
let (krate, version) = crate_and_version(conn, user.id);
145150
insert_into(version_downloads::table)
146151
.values(version_downloads::version_id.eq(version.id))
147-
.execute(conn)
152+
.execute(&mut async_conn)
153+
.await
148154
.unwrap();
149155
insert_into(version_downloads::table)
150156
.values((
151157
version_downloads::version_id.eq(version.id),
152158
version_downloads::date.eq(date(now - 1.day())),
153159
version_downloads::processed.eq(true),
154160
))
155-
.execute(conn)
161+
.execute(&mut async_conn)
162+
.await
156163
.unwrap();
157164

158-
super::update(conn).unwrap();
165+
super::update(&mut async_conn).await.unwrap();
159166

160167
let version_downloads = versions::table
161168
.find(version.id)
162169
.select(versions::downloads)
163-
.first(conn);
170+
.first(&mut async_conn)
171+
.await;
164172
assert_eq!(version_downloads, Ok(1));
165173

166174
let crate_downloads = crate_downloads::table
167175
.find(krate.id)
168176
.select(crate_downloads::downloads)
169-
.first(conn);
177+
.first(&mut async_conn)
178+
.await;
170179
assert_eq!(crate_downloads, Ok(1));
171180

172-
super::update(conn).unwrap();
181+
super::update(&mut async_conn).await.unwrap();
173182

174183
let version_downloads = versions::table
175184
.find(version.id)
176185
.select(versions::downloads)
177-
.first(conn);
186+
.first(&mut async_conn)
187+
.await;
178188
assert_eq!(version_downloads, Ok(1));
179189
}
180190

181-
#[test]
182-
fn set_processed_true() {
191+
#[tokio::test]
192+
async fn set_processed_true() {
183193
use diesel::dsl::*;
184194

185195
let test_db = TestDatabase::new();
186196
let conn = &mut test_db.connect();
187-
let user = user(conn);
197+
let mut async_conn = test_db.async_connect().await;
198+
199+
let user = user(&mut async_conn).await;
188200
let (_, version) = crate_and_version(conn, user.id);
189201
insert_into(version_downloads::table)
190202
.values((
@@ -194,22 +206,26 @@ mod tests {
194206
version_downloads::date.eq(date(now - 2.days())),
195207
version_downloads::processed.eq(false),
196208
))
197-
.execute(conn)
209+
.execute(&mut async_conn)
210+
.await
198211
.unwrap();
199-
super::update(conn).unwrap();
212+
super::update(&mut async_conn).await.unwrap();
200213
let processed = version_downloads::table
201214
.filter(version_downloads::version_id.eq(version.id))
202215
.select(version_downloads::processed)
203-
.first(conn);
216+
.first(&mut async_conn)
217+
.await;
204218
assert_eq!(processed, Ok(true));
205219
}
206220

207-
#[test]
208-
fn dont_process_recent_row() {
221+
#[tokio::test]
222+
async fn dont_process_recent_row() {
209223
use diesel::dsl::*;
210224
let test_db = TestDatabase::new();
211225
let conn = &mut test_db.connect();
212-
let user = user(conn);
226+
let mut async_conn = test_db.async_connect().await;
227+
228+
let user = user(&mut async_conn).await;
213229
let (_, version) = crate_and_version(conn, user.id);
214230
insert_into(version_downloads::table)
215231
.values((
@@ -219,32 +235,38 @@ mod tests {
219235
version_downloads::date.eq(date(now)),
220236
version_downloads::processed.eq(false),
221237
))
222-
.execute(conn)
238+
.execute(&mut async_conn)
239+
.await
223240
.unwrap();
224-
super::update(conn).unwrap();
241+
super::update(&mut async_conn).await.unwrap();
225242
let processed = version_downloads::table
226243
.filter(version_downloads::version_id.eq(version.id))
227244
.select(version_downloads::processed)
228-
.first(conn);
245+
.first(&mut async_conn)
246+
.await;
229247
assert_eq!(processed, Ok(false));
230248
}
231249

232-
#[test]
233-
fn increment_a_little() {
250+
#[tokio::test]
251+
async fn increment_a_little() {
234252
use diesel::dsl::*;
235253
use diesel::update;
236254

237255
let test_db = TestDatabase::new();
238256
let conn = &mut test_db.connect();
239-
let user = user(conn);
257+
let mut async_conn = test_db.async_connect().await;
258+
259+
let user = user(&mut async_conn).await;
240260
let (krate, version) = crate_and_version(conn, user.id);
241261
update(versions::table)
242262
.set(versions::updated_at.eq(now - 2.hours()))
243-
.execute(conn)
263+
.execute(&mut async_conn)
264+
.await
244265
.unwrap();
245266
update(crates::table)
246267
.set(crates::updated_at.eq(now - 2.hours()))
247-
.execute(conn)
268+
.execute(&mut async_conn)
269+
.await
248270
.unwrap();
249271
insert_into(version_downloads::table)
250272
.values((
@@ -254,70 +276,89 @@ mod tests {
254276
version_downloads::date.eq(date(now)),
255277
version_downloads::processed.eq(false),
256278
))
257-
.execute(conn)
279+
.execute(&mut async_conn)
280+
.await
258281
.unwrap();
259282
insert_into(version_downloads::table)
260283
.values((
261284
version_downloads::version_id.eq(version.id),
262285
version_downloads::date.eq(date(now - 1.day())),
263286
))
264-
.execute(conn)
287+
.execute(&mut async_conn)
288+
.await
265289
.unwrap();
266290

267-
let version_before: Version = versions::table.find(version.id).first(conn).unwrap();
291+
let version_before: Version = versions::table
292+
.find(version.id)
293+
.first(&mut async_conn)
294+
.await
295+
.unwrap();
268296
let krate_before: Crate = Crate::all()
269297
.filter(crates::id.eq(krate.id))
270-
.first(conn)
298+
.first(&mut async_conn)
299+
.await
271300
.unwrap();
272301

273-
super::update(conn).unwrap();
302+
super::update(&mut async_conn).await.unwrap();
274303

275-
let version2: Version = versions::table.find(version.id).first(conn).unwrap();
304+
let version2: Version = versions::table
305+
.find(version.id)
306+
.first(&mut async_conn)
307+
.await
308+
.unwrap();
276309
assert_eq!(version2.downloads, 2);
277310
assert_eq!(version2.updated_at, version_before.updated_at);
278311

279312
let krate2: Crate = Crate::all()
280313
.filter(crates::id.eq(krate.id))
281-
.first(conn)
314+
.first(&mut async_conn)
315+
.await
282316
.unwrap();
283317
assert_eq!(krate2.updated_at, krate_before.updated_at);
284318

285319
let krate2_downloads: i64 = crate_downloads::table
286320
.find(krate.id)
287321
.select(crate_downloads::downloads)
288-
.first(conn)
322+
.first(&mut async_conn)
323+
.await
289324
.unwrap();
290325
assert_eq!(krate2_downloads, 2);
291326

292-
super::update(conn).unwrap();
327+
super::update(&mut async_conn).await.unwrap();
293328

294-
let version3: Version = versions::table.find(version.id).first(conn).unwrap();
329+
let version3: Version = versions::table
330+
.find(version.id)
331+
.first(&mut async_conn)
332+
.await
333+
.unwrap();
295334
assert_eq!(version3.downloads, 2);
296335
}
297336

298-
#[test]
299-
fn set_processed_no_set_updated_at() {
337+
#[tokio::test]
338+
async fn set_processed_no_set_updated_at() {
300339
use diesel::dsl::*;
301340
use diesel::update;
302341

303342
let test_db = TestDatabase::new();
304343
let mut conn = test_db.connect();
344+
let mut async_conn = test_db.async_connect().await;
345+
346+
let user = user(&mut async_conn).await;
347+
let (_, version) = crate_and_version(&mut conn, user.id);
305348

306349
// This test is using a transaction to ensure `now` is the same for all
307350
// queries within this test.
308-
conn.begin_test_transaction().unwrap();
309-
310-
let conn = &mut conn;
351+
async_conn.begin_test_transaction().await.unwrap();
311352

312-
let user = user(conn);
313-
let (_, version) = crate_and_version(conn, user.id);
314353
update(versions::table)
315354
.set(versions::updated_at.eq(now - 2.days()))
316-
.execute(conn)
355+
.execute(&mut async_conn)
356+
.await
317357
.unwrap();
318358
update(crates::table)
319359
.set(crates::updated_at.eq(now - 2.days()))
320-
.execute(conn)
360+
.execute(&mut async_conn)
361+
.await
321362
.unwrap();
322363
insert_into(version_downloads::table)
323364
.values((
@@ -327,16 +368,20 @@ mod tests {
327368
version_downloads::date.eq(date(now - 2.days())),
328369
version_downloads::processed.eq(false),
329370
))
330-
.execute(conn)
371+
.execute(&mut async_conn)
372+
.await
331373
.unwrap();
332374

333-
super::update(conn).unwrap();
375+
super::update(&mut async_conn).await.unwrap();
376+
334377
let versions_changed = versions::table
335378
.select(versions::updated_at.ne(now - 2.days()))
336-
.get_result(conn);
379+
.get_result(&mut async_conn)
380+
.await;
337381
let crates_changed = crates::table
338382
.select(crates::updated_at.ne(now - 2.days()))
339-
.get_result(conn);
383+
.get_result(&mut async_conn)
384+
.await;
340385
assert_eq!(versions_changed, Ok(false));
341386
assert_eq!(crates_changed, Ok(false));
342387
}

0 commit comments

Comments
 (0)