diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4a4bdc0..c1f9dde 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -60,13 +60,30 @@ jobs: with: key: ${{ github.job }} - - name: Run Cargo Tests - uses: actions-rs/cargo@844f36862e911db73fe0815f00a4a2602c279505 # pin@v1 + - name: Run unit tests + run: | + make unit-test + + rebalance-integration-test: + name: Rebalance integration test + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + + - name: Install protoc + uses: arduino/setup-protoc@v3 + + - uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # pin@v1 with: - command: test - args: --all + toolchain: stable + profile: minimal + override: true + + - uses: swatinem/rust-cache@81d053bdb0871dcd3f10763c8cc60d0adc41762b # pin@v1 + with: + key: ${{ github.job }} - - name: Run Python Integration Tests + - name: Run Rebalance Integration Test run: | - export PYTEST_ADDOPTS="" - python -m pytest python/integration_tests -s -vv + make test-rebalance diff --git a/Makefile b/Makefile index 8e56471..38a1808 100644 --- a/Makefile +++ b/Makefile @@ -23,36 +23,44 @@ release: ## Build a release profile with all features style: ## Run style checking tools (cargo-fmt) @rustup component add rustfmt 2> /dev/null cargo fmt --all --check -.PHONY: format +.PHONY: style lint: ## Run linting tools (cargo-clippy) @rustup component add clippy 2> /dev/null cargo clippy --workspace --all-targets --all-features --no-deps -- -D warnings -.PHONY: format +.PHONY: lint format: ## Run autofix mode for formatting and lint @rustup component add clippy 2> /dev/null @rustup component add rustfmt 2> /dev/null cargo fmt --all cargo clippy --workspace --all-targets --all-features --no-deps --fix --allow-dirty --allow-staged -- -D warnings +.PHONY: format # Tests -unit-test: +unit-test: ## Run unit tests cargo test -.PHONY: test +.PHONY: unit-test -install-py-dev: +install-py-dev: ## Install python dependencies python -m venv python/.venv . python/.venv/bin/activate pip install -r python/requirements-dev.txt .PHONY: install-py-dev -integration-test: - cargo build - python -m venv python/.venv - . python/.venv/bin/activate - python -m pytest python/integration_tests -s -vv +reset-kafka: install-py-dev ## Reset kafka + devservices down + -docker volume rm kafka_kafka-data + devservices up +.PHONY: reset-kafka + +test-rebalance: build reset-kafka ## Run the rebalance integration tests + python -m pytest python/integration_tests/test_consumer_rebalancing.py -s + rm -r python/integration_tests/.tests_output/test_consumer_rebalancing +.PHONY: test-rebalance + +integration-test test-rebalance: ## Run all integration tests .PHONY: integration-test # Help diff --git a/python/integration_tests/helpers.py b/python/integration_tests/helpers.py index 6712978..1e99be8 100644 --- a/python/integration_tests/helpers.py +++ b/python/integration_tests/helpers.py @@ -10,69 +10,29 @@ TASKBROKER_ROOT = Path(__file__).parent.parent.parent TASKBROKER_BIN = TASKBROKER_ROOT / "target/debug/taskbroker" -TESTS_OUTPUT_PATH = Path(__file__).parent / ".tests_output" - - -def check_topic_exists(topic_name: str) -> bool: - try: - check_topic_cmd = [ - "docker", - "exec", - "kafka-kafka-1", - "kafka-topics", - "--bootstrap-server", - "localhost:9092", - "--list", - ] - result = subprocess.run(check_topic_cmd, check=True, capture_output=True, text=True) - topics = result.stdout.strip().split("\n") - - return topic_name in topics - except Exception as e: - raise Exception(f"Failed to check if topic exists: {e}") +TESTS_OUTPUT_ROOT = Path(__file__).parent / ".tests_output" def create_topic(topic_name: str, num_partitions: int) -> None: - try: - create_topic_cmd = [ - "docker", - "exec", - "kafka-kafka-1", - "kafka-topics", - "--bootstrap-server", - "localhost:9092", - "--create", - "--topic", - topic_name, - "--partitions", - str(num_partitions), - "--replication-factor", - "1", - ] - subprocess.run(create_topic_cmd, check=True) - except Exception as e: - raise Exception(f"Failed to create topic: {e}") - - -def update_topic_partitions(topic_name: str, num_partitions: int) -> None: - try: - create_topic_cmd = [ - "docker", - "exec", - "kafka-kafka-1", - "kafka-topics", - "--bootstrap-server", - "localhost:9092", - "--alter", - "--topic", - topic_name, - "--partitions", - str(num_partitions), - ] - subprocess.run(create_topic_cmd, check=True) - except Exception: - # Command fails topic already has the correct number of partitions. Try to continue. - pass + print(f"Creating topic: {topic_name}, with {num_partitions} partitions") + create_topic_cmd = [ + "docker", + "exec", + "kafka-kafka-1", + "kafka-topics", + "--bootstrap-server", + "localhost:9092", + "--create", + "--topic", + topic_name, + "--partitions", + str(num_partitions), + ] + res = subprocess.run(create_topic_cmd, capture_output=True, text=True) + if res.returncode != 0: + print(f"Got return code: {res.returncode}, when creating topic") + print(f"Stdout: {res.stdout}") + print(f"Stderr: {res.stderr}") def serialize_task_activation(args: list, kwargs: dict) -> bytes: @@ -88,6 +48,7 @@ def serialize_task_activation(args: list, kwargs: dict) -> bytes: taskname="integration_tests.say_hello", parameters=orjson.dumps({"args": args, "kwargs": kwargs}), retry_state=retry_state, + processing_deadline_duration=3000, received_at=Timestamp(seconds=int(time.time())), ).SerializeToString() @@ -96,10 +57,12 @@ def serialize_task_activation(args: list, kwargs: dict) -> bytes: def send_messages_to_kafka(topic_name: str, num_messages: int) -> None: try: - producer = Producer({ - 'bootstrap.servers': 'localhost:9092', - 'broker.address.family': 'v4' - }) + producer = Producer( + { + "bootstrap.servers": "127.0.0.1:9092", + "broker.address.family": "v4", + } + ) for _ in range(num_messages): task_message = serialize_task_activation(["foobar"], {}) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index 0a7a95c..14d130f 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -6,19 +6,20 @@ import threading import time +from pathlib import Path from threading import Thread import yaml from python.integration_tests.helpers import ( TASKBROKER_BIN, - TESTS_OUTPUT_PATH, - check_topic_exists, + TESTS_OUTPUT_ROOT, create_topic, - update_topic_partitions, send_messages_to_kafka, ) +TEST_OUTPUT_PATH = TESTS_OUTPUT_ROOT / "test_consumer_rebalancing" + def manage_consumer( consumer_index: int, @@ -58,7 +59,7 @@ def test_tasks_written_once_during_rebalancing() -> None: num_messages = 100_000 num_restarts = 16 num_partitions = 32 - min_restart_duration = 1 + min_restart_duration = 4 max_restart_duration = 30 max_pending_count = 15_000 topic_name = "task-worker" @@ -79,27 +80,18 @@ def test_tasks_written_once_during_rebalancing() -> None: ) random.seed(42) - # Ensure topic has correct number of partitions - if not check_topic_exists(topic_name): - print( - f"{topic_name} topic does not exist, creating it with {num_partitions} partitions" - ) - create_topic(topic_name, num_partitions) - else: - print( - f"{topic_name} topic already exists, making sure it has {num_partitions} partitions" - ) - update_topic_partitions(topic_name, num_partitions) + # Ensure topic exists and has correct number of partitions + create_topic(topic_name, num_partitions) # Create config files for consumers - print("\nCreating config files for consumers") - TESTS_OUTPUT_PATH.mkdir(exist_ok=True) + print("Creating config files for consumers") + TEST_OUTPUT_PATH.mkdir(parents=True, exist_ok=True) consumer_configs = {} for i in range(num_consumers): db_name = f"db_{i}_{curr_time}" consumer_configs[f"config_{i}.yml"] = { "db_name": db_name, - "db_path": str(TESTS_OUTPUT_PATH / f"{db_name}.sqlite"), + "db_path": str(TEST_OUTPUT_PATH / f"{db_name}.sqlite"), "max_pending_count": max_pending_count, "kafka_topic": topic_name, "kafka_consumer_group": topic_name, @@ -108,7 +100,7 @@ def test_tasks_written_once_during_rebalancing() -> None: } for filename, config in consumer_configs.items(): - with open(str(TESTS_OUTPUT_PATH / filename), "w") as f: + with open(str(TEST_OUTPUT_PATH / filename), "w") as f: yaml.safe_dump(config, f) try: @@ -120,11 +112,11 @@ def test_tasks_written_once_during_rebalancing() -> None: args=( i, consumer_path, - str(TESTS_OUTPUT_PATH / f"config_{i}.yml"), + str(TEST_OUTPUT_PATH / f"config_{i}.yml"), num_restarts, min_restart_duration, max_restart_duration, - str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), + str(TEST_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), ), ) thread.start() @@ -205,13 +197,18 @@ def test_tasks_written_once_during_rebalancing() -> None: consumer_error_logs = [] for i in range(num_consumers): - with open(str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r") as f: + with open(str(TEST_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r") as f: lines = f.readlines() for log_line_index, line in enumerate(lines): if "[31mERROR" in line: # If there is an error in log file, capture 10 lines before and after the error line - consumer_error_logs.append(f"Error found in consumer_{i}. Logging 10 lines before and after the error line:") - for j in range(max(0, log_line_index - 10), min(len(lines) - 1, log_line_index + 10)): + consumer_error_logs.append( + f"Error found in consumer_{i}. Logging 10 lines before and after the error line:" + ) + for j in range( + max(0, log_line_index - 10), + min(len(lines) - 1, log_line_index + 10), + ): consumer_error_logs.append(lines[j].strip()) consumer_error_logs.append("")