Skip to content

Commit aebacdd

Browse files
authored
feat(conduit): Add conduit demo task (#102403)
Adds a demo task to asynchronously generate data and send it to the Conduit Publish API. This serves as a reference implementation for publishing data to Conduit and enables our demo to actually work. I also updated some of the setting names to be more precise to avoid confusion. The next step is adding a frontend demo page that consumes the stream via the conduit client.
1 parent 3af5f79 commit aebacdd

File tree

9 files changed

+548
-26
lines changed

9 files changed

+548
-26
lines changed

src/sentry/conduit/auth.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,13 @@ def generate_conduit_token(
3636
JWT token string
3737
"""
3838
if issuer is None:
39-
issuer = settings.CONDUIT_JWT_ISSUER
39+
issuer = settings.CONDUIT_GATEWAY_JWT_ISSUER
4040
if audience is None:
41-
audience = settings.CONDUIT_JWT_AUDIENCE
41+
audience = settings.CONDUIT_GATEWAY_JWT_AUDIENCE
4242
if conduit_private_key is None:
43-
conduit_private_key = settings.CONDUIT_PRIVATE_KEY
43+
conduit_private_key = settings.CONDUIT_GATEWAY_PRIVATE_KEY
4444
if conduit_private_key is None:
45-
raise ValueError("CONDUIT_PRIVATE_KEY not configured")
45+
raise ValueError("CONDUIT_GATEWAY_PRIVATE_KEY not configured")
4646

4747
now = int(time.time())
4848
exp = now + TOKEN_TTL_SEC

src/sentry/conduit/endpoints/organization_conduit_demo.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
from rest_framework.request import Request
44
from rest_framework.response import Response
55

6+
from sentry import features
67
from sentry.api.api_owners import ApiOwner
78
from sentry.api.api_publish_status import ApiPublishStatus
89
from sentry.api.base import region_silo_endpoint
910
from sentry.api.bases import OrganizationEndpoint
1011
from sentry.conduit.auth import get_conduit_credentials
12+
from sentry.conduit.tasks import stream_demo_data
1113
from sentry.models.organization import Organization
1214

1315

@@ -29,6 +31,8 @@ class OrganizationConduitDemoEndpoint(OrganizationEndpoint):
2931
owner = ApiOwner.INFRA_ENG
3032

3133
def post(self, request: Request, organization: Organization) -> Response:
34+
if not features.has("organizations:conduit-demo", organization, actor=request.user):
35+
return Response(status=404)
3236
try:
3337
conduit_credentials = get_conduit_credentials(
3438
organization.id,
@@ -39,9 +43,12 @@ def post(self, request: Request, organization: Organization) -> Response:
3943
{"error": "Conduit is not configured properly"},
4044
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
4145
)
46+
# Kick off a task to stream data to Conduit
47+
stream_demo_data.delay(organization.id, conduit_credentials.channel_id)
4248
serializer = ConduitCredentialsResponseSerializer(
4349
{
4450
"conduit": conduit_credentials._asdict(),
4551
}
4652
)
53+
# Respond back to the user with the credentials needed to connect to Conduit
4754
return Response(serializer.data, status=status.HTTP_201_CREATED)

src/sentry/conduit/tasks.py

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
import datetime
2+
import logging
3+
import time
4+
from functools import partial
5+
from uuid import uuid4
6+
7+
import requests
8+
from django.conf import settings
9+
from google.protobuf.struct_pb2 import Struct
10+
from google.protobuf.timestamp_pb2 import Timestamp
11+
from requests import Response
12+
from requests.exceptions import RequestException
13+
from sentry_protos.conduit.v1alpha.publish_pb2 import Phase, PublishRequest
14+
15+
from sentry.silo.base import SiloMode
16+
from sentry.tasks.base import instrumented_task
17+
from sentry.taskworker.namespaces import conduit_tasks
18+
from sentry.utils import jwt
19+
from sentry.utils.retries import ConditionalRetryPolicy, exponential_delay
20+
21+
logger = logging.getLogger(__name__)
22+
23+
PUBLISH_REQUEST_TIMEOUT_SECONDS = 5
24+
PUBLISH_REQUEST_MAX_RETRIES = 5
25+
NUM_DELTAS = 100
26+
SEND_INTERVAL_SECONDS = 0.15
27+
JWT_EXPIRATION_SECONDS = 300 # 5 minutes
28+
TASK_PROCESSING_DEADLINE_SECONDS = 60 * 3 # 3 minutes
29+
30+
31+
@instrumented_task(
32+
name="sentry.conduit.tasks.stream_demo_data",
33+
namespace=conduit_tasks,
34+
processing_deadline_duration=TASK_PROCESSING_DEADLINE_SECONDS,
35+
silo_mode=SiloMode.REGION,
36+
)
37+
def stream_demo_data(org_id: int, channel_id: str) -> None:
38+
"""Asynchronously stream data to Conduit."""
39+
token = generate_jwt(subject="demo")
40+
logger.info(
41+
"conduit.stream_demo_data.started", extra={"org_id": org_id, "channel_id": channel_id}
42+
)
43+
sequence = 0
44+
start_publish_request = PublishRequest(
45+
channel_id=channel_id,
46+
message_id=str(uuid4()),
47+
sequence=sequence,
48+
client_timestamp=get_timestamp(),
49+
phase=Phase.PHASE_START,
50+
)
51+
publish_data(org_id, start_publish_request, token)
52+
sequence += 1
53+
54+
for i in range(NUM_DELTAS):
55+
payload = Struct()
56+
payload.update({"value": str(i)})
57+
delta_publish_request = PublishRequest(
58+
channel_id=channel_id,
59+
message_id=str(uuid4()),
60+
sequence=sequence,
61+
client_timestamp=get_timestamp(),
62+
phase=Phase.PHASE_DELTA,
63+
payload=payload,
64+
)
65+
publish_data(org_id, delta_publish_request, token)
66+
sequence += 1
67+
time.sleep(SEND_INTERVAL_SECONDS)
68+
69+
end_publish_request = PublishRequest(
70+
channel_id=channel_id,
71+
message_id=str(uuid4()),
72+
sequence=sequence,
73+
client_timestamp=get_timestamp(),
74+
phase=Phase.PHASE_END,
75+
)
76+
publish_data(org_id, end_publish_request, token)
77+
logger.info(
78+
"conduit.stream_demo_data.ended", extra={"org_id": org_id, "channel_id": channel_id}
79+
)
80+
81+
82+
def generate_jwt(
83+
subject: str, issuer: str | None = None, audience: str | None = None, secret: str | None = None
84+
) -> str:
85+
"""
86+
Generate a JWT token for the Conduit publish API.
87+
88+
Uses HS256 algorithm with a 5 minute expiration.
89+
"""
90+
if issuer is None:
91+
issuer = settings.CONDUIT_PUBLISH_JWT_ISSUER
92+
if audience is None:
93+
audience = settings.CONDUIT_PUBLISH_JWT_AUDIENCE
94+
if secret is None:
95+
secret = settings.CONDUIT_PUBLISH_SECRET
96+
if secret is None:
97+
raise ValueError("CONDUIT_PUBLISH_SECRET not configured")
98+
claims = {
99+
"sub": subject,
100+
"iss": issuer,
101+
"aud": audience,
102+
"exp": int(time.time()) + JWT_EXPIRATION_SECONDS,
103+
}
104+
return jwt.encode(claims, secret, algorithm="HS256")
105+
106+
107+
def should_retry_publish(attempt: int, exception: Exception) -> bool:
108+
return attempt < PUBLISH_REQUEST_MAX_RETRIES and isinstance(exception, RequestException)
109+
110+
111+
publish_retry_policy = ConditionalRetryPolicy(
112+
should_retry_publish,
113+
exponential_delay(0.5),
114+
)
115+
116+
117+
def publish_data(
118+
org_id: int,
119+
publish_request: PublishRequest,
120+
token: str,
121+
publish_url: str | None = None,
122+
) -> Response:
123+
"""
124+
Publish a protobuf message to Conduit with retries.
125+
126+
Retries up to 5 times with exponential backoff.
127+
"""
128+
if publish_url is None:
129+
publish_url = settings.CONDUIT_PUBLISH_URL
130+
return publish_retry_policy(
131+
partial(
132+
_do_publish,
133+
org_id=org_id,
134+
publish_request=publish_request,
135+
token=token,
136+
publish_url=publish_url,
137+
)
138+
)
139+
140+
141+
def _do_publish(
142+
org_id: int,
143+
publish_request: PublishRequest,
144+
token: str,
145+
publish_url: str,
146+
) -> Response:
147+
response = requests.post(
148+
url=f"{publish_url}/publish/{org_id}/{publish_request.channel_id}",
149+
headers={
150+
"Authorization": f"Bearer {token}",
151+
"Content-Type": "application/x-protobuf",
152+
},
153+
data=publish_request.SerializeToString(),
154+
timeout=PUBLISH_REQUEST_TIMEOUT_SECONDS,
155+
)
156+
response.raise_for_status()
157+
return response
158+
159+
160+
def get_timestamp(dt: datetime.datetime | None = None) -> Timestamp:
161+
if dt is None:
162+
dt = datetime.datetime.now(datetime.UTC)
163+
timestamp = Timestamp()
164+
timestamp.FromDatetime(dt)
165+
return timestamp

src/sentry/conf/server.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -828,6 +828,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
828828
# Taskworkers need to import task modules to make tasks
829829
# accessible to the worker.
830830
TASKWORKER_IMPORTS: tuple[str, ...] = (
831+
"sentry.conduit.tasks",
831832
"sentry.data_export.tasks",
832833
"sentry.debug_files.tasks",
833834
"sentry.deletions.tasks.hybrid_cloud",
@@ -3186,7 +3187,12 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
31863187
SENTRY_OPTIONS["system.region-api-url-template"] = f"https://{{region}}.{ngrok_host}"
31873188
SENTRY_FEATURES["system:multi-region"] = True
31883189

3189-
CONDUIT_PRIVATE_KEY: str | None = os.getenv("CONDUIT_PRIVATE_KEY")
3190+
CONDUIT_GATEWAY_PRIVATE_KEY: str | None = os.getenv("CONDUIT_GATEWAY_PRIVATE_KEY")
31903191
CONDUIT_GATEWAY_URL: str = os.getenv("CONDUIT_GATEWAY_URL", "https://conduit.sentry.io")
3191-
CONDUIT_JWT_ISSUER: str = os.getenv("CONDUIT_JWT_ISSUER", "sentry")
3192-
CONDUIT_JWT_AUDIENCE: str = os.getenv("CONDUIT_JWT_AUDIENCE", "conduit")
3192+
CONDUIT_GATEWAY_JWT_ISSUER: str = os.getenv("CONDUIT_GATEWAY_JWT_ISSUER", "sentry.io")
3193+
CONDUIT_GATEWAY_JWT_AUDIENCE: str = os.getenv("CONDUIT_GATEWAY_JWT_AUDIENCE", "conduit")
3194+
3195+
CONDUIT_PUBLISH_SECRET: str | None = os.getenv("CONDUIT_PUBLISH_SECRET")
3196+
CONDUIT_PUBLISH_URL: str = os.getenv("CONDUIT_PUBLISH_URL", "http://127.0.0.1:9097")
3197+
CONDUIT_PUBLISH_JWT_ISSUER: str = os.getenv("CONDUIT_PUBLISH_JWT_ISSUER", "sentry.io")
3198+
CONDUIT_PUBLISH_JWT_AUDIENCE: str = os.getenv("CONDUIT_PUBLISH_JWT_AUDIENCE", "conduit")

src/sentry/features/temporary.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,8 @@ def register_temporary_features(manager: FeatureManager) -> None:
605605
manager.add("organizations:notification-platform", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True)
606606
manager.add("organizations:on-demand-gen-metrics-deprecation-prefill", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
607607
manager.add("organizations:on-demand-gen-metrics-deprecation-query-prefill", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
608+
# Enables Conduit demo endpoint and UI
609+
manager.add("organizations:conduit-demo", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True)
608610

609611
# NOTE: Don't add features down here! Add them to their specific group and sort
610612
# them alphabetically! The order features are registered is not important.

src/sentry/taskworker/namespaces.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@
2626
app_feature="errors",
2727
)
2828

29+
conduit_tasks = app.taskregistry.create_namespace(
30+
"conduit",
31+
app_feature="conduit",
32+
)
33+
2934
crons_tasks = app.taskregistry.create_namespace(
3035
"crons",
3136
app_feature="crons",

tests/sentry/conduit/endpoints/test_organization_conduit_demo.py

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
from unittest.mock import MagicMock, patch
2+
13
from django.test.utils import override_settings
24

35
from sentry.testutils.cases import APITestCase
6+
from sentry.testutils.helpers import with_feature
47
from sentry.testutils.silo import region_silo_test
58
from tests.sentry.utils.test_jwt import RS256_KEY
69

@@ -14,11 +17,12 @@ def setUp(self) -> None:
1417
self.login_as(user=self.user)
1518

1619
@override_settings(
17-
CONDUIT_PRIVATE_KEY=RS256_KEY,
18-
CONDUIT_JWT_ISSUER="sentry",
19-
CONDUIT_JWT_AUDIENCE="conduit",
20+
CONDUIT_GATEWAY_PRIVATE_KEY=RS256_KEY,
21+
CONDUIT_GATEWAY_JWT_ISSUER="sentry",
22+
CONDUIT_GATEWAY_JWT_AUDIENCE="conduit",
2023
CONDUIT_GATEWAY_URL="https://conduit.example.com",
2124
)
25+
@with_feature("organizations:conduit-demo")
2226
def test_post_generate_credentials(self) -> None:
2327
"""Test that POST generates valid credentials."""
2428

@@ -34,6 +38,7 @@ def test_post_generate_credentials(self) -> None:
3438
assert "url" in response.data["conduit"]
3539
assert str(self.organization.id) in response.data["conduit"]["url"]
3640

41+
@with_feature("organizations:conduit-demo")
3742
def test_post_without_org_access(self) -> None:
3843
"""Test that users without org access cannot generate credentials."""
3944
other_org = self.create_organization()
@@ -44,6 +49,7 @@ def test_post_without_org_access(self) -> None:
4449
status_code=403,
4550
)
4651

52+
@with_feature("organizations:conduit-demo")
4753
def test_post_missing_conduit_config(self) -> None:
4854
"""Test graceful failure when CONDUIT_PRIVATE_KEY is not configured."""
4955
response = self.get_error_response(
@@ -55,11 +61,12 @@ def test_post_missing_conduit_config(self) -> None:
5561
assert response.data == {"error": "Conduit is not configured properly"}
5662

5763
@override_settings(
58-
CONDUIT_PRIVATE_KEY=RS256_KEY,
59-
CONDUIT_JWT_ISSUER="sentry",
60-
CONDUIT_JWT_AUDIENCE="conduit",
64+
CONDUIT_GATEWAY_PRIVATE_KEY=RS256_KEY,
65+
CONDUIT_GATEWAY_JWT_ISSUER="sentry",
66+
CONDUIT_GATEWAY_JWT_AUDIENCE="conduit",
6167
CONDUIT_GATEWAY_URL="https://conduit.example.com",
6268
)
69+
@with_feature("organizations:conduit-demo")
6370
def test_credentials_are_unique(self) -> None:
6471
"""Test that multiple calls generate different credentials."""
6572
response1 = self.get_success_response(
@@ -76,3 +83,26 @@ def test_credentials_are_unique(self) -> None:
7683

7784
assert response1.data["conduit"]["token"] != response2.data["conduit"]["token"]
7885
assert response1.data["conduit"]["channel_id"] != response2.data["conduit"]["channel_id"]
86+
87+
@override_settings(
88+
CONDUIT_GATEWAY_PRIVATE_KEY=RS256_KEY,
89+
CONDUIT_GATEWAY_JWT_ISSUER="sentry",
90+
CONDUIT_GATEWAY_JWT_AUDIENCE="conduit",
91+
CONDUIT_GATEWAY_URL="https://conduit.example.com",
92+
)
93+
@patch("sentry.conduit.endpoints.organization_conduit_demo.stream_demo_data")
94+
@with_feature("organizations:conduit-demo")
95+
def test_post_queues_task(self, mock_task: MagicMock):
96+
self.get_success_response(
97+
self.organization.slug,
98+
method="POST",
99+
status_code=201,
100+
)
101+
mock_task.delay.assert_called_once()
102+
103+
def test_post_without_feature_flag(self) -> None:
104+
self.get_error_response(
105+
self.organization.slug,
106+
method="POST",
107+
status_code=404,
108+
)

0 commit comments

Comments
 (0)