Skip to content

Commit

Permalink
test(kafka): Refactor integration tests (#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
john-z-yang authored Dec 24, 2024
1 parent 91f3c3b commit 8c591c8
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 105 deletions.
31 changes: 24 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,30 @@ jobs:
with:
key: ${{ github.job }}

- name: Run Cargo Tests
uses: actions-rs/cargo@844f36862e911db73fe0815f00a4a2602c279505 # pin@v1
- name: Run unit tests
run: |
make unit-test
rebalance-integration-test:
name: Rebalance integration test
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2

- name: Install protoc
uses: arduino/setup-protoc@v3

- uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # pin@v1
with:
command: test
args: --all
toolchain: stable
profile: minimal
override: true

- uses: swatinem/rust-cache@81d053bdb0871dcd3f10763c8cc60d0adc41762b # pin@v1
with:
key: ${{ github.job }}

- name: Run Python Integration Tests
- name: Run Rebalance Integration Test
run: |
export PYTEST_ADDOPTS=""
python -m pytest python/integration_tests -s -vv
make test-rebalance
28 changes: 18 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,44 @@ release: ## Build a release profile with all features
style: ## Run style checking tools (cargo-fmt)
@rustup component add rustfmt 2> /dev/null
cargo fmt --all --check
.PHONY: format
.PHONY: style

lint: ## Run linting tools (cargo-clippy)
@rustup component add clippy 2> /dev/null
cargo clippy --workspace --all-targets --all-features --no-deps -- -D warnings
.PHONY: format
.PHONY: lint

format: ## Run autofix mode for formatting and lint
@rustup component add clippy 2> /dev/null
@rustup component add rustfmt 2> /dev/null
cargo fmt --all
cargo clippy --workspace --all-targets --all-features --no-deps --fix --allow-dirty --allow-staged -- -D warnings
.PHONY: format

# Tests

unit-test:
unit-test: ## Run unit tests
cargo test
.PHONY: test
.PHONY: unit-test

install-py-dev:
install-py-dev: ## Install python dependencies
python -m venv python/.venv
. python/.venv/bin/activate
pip install -r python/requirements-dev.txt
.PHONY: install-py-dev

integration-test:
cargo build
python -m venv python/.venv
. python/.venv/bin/activate
python -m pytest python/integration_tests -s -vv
reset-kafka: install-py-dev ## Reset kafka
devservices down
-docker volume rm kafka_kafka-data
devservices up
.PHONY: reset-kafka

test-rebalance: build reset-kafka ## Run the rebalance integration tests
python -m pytest python/integration_tests/test_consumer_rebalancing.py -s
rm -r python/integration_tests/.tests_output/test_consumer_rebalancing
.PHONY: test-rebalance

integration-test test-rebalance: ## Run all integration tests
.PHONY: integration-test

# Help
Expand Down
91 changes: 27 additions & 64 deletions python/integration_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,69 +10,29 @@

TASKBROKER_ROOT = Path(__file__).parent.parent.parent
TASKBROKER_BIN = TASKBROKER_ROOT / "target/debug/taskbroker"
TESTS_OUTPUT_PATH = Path(__file__).parent / ".tests_output"


def check_topic_exists(topic_name: str) -> bool:
try:
check_topic_cmd = [
"docker",
"exec",
"kafka-kafka-1",
"kafka-topics",
"--bootstrap-server",
"localhost:9092",
"--list",
]
result = subprocess.run(check_topic_cmd, check=True, capture_output=True, text=True)
topics = result.stdout.strip().split("\n")

return topic_name in topics
except Exception as e:
raise Exception(f"Failed to check if topic exists: {e}")
TESTS_OUTPUT_ROOT = Path(__file__).parent / ".tests_output"


def create_topic(topic_name: str, num_partitions: int) -> None:
try:
create_topic_cmd = [
"docker",
"exec",
"kafka-kafka-1",
"kafka-topics",
"--bootstrap-server",
"localhost:9092",
"--create",
"--topic",
topic_name,
"--partitions",
str(num_partitions),
"--replication-factor",
"1",
]
subprocess.run(create_topic_cmd, check=True)
except Exception as e:
raise Exception(f"Failed to create topic: {e}")


def update_topic_partitions(topic_name: str, num_partitions: int) -> None:
try:
create_topic_cmd = [
"docker",
"exec",
"kafka-kafka-1",
"kafka-topics",
"--bootstrap-server",
"localhost:9092",
"--alter",
"--topic",
topic_name,
"--partitions",
str(num_partitions),
]
subprocess.run(create_topic_cmd, check=True)
except Exception:
# Command fails topic already has the correct number of partitions. Try to continue.
pass
print(f"Creating topic: {topic_name}, with {num_partitions} partitions")
create_topic_cmd = [
"docker",
"exec",
"kafka-kafka-1",
"kafka-topics",
"--bootstrap-server",
"localhost:9092",
"--create",
"--topic",
topic_name,
"--partitions",
str(num_partitions),
]
res = subprocess.run(create_topic_cmd, capture_output=True, text=True)
if res.returncode != 0:
print(f"Got return code: {res.returncode}, when creating topic")
print(f"Stdout: {res.stdout}")
print(f"Stderr: {res.stderr}")


def serialize_task_activation(args: list, kwargs: dict) -> bytes:
Expand All @@ -88,6 +48,7 @@ def serialize_task_activation(args: list, kwargs: dict) -> bytes:
taskname="integration_tests.say_hello",
parameters=orjson.dumps({"args": args, "kwargs": kwargs}),
retry_state=retry_state,
processing_deadline_duration=3000,
received_at=Timestamp(seconds=int(time.time())),
).SerializeToString()

Expand All @@ -96,10 +57,12 @@ def serialize_task_activation(args: list, kwargs: dict) -> bytes:

def send_messages_to_kafka(topic_name: str, num_messages: int) -> None:
try:
producer = Producer({
'bootstrap.servers': 'localhost:9092',
'broker.address.family': 'v4'
})
producer = Producer(
{
"bootstrap.servers": "127.0.0.1:9092",
"broker.address.family": "v4",
}
)

for _ in range(num_messages):
task_message = serialize_task_activation(["foobar"], {})
Expand Down
45 changes: 21 additions & 24 deletions python/integration_tests/test_consumer_rebalancing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
import threading
import time

from pathlib import Path
from threading import Thread

import yaml

from python.integration_tests.helpers import (
TASKBROKER_BIN,
TESTS_OUTPUT_PATH,
check_topic_exists,
TESTS_OUTPUT_ROOT,
create_topic,
update_topic_partitions,
send_messages_to_kafka,
)

TEST_OUTPUT_PATH = TESTS_OUTPUT_ROOT / "test_consumer_rebalancing"


def manage_consumer(
consumer_index: int,
Expand Down Expand Up @@ -58,7 +59,7 @@ def test_tasks_written_once_during_rebalancing() -> None:
num_messages = 100_000
num_restarts = 16
num_partitions = 32
min_restart_duration = 1
min_restart_duration = 4
max_restart_duration = 30
max_pending_count = 15_000
topic_name = "task-worker"
Expand All @@ -79,27 +80,18 @@ def test_tasks_written_once_during_rebalancing() -> None:
)
random.seed(42)

# Ensure topic has correct number of partitions
if not check_topic_exists(topic_name):
print(
f"{topic_name} topic does not exist, creating it with {num_partitions} partitions"
)
create_topic(topic_name, num_partitions)
else:
print(
f"{topic_name} topic already exists, making sure it has {num_partitions} partitions"
)
update_topic_partitions(topic_name, num_partitions)
# Ensure topic exists and has correct number of partitions
create_topic(topic_name, num_partitions)

# Create config files for consumers
print("\nCreating config files for consumers")
TESTS_OUTPUT_PATH.mkdir(exist_ok=True)
print("Creating config files for consumers")
TEST_OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
consumer_configs = {}
for i in range(num_consumers):
db_name = f"db_{i}_{curr_time}"
consumer_configs[f"config_{i}.yml"] = {
"db_name": db_name,
"db_path": str(TESTS_OUTPUT_PATH / f"{db_name}.sqlite"),
"db_path": str(TEST_OUTPUT_PATH / f"{db_name}.sqlite"),
"max_pending_count": max_pending_count,
"kafka_topic": topic_name,
"kafka_consumer_group": topic_name,
Expand All @@ -108,7 +100,7 @@ def test_tasks_written_once_during_rebalancing() -> None:
}

for filename, config in consumer_configs.items():
with open(str(TESTS_OUTPUT_PATH / filename), "w") as f:
with open(str(TEST_OUTPUT_PATH / filename), "w") as f:
yaml.safe_dump(config, f)

try:
Expand All @@ -120,11 +112,11 @@ def test_tasks_written_once_during_rebalancing() -> None:
args=(
i,
consumer_path,
str(TESTS_OUTPUT_PATH / f"config_{i}.yml"),
str(TEST_OUTPUT_PATH / f"config_{i}.yml"),
num_restarts,
min_restart_duration,
max_restart_duration,
str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"),
str(TEST_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"),
),
)
thread.start()
Expand Down Expand Up @@ -205,13 +197,18 @@ def test_tasks_written_once_during_rebalancing() -> None:

consumer_error_logs = []
for i in range(num_consumers):
with open(str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r") as f:
with open(str(TEST_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r") as f:
lines = f.readlines()
for log_line_index, line in enumerate(lines):
if "[31mERROR" in line:
# If there is an error in log file, capture 10 lines before and after the error line
consumer_error_logs.append(f"Error found in consumer_{i}. Logging 10 lines before and after the error line:")
for j in range(max(0, log_line_index - 10), min(len(lines) - 1, log_line_index + 10)):
consumer_error_logs.append(
f"Error found in consumer_{i}. Logging 10 lines before and after the error line:"
)
for j in range(
max(0, log_line_index - 10),
min(len(lines) - 1, log_line_index + 10),
):
consumer_error_logs.append(lines[j].strip())
consumer_error_logs.append("")

Expand Down

0 comments on commit 8c591c8

Please sign in to comment.