-
Notifications
You must be signed in to change notification settings - Fork 0
feat(ISV-6425): extend base consumer with retry mechanism #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
233f208
37c52ac
b82afb2
deabebb
87b6d0f
58f7e89
dcf54e5
0a9d552
9fd602c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,12 +6,43 @@ additional functionalities to these selected parts. | |
|
|
||
| ## Features | ||
|
|
||
| As this library is currently under development, this library should not be | ||
| used. | ||
|
|
||
| The aim is to provide a fault-tolerant platform for parallel message | ||
| processing. | ||
|
|
||
| ### Parallel processing | ||
|
|
||
| Each consumer requires an executor pool, which will be used for message | ||
| processing. Each consumer can consume from multiple topics and processes | ||
| the messages from these topics by a single callable. The callable must be | ||
| specified by the user of this library. | ||
|
|
||
| The library also ensures exactly-once processing when used correctly. | ||
| To ensure this, the tasks should take short enough time that all | ||
| of them finish before the cluster forces rebalancing. The library tries to | ||
| finish everything before the rebalance while stopping additional non-started | ||
| tasks. The default timeout for each task to finish is 30 seconds, but can be | ||
|
||
| changed. Kafka cluster behavior change may also be needed with longer tasks. | ||
| This behavior only appears during rebalancing and graceful stopping of the | ||
| consumer. | ||
|
|
||
| ### Fault-tolerance | ||
|
|
||
| Each consumer accepts configuration with retry topics. A retry topic is | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. kinda new to Kafka-- is this retry topic pattern a widely-used thing for Kafka as a platform, or is it our own thing? first time seeing something like this in the wild so i'm just trying to get more info. the closest established generic pattern i know of is a dead letter channel, but that isn't really meant to indicate when/where to retry, just a place to put dropped messages. otherwise the best i can find for this is this medium article & some Java/Spring-related stuff on baeldung
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is basically a lightweight reimplementation of the functionality present in Spring Kafka library, see here. This solution was suggested by Tim Waugh |
||
| a Kafka topic used for asynchronous retrying of message processing. If the | ||
| specified target callable fails, the consumer will commit the original message | ||
| and resends the same message to a retry topic with special headers. The headers | ||
| include information about the next timestamp at which the message should be | ||
| processed again (to give some time to the error to disappear if the processing | ||
| depends on some outside infrastructure). | ||
|
|
||
| The retry topic is polled alongside the original topic. If a message contains | ||
| the special timestamp header, its Kafka partition of origin will be paused and | ||
| the message will be stored locally. The processing will resume only after the | ||
| specified timestamp passes. The message will not be processed before the | ||
| timestamp, it can only gather delay (depending on the occupation of the | ||
| worker pool). Once the message is sent to the pool for re-processing, the | ||
| consumption of the blocked partition is resumed. | ||
|
|
||
| ## Local testing | ||
|
|
||
| This project uses [`uv`][1]. To set up the project locally, use | ||
|
|
@@ -33,7 +64,7 @@ For integration tests you also need [`podman`][3] or [`docker`][4] with | |
| `compose`. Run: | ||
|
|
||
| ```bash | ||
| docker compose up -d | ||
| podman compose up -d | ||
| ``` | ||
|
|
||
| Wait a while and then run: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,16 @@ | ||
| """Retriable Kafka client module""" | ||
|
|
||
| from .consumer import BaseConsumer | ||
| from .types import ConsumerConfig | ||
| from .orchestrate import consume_topics | ||
| from .config import ConsumerConfig, ProducerConfig, ConsumeTopicConfig | ||
| from .orchestrate import consume_topics, ConsumerThread | ||
| from .producer import BaseProducer | ||
|
|
||
| __all__ = ("BaseConsumer", "ConsumerConfig", "consume_topics") | ||
| __all__ = ( | ||
| "BaseConsumer", | ||
| "BaseProducer", | ||
| "consume_topics", | ||
| "ConsumerConfig", | ||
| "ConsumerThread", | ||
| "ProducerConfig", | ||
| "ConsumeTopicConfig", | ||
| ) |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -4,21 +4,21 @@ | |||||
| from typing import Callable, Any | ||||||
|
|
||||||
|
|
||||||
| @dataclass | ||||||
| @dataclass(kw_only=True) | ||||||
| class _CommonConfig: | ||||||
| """ | ||||||
| Topic configuration common for consumers and producers. | ||||||
| Attributes: | ||||||
| kafka_hosts: list of Kafka node URLs to connect to | ||||||
| topics: list of topic names to connect to | ||||||
| username: Kafka username | ||||||
| password: Kafka password | ||||||
| additional_settings: additional settings to pass directly to Kafka | ||||||
| """ | ||||||
|
|
||||||
| kafka_hosts: list[str] | ||||||
| topics: list[str] | ||||||
| username: str | ||||||
| password: str | ||||||
| additional_settings: dict[str, Any] = field(default_factory=dict) | ||||||
|
|
||||||
|
|
||||||
| @dataclass | ||||||
|
|
@@ -30,28 +30,57 @@ class ProducerConfig(_CommonConfig): | |||||
| topics: list of topic names to publish to | ||||||
| username: producer username | ||||||
| password: producer password | ||||||
| additional_settings: additional settings to pass directly to Kafka consumer | ||||||
| retries: number of attempts to publish the message | ||||||
| fallback_factor: how many times longer should each backoff take | ||||||
| fallback_base: what is the starting backoff in seconds | ||||||
| """ | ||||||
|
|
||||||
| topics: list[str] | ||||||
| retries: int = field(default=3) | ||||||
| fallback_factor: float = field(default=2.0) | ||||||
| fallback_base: float = field(default=5.0) | ||||||
|
|
||||||
|
|
||||||
| @dataclass | ||||||
| class ConsumeTopicConfig: | ||||||
| """ | ||||||
| Configuration for retry mechanism of a consumer. | ||||||
| Must be used from within ConsumerConfig. | ||||||
| Attributes: | ||||||
| base_topic: Topic that this consumer subscribes to | ||||||
| retry_topic: Topic used for resending failed messages | ||||||
| retries: maximal number of attempts to re-process the | ||||||
| message originated from base_topic | ||||||
| fallback_delay: Number of seconds to wait before a message | ||||||
| should be re-processed. This is a non-blocking event. | ||||||
| """ | ||||||
|
|
||||||
| base_topic: str | ||||||
| retry_topic: str | None = field(default=None) | ||||||
| retries: int = field(default=5) | ||||||
| fallback_delay: float = field(default=15.0) | ||||||
|
|
||||||
|
|
||||||
| @dataclass | ||||||
| class ConsumerConfig(_CommonConfig): | ||||||
| """ | ||||||
| Topic configuration for each consumer. | ||||||
| Attributes: | ||||||
| kafka_hosts: list of Kafka node URLs to connect to | ||||||
| topics: list of topic names to connect to | ||||||
| topics: list of configuration for topics and their | ||||||
| retry policies | ||||||
| cancel_future_wait_time: Maximal time to wait fot a task | ||||||
|
||||||
| cancel_future_wait_time: Maximal time to wait fot a task | |
| cancel_future_wait_time: Maximal time to wait for a task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the docs here, this is a lot of code to review in one go & i definitely needed the primer