diff --git a/k8s/k8s_stateful_set.yaml b/k8s/k8s_stateful_set.yaml index 363519d7..83d7207d 100644 --- a/k8s/k8s_stateful_set.yaml +++ b/k8s/k8s_stateful_set.yaml @@ -35,7 +35,7 @@ spec: terminationGracePeriodSeconds: 10 containers: - name: taskbroker - image: us-central1-docker.pkg.dev/sentryio/taskbroker/image:ab1f14a7701cddf3113bab70d4367f1172be68cd + image: us-central1-docker.pkg.dev/sentryio/taskbroker/image:8ff9bdee7be16ecf73bc9a20a7db6f2d9dda2e95 env: - name: TASKBROKER_KAFKA_CLUSTER value: "kafka-001:9092" diff --git a/k8s/k8s_taskworker.yaml b/k8s/k8s_taskworker.yaml new file mode 100644 index 00000000..7a17e473 --- /dev/null +++ b/k8s/k8s_taskworker.yaml @@ -0,0 +1,32 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: taskworker + labels: + app: taskworker +spec: + replicas: 8 + selector: + matchLabels: + app: taskworker + template: + metadata: + labels: + app: taskworker + spec: + containers: + - name: taskworker-container + image: us-central1-docker.pkg.dev/sentryio/taskworker/image@sha256:3ba1c93419eb1cb2a6fc40028f6fac1085b7d5d9956aaae1d9f7b827a5de9291 + env: + - name: SENTRY_TASKWORKER_GRPC_HOST + value: "taskbroker-service" + - name: SENTRY_TASKWORKER_GRPC_PORT + value: "50051" + - name: SENTRY_TASKWORKER_NAMESPACE + value: "test" + - name: SENTRY_TASKWORKER_FAILURE_RATE + value: "0.0" + - name: SENTRY_TASKWORKER_TIMEOUT_RATE + value: "0.0" + ports: + - containerPort: 8686 diff --git a/python/integration_tests/runner.py b/python/integration_tests/runner.py index ffab4e4b..30cce86d 100644 --- a/python/integration_tests/runner.py +++ b/python/integration_tests/runner.py @@ -18,12 +18,16 @@ def main() -> None: namespace=namespace, failure_rate=failure_rate, timeout_rate=timeout_rate, + enable_backoff=True, ) + task = None while True: - task = worker.fetch_task() + if task is None: + task = worker.fetch_task() + if task: - worker.process_task(task) + task = worker.process_task(task) if __name__ == "__main__": diff --git a/python/integration_tests/test_task_worker_processing.py b/python/integration_tests/test_task_worker_processing.py index 7afd96b6..747219dd 100644 --- a/python/integration_tests/test_task_worker_processing.py +++ b/python/integration_tests/test_task_worker_processing.py @@ -8,7 +8,6 @@ import yaml from collections import defaultdict -from pathlib import Path from python.integration_tests.helpers import ( TASKBROKER_BIN, TESTS_OUTPUT_ROOT, @@ -16,7 +15,7 @@ create_topic, ) -from python.integration_tests.worker import SimpleTaskWorker, TaskWorkerClient +from python.integration_tests.worker import ConfigurableTaskWorker, TaskWorkerClient TEST_OUTPUT_PATH = TESTS_OUTPUT_ROOT / "test_task_worker_processing" @@ -32,7 +31,7 @@ def manage_taskworker( shutdown_event: threading.Event, ) -> None: print(f"[taskworker_{worker_id}] Starting taskworker_{worker_id}") - worker = SimpleTaskWorker( + worker = ConfigurableTaskWorker( TaskWorkerClient(f"127.0.0.1:{consumer_config['grpc_port']}") ) fetched_tasks = 0 diff --git a/python/integration_tests/worker.py b/python/integration_tests/worker.py index 69c7c1dd..6628b285 100644 --- a/python/integration_tests/worker.py +++ b/python/integration_tests/worker.py @@ -59,67 +59,42 @@ def update_task( return None -class SimpleTaskWorker: - """ - A simple TaskWorker that is used for integration tests. This taskworker does not - actually execute tasks, it simply fetches tasks from the taskworker gRPC server - and updates their status depending on the test scenario. - """ - - def __init__(self, client: TaskWorkerClient, namespace: str | None = None) -> None: - self.client = client - self._namespace: str | None = namespace - - def fetch_task(self) -> TaskActivation | None: - try: - activation = self.client.get_task(self._namespace) - except grpc.RpcError: - print("get_task failed. Retrying in 1 second") - return None - - if not activation: - print("No task fetched") - return None - - return activation - - def process_task(self, activation: TaskActivation) -> TaskActivation | None: - return self.client.update_task( - task_id=activation.id, - status=TASK_ACTIVATION_STATUS_COMPLETE, - fetch_next_task=FetchNextTask(namespace=self._namespace), - ) - - class ConfigurableTaskWorker: """ A taskworker that can be configured to fail/timeout while processing tasks. """ - def __init__(self, client: TaskWorkerClient, namespace: str | None = None, failure_rate: float = 0.0, timeout_rate: float = 0.0) -> None: + def __init__(self, client: TaskWorkerClient, namespace: str | None = None, failure_rate: float = 0.0, timeout_rate: float = 0.0, enable_backoff: bool = False) -> None: self.client = client self._namespace: str | None = namespace self._failure_rate: float = failure_rate self._timeout_rate: float = timeout_rate + self._backoff_wait_time: float | None = 0.0 if enable_backoff else None def fetch_task(self) -> TaskActivation | None: try: activation = self.client.get_task(self._namespace) - except grpc.RpcError: - print("get_task failed. Retrying in 1 second") + except grpc.RpcError as err: + logging.error(f"get_task failed. Retrying in 1 second: {err}") time.sleep(1) return None if not activation: - print("No task fetched") + logging.debug("No task fetched") + if self._backoff_wait_time is not None: + logging.debug(f"Backing off for {self._backoff_wait_time} seconds") + time.sleep(self._backoff_wait_time) + self._backoff_wait_time = min(self._backoff_wait_time + 1, 10) return None + self._backoff_wait_time = 0.0 return activation def process_task(self, activation: TaskActivation) -> TaskActivation | None: + logging.debug(f"Processing task {activation.id}") if self._timeout_rate and random.random() < self._timeout_rate: return None # Pretend that the task was dropped - + if self._failure_rate and random.random() < self._failure_rate: update_status = TASK_ACTIVATION_STATUS_FAILURE else: