Skip to content

Commit 019f1c1

Browse files
bmckerryshashjar
authored andcommitted
feat(consumers): add ProcessingStrategy for profiling join (#102140)
This PR adds a StrategyFactory which adds a strategy to a consumer that starts/stops profiling during calls to `join()`. The goal of profiling `join()` is to troubleshoot why some of our consumers are hanging when `join()` is called during partition revocation.
1 parent f656b2d commit 019f1c1

File tree

6 files changed

+72
-1
lines changed

6 files changed

+72
-1
lines changed

src/sentry/consumers/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
validate_consumer_definition,
2424
)
2525
from sentry.consumers.dlq import DlqStaleMessagesStrategyFactoryWrapper, maybe_build_dlq_producer
26+
from sentry.consumers.profiler import JoinProfiler
2627
from sentry.consumers.validate_schema import ValidateSchema
2728
from sentry.eventstream.types import EventStreamEventType
2829
from sentry.ingest.types import ConsumerType
@@ -466,6 +467,7 @@ def get_stream_processor(
466467
kafka_slice_id: int | None = None,
467468
shutdown_strategy_before_consumer: bool = False,
468469
add_global_tags: bool = False,
470+
profile_consumer_join: bool = False,
469471
) -> StreamProcessor:
470472
from sentry.utils import kafka_config
471473

@@ -594,6 +596,9 @@ def build_consumer_config(group_id: str):
594596
healthcheck_file_path, strategy_factory
595597
)
596598

599+
if profile_consumer_join:
600+
strategy_factory = JoinProfilerStrategyFactoryWrapper(strategy_factory)
601+
597602
if enable_dlq and consumer_definition.get("dlq_topic"):
598603
dlq_topic = consumer_definition["dlq_topic"]
599604
else:
@@ -674,3 +679,12 @@ def __init__(self, healthcheck_file_path: str, inner: ProcessingStrategyFactory)
674679
def create_with_partitions(self, commit, partitions):
675680
rv = self.inner.create_with_partitions(commit, partitions)
676681
return Healthcheck(self.healthcheck_file_path, rv)
682+
683+
684+
class JoinProfilerStrategyFactoryWrapper(ProcessingStrategyFactory):
685+
def __init__(self, inner: ProcessingStrategyFactory):
686+
self.inner = inner
687+
688+
def create_with_partitions(self, commit, partitions):
689+
rv = self.inner.create_with_partitions(commit, partitions)
690+
return JoinProfiler(rv)

src/sentry/consumers/profiler.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from __future__ import annotations
2+
3+
import sentry_sdk
4+
from arroyo.processing.strategies.abstract import ProcessingStrategy
5+
from arroyo.types import Message, TStrategyPayload
6+
7+
8+
class JoinProfiler(ProcessingStrategy[TStrategyPayload]):
9+
"""
10+
Strategy which passes through all ProcessingStrategy method calls,
11+
but runs Sentry's continuous profiler for `join()` calls.
12+
13+
This startegy is being used to troubleshoot our consumers hanging during `join()` occasionally.
14+
"""
15+
16+
def __init__(self, next_step: ProcessingStrategy[TStrategyPayload]) -> None:
17+
self.__next_step = next_step
18+
19+
def join(self, timeout: float | None = None):
20+
with sentry_sdk.start_transaction(
21+
op="consumer_join", name="consumer.join", custom_sampling_context={"sample_rate": 1.0}
22+
):
23+
self.__next_step.join(timeout)
24+
25+
def submit(self, message: Message[TStrategyPayload]) -> None:
26+
self.__next_step.submit(message)
27+
28+
def poll(self) -> None:
29+
self.__next_step.poll()
30+
31+
def close(self) -> None:
32+
self.__next_step.close()
33+
34+
def terminate(self) -> None:
35+
self.__next_step.terminate()

src/sentry/options/defaults.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3633,7 +3633,6 @@
36333633
)
36343634

36353635
# Sets the sample rate for profiles collected via the JoinProfiler arroyo strategy
3636-
# TODO: add the JoinProfiler arroyo strategy
36373636
register(
36383637
"consumer.join.profiling.rate",
36393638
type=Float,

src/sentry/runner/commands/run.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,12 @@ def taskbroker_send_tasks(
544544
default=False,
545545
help="A potential workaround for Broker Handle Destroyed during shutdown (see arroyo option).",
546546
)
547+
@click.option(
548+
"--profile-consumer-join",
549+
is_flag=True,
550+
default=False,
551+
help="Adds a ProcessingStrategy to the start of a consumer that records a transaction of the consumer's join() method.",
552+
)
547553
@configuration
548554
def basic_consumer(
549555
consumer_name: str,
@@ -633,6 +639,7 @@ def dev_consumer(consumer_names: tuple[str, ...]) -> None:
633639
stale_threshold_sec=None,
634640
healthcheck_file_path=None,
635641
enforce_schema=True,
642+
profile_consumer_join=False,
636643
)
637644
for consumer_name in consumer_names
638645
]

src/sentry/utils/sdk.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ def traces_sampler(sampling_context):
204204

205205
def profiles_sampler(sampling_context):
206206
PROFILES_SAMPLING_RATE = {
207+
"consumer.join": options.get("consumer.join.profiling.rate"),
207208
"spans.process.process_message": options.get("spans.process-spans.profiling.rate"),
208209
}
209210
if "transaction_context" in sampling_context:
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from unittest.mock import Mock
2+
3+
from sentry.consumers import JoinProfilerStrategyFactoryWrapper
4+
5+
6+
def test_join_profiler() -> None:
7+
inner_factory_mock = Mock()
8+
inner_strategy_mock = Mock()
9+
inner_factory_mock.create_with_partitions = Mock(return_value=inner_strategy_mock)
10+
11+
factory = JoinProfilerStrategyFactoryWrapper(inner_factory_mock)
12+
strategy = factory.create_with_partitions(Mock(), Mock())
13+
14+
strategy.join()
15+
inner_strategy_mock.join.assert_called_once()

0 commit comments

Comments
 (0)