Skip to content

Commit

Permalink
ref(k8s): Add a k8s for the taskworker deployment (#114)
Browse files Browse the repository at this point in the history
This adds a k8s file for deploying the taskworker image. It also makes a few QoL improvements:

- Replace SimpleTaskWorker with ConfigurableTaskWorker
- Add more logging to the worker
- Add a backoff to the worker, so that if there are no messages it doesn't spam logs
- Update the taskbroker image
- Fix a bug in the worker where it wouldn't process tasks fetched in the update
  • Loading branch information
evanh authored Jan 10, 2025
1 parent 8ff9bde commit cc30423
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 43 deletions.
2 changes: 1 addition & 1 deletion k8s/k8s_stateful_set.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ spec:
terminationGracePeriodSeconds: 10
containers:
- name: taskbroker
image: us-central1-docker.pkg.dev/sentryio/taskbroker/image:ab1f14a7701cddf3113bab70d4367f1172be68cd
image: us-central1-docker.pkg.dev/sentryio/taskbroker/image:8ff9bdee7be16ecf73bc9a20a7db6f2d9dda2e95
env:
- name: TASKBROKER_KAFKA_CLUSTER
value: "kafka-001:9092"
Expand Down
32 changes: 32 additions & 0 deletions k8s/k8s_taskworker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: taskworker
labels:
app: taskworker
spec:
replicas: 8
selector:
matchLabels:
app: taskworker
template:
metadata:
labels:
app: taskworker
spec:
containers:
- name: taskworker-container
image: us-central1-docker.pkg.dev/sentryio/taskworker/image@sha256:3ba1c93419eb1cb2a6fc40028f6fac1085b7d5d9956aaae1d9f7b827a5de9291
env:
- name: SENTRY_TASKWORKER_GRPC_HOST
value: "taskbroker-service"
- name: SENTRY_TASKWORKER_GRPC_PORT
value: "50051"
- name: SENTRY_TASKWORKER_NAMESPACE
value: "test"
- name: SENTRY_TASKWORKER_FAILURE_RATE
value: "0.0"
- name: SENTRY_TASKWORKER_TIMEOUT_RATE
value: "0.0"
ports:
- containerPort: 8686
8 changes: 6 additions & 2 deletions python/integration_tests/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ def main() -> None:
namespace=namespace,
failure_rate=failure_rate,
timeout_rate=timeout_rate,
enable_backoff=True,
)

task = None
while True:
task = worker.fetch_task()
if task is None:
task = worker.fetch_task()

if task:
worker.process_task(task)
task = worker.process_task(task)


if __name__ == "__main__":
Expand Down
5 changes: 2 additions & 3 deletions python/integration_tests/test_task_worker_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
import yaml

from collections import defaultdict
from pathlib import Path
from python.integration_tests.helpers import (
TASKBROKER_BIN,
TESTS_OUTPUT_ROOT,
send_messages_to_kafka,
create_topic,
)

from python.integration_tests.worker import SimpleTaskWorker, TaskWorkerClient
from python.integration_tests.worker import ConfigurableTaskWorker, TaskWorkerClient


TEST_OUTPUT_PATH = TESTS_OUTPUT_ROOT / "test_task_worker_processing"
Expand All @@ -32,7 +31,7 @@ def manage_taskworker(
shutdown_event: threading.Event,
) -> None:
print(f"[taskworker_{worker_id}] Starting taskworker_{worker_id}")
worker = SimpleTaskWorker(
worker = ConfigurableTaskWorker(
TaskWorkerClient(f"127.0.0.1:{consumer_config['grpc_port']}")
)
fetched_tasks = 0
Expand Down
49 changes: 12 additions & 37 deletions python/integration_tests/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,67 +59,42 @@ def update_task(
return None


class SimpleTaskWorker:
"""
A simple TaskWorker that is used for integration tests. This taskworker does not
actually execute tasks, it simply fetches tasks from the taskworker gRPC server
and updates their status depending on the test scenario.
"""

def __init__(self, client: TaskWorkerClient, namespace: str | None = None) -> None:
self.client = client
self._namespace: str | None = namespace

def fetch_task(self) -> TaskActivation | None:
try:
activation = self.client.get_task(self._namespace)
except grpc.RpcError:
print("get_task failed. Retrying in 1 second")
return None

if not activation:
print("No task fetched")
return None

return activation

def process_task(self, activation: TaskActivation) -> TaskActivation | None:
return self.client.update_task(
task_id=activation.id,
status=TASK_ACTIVATION_STATUS_COMPLETE,
fetch_next_task=FetchNextTask(namespace=self._namespace),
)


class ConfigurableTaskWorker:
"""
A taskworker that can be configured to fail/timeout while processing tasks.
"""

def __init__(self, client: TaskWorkerClient, namespace: str | None = None, failure_rate: float = 0.0, timeout_rate: float = 0.0) -> None:
def __init__(self, client: TaskWorkerClient, namespace: str | None = None, failure_rate: float = 0.0, timeout_rate: float = 0.0, enable_backoff: bool = False) -> None:
self.client = client
self._namespace: str | None = namespace
self._failure_rate: float = failure_rate
self._timeout_rate: float = timeout_rate
self._backoff_wait_time: float | None = 0.0 if enable_backoff else None

def fetch_task(self) -> TaskActivation | None:
try:
activation = self.client.get_task(self._namespace)
except grpc.RpcError:
print("get_task failed. Retrying in 1 second")
except grpc.RpcError as err:
logging.error(f"get_task failed. Retrying in 1 second: {err}")
time.sleep(1)
return None

if not activation:
print("No task fetched")
logging.debug("No task fetched")
if self._backoff_wait_time is not None:
logging.debug(f"Backing off for {self._backoff_wait_time} seconds")
time.sleep(self._backoff_wait_time)
self._backoff_wait_time = min(self._backoff_wait_time + 1, 10)
return None

self._backoff_wait_time = 0.0
return activation

def process_task(self, activation: TaskActivation) -> TaskActivation | None:
logging.debug(f"Processing task {activation.id}")
if self._timeout_rate and random.random() < self._timeout_rate:
return None # Pretend that the task was dropped

if self._failure_rate and random.random() < self._failure_rate:
update_status = TASK_ACTIVATION_STATUS_FAILURE
else:
Expand Down

0 comments on commit cc30423

Please sign in to comment.