Skip to content

Commit

Permalink
Merge pull request #78 from nickmahilani/extra-kafka-model-args
Browse files Browse the repository at this point in the history
add some kafka model constraint args
  • Loading branch information
abersnaze authored Oct 24, 2023
2 parents 8b6bef1 + 79fe445 commit fe726e7
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 1 deletion.
51 changes: 50 additions & 1 deletion service_capacity_modeling/models/org/netflix/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,31 @@ def _estimate_kafka_cluster_zonal(
hot_retention_seconds,
zones_per_region: int = 3,
copies_per_region: int = 2,
require_local_disks: bool = False,
require_attached_disks: bool = False,
required_zone_size: Optional[int] = None,
max_regional_size: int = 150,
max_local_disk_gib: int = 1024 * 5,
min_instance_cpu: int = 2,
min_instance_memory_gib: int = 12,
) -> Optional[CapacityPlan]:

# Kafka doesn't like to deploy on single CPU instances
if instance.cpu < 2:
if instance.cpu < min_instance_cpu:
return None

# Kafka doesn't like to deploy to instances with < 7 GiB of ram
if instance.ram_gib < min_instance_memory_gib:
return None

# if we're not allowed to use attached disks, skip EBS only types
if instance.drive is None and require_local_disks:
return None

# if we're not allowed to use local disks, skip ephems
if instance.drive is not None and require_attached_disks:
return None

# Kafka only deploys on gp3 drives right now
if instance.drive is None and drive.name != "gp3":
return None
Expand Down Expand Up @@ -247,6 +259,11 @@ def _estimate_kafka_cluster_zonal(
params = {"kafka.copies": copies_per_region}
_upsert_params(cluster, params)

# Sometimes we don't want to modify cluster topology, so only allow
# topologies that match the desired zone size
if required_zone_size is not None and cluster.count != required_zone_size:
return None

# Kafka clusters generally should try to stay under some total number
# of nodes. Orgs do this for all kinds of reasons such as
# * Security group limits. Since you must have < 500 rules if you're
Expand Down Expand Up @@ -301,6 +318,18 @@ class NflxKafkaArguments(BaseModel):
"Typically consumers lag under 10s, but ensure up to 10M can be handled"
),
)
require_local_disks: bool = Field(
default=False,
description="If local (ephemeral) drives are required",
)
require_attached_disks: bool = Field(
default=False,
description="If attached (ebs) drives are required",
)
required_zonal_size: Optional[int] = Field(
default=None,
description="Require zonal clusters to be this size (force vertical scaling)",
)
max_regional_size: int = Field(
default=150,
description="What is the maximum size of a cluster in this region",
Expand All @@ -312,6 +341,10 @@ class NflxKafkaArguments(BaseModel):
"recovery duration on failure."
),
)
min_instance_cpu: int = Field(
default=2,
description="The minimum number of instance CPU to allow",
)
min_instance_memory_gib: int = Field(
default=12,
description="The minimum amount of instance memory to allow",
Expand Down Expand Up @@ -344,21 +377,37 @@ def capacity_plan(
max_local_disk_gib: int = extra_model_arguments.get(
"max_local_disk_gib", 1024 * 5
)
min_instance_cpu: int = extra_model_arguments.get(
"min_instance_cpu", 2
)
min_instance_memory_gib: int = extra_model_arguments.get(
"min_instance_memory_gib", 12
)
hot_retention_seconds: float = iso_to_seconds(
extra_model_arguments.get("hot_retention", "PT10M")
)
require_local_disks: bool = extra_model_arguments.get(
"require_local_disks", False
)
require_attached_disks: bool = extra_model_arguments.get(
"require_attached_disks", False
)
required_zone_size: Optional[int] = extra_model_arguments.get(
"required_zone_size", None
)

return _estimate_kafka_cluster_zonal(
instance=instance,
drive=drive,
desires=desires,
zones_per_region=context.zones_in_region,
copies_per_region=copies_per_region,
require_local_disks=require_local_disks,
require_attached_disks=require_attached_disks,
required_zone_size=required_zone_size,
max_regional_size=max_regional_size,
max_local_disk_gib=max_local_disk_gib,
min_instance_cpu=min_instance_cpu,
min_instance_memory_gib=min_instance_memory_gib,
hot_retention_seconds=hot_retention_seconds,
)
Expand Down
64 changes: 64 additions & 0 deletions tests/netflix/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,67 @@ def test_kafka_high_throughput_ebs():
assert clstr.attached_drives[0].name == "gp3"
disk = clstr.attached_drives[0].size_gib * clstr.count
assert expected_disk[0] < disk < expected_disk[1] * 2.5

def test_kafka_model_constraints():
# 2.8 GiB / second
throughput = 2.8 * 1024 * 1024 * 1024
desires = CapacityDesires(
service_tier=1,
query_pattern=QueryPattern(
# 2 consumers
estimated_read_per_second=Interval(low=1, mid=2, high=2, confidence=0.98),
# 1 producer
estimated_write_per_second=Interval(low=1, mid=1, high=1, confidence=0.98),
# Write throughput of 100 MiB/s
estimated_mean_write_size_bytes=Interval(
low=throughput, mid=throughput, high=throughput * 2, confidence=0.98
),
),
)

required_zone_size = 10
min_instance_cpu = 16
# Force to attached drives
require_attached_disks = True
plan = planner.plan(
model_name="org.netflix.kafka",
region="us-east-1",
desires=desires,
extra_model_arguments={
"cluster_type": "ha",
"retention": "PT3H",
"require_attached_disks": require_attached_disks,
"min_instance_cpu": min_instance_cpu,
"required_zone_size": required_zone_size
},
num_results=3,
)
expected_min_zonal_cpu = required_zone_size * min_instance_cpu

for lr in plan.least_regret:
for z in range(3):
clstr = lr.candidate_clusters.zonal[z]
assert clstr.instance.drive is None
assert (clstr.instance.cpu * clstr.count) >= expected_min_zonal_cpu

# force to local disks
plan = planner.plan(
model_name="org.netflix.kafka",
region="us-east-1",
desires=desires,
extra_model_arguments={
"cluster_type": "ha",
"retention": "PT3H",
"require_local_disks": True,
"min_instance_cpu": min_instance_cpu,
"required_zone_size": required_zone_size
},
num_results=3,
)
expected_min_zonal_cpu = required_zone_size * min_instance_cpu

for lr in plan.least_regret:
for z in range(3):
clstr = lr.candidate_clusters.zonal[z]
assert clstr.instance.drive is not None
assert (clstr.instance.cpu * clstr.count) >= expected_min_zonal_cpu

0 comments on commit fe726e7

Please sign in to comment.