Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown #66

Open
wants to merge 21 commits into
base: trunk
Choose a base branch
from
Open

Graceful shutdown #66

wants to merge 21 commits into from

Conversation

jo-asplin-met-no
Copy link
Collaborator

No description provided.

This change adds the initial code for supporting graceful
shutdown of the services. It is still incomplete. Eventually
the shutdown_signal() function will be moved to a separate file
so it can be used for all services (currently api and ingestor).
Moreover, we need to understand exactly what shutdown means for
web services in this context so that catching a signal actually
results in desirable shutdown actions.
This change demonstrates how the shutdown_signal function
can be used both for the ingestion and api services.
Cargo.toml Outdated Show resolved Hide resolved
common/src/lib.rs Outdated Show resolved Hide resolved
api/src/lib.rs Outdated
@@ -135,5 +135,8 @@ pub async fn run(pool: PgConnectionPool) {

// run it with hyper on localhost:3000
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
axum::serve(listener, app)
.with_graceful_shutdown(::util::shutdown_signal())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The leading scoping operator before util looks a little odd here, is it correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it compiles. But I'll take a look ...

Copy link
Collaborator Author

@jo-asplin-met-no jo-asplin-met-no Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The leading scoping operator needs to be there for the code to compile. But maybe there's a better idiom to use in this case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is a namespace confict with the util module in this crate. We should maybe consider renaming one of them to avoid confusion. Or perhaps we can just move the contents of the util module into the util crate instead.

@Lun4m Lun4m linked an issue Feb 11, 2025 that may be closed by this pull request
util/src/lib.rs Outdated Show resolved Hide resolved
@intarga
Copy link
Member

intarga commented Feb 11, 2025

It's just occurred to me that we should probably handle the kafka task spawned at ingestion/src/main.rs:69

The stinfo task spawned just before it should be fine to leave unhandled though.

@jo-asplin-met-no
Copy link
Collaborator Author

jo-asplin-met-no commented Feb 11, 2025

It's just occurred to me that we should probably handle the kafka task spawned at ingestion/src/main.rs:69

The stinfo task spawned just before it should be fine to leave unhandled though.

I'll have a go at implementing graceful shutdown for that task then (in this PR if that's ok; @intarga @Lun4m: shout out if you think it should be in a separate issue/branch/PR). I think I can apply the more low-level idiom I tested last week.

@jo-asplin-met-no jo-asplin-met-no self-assigned this Feb 11, 2025
@intarga
Copy link
Member

intarga commented Feb 11, 2025

in this PR if that's ok

I agree it belongs in this PR, good luck with it!

@jo-asplin-met-no
Copy link
Collaborator Author

@intarga @Lun4m: I'm not at all sure that my solution for gracefully shutting down the kvkafka reader is the best one, so looking forward to reviews!

ingestion/src/main.rs Outdated Show resolved Hide resolved
Comment on lines 82 to 110
lard_ingestion::run(
match lard_ingestion::run(
db_pool,
PARAMCONV,
permit_tables,
rove_connector,
qc_pipelines,
)
.await
{
Ok(_result) => {}
Err(_error) => {}
};

// At this point the ingestion server has finished (possibly due to a shutdown signal).
// Ask other tasks to clean up and finish too:
cancel_token.cancel();
tracker.wait().await;

Ok(())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're shutting down Axum and the Kafka consumer sequentially, when you can do them in parallel.

Instead of receiving the shutdown signal inside the function you give to Axum's with_graceful_shutdown, you should use the cancellation token in that function receive the shutdown signal on its own task, where you use it to trigger the cancellation token. Then here, you would be able to just join the run future and the Kafka task's joinHandle

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the "signal catcher" task be joined alongside the others? Or wouldn't that be necessary if it just returns after it has caught the signal and triggered the cancellation token?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think?

Copy link
Collaborator Author

@jo-asplin-met-no jo-asplin-met-no Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, if the other tasks (ingestor and kvkafka reader) all terminated for some other reason than a signal being caught, then the signal catcher will never terminate, and it shouldn't be included it in the join since the join itself would then never terminate! And vice versa: if the other tasks did terminate as a result of the cancellation token being triggered by the signal catcher, then we know that the latter has done it's job and we don't really care about it anymore, so no need to join it in that case either. So no, I don't think the signal catcher should be joined. Am I right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was wrong. I forgot that the signal catcher needs to be collected like any other task, so it must be included in the join. The body of the task (the one that listens for signals) doesn't start until an await is called on the return value from spawn or that return value is passed to join (which does the equivalent of await behind the scenes I guess).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your first answer was right

I forgot that the signal catcher needs to be collected like any other task

It doesn't need to be collected if we don't care about it's state, and if the other tasks have terminated, we no longer have a use for it.

The body of the task (the one that listens for signals) doesn't start until an await is called on the return value from spawn or that return value is passed to join

This is true of futures, not tasks

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, of course, I was mixing up the async keyword (which creates a future) and the spawn function!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(thanks for putting up with all my questions btw - but rest assured that this type of feedback is really helpful for me!)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries! My framing of "What do you think?" was inviting a more interactive discussion 🙂

Comment on lines 253 to 281
tokio::select! {
_ = cancel_token.cancelled() => {
eprintln!("cancel_token.cancelled()");
break;
}
_ = async {
// https://docs.rs/kafka/latest/src/kafka/consumer/mod.rs.html#155
// poll asks for next available chunk of data as a MessageSet
match consumer.poll() {
Ok(sets) => {
for msgset in sets.iter() {
for msg in msgset.messages() {
if let Err(e) = parse_message(msg.value, &tx).await {
eprintln!("{}", e);
}
}
if let Err(e) = consumer.consume_messageset(msgset) {
eprintln!("{}", e);
}
}
consumer
.commit_consumed()
.expect("could not commit offset in consumer"); // ensure we keep offset
}
if let Err(e) = consumer.consume_messageset(msgset) {
eprintln!("{}", e);
Err(e) => {
eprintln!("{}\nRetrying in 5 seconds...", Error::Kafka(e));
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This select does not do what you think it does. Instead of telling you why I think you will learn more if you figure it out yourself, so I will ask a leading question instead: Can this loop break while only halfway through processing a message?

Feel free to ask follow on questions if you're stuck

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: I still haven't dealt with this. Will take a look tonight. Got to rush home now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed another attempt now.

Copy link
Collaborator Author

@jo-asplin-met-no jo-asplin-met-no Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the above loop cannot break in the middle of processing a message, and so there's no point in checking the cancellation token in that loop (thus the second attempt at a solution which reverted the loop to its original form). I guess the essential point is that there are two types of signals that can happen: 1) SIGKILL and 2) the rest. SIGKILL can happen absolutely anywhere and at all times, but it can also not be caught, so there's no point in trying to handle it (it's like pulling the power chord of a traditional (non-battery) computer). All rest of the signals can be caught, but that can only happen in the task where you have explicitly registered a handler for it, i.e. other tasks (like the one running the above loop) can safely assume that the signal doesn't pop up there as well. That's my current view of the world.

Copy link
Member

@intarga intarga Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the above loop cannot break in the middle of processing a message

Wrong. Maybe it will be easier to spot why in a golang equivalent:

msgset_processed := make(chan bool, 1)

go func() {
    for {
        let msgset := consumer.poll()
    
        handle_msgset(msgset) // parsing + database transactions + marking offset
    
        msgset_processed <- true
    }
}()

for {
    select {
    case <-cancellation_channel:
        break
    case <-msgset_processed:
        continue
    }
}

Can this break partway through processing a message?

In chat my suggestion to you was:

loop {
    select! {
        _ = shutdown_channel.recv() => { break; }
        poll_result = consumer.poll() => {
            // rest of the body here ...
        }
    }
}

Which is equivalent to this golang:

msgsets := make(chan Msgset, 1)

go func() {
    for {
        let msgset := consumer.poll()
    
        msgsets <- msgset
    }
}()

for {
    select {
    case <-cancellation_channel:
        break
    case msgset := <-msgsets:
        handle_msgset(msgset)
    }
}

Do you understand why this is different?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yet another attempt now. This time more aligned with @intarga 's suggestion. I did need to wrap consumer.poll() in an async block. Will that work?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has resolved the race condition, yes. The important thing here though is that you understand why there was a race condition here, and feel confident you can spot race conditions in the future.

Every project you work on at met is highly concurrent. Race conditions are a constant danger in such an environment, and they often come intertwined with complicated code they're hard to untangle from. If you can't see a race condition in a minimal example, you will be cursed to constantly write spooky bugs in your concurrent code. I would not be happy leaving it there if I were you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's another concurrency bug (not introduced by you) in the kafka code that I've noticed in this review, but I don't mind if you want to leave that to be fixed in a later PR, as this one has already been quite a lot.

Copy link
Collaborator Author

@jo-asplin-met-no jo-asplin-met-no Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has resolved the race condition, yes. The important thing here though is that you understand why there was a race condition here, and feel confident you can spot race conditions in the future.

So to summarize a bit (so that I can actually learn from this exercise and be able to spot possible race conditions in the future (which I of course really do appreciate the importance of)), the difference between my original proposal and the one I ended up with seems to be subtle but important.

The original/erroneous proposal was essentially this:

    // Consume the kafka queue infinitely
    loop {
        tokio::select! {
            _ = cancel_token.cancelled() => { break }
            _ = async { match consumer.poll() { /* body */ } => {}
        }
    }

while the final/correct proposal is essentially this:

    // Consume the kafka queue infinitely
    loop {
        tokio::select! {
            _ = cancel_token.cancelled() => { break }
            poll_result = async { consumer.poll() } => { match poll_result { /* body */ }}
        }
    }

And I think I now (finally) see the essence of the problem:

  • The select! statement starts multiple computations concurrently.
  • A computation here is the statement(s) before the '=>'. (I missed this point in my first proposal!)
  • When the first computation completes, three things happen:
    1: all the other computations are dropped immediately.
    2: the statement(s) that follow(s) the '=>' of the completed computation is/are executed.
    3: the select! statement as such completes

So having both the consumer.poll() and the message processing body before the '=>' was really the mistake, since the body could be dropped partway through if the other computation that listened for a cancellation suddenly completed!

The assumption here is of course that the computation inside consumer.poll() is not sensitive to being dropped at any time (at least not at a level that matters to us).

And I now realize that you actually said exactly this 18 hours ago, @intarga :

In my version I only select against the future returned by consumer.poll(). In your version, you create a future for the whole loop body and select against that.

So please forgive me for being a slow learner, and I do appreciate your patience :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! Nicely expressed

jo-asplin-met-no and others added 11 commits February 12, 2025 15:15
This change:
- gets rid of the task tracker (which seemed an overkill)
- uses a cancellation token in a better way
- awaits task termination concurrently in a join
The task that catches signals and triggers a cancellation token
doesn't generally need to be awaited for termination.
One exception is in the end-to-end tests where it can be useful to
report if it was an unexpected signal that caused testing to
terminate prematurely.
Comment on lines +237 to +255
tokio::select! {
_ = cancel_token.cancelled() => {
eprintln!("cancellation token triggered");
break;
}
// https://docs.rs/kafka/latest/src/kafka/consumer/mod.rs.html#155
// poll asks for next available chunk of data as a MessageSet
poll_result = async { consumer.poll() } => {
match poll_result {
Ok(sets) => {
for msgset in sets.iter() {
for msg in msgset.messages() {
if let Err(e) = parse_message(msg.value, &tx).await {
eprintln!("{}", e);
}
}
if let Err(e) = consumer.consume_messageset(msgset) {
eprintln!("{}", e);
}
Copy link
Collaborator

@Lun4m Lun4m Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it looks good! Just wanted to mention that another option with cancellation tokens (which I've never used before by the way) is to skip the select altogether.
I guess it's a matter of preference and it doesn't do exactly what the select code does (in particular, it doesn't play well with the Err branch of the match statement), but I find it easier to reason about.

Suggested change
tokio::select! {
_ = cancel_token.cancelled() => {
eprintln!("cancellation token triggered");
break;
}
// https://docs.rs/kafka/latest/src/kafka/consumer/mod.rs.html#155
// poll asks for next available chunk of data as a MessageSet
poll_result = async { consumer.poll() } => {
match poll_result {
Ok(sets) => {
for msgset in sets.iter() {
for msg in msgset.messages() {
if let Err(e) = parse_message(msg.value, &tx).await {
eprintln!("{}", e);
}
}
if let Err(e) = consumer.consume_messageset(msgset) {
eprintln!("{}", e);
}
if cancel_token.is_cancelled() {
eprintln!("cancellation token triggered");
break;
}
// https://docs.rs/kafka/latest/src/kafka/consumer/mod.rs.html#155
// poll asks for next available chunk of data as a MessageSet
match consumer.poll() {
// process message sets
...
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I forgot that our kafka library is sync. In that case I think this is equivalent to the select, and probably the way to go. It has another problem though: In the pathological case where the kafka queue is empty when we cancel, This will hang indefinitely (or at least the 90 seconds until systemd sends SIGKILL...). It's also a problem that we're doing blocking IO on a non-blocking task.

We can find a way around this, but I'm starting to think it would make more sense to switch to rdkafka which supports async.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why indefinitely? If the queue is empty I'd expect consumer.poll() to return an error (?), after which we wait 5 seconds and check if the token was cancelled before polling again.
But maybe it's not so simple and I agree we should probably switch to rdkafka.

It's also a problem that we're doing blocking IO on a non-blocking task.

This is the bug you mentioned in the other comment? That we are simply using spawn instead of spawn_blocking?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why indefinitely? If the queue is empty I'd expect consumer.poll() to return an error (?), after which we wait 5 seconds and check if the token was cancelled before polling again.

I looked at the source and we're actually both wrong, it does block, but not indefinitely. The internal kafka client let's you set a fetch_max_wait_time, the default isn't listed in the docs, but it's in the source as 100ms. There's no error returned from fetch_messages so I assume it just returns an empty vector if it times out.

100ms is short on a human scale so it's not a problem in terms of hanging indefinitely. It is an issue in terms of tokio though, because the guideline from tokio devs is that you shouldn't go more than 10-100us between await points, and this is 1000x more than that.

This is the bug you mentioned in the other comment?

Nope. What I was talking about is that at the moment we commit offsets without waiting for the associated DB inserts to complete. Related to that the DB inserts being on a separate task means they aren't covered by the graceful shutdown, but a solution to the first problem with probably also solve the second.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Graceful shutdown
3 participants