diff --git a/benchmarks/generator/sample_request.py b/benchmarks/generator/sample_request.py index 1a0d9ded..0e8013e6 100644 --- a/benchmarks/generator/sample_request.py +++ b/benchmarks/generator/sample_request.py @@ -4,6 +4,7 @@ import random import pandas as pd +import numpy as np from typing import Tuple, Optional, List from transformers import PreTrainedTokenizerBase @@ -110,16 +111,8 @@ def sample_requests_len_range( err_perc = initial_err_perc while err_perc <= 1: - input_range = range(0, sys.maxsize) - output_range = range(0, sys.maxsize) - if input_len is not None: - input_range = (int(input_len * (1 - err_perc)), int(input_len * (1 + err_perc))) - else: - input_range = (0, sys.maxsize) - if output_len is not None: - output_range = (int(output_len * (1 - err_perc)), int(output_len * (1 + err_perc))) - else: - output_range = (0, sys.maxsize) + input_range = (int(input_len * (1 - err_perc)), int(input_len * (1 + err_perc))) if input_len else (0, sys.maxsize) + output_range = (int(output_len * (1 - err_perc)), int(output_len * (1 + err_perc))) if output_len else (0, sys.maxsize) filtered = df[ (df["prompt_len"] >= input_range[0]) & (df["prompt_len"] <= input_range[1]) & @@ -139,13 +132,14 @@ def sample_requests_len_range( logging.debug(f"Relax err_perc {err_perc} by {err_step} new err_perc {err_perc + err_step} input_range {input_range} output_range {output_range}") err_perc += err_step - if err_perc >= 1: - logging.warn(f"No match found for request {i + 1} even after relaxing err_perc to {err_perc} fallback to random") - total_rows = len(df) - sample = df.iloc[random.randint(0, total_rows - 1)] - filtered_results.append({"prompt": sample["prompt"], - "prompt_length": sample["prompt_len"], - "output_length": sample["completion_len"]}) + if err_perc >= 1: + df["distance"] = np.sqrt((df["prompt_len"] - input_len) ** 2 + (df["completion_len"] - output_len) ** 2) + closest_sample = df.nsmallest(1, "distance").iloc[0] + closest_input, closest_output = closest_sample["prompt_len"], closest_sample["completion_len"] + filtered_results.append({"prompt": closest_sample["prompt"], + "prompt_length": closest_sample["prompt_len"], + "output_length": closest_sample["completion_len"]}) + logging.warn(f"No exact match found for request {i + 1}, target input/output lengths {input_len}/{output_len}, use closest QA pair input {closest_input} output {closest_output}.") return filtered_results diff --git a/benchmarks/generator/utils.py b/benchmarks/generator/utils.py index 7e4f9298..11de9237 100644 --- a/benchmarks/generator/utils.py +++ b/benchmarks/generator/utils.py @@ -184,7 +184,7 @@ def plot_workload(workload_name: str, # Convert workload data to a DataFrame data = [] for entry in workload: - timestamp_sec = entry["timestamp"] / 1000 # Convert ms to sec + timestamp_sec = int(entry["timestamp"] / 1000) # Convert ms to sec num_requests = len(entry["requests"]) total_prompt_tokens = np.mean([req["prompt_length"] for req in entry["requests"]]) if entry["requests"] else 0 total_output_tokens = np.mean([req["output_length"] for req in entry["requests"]]) if entry["requests"] else 0 @@ -200,13 +200,18 @@ def plot_workload(workload_name: str, df["time_bin"] = pd.cut(df["timestamp"], bins, labels=bins[:-1]) # Aggregate within each bin - binned_df = df.groupby("time_bin").sum() + # binned_df = df.groupby("time_bin").sum() + binned_df = df.groupby("time_bin").agg({ + "num_requests": "sum", + "total_prompt_tokens": "mean", + "total_output_tokens": "mean" + }) # Convert index back to numeric binned_df.index = binned_df.index.astype(float) - + print(binned_df) # Plotting - fig, (ax_qps, ax_input, ax_output) = plt.subplots(3, 1, figsize=(15, 12)) + fig, (ax_qps, ax_input, ax_output) = plt.subplots(3, 1, figsize=(10, 8)) ax_qps.plot(binned_df.index, binned_df["num_requests"], label="Total Requests") ax_input.plot(binned_df.index, binned_df["total_prompt_tokens"], label="Total Prompt Tokens") @@ -214,8 +219,8 @@ def plot_workload(workload_name: str, # Formatting plots for ax, ylabel, title in zip([ax_qps, ax_input, ax_output], - ["Requests per Second", "Prompt Token Count", "Output Token Count"], - ["Total Requests Sent per Second", "Total Prompt Tokens per Second", "Total Output Tokens per Second"]): + ["Requests per Minute", "Prompt Token Count", "Output Token Count"], + ["Total Requests Sent per Minute", "Total Prompt Tokens per Minute", "Total Output Tokens per Minute"]): ax.set_xlabel("Time (seconds)") ax.set_ylabel(ylabel) ax.set_title(title) diff --git a/benchmarks/generator/workload_generator.py b/benchmarks/generator/workload_generator.py index 6406c39f..a98f4de1 100644 --- a/benchmarks/generator/workload_generator.py +++ b/benchmarks/generator/workload_generator.py @@ -51,7 +51,7 @@ def generate_from_internal_csv(prompt_file_path: str, output_len_dist = [] rps_dist = [] for rps_config in rps_configs: - rps_segment = generate_poisson_dist(target = rps_config['mean_rps'], sample_size = rps_config['total_seconds'], generate_poisson_dist = 10) + rps_segment = generate_poisson_dist(target = rps_config['mean_rps'], sample_size = rps_config['total_seconds'], smooth_window_size = 10) rps_dist.extend(rps_segment) if internal_trace_type == "maas": for config in input_len_configs: @@ -146,7 +146,7 @@ def generate_constant(prompt_file_path: str, num_requests=qps, input_lens=[None] * qps, output_lens=[None] * qps, - initial_err_perc=0.5, + initial_err_perc=0.1, err_step=0.05 ) if concurrent_reqs: # Only add non-empty groups @@ -312,7 +312,7 @@ def generate_from_azure_csv(file_path: str, num_requests=len(input_lens), input_lens=input_lens, output_lens=output_lens, - initial_err_perc=0.5, + initial_err_perc=0.1, err_step=0.05 ) @@ -451,7 +451,7 @@ def pair_requests_with_prompts_round_robin(workload: List[List[Any]], interval_ms=args.interval_ms, output_file=f"{args.output_dir}/{args.trace_type}", to_jsonl=(args.output_format == "jsonl"), - ) + ) elif args.trace_type == "internal": generated_workload = generate_from_internal_csv(prompt_file_path=args.prompt_file, duration_ms=args.duration_ms, @@ -465,7 +465,7 @@ def pair_requests_with_prompts_round_robin(workload: List[List[Any]], input_scale=args.input_scale, output_scale=args.output_scale, internal_trace_type=args.internal_trace_type, - ) + ) elif args.trace_type == "azure": generated_workload = generate_from_azure_csv(file_path=args.traffic_file, @@ -485,5 +485,5 @@ def pair_requests_with_prompts_round_robin(workload: List[List[Any]], plot_workload( workload_name = workload_name, workload = workload, - bin_size_sec = int(args.interval_ms/1000), - output_dir = f"./plot") + bin_size_sec = int(args.interval_ms/1000) * 60, + output_dir = f"{args.output_dir}")