Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions README.md

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the docs here, this is a lot of code to review in one go & i definitely needed the primer

Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,47 @@ additional functionalities to these selected parts.

## Features

As this library is currently under development, this library should not be
used.

The aim is to provide a fault-tolerant platform for parallel message
processing.

### Parallel processing

Each consumer requires an executor pool, which will be used for message
processing. Each consumer can consume from multiple topics and processes
the messages from these topics by a single callable. The callable must be
specified by the user of this library.

The library also ensures exactly-once processing when used correctly.
To ensure this, the tasks should take short enough time that all
of them finish before the cluster forces rebalancing. The library tries to
finish tasks from revoked partitions before the rebalance while stopping
additional non-started tasks. The default timeout for each task to finish is
30 seconds, but can be changed. Kafka cluster behavior change may also be
needed with longer tasks. This behavior only appears during rebalancing and
graceful stopping of the consumer.

### Fault-tolerance

Each consumer accepts configuration with retry topics. A retry topic is

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kinda new to Kafka-- is this retry topic pattern a widely-used thing for Kafka as a platform, or is it our own thing?

first time seeing something like this in the wild so i'm just trying to get more info. the closest established generic pattern i know of is a dead letter channel, but that isn't really meant to indicate when/where to retry, just a place to put dropped messages. otherwise the best i can find for this is this medium article & some Java/Spring-related stuff on baeldung

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is basically a lightweight reimplementation of the functionality present in Spring Kafka library, see here. This solution was suggested by Tim Waugh

a Kafka topic used for asynchronous retrying of message processing. If the
specified target callable fails, the consumer will commit the original message
and resends the same message to a retry topic with special headers. The headers
include information about the next timestamp at which the message should be
processed again (to give some time to the error to disappear if the processing
depends on some outside infrastructure).

The retry topic is polled alongside the original topic. If a message contains
the special timestamp header, its Kafka partition of origin will be paused and
the message will be stored locally. The processing will resume only after the
specified timestamp passes. The message will not be processed before the
timestamp, it can only gather delay (depending on the occupation of the
worker pool). Once the message is sent to the pool for re-processing, the
consumption of the blocked partition is resumed.

This whole mechanism **does not ensure message ordering**. When a message is
sent to be retried, another message processing from the same topic is still
unblocked.

## Local testing

This project uses [`uv`][1]. To set up the project locally, use
Expand All @@ -33,7 +68,7 @@ For integration tests you also need [`podman`][3] or [`docker`][4] with
`compose`. Run:

```bash
docker compose up -d
podman compose up -d
```

Wait a while and then run:
Expand Down
15 changes: 12 additions & 3 deletions src/retriable_kafka_client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
"""Retriable Kafka client module"""

from .consumer import BaseConsumer
from .types import ConsumerConfig
from .orchestrate import consume_topics
from .config import ConsumerConfig, ProducerConfig, ConsumeTopicConfig
from .orchestrate import consume_topics, ConsumerThread
from .producer import BaseProducer

__all__ = ("BaseConsumer", "ConsumerConfig", "consume_topics")
__all__ = (
"BaseConsumer",
"BaseProducer",
"consume_topics",
"ConsumerConfig",
"ConsumerThread",
"ProducerConfig",
"ConsumeTopicConfig",
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@
from typing import Callable, Any


@dataclass
@dataclass(kw_only=True)
class _CommonConfig:
"""
Topic configuration common for consumers and producers.
Attributes:
kafka_hosts: list of Kafka node URLs to connect to
topics: list of topic names to connect to
username: Kafka username
password: Kafka password
additional_settings: additional settings to pass directly to Kafka
"""

kafka_hosts: list[str]
topics: list[str]
username: str
password: str
additional_settings: dict[str, Any] = field(default_factory=dict)


@dataclass
Expand All @@ -30,28 +30,57 @@ class ProducerConfig(_CommonConfig):
topics: list of topic names to publish to
username: producer username
password: producer password
additional_settings: additional settings to pass directly to Kafka consumer
retries: number of attempts to publish the message
fallback_factor: how many times longer should each backoff take
fallback_base: what is the starting backoff in seconds
"""

topics: list[str]
retries: int = field(default=3)
fallback_factor: float = field(default=2.0)
fallback_base: float = field(default=5.0)


@dataclass
class ConsumeTopicConfig:
"""
Configuration for retry mechanism of a consumer.
Must be used from within ConsumerConfig.
Attributes:
base_topic: Topic that this consumer subscribes to
retry_topic: Topic used for resending failed messages
retries: maximal number of attempts to re-process the
message originated from base_topic
fallback_delay: Number of seconds to wait before a message
should be re-processed. This is a non-blocking event.
"""

base_topic: str
retry_topic: str | None = field(default=None)
retries: int = field(default=5)
fallback_delay: float = field(default=15.0)


@dataclass
class ConsumerConfig(_CommonConfig):
"""
Topic configuration for each consumer.
Attributes:
kafka_hosts: list of Kafka node URLs to connect to
topics: list of topic names to connect to
topics: list of configuration for topics and their
retry policies
cancel_future_wait_time: Maximal time to wait for a task
to finish before discarding it on rebalance or soft shutdown.
Doesn't affect tasks which are ran in normal circumstances.
username: consumer username
password: consumer password
additional_settings: additional settings to pass directly to Kafka producer
group_id: consumer group ID to use when consuming
target: Callable to execute on all parsed messages
"""

group_id: str
target: Callable[[dict[str, Any]], Any]
topics: list[ConsumeTopicConfig] = field(default_factory=list)
cancel_future_wait_time: float = field(default=30.0)
Loading