Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions README.md

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the docs here, this is a lot of code to review in one go & i definitely needed the primer

Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,43 @@ additional functionalities to these selected parts.

## Features

As this library is currently under development, this library should not be
used.

The aim is to provide a fault-tolerant platform for parallel message
processing.

### Parallel processing

Each consumer requires an executor pool, which will be used for message
processing. Each consumer can consume from multiple topics and processes
the messages from these topics by a single callable. The callable must be
specified by the user of this library.

The library also ensures exactly-once processing when used correctly.
To ensure this, the tasks should take short enough time that all
of them finish before the cluster forces rebalancing. The library tries to
finish everything before the rebalance while stopping additional non-started
tasks. The default timeout for each task to finish is 30 seconds, but can be
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for papertrail: We agreed in DMs on moving from eager rebalancing towards cooperative rebalancing. Are those lines still applicable even after changing strategy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it should be re-worded, good point. It does stop tasks and waits for others to finish, it just does it only for affected partitions. Example: If a single consumer is in a group, it consumes all partitions. Then another consumer joins the group, the cluster has to revoke some of the partitions from the first consumer to supply them to the second consumer. I will try to update this text a little.

changed. Kafka cluster behavior change may also be needed with longer tasks.
This behavior only appears during rebalancing and graceful stopping of the
consumer.

### Fault-tolerance

Each consumer accepts configuration with retry topics. A retry topic is

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kinda new to Kafka-- is this retry topic pattern a widely-used thing for Kafka as a platform, or is it our own thing?

first time seeing something like this in the wild so i'm just trying to get more info. the closest established generic pattern i know of is a dead letter channel, but that isn't really meant to indicate when/where to retry, just a place to put dropped messages. otherwise the best i can find for this is this medium article & some Java/Spring-related stuff on baeldung

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is basically a lightweight reimplementation of the functionality present in Spring Kafka library, see here. This solution was suggested by Tim Waugh

a Kafka topic used for asynchronous retrying of message processing. If the
specified target callable fails, the consumer will commit the original message
and resends the same message to a retry topic with special headers. The headers
include information about the next timestamp at which the message should be
processed again (to give some time to the error to disappear if the processing
depends on some outside infrastructure).

The retry topic is polled alongside the original topic. If a message contains
the special timestamp header, its Kafka partition of origin will be paused and
the message will be stored locally. The processing will resume only after the
specified timestamp passes. The message will not be processed before the
timestamp, it can only gather delay (depending on the occupation of the
worker pool). Once the message is sent to the pool for re-processing, the
consumption of the blocked partition is resumed.

## Local testing

This project uses [`uv`][1]. To set up the project locally, use
Expand All @@ -33,7 +64,7 @@ For integration tests you also need [`podman`][3] or [`docker`][4] with
`compose`. Run:

```bash
docker compose up -d
podman compose up -d
```

Wait a while and then run:
Expand Down
15 changes: 12 additions & 3 deletions src/retriable_kafka_client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
"""Retriable Kafka client module"""

from .consumer import BaseConsumer
from .types import ConsumerConfig
from .orchestrate import consume_topics
from .config import ConsumerConfig, ProducerConfig, ConsumeTopicConfig
from .orchestrate import consume_topics, ConsumerThread
from .producer import BaseProducer

__all__ = ("BaseConsumer", "ConsumerConfig", "consume_topics")
__all__ = (
"BaseConsumer",
"BaseProducer",
"consume_topics",
"ConsumerConfig",
"ConsumerThread",
"ProducerConfig",
"ConsumeTopicConfig",
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@
from typing import Callable, Any


@dataclass
@dataclass(kw_only=True)
class _CommonConfig:
"""
Topic configuration common for consumers and producers.
Attributes:
kafka_hosts: list of Kafka node URLs to connect to
topics: list of topic names to connect to
username: Kafka username
password: Kafka password
additional_settings: additional settings to pass directly to Kafka
"""

kafka_hosts: list[str]
topics: list[str]
username: str
password: str
additional_settings: dict[str, Any] = field(default_factory=dict)


@dataclass
Expand All @@ -30,28 +30,57 @@ class ProducerConfig(_CommonConfig):
topics: list of topic names to publish to
username: producer username
password: producer password
additional_settings: additional settings to pass directly to Kafka consumer
retries: number of attempts to publish the message
fallback_factor: how many times longer should each backoff take
fallback_base: what is the starting backoff in seconds
"""

topics: list[str]
retries: int = field(default=3)
fallback_factor: float = field(default=2.0)
fallback_base: float = field(default=5.0)


@dataclass
class ConsumeTopicConfig:
"""
Configuration for retry mechanism of a consumer.
Must be used from within ConsumerConfig.
Attributes:
base_topic: Topic that this consumer subscribes to
retry_topic: Topic used for resending failed messages
retries: maximal number of attempts to re-process the
message originated from base_topic
fallback_delay: Number of seconds to wait before a message
should be re-processed. This is a non-blocking event.
"""

base_topic: str
retry_topic: str | None = field(default=None)
retries: int = field(default=5)
fallback_delay: float = field(default=15.0)


@dataclass
class ConsumerConfig(_CommonConfig):
"""
Topic configuration for each consumer.
Attributes:
kafka_hosts: list of Kafka node URLs to connect to
topics: list of topic names to connect to
topics: list of configuration for topics and their
retry policies
cancel_future_wait_time: Maximal time to wait fot a task
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cancel_future_wait_time: Maximal time to wait fot a task
cancel_future_wait_time: Maximal time to wait for a task

to finish before discarding it on rebalance or soft shutdown.
Doesn't affect tasks which are ran in normal circumstances.
username: consumer username
password: consumer password
additional_settings: additional settings to pass directly to Kafka producer
group_id: consumer group ID to use when consuming
target: Callable to execute on all parsed messages
"""

group_id: str
target: Callable[[dict[str, Any]], Any]
topics: list[ConsumeTopicConfig] = field(default_factory=list)
cancel_future_wait_time: float = field(default=30.0)
Loading