Skip to content

feat(ISV-6425): extend base consumer with retry mechanism#5

Merged
BorekZnovustvoritel merged 9 commits intomainfrom
ISV-6425
Jan 13, 2026
Merged

feat(ISV-6425): extend base consumer with retry mechanism#5
BorekZnovustvoritel merged 9 commits intomainfrom
ISV-6425

Conversation

@BorekZnovustvoritel
Copy link
Contributor

@BorekZnovustvoritel BorekZnovustvoritel commented Dec 18, 2025

  • Covered with unit and integration tests
  • Extended documentation

Assisted-by: Claude-4.5-opus-high (Cursor), mainly in the test section

How to review

The majority of this PR are tests. The best place to look for verifying of the functionality are integration tests. There are many unit tests and they are not that interesting to be honest. But the integration test functionality does sum up parts that are important in my opinion.

What changed

  • Added retry utils for retry mechanism
  • Changed offset_cache to consumer_tracking, which also tracks tasks (futures) to gracefully stop them
  • The retry mechanism submits failed messages to a retry topic, adds headers about attempt number and the next timestamp when reprocessing should be attempted
  • If a message is received (from the retry topic) and contains a timestamp in the future, the message is temporarily cached and its partition of origin is paused. This partition is resumed when the message can be processed again (its timestamp has passed).
  • There is a whole new integration test framework with quite extensive testing. Its creation was assisted by Claude, but only a small part of it was actually generated (and updated afterwards).

* covered with unit tests

Signed-off-by: Marek Szymutko <mszymutk@redhat.com>
Assisted-by: Claude-4.5-opus-high (Cursor)
* create various integration tests

Signed-off-by: Marek Szymutko <mszymutk@redhat.com>
Assisted-by: Claude-4.5-opus-high (Cursor)
Signed-off-by: Marek Szymutko <mszymutk@redhat.com>
@BorekZnovustvoritel
Copy link
Contributor Author

@mavaras PTAL (I cannot assign you in UI, I guess you have to accept the invitation to the organization) 🙏

Signed-off-by: Marek Szymutko <mszymutk@redhat.com>
Assisted-by: Claude-4.5-Sonnet (Cursor)
@BorekZnovustvoritel BorekZnovustvoritel changed the title feat(ISV-64245): extend base consumer with retry mechanism feat(ISV-6425): extend base consumer with retry mechanism Jan 5, 2026
Signed-off-by: Marek Szymutko <mszymutk@redhat.com>
Assisted-by: Claude-4.5-Sonnet (Cursor)
Signed-off-by: Marek Szymutko <mszymutk@redhat.com>
@BorekZnovustvoritel
Copy link
Contributor Author

@ezopezo @bclindner please also take a look. I am very much willing to schedule some meeting trying to explain this. I definitely do not expect your review too fast, it's a lot of lines, which I am sorry for

Copy link

@mavaras mavaras left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me this is a LGTM, but I suggest that others take an additional look to this 👍

Copy link

@bclindner bclindner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the wait - hoping i didn't miss much, but with all the dry test code you have to write for these kinds of changes i'm sure i missed something or other. i might recommend a third reviewer on this, it's a tough one.

i think this all makes sense to me, but i could use some clarification on a couple things. i also see some questionable bits, particularly in the tests. see comments.


### Fault-tolerance

Each consumer accepts configuration with retry topics. A retry topic is

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kinda new to Kafka-- is this retry topic pattern a widely-used thing for Kafka as a platform, or is it our own thing?

first time seeing something like this in the wild so i'm just trying to get more info. the closest established generic pattern i know of is a dead letter channel, but that isn't really meant to indicate when/where to retry, just a place to put dropped messages. otherwise the best i can find for this is this medium article & some Java/Spring-related stuff on baeldung

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is basically a lightweight reimplementation of the functionality present in Spring Kafka library, see here. This solution was suggested by Tim Waugh

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the docs here, this is a lot of code to review in one go & i definitely needed the primer


[testenv:integration]
commands = pytest -vv -s ./tests/integration/ {posargs}
commands = pytest -vv -s --log-cli-level=DEBUG ./tests/integration/ {posargs}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this debug addition still necessary/worth keeping in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes the integration tests transparent. We don't need this for functionality, but I think it makes sense we want to be able to debug integration tests. I am open to suggestions on this one though


```python
from retriable_kafka_client import ConsumeTopicConfig
from tests.integration.integration_utils import IntegrationTestScaffold, ScaffoldConfig

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: we should change this to .integration_utils to make it consistent with how we're actually using it



@dataclass
class RandomDelay:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...can't we just simplify this to

def get_random_delay(min_delay: float, max_delay: float):
    return random.uniform(min_delay, max_delay)

instead of making a whole thing out of it with a class? we could also just call random.uniform directly since we only use this once.

Copy link
Contributor Author

@BorekZnovustvoritel BorekZnovustvoritel Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, we can. But this way we are able to reduce the number of arguments passed to the scaffolding's start_consumer method. Let me know if you really want me to change this.

ValueError: if simulated failure occurs
"""
if self.delay:
if isinstance(self.delay, RandomDelay):
Copy link

@bclindner bclindner Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could easily simplify this by removing RandomDelay as mentioned elsewhere in the review

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current implementation, we are able to specify a static delay (float) or a random delay. If I were to ditch the RandomDelay class, then I think the best solution to pass the parameters to the MockTarget would be just two floats max_delay and min_delay. If a static delay was wanted in some tests, these would have to be specified as the same value.

I don't know, is that cleaner? I am not opposed, I am just not sure if I see the benefit really.

Copy link

@bclindner bclindner Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh I missed what was going on here before - this is randomizing at runtime rather than instantiation time so we're randomizing individual message delays. That's my bad, this makes sense now.

metadata = admin_client.list_topics(timeout=5)
if metadata.brokers:
return
except KafkaException:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: doesn't KafkaException extend Exception? if so, we can probably just remove this block for it, right?

and if not, we could also just do this to clean it up:

except (Exception, KafkaException):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, it's unnecessary. I guess I left that behind by accident. I will update this.

"""Test that RetryManager accepts valid configurations."""
config = _make_config(topics)
manager = RetryManager(config)
assert manager is not None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't see a code path where constructing a RetryManager would ever return None. is this assert a formality/pylint situation, or is there maybe a better way we can validate this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's nonsense. Just calling the RetryManager constructor and not getting any assertion error is enough here.

# Should have retried twice (2 failures + 1 success = 3 calls)
assert mock_kafka_producer.produce.call_count == 3
# Should have waited approximately: 0.01 + 0.011 = 0.021 seconds
assert elapsed_time >= 0.02 # Allow some timing flexibility

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a strange one - if the testing hardware is slow, this will pass when it shouldn't, rather than failing if the hardware is slow as these testing race conditions usually go. should we bother checking for this given it only semi-reliably tests the existence of a wait?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I missed this when culling nonsense by Claude in the tests.

@pytest.mark.asyncio
async def test_producer_send_success_first_attempt(base_producer: BaseProducer) -> None:
"""Test successful message send on first attempt."""
def _run_send_method(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

smart way to tackle this, i think! i guess you could also just run both at once here to avoid having to do parametrize, but i'm guessing that causes problems with things like patching

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to be honest, this is not my idea, this was created by Claude. But it makes sense to me so I kept it.

I think parametrization makes sense so that we can see which method failed in the test output. If we called both at once, some test would fail and we wouldn't know on first sight if it failed during the sync or async execution.

Signed-off-by: Marek Szymutko <mszymutk@redhat.com>
Copy link

@bclindner bclindner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good, this all seems clear to me now. Thanks!

Signed-off-by: Marek Szymutko <mszymutk@redhat.com>
topics: list of topic names to connect to
topics: list of configuration for topics and their
retry policies
cancel_future_wait_time: Maximal time to wait fot a task
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cancel_future_wait_time: Maximal time to wait fot a task
cancel_future_wait_time: Maximal time to wait for a task

README.md Outdated
Comment on lines +21 to +23
of them finish before the cluster forces rebalancing. The library tries to
finish everything before the rebalance while stopping additional non-started
tasks. The default timeout for each task to finish is 30 seconds, but can be
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for papertrail: We agreed in DMs on moving from eager rebalancing towards cooperative rebalancing. Are those lines still applicable even after changing strategy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it should be re-worded, good point. It does stop tasks and waits for others to finish, it just does it only for affected partitions. Example: If a single consumer is in a group, it consumes all partitions. Then another consumer joins the group, the cluster has to revoke some of the partitions from the first consumer to supply them to the second consumer. I will try to update this text a little.

@ezopezo
Copy link

ezopezo commented Jan 13, 2026

Also adding digest of our personal discussion: I expressed my concerns about reordering of messages during outage when moved to retry topic. To my knowledge messages are loaded to this topic, and then pop-ed from it, but when processing is still blocked, message is added at the beginning of the queue. When processing started again this might cause message reordering, which might cause inconsistencies in workflows. Nevertheless, we agreed that this implementation is sufficient, thorough UAT will be performed, logs will be analyzed afterwards and further improvements might be addressed in further tasks.

Signed-off-by: Marek Szymutko <mszymutk@redhat.com>
@BorekZnovustvoritel BorekZnovustvoritel merged commit 4678e05 into main Jan 13, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants