From dfaf1c196a6b97c5b6566a841067d8ccd63fe305 Mon Sep 17 00:00:00 2001 From: Nick Mahilani Date: Mon, 23 Oct 2023 17:01:09 -0700 Subject: [PATCH 1/2] add some kafka model constraint args --- .../models/org/netflix/kafka.py | 51 ++++++++++++++- tests/netflix/test_kafka.py | 65 +++++++++++++++++++ 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/service_capacity_modeling/models/org/netflix/kafka.py b/service_capacity_modeling/models/org/netflix/kafka.py index 5373db6..c792360 100644 --- a/service_capacity_modeling/models/org/netflix/kafka.py +++ b/service_capacity_modeling/models/org/netflix/kafka.py @@ -156,19 +156,31 @@ def _estimate_kafka_cluster_zonal( hot_retention_seconds, zones_per_region: int = 3, copies_per_region: int = 2, + require_local_disks: bool = False, + require_attached_disks: bool = False, + required_zone_size: Optional[int] = None, max_regional_size: int = 150, max_local_disk_gib: int = 1024 * 5, + min_instance_cpu: int = 2, min_instance_memory_gib: int = 12, ) -> Optional[CapacityPlan]: # Kafka doesn't like to deploy on single CPU instances - if instance.cpu < 2: + if instance.cpu < min_instance_cpu: return None # Kafka doesn't like to deploy to instances with < 7 GiB of ram if instance.ram_gib < min_instance_memory_gib: return None + # if we're not allowed to use attached disks, skip EBS only types + if instance.drive is None and require_local_disks: + return None + + # if we're not allowed to use local disks, skip ephems + if instance.drive is not None and require_attached_disks: + return None + # Kafka only deploys on gp3 drives right now if instance.drive is None and drive.name != "gp3": return None @@ -247,6 +259,11 @@ def _estimate_kafka_cluster_zonal( params = {"kafka.copies": copies_per_region} _upsert_params(cluster, params) + # Sometimes we don't want to modify cluster topology, so only allow + # topologies that match the desired zone size + if required_zone_size is not None and cluster.count != required_zone_size: + return None + # Kafka clusters generally should try to stay under some total number # of nodes. Orgs do this for all kinds of reasons such as # * Security group limits. Since you must have < 500 rules if you're @@ -301,6 +318,18 @@ class NflxKafkaArguments(BaseModel): "Typically consumers lag under 10s, but ensure up to 10M can be handled" ), ) + require_local_disks: bool = Field( + default=False, + description="If local (ephemeral) drives are required", + ) + require_attached_disks: bool = Field( + default=False, + description="If attached (ebs) drives are required", + ) + required_zonal_size: Optional[int] = Field( + default=None, + description="Require zonal clusters to be this size (force vertical scaling)", + ) max_regional_size: int = Field( default=150, description="What is the maximum size of a cluster in this region", @@ -312,6 +341,10 @@ class NflxKafkaArguments(BaseModel): "recovery duration on failure." ), ) + min_instance_cpu: int = Field( + default=2, + description="The minimum number of instance CPU to allow", + ) min_instance_memory_gib: int = Field( default=12, description="The minimum amount of instance memory to allow", @@ -344,12 +377,24 @@ def capacity_plan( max_local_disk_gib: int = extra_model_arguments.get( "max_local_disk_gib", 1024 * 5 ) + min_instance_cpu: int = extra_model_arguments.get( + "min_instance_cpu", 2 + ) min_instance_memory_gib: int = extra_model_arguments.get( "min_instance_memory_gib", 12 ) hot_retention_seconds: float = iso_to_seconds( extra_model_arguments.get("hot_retention", "PT10M") ) + require_local_disks: bool = extra_model_arguments.get( + "require_local_disks", False + ) + require_attached_disks: bool = extra_model_arguments.get( + "require_attached_disks", False + ) + required_zone_size: Optional[int] = extra_model_arguments.get( + "required_zone_size", None + ) return _estimate_kafka_cluster_zonal( instance=instance, @@ -357,8 +402,12 @@ def capacity_plan( desires=desires, zones_per_region=context.zones_in_region, copies_per_region=copies_per_region, + require_local_disks=require_local_disks, + require_attached_disks=require_attached_disks, + required_zone_size=required_zone_size, max_regional_size=max_regional_size, max_local_disk_gib=max_local_disk_gib, + min_instance_cpu=min_instance_cpu, min_instance_memory_gib=min_instance_memory_gib, hot_retention_seconds=hot_retention_seconds, ) diff --git a/tests/netflix/test_kafka.py b/tests/netflix/test_kafka.py index bf8593c..0eda790 100644 --- a/tests/netflix/test_kafka.py +++ b/tests/netflix/test_kafka.py @@ -208,3 +208,68 @@ def test_kafka_high_throughput_ebs(): assert clstr.attached_drives[0].name == "gp3" disk = clstr.attached_drives[0].size_gib * clstr.count assert expected_disk[0] < disk < expected_disk[1] * 2.5 + +def test_kafka_model_constraints(): + # 2.8 GiB / second + throughput = 2.8 * 1024 * 1024 * 1024 + desires = CapacityDesires( + service_tier=1, + query_pattern=QueryPattern( + # 2 consumers + estimated_read_per_second=Interval(low=1, mid=2, high=2, confidence=0.98), + # 1 producer + estimated_write_per_second=Interval(low=1, mid=1, high=1, confidence=0.98), + # Write throughput of 100 MiB/s + estimated_mean_write_size_bytes=Interval( + low=throughput, mid=throughput, high=throughput * 2, confidence=0.98 + ), + ), + ) + + required_zone_size = 10 + min_instance_cpu = 16 + # Force to attached drives + require_attached_disks = True + plan = planner.plan( + model_name="org.netflix.kafka", + region="us-east-1", + desires=desires, + extra_model_arguments={ + "cluster_type": "ha", + "retention": "PT3H", + "require_attached_disks": require_attached_disks, + "min_instance_cpu": min_instance_cpu, + "required_zone_size": required_zone_size + }, + num_results=3, + ) + expected_min_zonal_cpu = required_zone_size * min_instance_cpu + + for lr in plan.least_regret: + for z in range(3): + assert lr.requirements.zonal[z].cpu_cores.low >= expected_min_zonal_cpu + clstr = lr.candidate_clusters.zonal[z] + assert clstr.instance.drive is None + + + # force to local disks + plan = planner.plan( + model_name="org.netflix.kafka", + region="us-east-1", + desires=desires, + extra_model_arguments={ + "cluster_type": "ha", + "retention": "PT3H", + "require_local_disks": True, + "min_instance_cpu": min_instance_cpu, + "required_zone_size": required_zone_size + }, + num_results=3, + ) + expected_min_zonal_cpu = required_zone_size * min_instance_cpu + + for lr in plan.least_regret: + for z in range(3): + assert lr.requirements.zonal[z].cpu_cores.low >= expected_min_zonal_cpu + clstr = lr.candidate_clusters.zonal[z] + assert clstr.instance.drive is not None From 79fe4453d5aed02d41ec33e7f4fe8a13a45757b6 Mon Sep 17 00:00:00 2001 From: Nick Mahilani Date: Tue, 24 Oct 2023 11:00:59 -0700 Subject: [PATCH 2/2] fix test --- tests/netflix/test_kafka.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/netflix/test_kafka.py b/tests/netflix/test_kafka.py index 0eda790..ce8fee0 100644 --- a/tests/netflix/test_kafka.py +++ b/tests/netflix/test_kafka.py @@ -247,10 +247,9 @@ def test_kafka_model_constraints(): for lr in plan.least_regret: for z in range(3): - assert lr.requirements.zonal[z].cpu_cores.low >= expected_min_zonal_cpu clstr = lr.candidate_clusters.zonal[z] assert clstr.instance.drive is None - + assert (clstr.instance.cpu * clstr.count) >= expected_min_zonal_cpu # force to local disks plan = planner.plan( @@ -270,6 +269,6 @@ def test_kafka_model_constraints(): for lr in plan.least_regret: for z in range(3): - assert lr.requirements.zonal[z].cpu_cores.low >= expected_min_zonal_cpu clstr = lr.candidate_clusters.zonal[z] assert clstr.instance.drive is not None + assert (clstr.instance.cpu * clstr.count) >= expected_min_zonal_cpu