Skip to content

Commit c4382bb

Browse files
authored
feat(producer): Replace confluent_kafka.Producer with ConfluentProducer (#102432)
Bump `sentry-arroyo` to `2.31.2` and slowly rolling out `ConfluentProducer` to replace `confluent_kafka.Producer` in order to record produce metrics. Refs STREAM-549
1 parent 3e4d286 commit c4382bb

File tree

10 files changed

+108
-11
lines changed

10 files changed

+108
-11
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ dependencies = [
7878
"rfc3339-validator>=0.1.2",
7979
"rfc3986-validator>=0.1.1",
8080
# [end] jsonschema format validators
81-
"sentry-arroyo>=2.30.0",
81+
"sentry-arroyo>=2.31.2",
8282
"sentry-forked-email-reply-parser>=0.5.12.post1",
8383
"sentry-kafka-schemas>=2.1.13",
8484
"sentry-ophio>=1.1.3",

src/sentry/eventstream/kafka/backend.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from sentry.eventstream.types import EventStreamEventType
2121
from sentry.killswitches import killswitch_matches_context
2222
from sentry.utils import json
23+
from sentry.utils.confluent_producer import get_confluent_producer
2324
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
2425

2526
EAP_ITEMS_CODEC: Codec[TraceItem] = get_topic_codec(Topic.SNUBA_ITEMS)
@@ -48,7 +49,7 @@ def get_producer(self, topic: Topic) -> Producer:
4849
cluster_options = get_kafka_producer_cluster_options(cluster_name)
4950
cluster_options["client.id"] = "sentry.eventstream.kafka"
5051
# XXX(markus): We should use `sentry.utils.arroyo_producer.get_arroyo_producer`.
51-
self.__producers[topic] = Producer(
52+
self.__producers[topic] = get_confluent_producer(
5253
build_kafka_producer_configuration(default_config=cluster_options)
5354
)
5455

src/sentry/options/defaults.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,15 @@
629629
flags=FLAG_AUTOMATOR_MODIFIABLE,
630630
)
631631

632+
# Rollout configuration for Arroyo ConfluentProducer
633+
# Controls the rollout of individual Kafka producers by name
634+
register(
635+
"arroyo.producer.confluent-producer-rollout",
636+
type=Dict,
637+
default={},
638+
flags=FLAG_AUTOMATOR_MODIFIABLE,
639+
)
640+
632641
# Analytics
633642
register("analytics.backend", default="noop", flags=FLAG_NOSTORE)
634643
register("analytics.options", default={}, flags=FLAG_NOSTORE)

src/sentry/sentry_metrics/consumers/indexer/multiprocess.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@
1010
from arroyo.processing.strategies import ProcessingStrategy as ProcessingStep
1111
from arroyo.processing.strategies.commit import CommitOffsets
1212
from arroyo.types import Commit, FilteredPayload, Message, Partition, Value
13-
from confluent_kafka import Producer
1413

1514
from sentry.conf.types.kafka_definition import Topic
1615
from sentry.utils import kafka_config, metrics
16+
from sentry.utils.confluent_producer import get_confluent_producer
1717

1818
logger = logging.getLogger(__name__)
1919

@@ -28,7 +28,7 @@ def __init__(
2828
snuba_metrics = kafka_config.get_topic_definition(output_topic)
2929
producer_config = kafka_config.get_kafka_producer_cluster_options(snuba_metrics["cluster"])
3030
producer_config["client.id"] = "sentry.sentry_metrics.multiprocess"
31-
self.__producer = Producer(producer_config)
31+
self.__producer = get_confluent_producer(producer_config)
3232
self.__producer_topic = snuba_metrics["real_topic_name"]
3333

3434
self.__commit = CommitOffsets(commit_function)

src/sentry/sentry_metrics/consumers/indexer/slicing_router.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
RoutingPayload,
2121
)
2222
from sentry.utils import kafka_config
23+
from sentry.utils.confluent_producer import get_confluent_producer
2324

2425

2526
class SlicingConfigurationException(Exception):
@@ -112,8 +113,9 @@ def __init__(
112113
producer_config["client.id"] = (
113114
f"sentry.sentry_metrics.slicing_router.{current_sliceable}.{current_slice_id}"
114115
)
116+
producer = get_confluent_producer(producer_config)
115117
self.__slice_to_producer[current_slice_id] = MessageRoute(
116-
producer=Producer(producer_config),
118+
producer=producer,
117119
topic=Topic(configuration["topic"]),
118120
)
119121
# All logical partitions should be routed to a slice ID that's present in the slice
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from __future__ import annotations
2+
3+
from typing import Any
4+
5+
from arroyo.backends.kafka import ConfluentProducer
6+
from confluent_kafka import Producer
7+
8+
from sentry import options
9+
10+
11+
def get_confluent_producer(
12+
configuration: dict[str, Any],
13+
) -> Producer:
14+
"""
15+
Get a confluent_kafka Producer for a given configuration.
16+
17+
Args:
18+
configuration: The configuration for the confluent_kafka Producer
19+
20+
Returns:
21+
confluent_kafka Producer
22+
"""
23+
rollout_config = options.get("arroyo.producer.confluent-producer-rollout", {})
24+
name = configuration.get("client.id")
25+
26+
# f"sentry.sentry_metrics.slicing_router.{current_sliceable}.{current_slice_id}"
27+
if name is not None and isinstance(name, str):
28+
if name.startswith("sentry.sentry_metrics.slicing_router"):
29+
name = "sentry.sentry_metrics.slicing_router"
30+
31+
if rollout_config.get(name, False):
32+
return ConfluentProducer(configuration)
33+
return Producer(configuration)

src/sentry/utils/pubsub.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
from typing import Any
22

33
from arroyo.backends.kafka import build_kafka_producer_configuration
4-
from confluent_kafka import Producer
4+
5+
from sentry.utils.confluent_producer import get_confluent_producer
56

67

78
class KafkaPublisher:
89
# XXX(markus): Deprecated. Please use `sentry.utils.arroyo_producer.get_arroyo_producer`.
910
def __init__(self, connection: dict[str, Any], asynchronous: bool = True) -> None:
10-
self.producer = Producer(
11-
build_kafka_producer_configuration(default_config=connection or {})
11+
connection = connection or {}
12+
if "client.id" not in connection:
13+
connection["client.id"] = "sentry.utils.pubsub"
14+
self.producer = get_confluent_producer(
15+
build_kafka_producer_configuration(default_config=connection)
1216
)
1317
self.asynchronous = asynchronous
1418

tests/sentry/sentry_metrics/consumers/test_slicing_router.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
_validate_slicing_config,
2121
_validate_slicing_consumer_config,
2222
)
23+
from sentry.testutils.helpers.options import override_options
2324

2425
KAFKA_SNUBA_GENERIC_METRICS = "snuba-generic-metrics"
2526

@@ -78,6 +79,7 @@ def setup_slicing() -> Generator[None]:
7879

7980

8081
@pytest.mark.parametrize("org_id", [1, 127, 128, 256, 257])
82+
@override_options({"arroyo.producer.confluent-producer-rollout": {}})
8183
def test_with_slicing(metrics_message, setup_slicing) -> None:
8284
"""
8385
With partitioning settings, the SlicingRouter should route to the correct topic
@@ -94,6 +96,7 @@ def test_with_slicing(metrics_message, setup_slicing) -> None:
9496
assert False, "unexpected org_id"
9597

9698

99+
@override_options({"arroyo.producer.confluent-producer-rollout": {}})
97100
def test_with_no_org_in_routing_header(setup_slicing) -> None:
98101
"""
99102
With partitioning settings, the SlicingRouter should route to the correct topic
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from arroyo.backends.kafka import ConfluentProducer
2+
from confluent_kafka import Producer
3+
4+
from sentry.testutils.helpers.options import override_options
5+
from sentry.utils.confluent_producer import get_confluent_producer
6+
7+
8+
@override_options({"arroyo.producer.confluent-producer-rollout": {"test_producer": True}})
9+
def test_get_confluent_producer() -> None:
10+
configuration = {
11+
"bootstrap.servers": "localhost:9092",
12+
"client.id": "test_producer",
13+
}
14+
15+
producer = get_confluent_producer(configuration)
16+
assert producer is not None
17+
18+
assert isinstance(producer, Producer)
19+
assert isinstance(producer, ConfluentProducer)
20+
21+
for attrs in Producer.__dict__.keys():
22+
assert hasattr(producer, attrs)
23+
24+
25+
@override_options({"arroyo.producer.confluent-producer-rollout": {"test_producer": False}})
26+
def test_get_confluent_producer_not_rolled_out() -> None:
27+
configuration = {
28+
"bootstrap.servers": "localhost:9092",
29+
"client.id": "test_producer",
30+
}
31+
32+
producer = get_confluent_producer(configuration)
33+
assert isinstance(producer, Producer)
34+
assert not isinstance(producer, ConfluentProducer)
35+
36+
37+
@override_options({"arroyo.producer.confluent-producer-rollout": {}})
38+
def test_get_confluent_producer_no_client_id() -> None:
39+
configuration = {
40+
"bootstrap.servers": "localhost:9092",
41+
}
42+
43+
producer = get_confluent_producer(configuration)
44+
assert isinstance(producer, Producer)
45+
assert not isinstance(producer, ConfluentProducer)

uv.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)