Skip to content

Commit 296b048

Browse files
Arushi-07copybara-github
authored andcommitted
Cleanup cassandra stress benchmark by running cassandra stress on fixed thread count only
PiperOrigin-RevId: 889335056
1 parent 9087064 commit 296b048

File tree

1 file changed

+67
-225
lines changed

1 file changed

+67
-225
lines changed

perfkitbenchmarker/linux_benchmarks/cassandra_stress_benchmark.py

Lines changed: 67 additions & 225 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
NUM_KEYS_PER_CORE = 2000000
5656
# Adding wait between prefill and the test workload to give some time
5757
# for the data to propagate and for the cluster to stabilize.
58-
PROPAGATION_WAIT_TIME = 720
58+
PROPAGATION_WAIT_TIME = 300
5959
WAIT_BETWEEN_COMPACTION_TASKS_CHECK = 300
6060

6161
# cassandra-stress command
@@ -158,7 +158,7 @@
158158
)
159159
CASSANDRA_CPU_UTILIZATION_LIMIT = flags.DEFINE_integer(
160160
'cassandra_cpu_utilization_limit',
161-
80,
161+
None,
162162
'Maximum cpu utilization percentage for the benchmark. ',
163163
)
164164
CASSANDRA_STRESS_MAX_RUNS = flags.DEFINE_integer(
@@ -784,12 +784,19 @@ def RunCassandraStressTestOnClients(
784784
command,
785785
metadata,
786786
is_preload,
787+
CASSANDRA_STRESS_RUN_DURATION.value,
787788
],
788789
{},
789790
))
790791
if not is_preload:
791792
for vm in cassandra_vms:
792-
tasks.append((CPUUtilizationReporting, [vm], {}))
793+
tasks.append(
794+
(
795+
CPUUtilizationReporting,
796+
[vm, CASSANDRA_STRESS_RUN_DURATION.value],
797+
{},
798+
)
799+
)
793800
background_tasks.RunParallelThreads(tasks, max_concurrency=10)
794801
# uptime reports load average for the last 1, 5 and 15 minutes. It is
795802
# important to get the average the moment test is done to avoid errors.
@@ -806,6 +813,7 @@ def RunCassandraStressOnClient(
806813
command,
807814
metadata,
808815
is_preload,
816+
cassandra_stress_run_duration,
809817
):
810818
"""Run Cassandra-stress test on a client node."""
811819
spec = CassandraStressCommandSpec(
@@ -815,7 +823,7 @@ def RunCassandraStressOnClient(
815823
cassandra_vms=cassandra_vms,
816824
command=command,
817825
metadata=metadata,
818-
duration=CASSANDRA_STRESS_RUN_DURATION.value,
826+
duration=cassandra_stress_run_duration,
819827
consistency_level=FLAGS.cassandra_stress_consistency_level,
820828
retries=FLAGS.cassandra_stress_retries,
821829
mixed_ratio=FLAGS.cassandra_stress_mixed_ratio,
@@ -828,13 +836,13 @@ def RunCassandraStressOnClient(
828836
)
829837

830838

831-
def CPUUtilizationReporting(vm):
839+
def CPUUtilizationReporting(vm, duration):
832840
# command : sar -u <interval> <count>
833841
# we are collection the data every SAR_CPU_UTILIZATION_INTERVAL seconds for
834842
# the duration of the test
835843
vm.RobustRemoteCommand(
836844
f'sar -u {SAR_CPU_UTILIZATION_INTERVAL}'
837-
f' {CalculateNumberOfSarRequestsFromDuration(CASSANDRA_STRESS_RUN_DURATION.value, SAR_CPU_UTILIZATION_INTERVAL)}'
845+
f' {CalculateNumberOfSarRequestsFromDuration(duration, SAR_CPU_UTILIZATION_INTERVAL)}'
838846
f' > {GenerateCpuUtilizationFileName(vm)}'
839847
)
840848

@@ -855,7 +863,15 @@ def ParseAverageCpuUtilization(output) -> float:
855863

856864

857865
def CalculateNumberOfSarRequestsFromDuration(duration, freq):
858-
"""Calculates the number of sar requests to be sent from the duration of the test."""
866+
"""Calculates the number of sar requests.
867+
868+
Args:
869+
duration: The duration of the test as a string (e.g., '10m', '300s').
870+
freq: The frequency of sar requests in seconds.
871+
872+
Returns:
873+
The number of sar requests to send.
874+
"""
859875
if duration is None:
860876
# If duration is not set, we don't need to send sar requests.
861877
return 0
@@ -1006,136 +1022,47 @@ def Run(benchmark_spec):
10061022
metadata['cassandra_version'] = cassandra.GetCassandraVersion(
10071023
benchmark_spec.vm_groups[CASSANDRA_GROUP][0]
10081024
)
1009-
if CASSANDRA_STRESS_THREADS.value:
1010-
return RunCassandraStressOnFixedThreads(
1011-
client_vms,
1012-
cassandra_vms,
1013-
metadata,
1014-
CASSANDRA_STRESS_THREADS.value,
1015-
)
1016-
return RunTestNTimes(client_vms, cassandra_vms, metadata)
1017-
1018-
1019-
def RunTestNTimes(client_vms, cassandra_vms, metadata):
1020-
"""Run the cassandra stress test max_allowed_runs times.
1021-
1022-
Args:
1023-
client_vms: client vms.
1024-
cassandra_vms: cassandra server vms.
1025-
metadata: dict. Contains metadata for this benchmark.
1025+
threads = CASSANDRA_STRESS_THREADS.value or [1] + list(range(100, 1001, 100))
1026+
samples = RunCassandraStressOnFixedThreads(
1027+
client_vms,
1028+
cassandra_vms,
1029+
metadata,
1030+
threads)
1031+
samples.append(
1032+
GenerateMaxOpRateSample(samples)
1033+
)
1034+
return samples
10261035

1027-
Returns:
1028-
A list of sample.Sample objects.
10291036

1030-
Running cassandra stress test with different thread counts.
1031-
- We increase the thread count gradually by THREAD_INCREMENT_COUNT
1032-
till op rate increases.
1033-
- We decrease the thread count by THREAD_INCREMENT_COUNT/2
1034-
if the operation rate is lower than the previous run.
1035-
"""
1036-
samples = []
1037-
left_thread_count = STARTING_THREAD_COUNT
1038-
# TODO(arushigaur) Remove hardcoded max thread count and just do double and
1039-
# half of the current thread count.
1040-
right_thread_count = 500
1037+
def GenerateMaxOpRateSample(samples):
1038+
"""Parse the max op rate from the thread run data."""
10411039
max_op_rate = 0
10421040
max_op_rate_metadata = None
1043-
thread_data = {}
1044-
while (
1045-
left_thread_count < right_thread_count - 1
1046-
):
1047-
current_thread_count = int((left_thread_count + right_thread_count) / 2)
1048-
# running with both left_left_thread_count and current_thread_count to
1049-
# understand where is the current_thread_count on the curve.
1050-
for threads in [left_thread_count, current_thread_count]:
1051-
# adding nodetool status to check the ownership of each node.
1052-
cassandra.GetNodetoolStatus(cassandra_vms)
1053-
if threads in thread_data:
1054-
logging.info('thread count %s already tested', threads)
1055-
continue
1056-
current_metadata = copy.deepcopy(metadata)
1057-
current_metadata['num_cassandra_stress_threads'] = threads
1058-
logging.info('running thread count: %s', threads)
1059-
RunCassandraStressTestOnClients(
1060-
cassandra_vms,
1061-
client_vms,
1062-
FLAGS.cassandra_stress_command,
1063-
immutabledict.immutabledict(current_metadata),
1064-
is_preload=False,
1065-
)
1066-
current_samples = CollectResults(
1067-
client_vms, current_metadata
1068-
)
1069-
samples.extend(current_samples)
1070-
thread_data[threads] = {
1071-
'operation_rate': GetOperationRate(
1072-
current_samples
1073-
),
1074-
'median_latency': GetMedianLatency(
1075-
current_samples
1076-
),
1077-
'cpu_loads': GetCpuAverageLoad(cassandra_vms),
1078-
'cpu_utilization': GetCpuUtilization(
1079-
cassandra_vms
1080-
),
1081-
'metadata': current_metadata,
1082-
}
1083-
logging.info(
1084-
'details for thread count %s are %s',
1085-
threads,
1086-
thread_data[threads],
1087-
)
1088-
PerformanceReporting(
1089-
cassandra_vms, thread_data[threads], current_metadata
1090-
)
1091-
WaitForCompactionTasks(cassandra_vms)
1092-
left_thread_count, right_thread_count = (
1093-
GetNextThreadCount(
1094-
left_thread_count,
1095-
current_thread_count,
1096-
right_thread_count,
1097-
thread_data,
1098-
MAX_MEDIAN_LATENCY_MS,
1099-
CASSANDRA_CPU_UTILIZATION_LIMIT.value,
1100-
)
1101-
)
1102-
# TODO(arushigaur) Add a stopping condition that if op rate in the last x
1103-
# tests hasn't changed much then exit the while loop.
1104-
# If op rate at the current_thread_count meets latency and cpu utilization
1105-
# criteria, then record it as the max_op_rate.
1106-
if (
1107-
max_op_rate < thread_data[current_thread_count]['operation_rate']
1108-
and int(thread_data[current_thread_count]['median_latency'])
1109-
< MAX_MEDIAN_LATENCY_MS
1110-
and sum(thread_data[current_thread_count]['cpu_utilization'])
1111-
/ len(thread_data[current_thread_count]['cpu_utilization'])
1112-
< CASSANDRA_CPU_UTILIZATION_LIMIT.value * 1.01
1113-
):
1114-
max_op_rate = thread_data[current_thread_count]['operation_rate']
1115-
max_op_rate_metadata = thread_data[current_thread_count]['metadata']
1116-
samples.append(
1117-
sample.Sample(
1118-
'max_op_rate',
1119-
max_op_rate,
1120-
'operations per second',
1121-
max_op_rate_metadata,
1122-
)
1041+
thread_run_data = next(
1042+
(
1043+
s.metadata['thread_data']
1044+
for s in samples
1045+
if s.metric == 'thread_run_data'
1046+
),
1047+
{},
11231048
)
1124-
thread_run_data_metadata = copy.deepcopy(max_op_rate_metadata or {})
1125-
thread_run_data_metadata['thread_data'] = thread_data
1126-
samples.append(
1127-
sample.Sample(
1128-
'thread_run_data',
1129-
-1,
1130-
'',
1131-
thread_run_data_metadata,
1132-
)
1049+
for _, thread_data in thread_run_data.items():
1050+
if max_op_rate < thread_data['operation_rate']:
1051+
max_op_rate = thread_data['operation_rate']
1052+
max_op_rate_metadata = thread_data['metadata']
1053+
return sample.Sample(
1054+
'max_op_rate',
1055+
max_op_rate,
1056+
'operations per second',
1057+
max_op_rate_metadata,
11331058
)
1134-
return samples
11351059

11361060

11371061
def RunCassandraStressOnFixedThreads(
1138-
client_vms, cassandra_vms, metadata, client_threads_array
1062+
client_vms,
1063+
cassandra_vms,
1064+
metadata,
1065+
client_threads_array,
11391066
):
11401067
"""Run the cassandra stress test max_allowed_runs times.
11411068
@@ -1165,20 +1092,18 @@ def RunCassandraStressOnFixedThreads(
11651092
immutabledict.immutabledict(current_metadata),
11661093
is_preload=False,
11671094
)
1168-
current_samples = CollectResults(
1169-
client_vms, current_metadata
1170-
)
1095+
current_samples = CollectResults(client_vms, current_metadata)
11711096
samples.extend(current_samples)
1097+
cpu_utilization = GetCpuUtilization(
1098+
cassandra_vms, CASSANDRA_STRESS_RUN_DURATION.value
1099+
)
11721100
thread_data[threads] = {
1173-
'operation_rate': GetOperationRate(
1174-
current_samples
1175-
),
1176-
'median_latency': GetMedianLatency(
1177-
current_samples
1178-
),
1101+
'operation_rate': GetOperationRate(current_samples),
1102+
'median_latency': GetMedianLatency(current_samples),
11791103
'cpu_loads': GetCpuAverageLoad(cassandra_vms),
1180-
'cpu_utilization': GetCpuUtilization(
1181-
cassandra_vms
1104+
'cpu_utilization': cpu_utilization,
1105+
'cpu_utilization_avg': round(
1106+
sum(cpu_utilization) / len(cpu_utilization), 2
11821107
),
11831108
'metadata': current_metadata,
11841109
}
@@ -1204,82 +1129,6 @@ def RunCassandraStressOnFixedThreads(
12041129
return samples
12051130

12061131

1207-
def GetNextThreadCount(
1208-
left_thread_count,
1209-
current_thread_count,
1210-
right_thread_count,
1211-
thread_data,
1212-
max_median_latency,
1213-
max_cpu_utilization,
1214-
):
1215-
"""Gets the next thread count bounds based on the current metrics.
1216-
1217-
Args:
1218-
left_thread_count: The lower bound of the thread count for the binary
1219-
search.
1220-
current_thread_count: The current thread count used for the test.
1221-
right_thread_count: The upper bound of the thread count for the binary
1222-
search.
1223-
thread_data: A dict of thread data containing operation_rate,
1224-
median_latency and average_cpu_usage.
1225-
max_median_latency: The maximum acceptable median latency.
1226-
max_cpu_utilization: The maximum acceptable cpu utilization.
1227-
1228-
Returns:
1229-
A tuple of the new (left_thread_count, right_thread_count) for the next
1230-
iteration of the binary search.
1231-
1232-
We are trying to achieve a optimal thread count to maximize the operation
1233-
rate. The operation rate, latency and cpu utilization increase with threads
1234-
till op rate reaches a max and after that op rate drops while latency and cpu
1235-
keeps increasing.
1236-
"""
1237-
# taking average cpu utilization to consider uneven load across nodes.
1238-
average_cpu_usage = sum(
1239-
thread_data[current_thread_count]['cpu_utilization']
1240-
) / len(thread_data[current_thread_count]['cpu_utilization'])
1241-
median_latency = int(thread_data[current_thread_count]['median_latency'])
1242-
if (
1243-
median_latency
1244-
>= max_median_latency
1245-
or average_cpu_usage
1246-
>= max_cpu_utilization*1.01 # cpu utilization within 1% of the limit.
1247-
):
1248-
# decrease thread count
1249-
logging.info(
1250-
'latency is %s and cpu utilization is %s for %s threads, decreasing'
1251-
' threads',
1252-
median_latency,
1253-
average_cpu_usage,
1254-
current_thread_count,
1255-
)
1256-
right_thread_count = current_thread_count
1257-
elif (
1258-
thread_data[current_thread_count]['operation_rate']
1259-
< thread_data[left_thread_count]['operation_rate']
1260-
):
1261-
# move left on the curve because op rate dropped from left to right.
1262-
right_thread_count = current_thread_count
1263-
logging.info(
1264-
'op rate %s is %s and %s is %s, decreasing threads',
1265-
left_thread_count,
1266-
thread_data[left_thread_count]['operation_rate'],
1267-
current_thread_count,
1268-
thread_data[current_thread_count]['operation_rate'],
1269-
)
1270-
else:
1271-
# move right on the curve because op rate increased from left to right.
1272-
left_thread_count = current_thread_count
1273-
logging.info(
1274-
'op rate %s is %s and %s is %s, increasing threads',
1275-
left_thread_count,
1276-
thread_data[left_thread_count]['operation_rate'],
1277-
current_thread_count,
1278-
thread_data[current_thread_count]['operation_rate'],
1279-
)
1280-
return int(left_thread_count), int(right_thread_count)
1281-
1282-
12831132
@vm_util.Retry(
12841133
max_retries=20,
12851134
retryable_exceptions=(
@@ -1334,19 +1183,12 @@ def LogMemoryUsage(cassandra_vms):
13341183
vm.RemoteCommand('free -h')
13351184

13361185

1337-
def GetCpuUtilization(cassandra_vms):
1338-
"""Get cpu utilization during the test from sar output.
1339-
1340-
Args:
1341-
cassandra_vms: cassandra server vms.
1342-
1343-
Returns:
1344-
A list of cpu utilization for each cassandra node.
1345-
"""
1186+
def GetCpuUtilization(cassandra_vms, run_duration):
1187+
"""Get cpu utilization during the test from sar output."""
13461188
cpu_utilization = []
13471189
if (
13481190
CalculateNumberOfSarRequestsFromDuration(
1349-
CASSANDRA_STRESS_RUN_DURATION.value, SAR_CPU_UTILIZATION_INTERVAL
1191+
run_duration, SAR_CPU_UTILIZATION_INTERVAL
13501192
)
13511193
== 0
13521194
):

0 commit comments

Comments
 (0)