Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add GPU Optimizer deployment and update configurations #480

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions benchmarks/generator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ If no trace file path is specified, the generator will generate workload file ba

```
export SHARE_GPT_PATH=${PATH_TO_SHARE_GPT_FILE}
python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --num-requests 10000 --trace-type synthetic --output "output"
python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --interval-ms 1000 --duration-ms 600000 --trace-type synthetic --model "Qwen/Qwen2.5-Coder-7B-Instruct" --output "output"
```
Here ```--interval-ms``` specifies the granularity of concurently dispatched requests (in milliseconds). ```--duration-ms``` specifies the total length of the trace in milliseconds.

The file would be stored under ```output``` folder based on the name of different patterns. And the plot illustrates the workload pattern will be under the ```plot``` directory.


## Generate a workload file based on load summary .csv file
## Generate a workload file based on internal load summary .csv file
```
export SUMMARY_FILE=${PATH_TO_SUMMARY_FILE}
export SHARE_GPT_PATH=${PATH_TO_SHARE_GPT_FILE}
python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --num-requests 100000 --trace-type summary --trace-file "$SUMMARY_FILE" --output "output"
python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --interval-ms 1000 --duration-ms 3600000 --trace-type internal --trace-file "$SUMMARY_FILE" --model "Qwen/Qwen2.5-Coder-7B-Instruct" --output "output"
```

This generator assumes trace file to be in the following format
Expand All @@ -36,14 +37,14 @@ This generator generate workload file (in .json format) under ```output``` folde
And the plot illustrates the workload pattern will be under the ```plot``` directory.


## Generate a workload file based on load summary .csv file
## Generate a workload file based on Azure LLM Trace

To produce a workload based on [Azure LLM Trace](https://github.com/Azure/AzurePublicDataset/tree/master/data), use the following commands:

```
export AZURE_TRACE_NAME=${PATH_TO_AZURE_TRACE_NAME}
export SHARE_GPT_PATH=${PATH_TO_SHARE_GPT_FILE}
python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --num-requests 100000 --trace-type azure --trace-file "$AZURE_TRACE_NAME" --group-interval-seconds 1 --model "meta-llama/Llama-2-7b-hf" --output "output"
python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --interval-ms 1000 --duration-ms 3600000 --trace-type azure --trace-file "$AZURE_TRACE_NAME" --group-interval-seconds 1 --model "Qwen/Qwen2.5-Coder-7B-Instruct" --output "output"
```

Note that the trace file contains both input and output lengths. And therefore dataset in ```$SHARE_GPT_PATH``` needs to be tokenized to be able to sampled based on their input/output token lengths. Therefore it is required to specify tokenizer to generate based on this trace. Use ```--group-interval-seconds``` to specify grouping interval from the origianl trace. The file would be stored under ```output``` folder and the plot illustrates the workload pattern will be under the ```plot``` directory.
103 changes: 103 additions & 0 deletions benchmarks/generator/sample_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import logging
import json

import pandas as pd

from typing import Tuple, Optional, List
from transformers import PreTrainedTokenizerBase

def load_sharegpt_requests(
dataset_path: str,
tokenizer: PreTrainedTokenizerBase,
) -> pd.DataFrame:
# Load the dataset into a DataFrame
with open(dataset_path, encoding='utf-8') as f:
dataset = json.load(f)
dataset = [
(data["conversations"][0]["value"], data["conversations"][1]["value"])
for data in dataset if len(data["conversations"]) >= 2
]
df = pd.DataFrame(dataset, columns=["prompt", "completion"])
logging.warn(f"...Start dataframe transformation")
# Tokenize and calculate lengths
df["prompt_len"] = df["prompt"].apply(lambda x: len(tokenizer(x).input_ids))
df["completion_len"] = df["completion"].apply(lambda x: len(tokenizer(x).input_ids))
logging.warn(f"...Complete dataframe transformation")
return df

def sample_sharegpt_requests(
dataset_path: str,
num_requests: int,
tokenizer: Optional[PreTrainedTokenizerBase] = None,
fixed_output_len: Optional[int] = None,
) -> List[Tuple[str, int, int, None]]:
# Load the dataset
with open(dataset_path, encoding='utf-8') as f:
dataset = json.load(f)
dataset = [data for data in dataset if len(data["conversations"]) >= 2]
dataset = [(data["conversations"][0]["value"], data["conversations"][1]["value"]) for data in dataset]

filtered_dataset: List[Tuple[str, int, int]] = []
for i in range(len(dataset)):
if len(filtered_dataset) == num_requests:
break
prompt = dataset[i][0]
if tokenizer is not None:
prompt_token_ids = tokenizer(prompt).input_ids
completion = dataset[i][1]
completion_token_ids = tokenizer(completion).input_ids
prompt_len = len(prompt_token_ids)
output_len = len(completion_token_ids) if fixed_output_len is None else fixed_output_len
if prompt_len < 4 or (fixed_output_len is None and output_len < 4):
continue
if prompt_len > 1024 or prompt_len + output_len > 2048:
continue
filtered_dataset.append((prompt, prompt_len, output_len, None))
else:
filtered_dataset.append((prompt, -1, -1, None))

return filtered_dataset


def sample_sharegpt_requests_len_range(
df: pd.DataFrame,
num_requests: int,
input_lens: List[int],
output_lens: List[int],
initial_err_perc: Optional[float] = 0.5,
err_step: float = 0.05
) -> List[Tuple[str, int, int, None]]:
filtered_results = []

# Relaxation mechanism
for i in range(num_requests):
input_len = input_lens[i]
output_len = output_lens[i]
err_perc = initial_err_perc

while err_perc >= 0:
input_range = (int(input_len * err_perc), int(input_len * (1 + err_perc)))
output_range = (int(output_len * err_perc), int(output_len * (1 + err_perc)))

filtered = df[
(df["prompt_len"] >= input_range[0]) &
(df["prompt_len"] <= input_range[1]) &
(df["completion_len"] >= output_range[0]) &
(df["completion_len"] <= output_range[1])
]

if not filtered.empty:
# Select the first match or random sample
sample = filtered.iloc[0] # Or filtered.sample(1) for random
filtered_results.append((sample["prompt"], sample["prompt_len"], sample["completion_len"], None))
break # Stop relaxing for this request once a match is found

# Reduce err_perc for next iteration
logging.warn(f"Relax err_perc {err_perc} by {err_step}")
err_perc -= err_step

if err_perc < 0:
raise Exception(f"No match found for request {i + 1} even after relaxing err_perc to 0")

return filtered_results

140 changes: 34 additions & 106 deletions benchmarks/generator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
import os

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from typing import Tuple, Optional, List, Union
from transformers import (AutoTokenizer, PreTrainedTokenizer, PreTrainedTokenizerBase,
from typing import List, Union, Any, Optional
from transformers import (AutoTokenizer, PreTrainedTokenizer,
PreTrainedTokenizerFast)

def make_serializable(data):
Expand All @@ -25,124 +24,27 @@ def make_serializable(data):
else:
return data

def sample_sharegpt_requests(
dataset_path: str,
num_requests: int,
tokenizer: Optional[PreTrainedTokenizerBase] = None,
fixed_output_len: Optional[int] = None,
) -> List[Tuple[str, int, int, None]]:
# Load the dataset
with open(dataset_path, encoding='utf-8') as f:
dataset = json.load(f)
dataset = [data for data in dataset if len(data["conversations"]) >= 2]
dataset = [(data["conversations"][0]["value"], data["conversations"][1]["value"]) for data in dataset]

filtered_dataset: List[Tuple[str, int, int]] = []
for i in range(len(dataset)):
if len(filtered_dataset) == num_requests:
break
prompt = dataset[i][0]
if tokenizer is not None:
prompt_token_ids = tokenizer(prompt).input_ids
completion = dataset[i][1]
completion_token_ids = tokenizer(completion).input_ids
prompt_len = len(prompt_token_ids)
output_len = len(completion_token_ids) if fixed_output_len is None else fixed_output_len
if prompt_len < 4 or (fixed_output_len is None and output_len < 4):
continue
if prompt_len > 1024 or prompt_len + output_len > 2048:
continue
filtered_dataset.append((prompt, prompt_len, output_len, None))
else:
filtered_dataset.append((prompt, -1, -1, None))

return filtered_dataset

def load_sharegpt_requests(
dataset_path: str,
tokenizer: PreTrainedTokenizerBase,
) -> pd.DataFrame:
# Load the dataset into a DataFrame
with open(dataset_path, encoding='utf-8') as f:
dataset = json.load(f)
dataset = [
(data["conversations"][0]["value"], data["conversations"][1]["value"])
for data in dataset if len(data["conversations"]) >= 2
]
df = pd.DataFrame(dataset, columns=["prompt", "completion"])
logging.INFO(f"...Start dataframe transformation")
# Tokenize and calculate lengths
df["prompt_len"] = df["prompt"].apply(lambda x: len(tokenizer(x).input_ids))
df["completion_len"] = df["completion"].apply(lambda x: len(tokenizer(x).input_ids))
logging.INFO(f"...Complete dataframe transformation")
return df


def sample_sharegpt_requests_len_range(
df: pd.DataFrame,
num_requests: int,
input_lens: List[int],
output_lens: List[int],
initial_err_perc: Optional[float] = 0.5,
err_step: float = 0.05
) -> List[Tuple[str, int, int, None]]:
filtered_results = []

# Relaxation mechanism
for i in range(num_requests):
input_len = input_lens[i]
output_len = output_lens[i]
err_perc = initial_err_perc

while err_perc >= 0:
input_range = (int(input_len * err_perc), int(input_len * (1 + err_perc)))
output_range = (int(output_len * err_perc), int(output_len * (1 + err_perc)))

filtered = df[
(df["prompt_len"] >= input_range[0]) &
(df["prompt_len"] <= input_range[1]) &
(df["completion_len"] >= output_range[0]) &
(df["completion_len"] <= output_range[1])
]

if not filtered.empty:
# Select the first match or random sample
sample = filtered.iloc[0] # Or filtered.sample(1) for random
filtered_results.append((sample["prompt"], sample["prompt_len"], sample["completion_len"], None))
break # Stop relaxing for this request once a match is found

# Reduce err_perc for next iteration
logging.warn(f"Relax err_perc {err_perc} by {err_step}")
err_perc -= err_step

if err_perc < 0:
raise Exception(f"No match found for request {i + 1} even after relaxing err_perc to 0")

logging.info(f"Successfully found {len(filtered_results)} requests")
return filtered_results


def get_tokenizer(
pretrained_model_name_or_path: str, trust_remote_code: bool
) -> Union[PreTrainedTokenizer, PreTrainedTokenizerFast]:
return AutoTokenizer.from_pretrained(pretrained_model_name_or_path,
trust_remote_code=trust_remote_code)

def plot_workload(workload_dict, interval_sec, output_file: str = None):
def plot_workload(workload_dict, interval_ms, output_file: str = None):
"""
Plots the concurrency (item length) of the generated workload.

Args:
workload_dict (dict): A dictionary where the keys are workload names (labels) and the values are lists of lists representing the workload.
interval_sec (int):
interval_ms (int): Interval in milliseconds.
"""
fig, ax = plt.subplots()
for workload_name, workload in workload_dict.items():
concurrency_values = [len(item) for item in workload]
ax.plot(np.arange(len(concurrency_values)) * interval_sec, concurrency_values, label=workload_name)
concurrency_values = [len(item) for (_, item) in workload]
ax.plot(np.arange(len(concurrency_values)) * interval_ms, concurrency_values, label=workload_name)

ax.set_ylim(0,)
plt.xlabel('Time (Sec)')
plt.xlabel('Time (ms)')
plt.ylabel('Concurrency')
plt.title('Workload Concurrency')
plt.legend()
Expand All @@ -151,4 +53,30 @@ def plot_workload(workload_dict, interval_sec, output_file: str = None):
else:
os.makedirs(os.path.dirname(output_file), exist_ok=True)
plt.savefig(output_file)
logging.info(f'Saved workload plot to {output_file}')
logging.warn(f'Saved workload plot to {output_file}')

def save_workload(load_struct: List[Any],
output_path: str,
use_jsonl: Optional[bool] = False):
if use_jsonl:
with open(output_path + ".jsonl", "w") as file:
for row in load_struct:
json_line = json.dumps(row) # Convert list to JSON string
file.write(json_line + "\n")
logging.warn(f'Saved workload file to {output_path + ".jsonl"}')
else:
with open(output_path + ".json", 'w') as file:
json.dump(load_struct, file, indent=4)
logging.warn(f'Saved workload file to {output_path + ".json"}')

def load_workload(load_struct: List[Any],
input_path: str,
use_jsonl: Optional[bool] = False) -> List[Any]:
load_struct = None
if use_jsonl:
with open(input_path, "r") as file:
load_struct = [json.loads(line) for line in file]
else:
with open(input_path, "r") as file:
load_struct = json.load(file)
return load_struct
Loading
Loading