Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown #66

Open
wants to merge 21 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8a43508
Added initial code for graceful shutdown (Issue #45)
jo-asplin-met-no Feb 6, 2025
ffabee8
Typo (a bit pedantic, but still)
jo-asplin-met-no Feb 6, 2025
cb0d6cc
Refactored shutdown_signal to common package
jo-asplin-met-no Feb 10, 2025
df49f79
Moved shutdown_signal from common to util
jo-asplin-met-no Feb 11, 2025
8b01706
Removed superfluous signals
jo-asplin-met-no Feb 11, 2025
bb46d2f
Assumed Unix + added comments
jo-asplin-met-no Feb 11, 2025
3eff24a
Gracefully shut down kvkafka reader
jo-asplin-met-no Feb 12, 2025
d8ca2b6
Improved signal catching and task joining
jo-asplin-met-no Feb 12, 2025
dfa7bfb
Detected shutdown signal during end-to-end testing
jo-asplin-met-no Feb 12, 2025
ad49595
Don't generally await/join signal catcher task
jo-asplin-met-no Feb 12, 2025
6d050b4
Another go at cancelling kvkafka reader from caught signal
jo-asplin-met-no Feb 12, 2025
239b7c1
Remove `zero_to_none` function
Lun4m Feb 12, 2025
dc1a4c8
Create timeseries if label does not exist instead of returning Err
Lun4m Feb 12, 2025
7b46af6
Merge branch 'trunk' into graceful_shutdown
jo-asplin-met-no Feb 12, 2025
a63842e
Fixed lint issue
jo-asplin-met-no Feb 12, 2025
d4ef671
Fixed comment
jo-asplin-met-no Feb 13, 2025
eb9bd05
Used block to have attribute apply to multiple statements
jo-asplin-met-no Feb 13, 2025
ce4fee5
Fixed typos
jo-asplin-met-no Feb 13, 2025
a42a035
Yet another go at cancelling kafka reader, but essentially back to or…
jo-asplin-met-no Feb 13, 2025
decdd6b
Added debug printout
jo-asplin-met-no Feb 13, 2025
0c21b6c
Yet another go at cancelling kafka reader
jo-asplin-met-no Feb 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ members = [
"api",
"ingestion",
"integration_tests",
"rove_connector", "util",
"rove_connector",
"util",
]
resolver = "2"

Expand Down Expand Up @@ -32,4 +33,5 @@ serde = { version = "1.0.217", features = ["derive"] }
thiserror = "1.0.69"
tokio = { version = "1.43.0", features = ["rt-multi-thread", "macros"] }
tokio-postgres = { version = "0.7.12", features = ["with-chrono-0_4"] }
tokio-util = { version = "0.7.13", features = ["rt"] }
toml = "0.8.19"
2 changes: 2 additions & 0 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ axum.workspace = true
bb8.workspace = true
bb8-postgres.workspace = true
chrono.workspace = true
util = { path = "../util" }
postgres-types.workspace = true
serde.workspace = true
tokio.workspace = true
tokio-postgres.workspace = true
tokio-util.workspace = true
8 changes: 6 additions & 2 deletions api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use timeseries::{
};
use timeslice::{get_timeslice, Timeslice};
use tokio_postgres::NoTls;
use tokio_util::sync::CancellationToken;

pub mod latest;
pub mod timeseries;
Expand Down Expand Up @@ -119,7 +120,7 @@ async fn latest_handler(
Ok(Json(LatestResp { data }))
}

pub async fn run(pool: PgConnectionPool) {
pub async fn run(pool: PgConnectionPool, cancel_token: CancellationToken) {
// build our application with routes
let app = Router::new()
.route(
Expand All @@ -135,5 +136,8 @@ pub async fn run(pool: PgConnectionPool) {

// run it with hyper on localhost:3000
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
axum::serve(listener, app)
.with_graceful_shutdown(::util::await_cancellation(cancel_token))
.await
.unwrap();
}
7 changes: 6 additions & 1 deletion api/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use bb8_postgres::PostgresConnectionManager;
use tokio_postgres::NoTls;
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() {
Expand All @@ -19,5 +20,9 @@ async fn main() {
let manager = PostgresConnectionManager::new_from_stringlike(connect_string, NoTls).unwrap();
let pool = bb8::Pool::builder().build(manager).await.unwrap();

lard_api::run(pool).await;
// set up cancellation token and signal catcher for graceful shutdown
let cancel_token = CancellationToken::new();
tokio::spawn(util::signal_catcher(cancel_token.clone()));

tokio::spawn(lard_api::run(pool, cancel_token.clone()));
}
2 changes: 2 additions & 0 deletions ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ bb8-postgres.workspace = true
bytes.workspace = true
chrono.workspace = true
chronoutil.workspace = true
util = { path = "../util" }
csv.workspace = true
futures.workspace = true
kafka.workspace = true
Expand All @@ -27,4 +28,5 @@ serde.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-postgres.workspace = true
tokio-util.workspace = true
toml.workspace = true
53 changes: 33 additions & 20 deletions ingestion/src/kvkafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use serde::{Deserialize, Deserializer};
use thiserror::Error;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

use crate::PgConnectionPool;

Expand Down Expand Up @@ -127,11 +128,15 @@ pub struct Msg {
kvdata: Kvdata,
}

pub async fn read_and_insert(pool: PgConnectionPool, group_string: String) {
pub async fn read_and_insert(
pool: PgConnectionPool,
group_string: String,
cancel_token: CancellationToken,
) {
let (tx, mut rx) = mpsc::channel(10);

tokio::spawn(async move {
read_kafka(group_string, tx).await;
read_kafka(group_string, tx, cancel_token).await;
});

let mut client = pool.get().await.expect("couldn't connect to database");
Expand Down Expand Up @@ -212,7 +217,7 @@ pub async fn parse_message(message: &[u8], tx: &mpsc::Sender<Msg>) -> Result<(),
Ok(())
}

async fn read_kafka(group_name: String, tx: mpsc::Sender<Msg>) {
async fn read_kafka(group_name: String, tx: mpsc::Sender<Msg>, cancel_token: CancellationToken) {
// NOTE: reading from the 4 redundant kafka queues, but only reading the checked data (other topics exists)
let mut consumer = Consumer::from_hosts(vec![
"kafka2-a1.met.no:9092".to_owned(),
Expand All @@ -229,27 +234,35 @@ async fn read_kafka(group_name: String, tx: mpsc::Sender<Msg>) {

// Consume the kafka queue infinitely
loop {
// https://docs.rs/kafka/latest/src/kafka/consumer/mod.rs.html#155
// poll asks for next available chunk of data as a MessageSet
match consumer.poll() {
Ok(sets) => {
for msgset in sets.iter() {
for msg in msgset.messages() {
if let Err(e) = parse_message(msg.value, &tx).await {
eprintln!("{}", e);
tokio::select! {
_ = cancel_token.cancelled() => {
eprintln!("cancellation token triggered");
break;
}
// https://docs.rs/kafka/latest/src/kafka/consumer/mod.rs.html#155
// poll asks for next available chunk of data as a MessageSet
poll_result = async { consumer.poll() } => {
match poll_result {
Ok(sets) => {
for msgset in sets.iter() {
for msg in msgset.messages() {
if let Err(e) = parse_message(msg.value, &tx).await {
eprintln!("{}", e);
}
}
if let Err(e) = consumer.consume_messageset(msgset) {
eprintln!("{}", e);
}
Comment on lines +237 to +255
Copy link
Collaborator

@Lun4m Lun4m Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it looks good! Just wanted to mention that another option with cancellation tokens (which I've never used before by the way) is to skip the select altogether.
I guess it's a matter of preference and it doesn't do exactly what the select code does (in particular, it doesn't play well with the Err branch of the match statement), but I find it easier to reason about.

Suggested change
tokio::select! {
_ = cancel_token.cancelled() => {
eprintln!("cancellation token triggered");
break;
}
// https://docs.rs/kafka/latest/src/kafka/consumer/mod.rs.html#155
// poll asks for next available chunk of data as a MessageSet
poll_result = async { consumer.poll() } => {
match poll_result {
Ok(sets) => {
for msgset in sets.iter() {
for msg in msgset.messages() {
if let Err(e) = parse_message(msg.value, &tx).await {
eprintln!("{}", e);
}
}
if let Err(e) = consumer.consume_messageset(msgset) {
eprintln!("{}", e);
}
if cancel_token.is_cancelled() {
eprintln!("cancellation token triggered");
break;
}
// https://docs.rs/kafka/latest/src/kafka/consumer/mod.rs.html#155
// poll asks for next available chunk of data as a MessageSet
match consumer.poll() {
// process message sets
...
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I forgot that our kafka library is sync. In that case I think this is equivalent to the select, and probably the way to go. It has another problem though: In the pathological case where the kafka queue is empty when we cancel, This will hang indefinitely (or at least the 90 seconds until systemd sends SIGKILL...). It's also a problem that we're doing blocking IO on a non-blocking task.

We can find a way around this, but I'm starting to think it would make more sense to switch to rdkafka which supports async.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why indefinitely? If the queue is empty I'd expect consumer.poll() to return an error (?), after which we wait 5 seconds and check if the token was cancelled before polling again.
But maybe it's not so simple and I agree we should probably switch to rdkafka.

It's also a problem that we're doing blocking IO on a non-blocking task.

This is the bug you mentioned in the other comment? That we are simply using spawn instead of spawn_blocking?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why indefinitely? If the queue is empty I'd expect consumer.poll() to return an error (?), after which we wait 5 seconds and check if the token was cancelled before polling again.

I looked at the source and we're actually both wrong, it does block, but not indefinitely. The internal kafka client let's you set a fetch_max_wait_time, the default isn't listed in the docs, but it's in the source as 100ms. There's no error returned from fetch_messages so I assume it just returns an empty vector if it times out.

100ms is short on a human scale so it's not a problem in terms of hanging indefinitely. It is an issue in terms of tokio though, because the guideline from tokio devs is that you shouldn't go more than 10-100us between await points, and this is 1000x more than that.

This is the bug you mentioned in the other comment?

Nope. What I was talking about is that at the moment we commit offsets without waiting for the associated DB inserts to complete. Related to that the DB inserts being on a separate task means they aren't covered by the graceful shutdown, but a solution to the first problem with probably also solve the second.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. What I was talking about is that at the moment we commit offsets without waiting for the associated DB inserts to complete. Related to that the DB inserts being on a separate task means they aren't covered by the graceful shutdown, but a solution to the first problem with probably also solve the second.

I think it would be covered indirectly by the sender being dropped? But oof. good catch, it probably makes more sense to parallelize over message sets instead of single messages.

}
consumer
.commit_consumed()
.expect("could not commit offset in consumer"); // ensure we keep offset
}
if let Err(e) = consumer.consume_messageset(msgset) {
eprintln!("{}", e);
Err(e) => {
eprintln!("{}\nRetrying in 5 seconds...", Error::Kafka(e));
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}
consumer
.commit_consumed()
.expect("could not commit offset in consumer"); // ensure we keep offset
}
Err(e) => {
eprintln!("{}\nRetrying in 5 seconds...", Error::Kafka(e));
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion ingestion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::{
};
use thiserror::Error;
use tokio_postgres::NoTls;
use tokio_util::sync::CancellationToken;

#[cfg(feature = "kafka")]
pub mod kvkafka;
Expand Down Expand Up @@ -430,6 +431,7 @@ pub async fn run(
permit_tables: Arc<RwLock<(ParamPermitTable, StationPermitTable)>>,
rove_connector: rove_connector::Connector,
qc_pipelines: HashMap<(i32, RelativeDuration), rove::Pipeline>,
cancel_token: CancellationToken,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// set up param conversion map
let param_conversions = get_conversions(param_conversion_path)?;
Expand All @@ -452,7 +454,9 @@ pub async fn run(

// run our app with hyper, listening globally on port 3001
let listener = tokio::net::TcpListener::bind("0.0.0.0:3001").await?;
axum::serve(listener, app).await?;
axum::serve(listener, app)
.with_graceful_shutdown(util::await_cancellation(cancel_token))
.await?;

Ok(())
}
45 changes: 31 additions & 14 deletions ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use lard_ingestion::qc_pipelines::load_pipelines;
use rove_connector::Connector;
use std::sync::{Arc, RwLock};
use tokio_postgres::NoTls;
use tokio_util::sync::CancellationToken;

use lard_ingestion::{getenv, permissions};

Expand Down Expand Up @@ -47,7 +48,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

let qc_pipelines = load_pipelines("qc_pipelines/fresh")?;

println!("Spawing task to fetch permissions from StInfoSys...");
println!("Spawning task to fetch permissions from StInfoSys...");
// background task to refresh permit tables every 30 mins
tokio::task::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30 * 60));
Expand All @@ -66,25 +67,41 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}
});

// Spawn kvkafka reader
#[cfg(feature = "kafka_prod")]
{
let kafka_group = args[1].to_string();
println!("Spawing kvkafka reader...");
tokio::spawn(lard_ingestion::kvkafka::read_and_insert(
db_pool.clone(),
kafka_group,
));
}
// set up cancellation token and signal catcher for graceful shutdown
let cancel_token = CancellationToken::new();
tokio::spawn(util::signal_catcher(cancel_token.clone()));

// Set up and run our server + database
println!("Ingestion server started!");
lard_ingestion::run(
let ingestor = tokio::spawn(lard_ingestion::run(
db_pool,
PARAMCONV,
permit_tables,
rove_connector,
qc_pipelines,
)
.await
cancel_token,
));

#[cfg(feature = "kafka_prod")]
// Spawn kvkafka reader
{
let kafka_group = args[1].to_string();
println!("Spawning kvkafka reader...");
let kvkafka_reader = tokio::spawn(lard_ingestion::kvkafka::read_and_insert(
db_pool.clone(),
kafka_group,
cancel_token.clone(),
));

let (ingestor_res, kvkafka_reader_res) = tokio::join!(ingestor, kvkafka_reader);
(_, _) = (ingestor_res, kvkafka_reader_res); // ignore for now
}

#[cfg(not(feature = "kafka_prod"))]
{
let ingestor_res = tokio::join!(ingestor);
_ = ingestor_res; // ignore for now
}

Ok(())
}
4 changes: 3 additions & 1 deletion integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ lard_api = { path = "../api" }
lard_ingestion = { path = "../ingestion", features = ["integration_tests", "kafka"] }
chrono.workspace = true
chronoutil.workspace = true
util = { path = "../util" }
tokio.workspace = true
tokio-postgres.workspace = true
tokio-util.workspace = true
bb8.workspace = true
bb8-postgres.workspace = true
bb8-postgres.workspace = true
rove.workspace = true
rove_connector = { path = "../rove_connector" }
serde.workspace = true
Expand Down
15 changes: 12 additions & 3 deletions integration_tests/tests/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use futures::{Future, FutureExt};
use rove::data_switch::{DataConnector, SpaceSpec, TimeSpec, Timestamp};
use tokio::sync::mpsc;
use tokio_postgres::NoTls;
use tokio_util::sync::CancellationToken;

use lard_api::{timeseries::Timeseries, LatestResp, TimeseriesResp, TimesliceResp};
use lard_ingestion::{
Expand Down Expand Up @@ -251,15 +252,21 @@ async fn e2e_test_wrapper<T: Future<Output = ()>>(test: T) {
};
let qc_pipelines = load_pipelines("mock_qc_pipelines/fresh").expect("failed to load pipelines");

// set up cancellation token and signal catcher to detect premature shutdown
let cancel_token = CancellationToken::new();
let sig_catcher = tokio::spawn(util::signal_catcher(cancel_token.clone()));

let cancel_token2 = cancel_token.clone();
let api_server = tokio::spawn(async move {
tokio::select! {
output = lard_api::run(api_pool) => output,
output = lard_api::run(api_pool, cancel_token2) => output,
_ = init_shutdown_rx1.recv() => {
api_shutdown_tx.send(()).unwrap();
()
api_shutdown_tx.send(()).unwrap()
},
}
});

let cancel_token2 = cancel_token.clone();
let ingestor = tokio::spawn(async move {
tokio::select! {
output = lard_ingestion::run(
Expand All @@ -268,6 +275,7 @@ async fn e2e_test_wrapper<T: Future<Output = ()>>(test: T) {
mock_permit_tables(),
rove_connector,
qc_pipelines,
cancel_token2,
) => output,
_ = init_shutdown_rx2.recv() => {
ingestor_shutdown_tx.send(()).unwrap();
Expand All @@ -279,6 +287,7 @@ async fn e2e_test_wrapper<T: Future<Output = ()>>(test: T) {
tokio::select! {
_ = api_server => panic!("API server task terminated first"),
_ = ingestor => panic!("Ingestor server task terminated first"),
_ = sig_catcher => panic!("Signal catcher caught a shutdown signal"),
// Clean up database even if test panics, to avoid test poisoning
test_result = AssertUnwindSafe(test).catch_unwind() => {
// For debugging a specific test, it might be useful to skip the cleanup process
Expand Down
Loading