diff --git a/benchmarks/client/README.md b/benchmarks/client/README.md new file mode 100644 index 00000000..d7b73097 --- /dev/null +++ b/benchmarks/client/README.md @@ -0,0 +1,35 @@ +## Test client locally + +Starting vllm server: + + +```shell +export API_KEY=${API_KEY} +python3 -m vllm.entrypoints.openai.api_server --host 0.0.0.0 \ +--port "8000" \ +--model /root/models/deepseek-llm-7b-chat \ +--trust-remote-code \ +--max-model-len "4096" \ +--api-key ${API_KEY} \ +--enable-chunked-prefill +``` + +Using a sample workload (generated by [the workload generator](../generator/README.md)) in a client. Turn on `--streaming` to collect fine grained metrics such as `TTFT` and `TPOT`: + +```shell +export API_KEY=${API_KEY} +python3 client.py \ +--workload-path "../generator/output/constant.jsonl" \ +--endpoint "http://localhost:8000" \ +--model /root/models/deepseek-llm-7b-chat \ +--api-key ${API_KEY} \ +--streaming \ +--output-file-path output.jsonl +``` +The output will be stored as a `.jsonl` file in `output.jsonl` + +Run analysis on metrics collected. For streaming client, we can specify a goodput target (e2e/tpot/ttft) like the following: + +```shell +python analyze.py --trace output.jsonl --output output --goodput-target tpot:0.5 +``` diff --git a/benchmarks/client/analyze.py b/benchmarks/client/analyze.py new file mode 100644 index 00000000..8c949a9c --- /dev/null +++ b/benchmarks/client/analyze.py @@ -0,0 +1,133 @@ +import json +import argparse +import os +import re +import matplotlib.pyplot as plt +import pandas as pd +import numpy as np + +def parse_goodput_target(goodput_target): + pattern = r'^(e2e|tpot|ttft):(-?\d+(\.\d+)?)$' + match = re.match(pattern, goodput_target) + + if match: + metric = match.group(1) + threshold = float(match.group(2)) # Convert to float + else: + raise ValueError(f"Invalid goodput spec: {goodput_target}") + return metric, threshold + +def main(args): + input_file = args.trace + output_path = args.output + data = [] + with open(input_file, "r") as f: + for line in f: + data.append(json.loads(line)) + # Extract metrics + timestamps = [item.get("start_time", f"Entry {i}") for i, item in enumerate(data)] + prompt_tokens = [item["prompt_tokens"] for item in data] + output_tokens = [item["output_tokens"] for item in data] + total_tokens = [item["total_tokens"] for item in data] + latencies = [item["latency"] for item in data] + throughputs = [item["throughput"] for item in data] + tokens_per_second = [item["total_tokens"] / item["latency"] for item in data] + ttft = [item["ttft"] if "ttft" in item else 0.0 for item in data] # Time to First Token + tpot = [item["tpot"] if "tpot" in item else 0.0 for item in data] # Time per Output Token + + goodput = None + if args.goodput_target is not None: + metric, threshold = parse_goodput_target(args.goodput_target) + if metric == "e2e": + goodput = len([item for item in latencies if item <= threshold]) / float(len(latencies)) + elif metric == "ttft": + goodput = len([item for item in ttft if item <= threshold]) / float(len(ttft)) + elif metric == "tpot": + goodput = len([item for item in tpot if item <= threshold]) / float(len(tpot)) + else: + raise ValueError(f"Invalid goodput target: {args.goodput_target}") + + # Sort data by start_time + sorted_indices = np.argsort(timestamps) + timestamps = [timestamps[i] for i in sorted_indices] + prompt_tokens = [prompt_tokens[i] for i in sorted_indices] + output_tokens = [output_tokens[i] for i in sorted_indices] + total_tokens = [total_tokens[i] for i in sorted_indices] + latencies = [latencies[i] for i in sorted_indices] + throughputs = [throughputs[i] for i in sorted_indices] + tokens_per_second = [tokens_per_second[i] for i in sorted_indices] + ttft = [ttft[i] for i in sorted_indices] + tpot = [tpot[i] for i in sorted_indices] + + # Convert timestamps to pandas datetime (if timestamps are actual time values) + try: + timestamps = pd.to_datetime(timestamps, unit='s') + except Exception: + timestamps = pd.Series(timestamps) + + # Helper function to calculate statistics + def calculate_statistics(values): + values = sorted(values) + avg = sum(values) / len(values) + median = np.median(values) + percentile_99 = np.percentile(values, 99) + return avg, median, percentile_99 + + # Calculate statistics for each metric + stats = { + "End-to-End Latency (s)": calculate_statistics(latencies), + "Throughput": calculate_statistics(throughputs), + "Tokens per Second": calculate_statistics(tokens_per_second), + "Prompt Tokens": calculate_statistics(prompt_tokens), + "Output Tokens": calculate_statistics(output_tokens), + "Total Tokens": calculate_statistics(total_tokens), + "Time to First Token (TTFT)": calculate_statistics(ttft), + "Time per Output Token (TPOT)": calculate_statistics(tpot), + } + + # Print statistics + for metric, (avg, median, p99) in stats.items(): + print(f"{metric} Statistics: Average = {avg:.4f}, Median = {median:.4f}, 99th Percentile = {p99:.4f}") + if goodput != None: + print(f"Goodput (reqs/s) {goodput:.4f}") + + # Create a DataFrame for plotting + df = pd.DataFrame({ + "Timestamp": timestamps, + "Prompt Tokens": prompt_tokens, + "Output Tokens": output_tokens, + "Total Tokens": total_tokens, + "End-to-End Latency (s)": latencies, + "Throughput": throughputs, + "Tokens per Second": tokens_per_second, + "Time to First Token (TTFT)": ttft, + "Time per Output Token (TPOT)": tpot, + }).set_index("Timestamp") + + # Plot each metric in a separate subplot + num_metrics = len(df.columns) + fig, axes = plt.subplots(num_metrics, 1, figsize=(12, 4 * num_metrics), sharex=True) + + for ax, (column, values) in zip(axes, df.items()): + ax.plot(df.index, values, marker='o', linestyle='-', label=column) + ax.set_ylabel(column) + ax.legend() + ax.grid() + + axes[-1].set_xlabel("Time") # Only set x-axis label for the last subplot + plt.suptitle("Time Series Analysis of LLM Performance Metrics") + plt.xticks(rotation=45) + plt.tight_layout(rect=[0, 0, 1, 0.96]) # Adjust layout to fit the title + os.makedirs(output_path, exist_ok=True) + plt.savefig(f"{output_path}/performance_metrics_time_series.pdf") + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='extract and plot performance metrics from a JSONL file') + parser.add_argument('--trace', type=str, required=True, help='Input trace containing collected metrics.') + parser.add_argument('--output', type=str, required=True, default="output", help='Output path.') + parser.add_argument('--goodput-target', type=str, required=False, default=None, help='Goodput target should be in the format of latency_metrics:threshold_in_seconds, choose latency metrics from one of the e2e, ttft, tpot.') + + args = parser.parse_args() + main(args) + \ No newline at end of file diff --git a/benchmarks/client/client.py b/benchmarks/client/client.py new file mode 100755 index 00000000..1f57a7ae --- /dev/null +++ b/benchmarks/client/client.py @@ -0,0 +1,222 @@ +import argparse +import logging +import time +import asyncio +import openai +import json +import io +import traceback + + +from typing import List +from utils import (load_workload, wrap_prompt_as_chat_message) + +logging.basicConfig(level=logging.INFO) + +async def send_request_streaming(client: openai.AsyncOpenAI, + model: str, + endpoint: str, + prompt: str, + output_file: str, + ): + start_time = asyncio.get_event_loop().time() + first_response_time = None + try: + stream = await client.chat.completions.create( + model=model, + messages=prompt, + temperature=0, + max_tokens=2048, + stream=True, + stream_options={"include_usage": True}, + ) + text_chunks = [] + prompt_tokens = 0 + output_tokens = 0 + total_tokens = 0 + + async for chunk in stream: + if chunk.choices: + if chunk.choices[0].delta.content is not None: + if not first_response_time: + first_response_time = asyncio.get_event_loop().time() + output_text = chunk.choices[0].delta.content + text_chunks.append(output_text) + prompt_tokens = chunk.usage.prompt_tokens + output_tokens = chunk.usage.completion_tokens + total_tokens = chunk.usage.total_tokens + response = "".join(text_chunks) + response_time = asyncio.get_event_loop().time() + latency = response_time - start_time + throughput = output_tokens / latency + ttft = first_response_time - start_time + tpot = (response_time - first_response_time) / output_tokens + result = { + "input": prompt, + "output": response, + "prompt_tokens": prompt_tokens, + "output_tokens": output_tokens, + "total_tokens": total_tokens, + "latency": latency, + "throughput": throughput, + "start_time": start_time, + "current_time": asyncio.get_event_loop().time(), + "ttft": ttft, + "tpot": tpot, + } + logging.info(result) + # Write result to JSONL file + output_file.write(json.dumps(result) + "\n") + output_file.flush() # Ensure data is written immediately to the file + return result + except Exception as e: + logging.error(f"Error sending request to at {endpoint}: {str(e)}") + traceback.print_exc() + return None + +async def benchmark_streaming(client: openai.AsyncOpenAI, + endpoint: str, + model: str, + load_struct: List, + output_file: io.TextIOWrapper): + + batch_tasks = [] + base_time = time.time() + num_requests = 0 + for requests_dict in load_struct: + ts = int(requests_dict["timestamp"]) + requests = requests_dict["requests"] + cur_time = time.time() + target_time = base_time + ts / 1000.0 + logging.warning(f"Prepare to launch {len(requests)} streaming tasks after {target_time - cur_time}") + if target_time > cur_time: + await asyncio.sleep(target_time - cur_time) + formatted_prompts = [wrap_prompt_as_chat_message(request["prompt"]) for request in requests] + for formatted_prompt in formatted_prompts: + task = asyncio.create_task( + send_request_streaming(client = client, + model = model, + endpoint = endpoint, + prompt = formatted_prompt, + output_file = output_file) + ) + batch_tasks.append(task) + num_requests += len(requests) + await asyncio.gather(*batch_tasks) + logging.warning(f"All {num_requests} requests completed for deployment.") + +# Asynchronous request handler +async def send_request_batch(client, model, endpoint, prompt, output_file): + start_time = asyncio.get_event_loop().time() + try: + response = await client.chat.completions.create( + model=model, + messages=prompt, + temperature=0, + max_tokens=2048 + ) + + latency = asyncio.get_event_loop().time() - start_time + prompt_tokens = response.usage.prompt_tokens + output_tokens = response.usage.completion_tokens + total_tokens = response.usage.total_tokens + throughput = output_tokens / latency + output_text = response.choices[0].message.content + + result = { + "input": prompt, + "output": output_text, + "prompt_tokens": prompt_tokens, + "output_tokens": output_tokens, + "total_tokens": total_tokens, + "start_time": start_time, + "current_time": asyncio.get_event_loop().time(), + "latency": latency, + "throughput": throughput + } + logging.info(result) + # Write result to JSONL file + output_file.write(json.dumps(result) + "\n") + output_file.flush() # Ensure data is written immediately to the file + + return result + except Exception as e: + logging.error(f"Error sending request to at {endpoint}: {str(e)}") + return None + + +async def benchmark_batch(client: openai.AsyncOpenAI, + endpoint: str, + model: str, + load_struct: List, + output_file: io.TextIOWrapper): + batch_tasks = [] + base_time = time.time() + num_requests = 0 + for requests_dict in load_struct: + ts = int(requests_dict["timestamp"]) + requests = requests_dict["requests"] + cur_time = time.time() + target_time = base_time + ts / 1000.0 + logging.warning(f"Prepare to launch {len(requests)} batched tasks after {target_time - cur_time}") + if target_time > cur_time: + await asyncio.sleep(target_time - cur_time) + formatted_prompts = [wrap_prompt_as_chat_message(request["prompt"]) for request in requests] + for formatted_prompt in formatted_prompts: + task = asyncio.create_task( + send_request_batch(client, model, endpoint, formatted_prompt, output_file) + ) + batch_tasks.append(task) + num_requests += len(requests) + await asyncio.gather(*batch_tasks) + logging.warning(f"All {num_requests} requests completed for deployment.") + + +def main(args): + logging.info(f"Starting benchmark on endpoint {args.endpoint}") + with open(args.output_file_path, 'w', encoding='utf-8') as output_file: + load_struct = load_workload(args.workload_path) + client = openai.AsyncOpenAI( + api_key=args.api_key, + base_url=args.endpoint + "/v1", + ) + if args.routing_strategy is not None: + client.default_headers["routing-strategy"] = args.routing_strategy + if not args.streaming: + logging.info("Using batch client") + start_time = time.time() + asyncio.run(benchmark_batch( + client = client, + endpoint=args.endpoint, + model=args.model, + load_struct=load_struct, + output_file=output_file, + )) + end_time = time.time() + logging.info(f"Benchmark completed in {end_time - start_time:.2f} seconds") + else: + logging.info("Using streaming client") + start_time = time.time() + asyncio.run(benchmark_streaming( + client = client, + endpoint=args.endpoint, + model=args.model, + load_struct=load_struct, + output_file=output_file, + )) + end_time = time.time() + logging.info(f"Benchmark completed in {end_time - start_time:.2f} seconds") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Workload Generator') + parser.add_argument("--workload-path", type=str, default=None, help="File path to the workload file.") + parser.add_argument('--endpoint', type=str, required=True) + parser.add_argument("--model", type=str, required=True, help="Name of the model.") + parser.add_argument("--api-key", type=str, required=True, help="API key to the service. ") + parser.add_argument('--output-file-path', type=str, default="output.jsonl") + parser.add_argument("--streaming", action="store_true", help="Use streaming client.") + parser.add_argument("--routing-strategy", type=str, required=False, default=None, help="Routing strategy to use.") + + args = parser.parse_args() + main(args) diff --git a/benchmarks/client/utils.py b/benchmarks/client/utils.py new file mode 100644 index 00000000..dfa0a18f --- /dev/null +++ b/benchmarks/client/utils.py @@ -0,0 +1,23 @@ +import json +from typing import List, Any + +def load_workload(input_path: str) -> List[Any]: + load_struct = None + if input_path.endswith(".jsonl"): + with open(input_path, "r") as file: + load_struct = [json.loads(line) for line in file] + else: + with open(input_path, "r") as file: + load_struct = json.load(file) + return load_struct + +# Function to wrap the prompt into OpenAI's chat completion message format. +def wrap_prompt_as_chat_message(prompt: str): + """ + Wrap the prompt into OpenAI's chat completion message format. + + :param prompt: The user prompt to be converted. + :return: A list containing chat completion messages. + """ + user_message = {"role": "user", "content": prompt} + return [user_message] \ No newline at end of file diff --git a/benchmarks/generator/README.md b/benchmarks/generator/README.md index c7fdba15..572b87e8 100644 --- a/benchmarks/generator/README.md +++ b/benchmarks/generator/README.md @@ -14,7 +14,7 @@ export SHAREGPT_FILE_PATH=/tmp/ShareGPT_V3_unfiltered_cleaned_split.json ```shell export TARGET_QPS=1 -python workload_generator.py --prompt-file $SHAREGPT_FILE_PATH --interval-ms 1000 --duration-ms 300000 --target-qps $ta --trace-type constant --model "Qwen/Qwen2.5-Coder-7B-Instruct" --output-dir "output" --output-format jsonl +python workload_generator.py --prompt-file $SHAREGPT_FILE_PATH --interval-ms 1000 --duration-ms 300000 --target-qps $TARGET_QPS --trace-type constant --model "Qwen/Qwen2.5-Coder-7B-Instruct" --output-dir "output" --output-format jsonl ``` ### Generate a workload file based on workload patterns (synthetic patterns) @@ -115,29 +115,5 @@ python workload_generator.py --prompt-file $SHAREGPT_FILE_PATH --num-prompts 100 Note that the trace file contains both input and output lengths. And therefore dataset in `$SHAREGPT_FILE_PATH` needs to be tokenized to be able to sampled based on their input/output token lengths. Therefore it is required to specify tokenizer to generate based on this trace. Use `--group-interval-seconds` to specify grouping interval from the original trace. The file would be stored under `output` folder and the plot illustrates the workload pattern will be under the `plot` directory. -## Run Workload Generator -Starting vllm server: - -```shell -python3 -m vllm.entrypoints.openai.api_server --host 0.0.0.0 \ ---port "8000" \ ---model /root/models/deepseek-coder-6.7b-instruct \ ---trust-remote-code \ ---max-model-len "14304" \ ---api-key sk-kFJ12nKsFVfVmGpj3QzX65s4RbN2xJqWzPYCjYu7wT3BlbLi \ ---enable-chunked-prefill -``` - -Using a sample workload in a client: - -```shell -python3 client.py \ ---workload-path "output/quick_rising.jsonl" \ ---endpoint "http://localhost:8000" \ ---model /root/models/deepseek-coder-6.7b-instruct \ ---api-key sk-kFJ12nKsFVfVmGpj3QzX65s4RbN2xJqWzPYCjYu7wT3BlbLi \ ---output-file-path output.jsonl -``` - -The output will be stored as a `.jsonl` file in `output.jsonl` +Use [client](../client/README.md) to test generated trace locally. \ No newline at end of file diff --git a/benchmarks/generator/client.py b/benchmarks/generator/client.py deleted file mode 100755 index a0298050..00000000 --- a/benchmarks/generator/client.py +++ /dev/null @@ -1,97 +0,0 @@ -import argparse -import logging -import time -import asyncio -import openai -import json - -from utils import (load_workload, wrap_prompt_as_chat_message) - - -# Asynchronous request handler -async def send_request(client, model, endpoint, prompt, output_file): - start_time = asyncio.get_event_loop().time() - try: - response = await client.chat.completions.create( - model=model, - messages=prompt, - temperature=0, - max_tokens=2048 - ) - - latency = asyncio.get_event_loop().time() - start_time - prompt_tokens = response.usage.prompt_tokens - output_tokens = response.usage.completion_tokens - total_tokens = response.usage.total_tokens - throughput = output_tokens / latency - output_text = response.choices[0].message.content - - result = { - "input": prompt, - "output": output_text, - "prompt_tokens": prompt_tokens, - "output_tokens": output_tokens, - "total_tokens": total_tokens, - "latency": latency, - "throughput": throughput - } - - # Write result to JSONL file - output_file.write(json.dumps(result) + "\n") - output_file.flush() # Ensure data is written immediately to the file - - logging.warning( - f"Request completed in {latency:.2f} seconds with throughput {throughput:.2f} tokens/s, request {prompt} response {response}") - return result - except Exception as e: - logging.error(f"Error sending request to at {endpoint}: {str(e)}") - return None - - -async def benchmark(endpoint, model, api_key, workload_path, output_file_path): - client = openai.AsyncOpenAI( - api_key=api_key, - base_url=endpoint + "/v1", - ) - with open(output_file_path, 'a', encoding='utf-8') as output_file: - load_struct = load_workload(workload_path) - batch_tasks = [] - base_time = time.time() - num_requests = 0 - for requests_dict in load_struct: - ts = int(requests_dict["timestamp"]) - requests = requests_dict["requests"] - cur_time = time.time() - target_time = base_time + ts / 1000.0 - logging.warning(f"Prepare to launch {len(requests)} tasks after {target_time - cur_time}") - if target_time > cur_time: - await asyncio.sleep(target_time - cur_time) - formatted_prompts = [wrap_prompt_as_chat_message(request["prompt"]) for request in requests] - for formatted_prompt in formatted_prompts: - task = asyncio.create_task( - send_request(client, model, endpoint, formatted_prompt, output_file) - ) - batch_tasks.append(task) - num_requests += len(requests) - await asyncio.gather(*batch_tasks) - logging.warning(f"All {num_requests} requests completed for deployment.") - - -def main(args): - logging.info(f"Starting benchmark on endpoint {args.endpoint}") - start_time = time.time() - asyncio.run(benchmark(args.endpoint, args.model, args.api_key, args.workload_path, args.output_file_path)) - end_time = time.time() - logging.info(f"Benchmark completed in {end_time - start_time:.2f} seconds") - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Workload Generator') - parser.add_argument("--workload-path", type=str, default=None, help="File path to the workload file.") - parser.add_argument('--endpoint', type=str, required=True) - parser.add_argument("--model", type=str, required=True, help="Name of the model.") - parser.add_argument("--api-key", type=str, required=True, help="API key to the service. ") - parser.add_argument('--output-file-path', type=str, default="output.jsonl") - - args = parser.parse_args() - main(args) diff --git a/benchmarks/generator/utils.py b/benchmarks/generator/utils.py index 2a1383b0..7e4f9298 100644 --- a/benchmarks/generator/utils.py +++ b/benchmarks/generator/utils.py @@ -249,29 +249,8 @@ def save_workload(load_struct: List[Any], json.dump(load_struct, file, indent=4) logging.warn(f'Saved workload file to {output_path + ".json"}') - -def load_workload(input_path: str) -> List[Any]: - load_struct = None - if input_path.endswith(".jsonl"): - with open(input_path, "r") as file: - load_struct = [json.loads(line) for line in file] - else: - with open(input_path, "r") as file: - load_struct = json.load(file) - return load_struct - def load_config(config_path: str) -> Dict[str, Any]: with open(config_path, "r") as file: config = json.load(file) return config -# Function to wrap the prompt into OpenAI's chat completion message format. -def wrap_prompt_as_chat_message(prompt: str): - """ - Wrap the prompt into OpenAI's chat completion message format. - - :param prompt: The user prompt to be converted. - :return: A list containing chat completion messages. - """ - user_message = {"role": "user", "content": prompt} - return [user_message]