Skip to content

Use the read-only replica for some queries in production #2073

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ use scheduled_thread_pool::ScheduledThreadPool;
// The db, oauth, and git2 types don't implement debug.
#[allow(missing_debug_implementations)]
pub struct App {
/// The database connection pool
pub diesel_database: db::DieselPool,
/// The primary database connection pool
pub primary_database: db::DieselPool,

/// The read-only replica database connection pool
pub read_only_replica_database: Option<db::DieselPool>,

/// The GitHub OAuth2 configuration
pub github: BasicClient,
Expand Down Expand Up @@ -78,29 +81,53 @@ impl App {
_ => 1,
};

// Used as the connection and statement timeout value for the database pool(s)
let db_connection_timeout = match (dotenv::var("DB_TIMEOUT"), config.env) {
(Ok(num), _) => num.parse().expect("couldn't parse DB_TIMEOUT"),
(_, Env::Production) => 10,
(_, Env::Test) => 1,
_ => 30,
};

// Determine if the primary pool is also read-only
let read_only_mode = dotenv::var("READ_ONLY_MODE").is_ok();
let connection_config = db::ConnectionConfig {
let primary_db_connection_config = db::ConnectionConfig {
statement_timeout: db_connection_timeout,
read_only: read_only_mode,
};

let thread_pool = Arc::new(ScheduledThreadPool::new(db_helper_threads));

let diesel_db_config = r2d2::Pool::builder()
let primary_db_config = r2d2::Pool::builder()
.max_size(db_pool_size)
.min_idle(db_min_idle)
.connection_timeout(Duration::from_secs(db_connection_timeout))
.connection_customizer(Box::new(connection_config))
.thread_pool(thread_pool);
.connection_customizer(Box::new(primary_db_connection_config))
.thread_pool(thread_pool.clone());

let primary_database = db::diesel_pool(&config.db_url, config.env, primary_db_config);

let read_only_replica_database = if let Some(url) = &config.replica_db_url {
let replica_db_connection_config = db::ConnectionConfig {
statement_timeout: db_connection_timeout,
read_only: true,
};

let replica_db_config = r2d2::Pool::builder()
.max_size(db_pool_size)
.min_idle(db_min_idle)
.connection_timeout(Duration::from_secs(db_connection_timeout))
.connection_customizer(Box::new(replica_db_connection_config))
.thread_pool(thread_pool);

Some(db::diesel_pool(&url, config.env, replica_db_config))
} else {
None
};

App {
diesel_database: db::diesel_pool(&config.db_url, config.env, diesel_db_config),
primary_database,
read_only_replica_database,
github,
session_key: config.session_key.clone(),
git_repo_checkout: config.git_repo_checkout.clone(),
Expand Down
5 changes: 4 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub struct Config {
pub gh_client_id: String,
pub gh_client_secret: String,
pub db_url: String,
pub replica_db_url: Option<String>,
pub env: Env,
pub max_upload_size: u64,
pub max_unpack_size: u64,
Expand All @@ -33,14 +34,15 @@ impl Default for Config {
/// - `MIRROR`: Is this instance of cargo_registry a mirror of crates.io.
/// - `HEROKU`: Is this instance of cargo_registry currently running on Heroku.
/// - `S3_BUCKET`: The S3 bucket used to store crate files. If not present during development,
/// cargo_registry will fall back to a local uploader.
/// cargo_registry will fall back to a local uploader.
/// - `S3_REGION`: The region in which the bucket was created. Optional if US standard.
/// - `S3_ACCESS_KEY`: The access key to interact with S3. Optional if running a mirror.
/// - `S3_SECRET_KEY`: The secret key to interact with S3. Optional if running a mirror.
/// - `SESSION_KEY`: The key used to sign and encrypt session cookies.
/// - `GH_CLIENT_ID`: The client ID of the associated GitHub application.
/// - `GH_CLIENT_SECRET`: The client secret of the associated GitHub application.
/// - `DATABASE_URL`: The URL of the postgres database to use.
/// - `READ_ONLY_REPLICA_URL`: The URL of an optional postgres read-only replica database.
/// - `BLOCKED_TRAFFIC`: A list of headers and environment variables to use for blocking
///. traffic. See the `block_traffic` module for more documentation.
fn default() -> Config {
Expand Down Expand Up @@ -129,6 +131,7 @@ impl Default for Config {
gh_client_id: env("GH_CLIENT_ID"),
gh_client_secret: env("GH_CLIENT_SECRET"),
db_url: env("DATABASE_URL"),
replica_db_url: dotenv::var("READ_ONLY_REPLICA_URL").ok(),
env: cargo_env,
max_upload_size: 10 * 1024 * 1024, // 10 MB default file upload size limit
max_unpack_size: 512 * 1024 * 1024, // 512 MB max when decompressed
Expand Down
2 changes: 1 addition & 1 deletion src/controllers/krate/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub fn downloads(req: &mut dyn Request) -> AppResult<Response> {
use diesel::sql_types::BigInt;

let crate_name = &req.params()["crate_id"];
let conn = req.db_conn()?;
let conn = req.db_read_only()?;
let krate = Crate::by_name(crate_name).first::<Crate>(&*conn)?;

let mut versions = krate.all_versions().load::<Version>(&*conn)?;
Expand Down
8 changes: 4 additions & 4 deletions src/controllers/krate/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::models::krate::ALL_COLUMNS;
pub fn summary(req: &mut dyn Request) -> AppResult<Response> {
use crate::schema::crates::dsl::*;

let conn = req.db_conn()?;
let conn = req.db_read_only()?;
let num_crates = crates.count().get_result(&*conn)?;
let num_downloads = metadata::table
.select(metadata::total_downloads)
Expand Down Expand Up @@ -103,7 +103,7 @@ pub fn summary(req: &mut dyn Request) -> AppResult<Response> {
/// Handles the `GET /crates/:crate_id` route.
pub fn show(req: &mut dyn Request) -> AppResult<Response> {
let name = &req.params()["crate_id"];
let conn = req.db_conn()?;
let conn = req.db_read_only()?;
let krate = Crate::by_name(name).first::<Crate>(&*conn)?;

let mut versions_and_publishers = krate
Expand Down Expand Up @@ -199,7 +199,7 @@ pub fn readme(req: &mut dyn Request) -> AppResult<Response> {
// this information already, but ember is definitely requesting it
pub fn versions(req: &mut dyn Request) -> AppResult<Response> {
let crate_name = &req.params()["crate_id"];
let conn = req.db_conn()?;
let conn = req.db_read_only()?;
let krate = Crate::by_name(crate_name).first::<Crate>(&*conn)?;
let mut versions_and_publishers: Vec<(Version, Option<User>)> = krate
.all_versions()
Expand Down Expand Up @@ -230,7 +230,7 @@ pub fn reverse_dependencies(req: &mut dyn Request) -> AppResult<Response> {
use diesel::dsl::any;

let name = &req.params()["crate_id"];
let conn = req.db_conn()?;
let conn = req.db_read_only()?;
let krate = Crate::by_name(name).first::<Crate>(&*conn)?;
let (rev_deps, total) = krate.reverse_dependencies(&*conn, &req.query())?;
let rev_deps: Vec<_> = rev_deps
Expand Down
2 changes: 1 addition & 1 deletion src/controllers/krate/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub fn publish(req: &mut dyn Request) -> AppResult<Response> {

let new_crate = parse_new_headers(req)?;

let conn = app.diesel_database.get()?;
let conn = app.primary_database.get()?;
let ids = req.authenticate(&conn)?;
let user = ids.find_user(&conn)?;

Expand Down
2 changes: 1 addition & 1 deletion src/controllers/krate/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::models::krate::{canon_crate_name, ALL_COLUMNS};
pub fn search(req: &mut dyn Request) -> AppResult<Response> {
use diesel::sql_types::{Bool, Text};

let conn = req.db_conn()?;
let conn = req.db_read_only()?;
let params = req.query();
let sort = params.get("sort").map(|s| &**s);
let include_yanked = params
Expand Down
33 changes: 16 additions & 17 deletions src/controllers/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,27 @@ pub mod yank;
use super::prelude::*;

use crate::db::DieselPooledConn;
use crate::models::{Crate, CrateVersions, Version};
use crate::schema::versions;
use crate::models::{Crate, Version};

fn version_and_crate(req: &dyn Request) -> AppResult<(DieselPooledConn<'_>, Version, Crate)> {
let crate_name = &req.params()["crate_id"];
let semver = &req.params()["version"];
if semver::Version::parse(semver).is_err() {
return Err(cargo_err(&format_args!("invalid semver: {}", semver)));
};
let crate_name = extract_crate_name(req);
let semver = extract_semver(req)?;

let conn = req.db_conn()?;
let krate = Crate::by_name(crate_name).first::<Crate>(&*conn)?;
let version = krate
.all_versions()
.filter(versions::num.eq(semver))
.first(&*conn)
.map_err(|_| {
cargo_err(&format_args!(
"crate `{}` does not have a version `{}`",
crate_name, semver
))
})?;
let version = krate.find_version(&conn, semver)?;

Ok((conn, version, krate))
}

fn extract_crate_name(req: &dyn Request) -> &str {
&req.params()["crate_id"]
}

fn extract_semver(req: &dyn Request) -> AppResult<&str> {
let semver = &req.params()["version"];
if semver::Version::parse(semver).is_err() {
return Err(cargo_err(&format_args!("invalid semver: {}", semver)));
};
Ok(semver)
}
11 changes: 9 additions & 2 deletions src/controllers/version/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::models::{Crate, VersionDownload};
use crate::schema::*;
use crate::views::EncodableVersionDownload;

use super::version_and_crate;
use super::{extract_crate_name, extract_semver};

/// Handles the `GET /crates/:crate_id/:version/download` route.
/// This returns a URL to the location where the crate is stored.
Expand Down Expand Up @@ -69,7 +69,14 @@ fn increment_download_counts(

/// Handles the `GET /crates/:crate_id/:version/downloads` route.
pub fn downloads(req: &mut dyn Request) -> AppResult<Response> {
let (conn, version, _) = version_and_crate(req)?;
let crate_name = extract_crate_name(req);
let semver = extract_semver(req)?;

let conn = req.db_read_only()?;
let version = Crate::by_name(crate_name)
.first::<Crate>(&*conn)?
.find_version(&conn, semver)?;

let cutoff_end_date = req
.query()
.get("before_date")
Expand Down
19 changes: 14 additions & 5 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,25 @@ pub fn diesel_pool(
}

pub trait RequestTransaction {
/// Return the lazily initialized postgres connection for this request.
///
/// The connection will live for the lifetime of the request.
// FIXME: This description does not match the implementation below.
/// Obtain a read/write database connection from the primary pool
fn db_conn(&self) -> Result<DieselPooledConn<'_>, r2d2::PoolError>;

/// Obtain a readonly database connection from the replica pool
///
/// If there is no replica pool, the primary pool is used instead.
fn db_read_only(&self) -> Result<DieselPooledConn<'_>, r2d2::PoolError>;
}

impl<T: Request + ?Sized> RequestTransaction for T {
fn db_conn(&self) -> Result<DieselPooledConn<'_>, r2d2::PoolError> {
self.app().diesel_database.get()
self.app().primary_database.get().map_err(Into::into)
}

fn db_read_only(&self) -> Result<DieselPooledConn<'_>, r2d2::PoolError> {
match &self.app().read_only_replica_database {
Some(pool) => pool.get().map_err(Into::into),
None => self.app().primary_database.get().map_err(Into::into),
}
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/middleware/log_connection_pool_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ impl Middleware for LogConnectionPoolStatus {
if last_log_time.elapsed() >= Duration::from_secs(60) {
*last_log_time = Instant::now();
println!(
"connection_pool_status=\"{:?}\" in_flight_requests={}",
self.app.diesel_database.state(),
"primary_pool_status=\"{:?}\" read_only_pool_status=\"{:?}\" in_flight_requests={}",
self.app.primary_database.state(),
self.app
.read_only_replica_database
.as_ref()
.map(|pool| pool.state()),
in_flight_requests
);
}
Expand Down
12 changes: 12 additions & 0 deletions src/models/krate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,18 @@ impl Crate {
crates::table.select(ALL_COLUMNS)
}

pub fn find_version(&self, conn: &PgConnection, version: &str) -> AppResult<Version> {
self.all_versions()
.filter(versions::num.eq(version))
.first(conn)
.map_err(|_| {
cargo_err(&format_args!(
"crate `{}` does not have a version `{}`",
self.name, version
))
})
}

pub fn valid_name(name: &str) -> bool {
let under_max_length = name.chars().take(MAX_NAME_LENGTH + 1).count() <= MAX_NAME_LENGTH;
Crate::valid_ident(name) && under_max_length
Expand Down
7 changes: 4 additions & 3 deletions src/tests/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ fn simple_config() -> Config {
gh_client_id: dotenv::var("GH_CLIENT_ID").unwrap_or_default(),
gh_client_secret: dotenv::var("GH_CLIENT_SECRET").unwrap_or_default(),
db_url: env("TEST_DATABASE_URL"),
replica_db_url: None,
env: Env::Test,
max_upload_size: 3000,
max_unpack_size: 2000,
Expand All @@ -159,7 +160,7 @@ fn build_app(
};

let app = App::new(&config, client);
t!(t!(app.diesel_database.get()).begin_test_transaction());
t!(t!(app.primary_database.get()).begin_test_transaction());
let app = Arc::new(app);
let handler = cargo_registry::build_handler(Arc::clone(&app));
(app, handler)
Expand Down Expand Up @@ -278,8 +279,8 @@ fn multiple_live_references_to_the_same_connection_can_be_checked_out() {
use std::ptr;

let (app, _) = app();
let conn1 = app.diesel_database.get().unwrap();
let conn2 = app.diesel_database.get().unwrap();
let conn1 = app.primary_database.get().unwrap();
let conn2 = app.primary_database.get().unwrap();
let conn1_ref: &PgConnection = &conn1;
let conn2_ref: &PgConnection = &conn2;

Expand Down
6 changes: 3 additions & 3 deletions src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Drop for TestAppInner {

// Manually verify that all jobs have completed successfully
// This will catch any tests that enqueued a job but forgot to initialize the runner
let conn = self.app.diesel_database.get().unwrap();
let conn = self.app.primary_database.get().unwrap();
let job_count: i64 = background_jobs.count().get_result(&*conn).unwrap();
assert_eq!(
0, job_count,
Expand Down Expand Up @@ -122,7 +122,7 @@ impl TestApp {
/// connection before making any API calls. Once the closure returns, the connection is
/// dropped, ensuring it is returned to the pool and available for any future API calls.
pub fn db<T, F: FnOnce(&PgConnection) -> T>(&self, f: F) -> T {
let conn = self.0.app.diesel_database.get().unwrap();
let conn = self.0.app.primary_database.get().unwrap();
f(&conn)
}

Expand Down Expand Up @@ -220,7 +220,7 @@ impl TestAppBuilder {
let (app, middle) = crate::build_app(self.config, self.proxy);

let runner = if self.build_job_runner {
let connection_pool = app.diesel_database.clone();
let connection_pool = app.primary_database.clone();
let repository_config = RepositoryConfig {
index_location: Url::from_file_path(&git::bare()).unwrap(),
credentials: Credentials::Missing,
Expand Down