Skip to content

Commit

Permalink
use closest request pair sizes; minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Le Xu committed Feb 28, 2025
1 parent c225afc commit 58e7465
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 30 deletions.
28 changes: 11 additions & 17 deletions benchmarks/generator/sample_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import random

import pandas as pd
import numpy as np

from typing import Tuple, Optional, List
from transformers import PreTrainedTokenizerBase
Expand Down Expand Up @@ -110,16 +111,8 @@ def sample_requests_len_range(
err_perc = initial_err_perc

while err_perc <= 1:
input_range = range(0, sys.maxsize)
output_range = range(0, sys.maxsize)
if input_len is not None:
input_range = (int(input_len * (1 - err_perc)), int(input_len * (1 + err_perc)))
else:
input_range = (0, sys.maxsize)
if output_len is not None:
output_range = (int(output_len * (1 - err_perc)), int(output_len * (1 + err_perc)))
else:
output_range = (0, sys.maxsize)
input_range = (int(input_len * (1 - err_perc)), int(input_len * (1 + err_perc))) if input_len else (0, sys.maxsize)
output_range = (int(output_len * (1 - err_perc)), int(output_len * (1 + err_perc))) if output_len else (0, sys.maxsize)
filtered = df[
(df["prompt_len"] >= input_range[0]) &
(df["prompt_len"] <= input_range[1]) &
Expand All @@ -139,13 +132,14 @@ def sample_requests_len_range(
logging.debug(f"Relax err_perc {err_perc} by {err_step} new err_perc {err_perc + err_step} input_range {input_range} output_range {output_range}")
err_perc += err_step

if err_perc >= 1:
logging.warn(f"No match found for request {i + 1} even after relaxing err_perc to {err_perc} fallback to random")
total_rows = len(df)
sample = df.iloc[random.randint(0, total_rows - 1)]
filtered_results.append({"prompt": sample["prompt"],
"prompt_length": sample["prompt_len"],
"output_length": sample["completion_len"]})
if err_perc >= 1:
df["distance"] = np.sqrt((df["prompt_len"] - input_len) ** 2 + (df["completion_len"] - output_len) ** 2)
closest_sample = df.nsmallest(1, "distance").iloc[0]
closest_input, closest_output = closest_sample["prompt_len"], closest_sample["completion_len"]
filtered_results.append({"prompt": closest_sample["prompt"],
"prompt_length": closest_sample["prompt_len"],
"output_length": closest_sample["completion_len"]})
logging.warn(f"No exact match found for request {i + 1}, target input/output lengths {input_len}/{output_len}, use closest QA pair input {closest_input} output {closest_output}.")

return filtered_results

Expand Down
17 changes: 11 additions & 6 deletions benchmarks/generator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def plot_workload(workload_name: str,
# Convert workload data to a DataFrame
data = []
for entry in workload:
timestamp_sec = entry["timestamp"] / 1000 # Convert ms to sec
timestamp_sec = int(entry["timestamp"] / 1000) # Convert ms to sec
num_requests = len(entry["requests"])
total_prompt_tokens = np.mean([req["prompt_length"] for req in entry["requests"]]) if entry["requests"] else 0
total_output_tokens = np.mean([req["output_length"] for req in entry["requests"]]) if entry["requests"] else 0
Expand All @@ -200,22 +200,27 @@ def plot_workload(workload_name: str,
df["time_bin"] = pd.cut(df["timestamp"], bins, labels=bins[:-1])

# Aggregate within each bin
binned_df = df.groupby("time_bin").sum()
# binned_df = df.groupby("time_bin").sum()
binned_df = df.groupby("time_bin").agg({
"num_requests": "sum",
"total_prompt_tokens": "mean",
"total_output_tokens": "mean"
})

# Convert index back to numeric
binned_df.index = binned_df.index.astype(float)

print(binned_df)
# Plotting
fig, (ax_qps, ax_input, ax_output) = plt.subplots(3, 1, figsize=(15, 12))
fig, (ax_qps, ax_input, ax_output) = plt.subplots(3, 1, figsize=(10, 8))

ax_qps.plot(binned_df.index, binned_df["num_requests"], label="Total Requests")
ax_input.plot(binned_df.index, binned_df["total_prompt_tokens"], label="Total Prompt Tokens")
ax_output.plot(binned_df.index, binned_df["total_output_tokens"], label="Total Output Tokens")

# Formatting plots
for ax, ylabel, title in zip([ax_qps, ax_input, ax_output],
["Requests per Second", "Prompt Token Count", "Output Token Count"],
["Total Requests Sent per Second", "Total Prompt Tokens per Second", "Total Output Tokens per Second"]):
["Requests per Minute", "Prompt Token Count", "Output Token Count"],
["Total Requests Sent per Minute", "Total Prompt Tokens per Minute", "Total Output Tokens per Minute"]):
ax.set_xlabel("Time (seconds)")
ax.set_ylabel(ylabel)
ax.set_title(title)
Expand Down
14 changes: 7 additions & 7 deletions benchmarks/generator/workload_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def generate_from_internal_csv(prompt_file_path: str,
output_len_dist = []
rps_dist = []
for rps_config in rps_configs:
rps_segment = generate_poisson_dist(target = rps_config['mean_rps'], sample_size = rps_config['total_seconds'], generate_poisson_dist = 10)
rps_segment = generate_poisson_dist(target = rps_config['mean_rps'], sample_size = rps_config['total_seconds'], smooth_window_size = 10)
rps_dist.extend(rps_segment)
if internal_trace_type == "maas":
for config in input_len_configs:
Expand Down Expand Up @@ -146,7 +146,7 @@ def generate_constant(prompt_file_path: str,
num_requests=qps,
input_lens=[None] * qps,
output_lens=[None] * qps,
initial_err_perc=0.5,
initial_err_perc=0.1,
err_step=0.05
)
if concurrent_reqs: # Only add non-empty groups
Expand Down Expand Up @@ -312,7 +312,7 @@ def generate_from_azure_csv(file_path: str,
num_requests=len(input_lens),
input_lens=input_lens,
output_lens=output_lens,
initial_err_perc=0.5,
initial_err_perc=0.1,
err_step=0.05
)

Expand Down Expand Up @@ -451,7 +451,7 @@ def pair_requests_with_prompts_round_robin(workload: List[List[Any]],
interval_ms=args.interval_ms,
output_file=f"{args.output_dir}/{args.trace_type}",
to_jsonl=(args.output_format == "jsonl"),
)
)
elif args.trace_type == "internal":
generated_workload = generate_from_internal_csv(prompt_file_path=args.prompt_file,
duration_ms=args.duration_ms,
Expand All @@ -465,7 +465,7 @@ def pair_requests_with_prompts_round_robin(workload: List[List[Any]],
input_scale=args.input_scale,
output_scale=args.output_scale,
internal_trace_type=args.internal_trace_type,
)
)

elif args.trace_type == "azure":
generated_workload = generate_from_azure_csv(file_path=args.traffic_file,
Expand All @@ -485,5 +485,5 @@ def pair_requests_with_prompts_round_robin(workload: List[List[Any]],
plot_workload(
workload_name = workload_name,
workload = workload,
bin_size_sec = int(args.interval_ms/1000),
output_dir = f"./plot")
bin_size_sec = int(args.interval_ms/1000) * 60,
output_dir = f"{args.output_dir}")

0 comments on commit 58e7465

Please sign in to comment.