From 90cd69073390d7612278b9f31b5fc85fb6a438e8 Mon Sep 17 00:00:00 2001 From: Ning Wang Date: Wed, 4 Dec 2024 08:30:41 -0800 Subject: [PATCH 1/4] Add GPU Optimizer deployment and update configurations --- config/default/kustomization.yaml | 1 + config/gpu-optimizer/deployment.yaml | 30 +++++++++++++++++++ config/gpu-optimizer/kustomization.yaml | 4 +++ config/gpu-optimizer/rbac.yaml | 29 ++++++++++++++++++ config/gpu-optimizer/service.yaml | 13 ++++++++ .../overlays/vke/default/kustomization.yaml | 1 + development/simulator/deployment-a100.yaml | 4 +-- development/simulator/deployment-a40.yaml | 4 +-- python/aibrix/aibrix/gpu_optimizer/README.md | 6 ++-- 9 files changed, 85 insertions(+), 7 deletions(-) create mode 100644 config/gpu-optimizer/deployment.yaml create mode 100644 config/gpu-optimizer/kustomization.yaml create mode 100644 config/gpu-optimizer/rbac.yaml create mode 100644 config/gpu-optimizer/service.yaml diff --git a/config/default/kustomization.yaml b/config/default/kustomization.yaml index f472003f..2402b1b1 100644 --- a/config/default/kustomization.yaml +++ b/config/default/kustomization.yaml @@ -23,6 +23,7 @@ resources: - ../rbac - ../manager - ../gateway +- ../gpu-optimizer - ../dependency/kuberay-operator # [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in # crd/kustomization.yaml diff --git a/config/gpu-optimizer/deployment.yaml b/config/gpu-optimizer/deployment.yaml new file mode 100644 index 00000000..668f39e3 --- /dev/null +++ b/config/gpu-optimizer/deployment.yaml @@ -0,0 +1,30 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: gpu-optimizer + namespace: aibrix-system +spec: + replicas: 1 + selector: + matchLabels: + app: gpu-optimizer + template: + metadata: + labels: + app: gpu-optimizer + spec: + serviceAccountName: gpu-optimizer + automountServiceAccountToken: true + containers: + - name: gpu-optimizer + image: aibrix/runtime:nightly + command: ["python", "-m", "aibrix.gpu_optimizer.app"] + ports: + - containerPort: 8080 + env: + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: REDIS_HOST + value: aibrix-redis-master.aibrix-system.svc.cluster.local \ No newline at end of file diff --git a/config/gpu-optimizer/kustomization.yaml b/config/gpu-optimizer/kustomization.yaml new file mode 100644 index 00000000..bb0c7530 --- /dev/null +++ b/config/gpu-optimizer/kustomization.yaml @@ -0,0 +1,4 @@ +resources: +- deployment.yaml +- service.yaml +- rbac.yaml \ No newline at end of file diff --git a/config/gpu-optimizer/rbac.yaml b/config/gpu-optimizer/rbac.yaml new file mode 100644 index 00000000..9c982ea0 --- /dev/null +++ b/config/gpu-optimizer/rbac.yaml @@ -0,0 +1,29 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: gpu-optimizer + namespace: aibrix-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + namespace: aibrix-system + name: gpu-optimizer +rules: + - apiGroups: ["apps"] + resources: ["deployments"] + verbs: ["get", "list", "watch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: gpu-optimizer + namespace: aibrix-system +subjects: + - kind: ServiceAccount + name: gpu-optimizer + namespace: aibrix-system +roleRef: + kind: Role + name: gpu-optimizer + apiGroup: rbac.authorization.k8s.io \ No newline at end of file diff --git a/config/gpu-optimizer/service.yaml b/config/gpu-optimizer/service.yaml new file mode 100644 index 00000000..6968aeed --- /dev/null +++ b/config/gpu-optimizer/service.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Service +metadata: + name: gpu-optimizer + namespace: aibrix-system +spec: + selector: + app: gpu-optimizer + ports: + - protocol: TCP + port: 8080 + targetPort: 8080 + type: ClusterIP \ No newline at end of file diff --git a/config/overlays/vke/default/kustomization.yaml b/config/overlays/vke/default/kustomization.yaml index 4598d51d..e75188a6 100644 --- a/config/overlays/vke/default/kustomization.yaml +++ b/config/overlays/vke/default/kustomization.yaml @@ -7,6 +7,7 @@ resources: - ../../../rbac - manager - gateway +- ../../../gpu-optimizer - ../../../dependency/kuberay-operator diff --git a/development/simulator/deployment-a100.yaml b/development/simulator/deployment-a100.yaml index 8436f5ef..78f4e5bc 100644 --- a/development/simulator/deployment-a100.yaml +++ b/development/simulator/deployment-a100.yaml @@ -150,11 +150,11 @@ spec: apiVersion: apps/v1 kind: Deployment name: simulator-llama2-7b-a100 - minReplicas: 0 + minReplicas: 1 maxReplicas: 10 targetMetric: "avg_prompt_throughput_toks_per_s" # Ignore if metricsSources is configured metricsSources: - - endpoint: gpu-optimizer.aibrix-system.svc.cluster.local:8080 + - endpoint: aibrix-gpu-optimizer.aibrix-system.svc.cluster.local:8080 path: /metrics/aibrix-system/simulator-llama2-7b-a100 metric: "vllm:deployment_replicas" targetValue: "1" diff --git a/development/simulator/deployment-a40.yaml b/development/simulator/deployment-a40.yaml index 7d172142..0eec6d4c 100644 --- a/development/simulator/deployment-a40.yaml +++ b/development/simulator/deployment-a40.yaml @@ -149,11 +149,11 @@ spec: apiVersion: apps/v1 kind: Deployment name: simulator-llama2-7b-a40 - minReplicas: 0 + minReplicas: 1 maxReplicas: 10 targetMetric: "avg_prompt_throughput_toks_per_s" # Ignore if metricsSources is configured metricsSources: - - endpoint: gpu-optimizer.aibrix-system.svc.cluster.local:8080 + - endpoint: aibrix-gpu-optimizer.aibrix-system.svc.cluster.local:8080 path: /metrics/aibrix-system/simulator-llama2-7b-a40 metric: "vllm:deployment_replicas" targetValue: "1" diff --git a/python/aibrix/aibrix/gpu_optimizer/README.md b/python/aibrix/aibrix/gpu_optimizer/README.md index 347a2603..74965805 100644 --- a/python/aibrix/aibrix/gpu_optimizer/README.md +++ b/python/aibrix/aibrix/gpu_optimizer/README.md @@ -28,7 +28,7 @@ kubectl -n aibrix-system port-forward svc/aibrix-redis-master 6379:6379 1>/dev/n # Or use make make debug-init -python optimizer/profiling/gen-profile.py simulator-llama2-7b-a100 -o "redis://localhost:6379/?model=llama2-7b" +python optimizer/profiling/gen_profile.py simulator-llama2-7b-a100 -o "redis://localhost:6379/?model=llama2-7b" # Or use make make DP=simulator-llama2-7b-a100 gen-profile ``` @@ -36,7 +36,7 @@ make DP=simulator-llama2-7b-a100 gen-profile 5. Deploy GPU Optimizer ```shell kubectl apply -f deployment.yaml -kubectl -n aibrix-system port-forward svc/gpu-optimizer 8080:8080 1>/dev/null 2>&1 & +kubectl -n aibrix-system port-forward svc/aibrix-gpu-optimizer 8080:8080 1>/dev/null 2>&1 & # Or use make make deploy @@ -47,7 +47,7 @@ make deploy 5. Start workload and see how model scale. Benchmark toolkit can be used to generate workload as: ```shell # Make sure gateway's local access, see docs/development/simulator/README.md for details. -python optimizer/profiling/gpu-benchmark.py --backend=vllm --port 8888 --request-rate=10 --num-prompts=100 --input_len 2000 --output_len 128 --model=llama2-7b +python optimizer/profiling/gpu_benchmark.py --backend=vllm --port 8888 --request-rate=10 --num-prompts=100 --input_len 2000 --output_len 128 --model=llama2-7b ``` 6. Observability: visit http://localhost:8080/dash/llama2-7b for workload pattern visualization. A independent visualization demo can access by: From 93603ffaae1823e259ccc487a3d71ae5ff6982fe Mon Sep 17 00:00:00 2001 From: Jingyuan Date: Thu, 5 Dec 2024 11:18:14 -0800 Subject: [PATCH 2/4] [Bug] Avoid including sensitive info in Dockerfile ENV (#487) * Move huggingface_token to config.json Add missing zscaler root CA to image for huggingface lib to download tokenizer model successfully. * Remove huggingface token --------- Co-authored-by: Jingyuan Zhang --- development/app/Dockerfile | 11 ++++++++--- development/app/README.md | 8 +++++--- development/app/app.py | 30 +++++++++++++++++++++++++++-- development/app/config.json | 3 +++ development/app/entrypoint.sh | 3 +++ development/app/zscaler_root_ca.crt | 28 +++++++++++++++++++++++++++ 6 files changed, 75 insertions(+), 8 deletions(-) create mode 100644 development/app/config.json create mode 100755 development/app/entrypoint.sh create mode 100644 development/app/zscaler_root_ca.crt diff --git a/development/app/Dockerfile b/development/app/Dockerfile index c5419e1f..0342144f 100644 --- a/development/app/Dockerfile +++ b/development/app/Dockerfile @@ -13,15 +13,20 @@ WORKDIR /app COPY requirements.txt /app/ # Install dependencies -RUN apt update && apt install -y curl jq git +RUN apt update && apt install -y curl jq git git-lfs RUN pip install --no-cache-dir -r requirements.txt # Copy the rest of the application code into the container COPY ./*.py /app/ +COPY ./*.json /app/ + +# Python base image has no zScaler root CA, we need add it manually +COPY ./zscaler_root_ca.crt /usr/local/share/ca-certificates/ +RUN update-ca-certificates +ENV REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt ENV MODEL_NAME=llama2-7b -ENV HUGGINGFACE_TOKEN="your huggingface token" ARG SIMULATION=disabled # Trigger profiling @@ -34,4 +39,4 @@ EXPOSE 8000 # Run the application, environment variable is necessary to apply ARG ENV SIMULATION=$SIMULATION -CMD python app.py --replica_config_device ${SIMULATION} +CMD ["python", "app.py"] diff --git a/development/app/README.md b/development/app/README.md index 8f455006..f119c518 100644 --- a/development/app/README.md +++ b/development/app/README.md @@ -32,9 +32,11 @@ kubectl delete -k config/mock ### Deploy the simulator app Alternatively, [vidur](https://github.com/microsoft/vidur) is integrated for high-fidality vLLM simulation: -0. Config HuggingFace token for model tokenizer by changing HUGGINGFACE_TOKEN in Dockerfile -``` -ENV HUGGINGFACE_TOKEN="your huggingface token" +0. Config HuggingFace token for model tokenizer by changing huggingface_token in config.json +```json +{ + "huggingface_token": "your huggingface token" +} ``` 1. Builder simulator base model image diff --git a/development/app/app.py b/development/app/app.py index c9b2b503..83da13ac 100644 --- a/development/app/app.py +++ b/development/app/app.py @@ -8,6 +8,7 @@ from datetime import datetime from random import randint import os +import json from typing import Optional try: @@ -26,19 +27,44 @@ DEPLOYMENT_NAME = os.getenv('DEPLOYMENT_NAME', 'llama2-70b') NAMESPACE = os.getenv('POD_NAMESPACE', 'default') DEFAULT_REPLICAS = int(os.getenv('DEFAULT_REPLICAS', '1')) -HUGGINGFACE_TOKEN = os.getenv('HUGGINGFACE_TOKEN', "your huggingface token") +SIMULATION=os.getenv('SIMULATION', 'disabled') modelMaps = { "llama2-7b": "meta-llama/Llama-2-7b-hf", "llama2-70b": "meta-llama/Llama-2-70b-hf" } -sys.argv.append(f"--replica_config_model_name={modelMaps.get(MODEL_NAME, MODEL_NAME)}") + +# Polifill the necessary arguments. +if "--replica_config_device" not in sys.argv: + sys.argv.append("--replica_config_device") + sys.argv.append(SIMULATION) +if "--replica_config_model_name" not in sys.argv: + sys.argv.append("--replica_config_model_name") + sys.argv.append(modelMaps.get(MODEL_NAME, MODEL_NAME)) tokenizer = None simulator: Optional[Simulator] = None logger = logging.getLogger(__name__) +def read_configs(file_path): + """ + Reads a JSON file that store sensitive information. + """ + try: + with open(file_path, "r") as f: + data = json.load(f) + if not isinstance(data, dict): + raise Exception("invalid config format, dict expected.") + return data + except Exception as e: + print(f"Error reading JSON file: {e}") + return {} + + +configs = read_configs("config.json") +HUGGINGFACE_TOKEN = configs.get("huggingface_token", "your huggingface token") + def get_token_count(text): try: # Encode the text diff --git a/development/app/config.json b/development/app/config.json new file mode 100644 index 00000000..9df272fe --- /dev/null +++ b/development/app/config.json @@ -0,0 +1,3 @@ +{ + "huggingface_token": "your huggingface token" +} \ No newline at end of file diff --git a/development/app/entrypoint.sh b/development/app/entrypoint.sh new file mode 100755 index 00000000..b51a95d7 --- /dev/null +++ b/development/app/entrypoint.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +python app.py "$@" # Pass all arguments to app.py \ No newline at end of file diff --git a/development/app/zscaler_root_ca.crt b/development/app/zscaler_root_ca.crt new file mode 100644 index 00000000..45e3a29f --- /dev/null +++ b/development/app/zscaler_root_ca.crt @@ -0,0 +1,28 @@ +-----BEGIN CERTIFICATE----- +MIIE0zCCA7ugAwIBAgIJANu+mC2Jt3uTMA0GCSqGSIb3DQEBCwUAMIGhMQswCQYD +VQQGEwJVUzETMBEGA1UECBMKQ2FsaWZvcm5pYTERMA8GA1UEBxMIU2FuIEpvc2Ux +FTATBgNVBAoTDFpzY2FsZXIgSW5jLjEVMBMGA1UECxMMWnNjYWxlciBJbmMuMRgw +FgYDVQQDEw9ac2NhbGVyIFJvb3QgQ0ExIjAgBgkqhkiG9w0BCQEWE3N1cHBvcnRA +enNjYWxlci5jb20wHhcNMTQxMjE5MDAyNzU1WhcNNDIwNTA2MDAyNzU1WjCBoTEL +MAkGA1UEBhMCVVMxEzARBgNVBAgTCkNhbGlmb3JuaWExETAPBgNVBAcTCFNhbiBK +b3NlMRUwEwYDVQQKEwxac2NhbGVyIEluYy4xFTATBgNVBAsTDFpzY2FsZXIgSW5j +LjEYMBYGA1UEAxMPWnNjYWxlciBSb290IENBMSIwIAYJKoZIhvcNAQkBFhNzdXBw +b3J0QHpzY2FsZXIuY29tMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA +qT7STSxZRTgEFFf6doHajSc1vk5jmzmM6BWuOo044EsaTc9eVEV/HjH/1DWzZtcr +fTj+ni205apMTlKBW3UYR+lyLHQ9FoZiDXYXK8poKSV5+Tm0Vls/5Kb8mkhVVqv7 +LgYEmvEY7HPY+i1nEGZCa46ZXCOohJ0mBEtB9JVlpDIO+nN0hUMAYYdZ1KZWCMNf +5J/aTZiShsorN2A38iSOhdd+mcRM4iNL3gsLu99XhKnRqKoHeH83lVdfu1XBeoQz +z5V6gA3kbRvhDwoIlTBeMa5l4yRdJAfdpkbFzqiwSgNdhbxTHnYYorDzKfr2rEFM +dsMU0DHdeAZf711+1CunuQIDAQABo4IBCjCCAQYwHQYDVR0OBBYEFLm33UrNww4M +hp1d3+wcBGnFTpjfMIHWBgNVHSMEgc4wgcuAFLm33UrNww4Mhp1d3+wcBGnFTpjf +oYGnpIGkMIGhMQswCQYDVQQGEwJVUzETMBEGA1UECBMKQ2FsaWZvcm5pYTERMA8G +A1UEBxMIU2FuIEpvc2UxFTATBgNVBAoTDFpzY2FsZXIgSW5jLjEVMBMGA1UECxMM +WnNjYWxlciBJbmMuMRgwFgYDVQQDEw9ac2NhbGVyIFJvb3QgQ0ExIjAgBgkqhkiG +9w0BCQEWE3N1cHBvcnRAenNjYWxlci5jb22CCQDbvpgtibd7kzAMBgNVHRMEBTAD +AQH/MA0GCSqGSIb3DQEBCwUAA4IBAQAw0NdJh8w3NsJu4KHuVZUrmZgIohnTm0j+ +RTmYQ9IKA/pvxAcA6K1i/LO+Bt+tCX+C0yxqB8qzuo+4vAzoY5JEBhyhBhf1uK+P +/WVWFZN/+hTgpSbZgzUEnWQG2gOVd24msex+0Sr7hyr9vn6OueH+jj+vCMiAm5+u +kd7lLvJsBu3AO3jGWVLyPkS3i6Gf+rwAp1OsRrv3WnbkYcFf9xjuaf4z0hRCrLN2 +xFNjavxrHmsH8jPHVvgc1VD0Opja0l/BRVauTrUaoW6tE+wFG5rEcPGS80jjHK4S +pB5iDj2mUZH1T8lzYtuZy0ZPirxmtsk3135+CKNa2OCAhhFjE0xd +-----END CERTIFICATE----- From aa77efb5f83285c3079afbc0400916cd4a152c1e Mon Sep 17 00:00:00 2001 From: Le Xu Date: Thu, 5 Dec 2024 13:40:25 -0800 Subject: [PATCH 3/4] Refactor generator to generate time-based traces (#478) * adding timestamp and prompt in/output length to traces * name fix; plotting script fix * update README * addressing comments * addressing comments * add sample workload * add sample workload * update file format * update jsonl option --------- Co-authored-by: Le Xu --- benchmarks/generator/README.md | 11 +- benchmarks/generator/sample_request.py | 103 +++++++++++++ benchmarks/generator/utils.py | 140 +++++------------ benchmarks/generator/workload_generator.py | 166 ++++++++++++--------- 4 files changed, 235 insertions(+), 185 deletions(-) create mode 100644 benchmarks/generator/sample_request.py diff --git a/benchmarks/generator/README.md b/benchmarks/generator/README.md index 57006818..eaec9b5f 100644 --- a/benchmarks/generator/README.md +++ b/benchmarks/generator/README.md @@ -5,17 +5,18 @@ If no trace file path is specified, the generator will generate workload file ba ``` export SHARE_GPT_PATH=${PATH_TO_SHARE_GPT_FILE} -python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --num-requests 10000 --trace-type synthetic --output "output" +python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --interval-ms 1000 --duration-ms 600000 --trace-type synthetic --model "Qwen/Qwen2.5-Coder-7B-Instruct" --output "output" ``` +Here ```--interval-ms``` specifies the granularity of concurently dispatched requests (in milliseconds). ```--duration-ms``` specifies the total length of the trace in milliseconds. The file would be stored under ```output``` folder based on the name of different patterns. And the plot illustrates the workload pattern will be under the ```plot``` directory. -## Generate a workload file based on load summary .csv file +## Generate a workload file based on internal load summary .csv file ``` export SUMMARY_FILE=${PATH_TO_SUMMARY_FILE} export SHARE_GPT_PATH=${PATH_TO_SHARE_GPT_FILE} -python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --num-requests 100000 --trace-type summary --trace-file "$SUMMARY_FILE" --output "output" +python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --interval-ms 1000 --duration-ms 3600000 --trace-type internal --trace-file "$SUMMARY_FILE" --model "Qwen/Qwen2.5-Coder-7B-Instruct" --output "output" ``` This generator assumes trace file to be in the following format @@ -36,14 +37,14 @@ This generator generate workload file (in .json format) under ```output``` folde And the plot illustrates the workload pattern will be under the ```plot``` directory. -## Generate a workload file based on load summary .csv file +## Generate a workload file based on Azure LLM Trace To produce a workload based on [Azure LLM Trace](https://github.com/Azure/AzurePublicDataset/tree/master/data), use the following commands: ``` export AZURE_TRACE_NAME=${PATH_TO_AZURE_TRACE_NAME} export SHARE_GPT_PATH=${PATH_TO_SHARE_GPT_FILE} -python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --num-requests 100000 --trace-type azure --trace-file "$AZURE_TRACE_NAME" --group-interval-seconds 1 --model "meta-llama/Llama-2-7b-hf" --output "output" +python workload_generator.py --prompt-file $SHARE_GPT_PATH --num-prompts 100 --interval-ms 1000 --duration-ms 3600000 --trace-type azure --trace-file "$AZURE_TRACE_NAME" --group-interval-seconds 1 --model "Qwen/Qwen2.5-Coder-7B-Instruct" --output "output" ``` Note that the trace file contains both input and output lengths. And therefore dataset in ```$SHARE_GPT_PATH``` needs to be tokenized to be able to sampled based on their input/output token lengths. Therefore it is required to specify tokenizer to generate based on this trace. Use ```--group-interval-seconds``` to specify grouping interval from the origianl trace. The file would be stored under ```output``` folder and the plot illustrates the workload pattern will be under the ```plot``` directory. \ No newline at end of file diff --git a/benchmarks/generator/sample_request.py b/benchmarks/generator/sample_request.py new file mode 100644 index 00000000..e3d84775 --- /dev/null +++ b/benchmarks/generator/sample_request.py @@ -0,0 +1,103 @@ +import logging +import json + +import pandas as pd + +from typing import Tuple, Optional, List +from transformers import PreTrainedTokenizerBase + +def load_sharegpt_requests( + dataset_path: str, + tokenizer: PreTrainedTokenizerBase, + ) -> pd.DataFrame: + # Load the dataset into a DataFrame + with open(dataset_path, encoding='utf-8') as f: + dataset = json.load(f) + dataset = [ + (data["conversations"][0]["value"], data["conversations"][1]["value"]) + for data in dataset if len(data["conversations"]) >= 2 + ] + df = pd.DataFrame(dataset, columns=["prompt", "completion"]) + logging.warn(f"...Start dataframe transformation") + # Tokenize and calculate lengths + df["prompt_len"] = df["prompt"].apply(lambda x: len(tokenizer(x).input_ids)) + df["completion_len"] = df["completion"].apply(lambda x: len(tokenizer(x).input_ids)) + logging.warn(f"...Complete dataframe transformation") + return df + +def sample_sharegpt_requests( + dataset_path: str, + num_requests: int, + tokenizer: Optional[PreTrainedTokenizerBase] = None, + fixed_output_len: Optional[int] = None, +) -> List[Tuple[str, int, int, None]]: + # Load the dataset + with open(dataset_path, encoding='utf-8') as f: + dataset = json.load(f) + dataset = [data for data in dataset if len(data["conversations"]) >= 2] + dataset = [(data["conversations"][0]["value"], data["conversations"][1]["value"]) for data in dataset] + + filtered_dataset: List[Tuple[str, int, int]] = [] + for i in range(len(dataset)): + if len(filtered_dataset) == num_requests: + break + prompt = dataset[i][0] + if tokenizer is not None: + prompt_token_ids = tokenizer(prompt).input_ids + completion = dataset[i][1] + completion_token_ids = tokenizer(completion).input_ids + prompt_len = len(prompt_token_ids) + output_len = len(completion_token_ids) if fixed_output_len is None else fixed_output_len + if prompt_len < 4 or (fixed_output_len is None and output_len < 4): + continue + if prompt_len > 1024 or prompt_len + output_len > 2048: + continue + filtered_dataset.append((prompt, prompt_len, output_len, None)) + else: + filtered_dataset.append((prompt, -1, -1, None)) + + return filtered_dataset + + +def sample_sharegpt_requests_len_range( + df: pd.DataFrame, + num_requests: int, + input_lens: List[int], + output_lens: List[int], + initial_err_perc: Optional[float] = 0.5, + err_step: float = 0.05 +) -> List[Tuple[str, int, int, None]]: + filtered_results = [] + + # Relaxation mechanism + for i in range(num_requests): + input_len = input_lens[i] + output_len = output_lens[i] + err_perc = initial_err_perc + + while err_perc >= 0: + input_range = (int(input_len * err_perc), int(input_len * (1 + err_perc))) + output_range = (int(output_len * err_perc), int(output_len * (1 + err_perc))) + + filtered = df[ + (df["prompt_len"] >= input_range[0]) & + (df["prompt_len"] <= input_range[1]) & + (df["completion_len"] >= output_range[0]) & + (df["completion_len"] <= output_range[1]) + ] + + if not filtered.empty: + # Select the first match or random sample + sample = filtered.iloc[0] # Or filtered.sample(1) for random + filtered_results.append((sample["prompt"], sample["prompt_len"], sample["completion_len"], None)) + break # Stop relaxing for this request once a match is found + + # Reduce err_perc for next iteration + logging.warn(f"Relax err_perc {err_perc} by {err_step}") + err_perc -= err_step + + if err_perc < 0: + raise Exception(f"No match found for request {i + 1} even after relaxing err_perc to 0") + + return filtered_results + diff --git a/benchmarks/generator/utils.py b/benchmarks/generator/utils.py index e9ce4831..5b2c8f6c 100644 --- a/benchmarks/generator/utils.py +++ b/benchmarks/generator/utils.py @@ -3,11 +3,10 @@ import os import numpy as np -import pandas as pd import matplotlib.pyplot as plt -from typing import Tuple, Optional, List, Union -from transformers import (AutoTokenizer, PreTrainedTokenizer, PreTrainedTokenizerBase, +from typing import List, Union, Any, Optional +from transformers import (AutoTokenizer, PreTrainedTokenizer, PreTrainedTokenizerFast) def make_serializable(data): @@ -25,124 +24,27 @@ def make_serializable(data): else: return data -def sample_sharegpt_requests( - dataset_path: str, - num_requests: int, - tokenizer: Optional[PreTrainedTokenizerBase] = None, - fixed_output_len: Optional[int] = None, -) -> List[Tuple[str, int, int, None]]: - # Load the dataset - with open(dataset_path, encoding='utf-8') as f: - dataset = json.load(f) - dataset = [data for data in dataset if len(data["conversations"]) >= 2] - dataset = [(data["conversations"][0]["value"], data["conversations"][1]["value"]) for data in dataset] - - filtered_dataset: List[Tuple[str, int, int]] = [] - for i in range(len(dataset)): - if len(filtered_dataset) == num_requests: - break - prompt = dataset[i][0] - if tokenizer is not None: - prompt_token_ids = tokenizer(prompt).input_ids - completion = dataset[i][1] - completion_token_ids = tokenizer(completion).input_ids - prompt_len = len(prompt_token_ids) - output_len = len(completion_token_ids) if fixed_output_len is None else fixed_output_len - if prompt_len < 4 or (fixed_output_len is None and output_len < 4): - continue - if prompt_len > 1024 or prompt_len + output_len > 2048: - continue - filtered_dataset.append((prompt, prompt_len, output_len, None)) - else: - filtered_dataset.append((prompt, -1, -1, None)) - - return filtered_dataset - -def load_sharegpt_requests( - dataset_path: str, - tokenizer: PreTrainedTokenizerBase, - ) -> pd.DataFrame: - # Load the dataset into a DataFrame - with open(dataset_path, encoding='utf-8') as f: - dataset = json.load(f) - dataset = [ - (data["conversations"][0]["value"], data["conversations"][1]["value"]) - for data in dataset if len(data["conversations"]) >= 2 - ] - df = pd.DataFrame(dataset, columns=["prompt", "completion"]) - logging.INFO(f"...Start dataframe transformation") - # Tokenize and calculate lengths - df["prompt_len"] = df["prompt"].apply(lambda x: len(tokenizer(x).input_ids)) - df["completion_len"] = df["completion"].apply(lambda x: len(tokenizer(x).input_ids)) - logging.INFO(f"...Complete dataframe transformation") - return df - - -def sample_sharegpt_requests_len_range( - df: pd.DataFrame, - num_requests: int, - input_lens: List[int], - output_lens: List[int], - initial_err_perc: Optional[float] = 0.5, - err_step: float = 0.05 -) -> List[Tuple[str, int, int, None]]: - filtered_results = [] - - # Relaxation mechanism - for i in range(num_requests): - input_len = input_lens[i] - output_len = output_lens[i] - err_perc = initial_err_perc - - while err_perc >= 0: - input_range = (int(input_len * err_perc), int(input_len * (1 + err_perc))) - output_range = (int(output_len * err_perc), int(output_len * (1 + err_perc))) - - filtered = df[ - (df["prompt_len"] >= input_range[0]) & - (df["prompt_len"] <= input_range[1]) & - (df["completion_len"] >= output_range[0]) & - (df["completion_len"] <= output_range[1]) - ] - - if not filtered.empty: - # Select the first match or random sample - sample = filtered.iloc[0] # Or filtered.sample(1) for random - filtered_results.append((sample["prompt"], sample["prompt_len"], sample["completion_len"], None)) - break # Stop relaxing for this request once a match is found - - # Reduce err_perc for next iteration - logging.warn(f"Relax err_perc {err_perc} by {err_step}") - err_perc -= err_step - - if err_perc < 0: - raise Exception(f"No match found for request {i + 1} even after relaxing err_perc to 0") - - logging.info(f"Successfully found {len(filtered_results)} requests") - return filtered_results - - def get_tokenizer( pretrained_model_name_or_path: str, trust_remote_code: bool ) -> Union[PreTrainedTokenizer, PreTrainedTokenizerFast]: return AutoTokenizer.from_pretrained(pretrained_model_name_or_path, trust_remote_code=trust_remote_code) -def plot_workload(workload_dict, interval_sec, output_file: str = None): +def plot_workload(workload_dict, interval_ms, output_file: str = None): """ Plots the concurrency (item length) of the generated workload. Args: workload_dict (dict): A dictionary where the keys are workload names (labels) and the values are lists of lists representing the workload. - interval_sec (int): + interval_ms (int): Interval in milliseconds. """ fig, ax = plt.subplots() for workload_name, workload in workload_dict.items(): - concurrency_values = [len(item) for item in workload] - ax.plot(np.arange(len(concurrency_values)) * interval_sec, concurrency_values, label=workload_name) + concurrency_values = [len(item) for (_, item) in workload] + ax.plot(np.arange(len(concurrency_values)) * interval_ms, concurrency_values, label=workload_name) ax.set_ylim(0,) - plt.xlabel('Time (Sec)') + plt.xlabel('Time (ms)') plt.ylabel('Concurrency') plt.title('Workload Concurrency') plt.legend() @@ -151,4 +53,30 @@ def plot_workload(workload_dict, interval_sec, output_file: str = None): else: os.makedirs(os.path.dirname(output_file), exist_ok=True) plt.savefig(output_file) - logging.info(f'Saved workload plot to {output_file}') \ No newline at end of file + logging.warn(f'Saved workload plot to {output_file}') + +def save_workload(load_struct: List[Any], + output_path: str, + use_jsonl: Optional[bool] = False): + if use_jsonl: + with open(output_path + ".jsonl", "w") as file: + for row in load_struct: + json_line = json.dumps(row) # Convert list to JSON string + file.write(json_line + "\n") + logging.warn(f'Saved workload file to {output_path + ".jsonl"}') + else: + with open(output_path + ".json", 'w') as file: + json.dump(load_struct, file, indent=4) + logging.warn(f'Saved workload file to {output_path + ".json"}') + +def load_workload(load_struct: List[Any], + input_path: str, + use_jsonl: Optional[bool] = False) -> List[Any]: + load_struct = None + if use_jsonl: + with open(input_path, "r") as file: + load_struct = [json.loads(line) for line in file] + else: + with open(input_path, "r") as file: + load_struct = json.load(file) + return load_struct \ No newline at end of file diff --git a/benchmarks/generator/workload_generator.py b/benchmarks/generator/workload_generator.py index 4adaa20c..4b39a11d 100644 --- a/benchmarks/generator/workload_generator.py +++ b/benchmarks/generator/workload_generator.py @@ -1,52 +1,50 @@ import logging import math import random -import json import pandas as pd import argparse import csv from typing import Tuple, List, Any from transformers import PreTrainedTokenizerBase -from datetime import datetime, timedelta -from utils import (sample_sharegpt_requests, load_sharegpt_requests,sample_sharegpt_requests_len_range, get_tokenizer, plot_workload, make_serializable) +from datetime import timedelta +from sample_request import (load_sharegpt_requests, sample_sharegpt_requests, sample_sharegpt_requests_len_range) +from utils import (get_tokenizer, plot_workload, make_serializable, save_workload) -def generate_from_summary_csv(input_requests: List[Any], - file_path: str, - sampling_granularity_seconds: int = 15, +def generate_from_internal_csv(file_path: str, + duration_ms: int, + summary_interval_ms: int, + interval_ms: int = 1000, ) -> List[List[Any]]: - total_requests_from_trace = [] + total_requests_from_summary = [] with open(file_path, 'r') as file: reader = csv.DictReader(file) for row in reader: if 'Total' in row: total_value = row['Total'] if total_value: - total_requests_from_trace.append(float(total_value)) + total_requests_from_summary.append(float(total_value)) workloads = [] base = 0 - end = False - for interval_requests in total_requests_from_trace: - if end: - break - mean_rate = round(interval_requests/sampling_granularity_seconds) - for _ in range(0, sampling_granularity_seconds): - bound = min(base + mean_rate, len(input_requests)) - workloads.append(input_requests[base : bound]) + ts = 0 + for interval_requests in total_requests_from_summary: + mean_rate = round(interval_requests/(summary_interval_ms / interval_ms)) + for ts_delta in list(range(0, summary_interval_ms, interval_ms)): + workloads.append((ts + ts_delta, range(base, base + mean_rate))) base += mean_rate - if base >= len(input_requests): - end = True - break + ts += summary_interval_ms + if ts > duration_ms: + break return workloads -def generate_synthetic(input_requests: List[Any], A=1, B=1, +def generate_synthetic(A=1, B=1, sigma=0.1, only_rise: bool = False, omega: float = None, period=0.25, length: int = None, - duration_sec: int = None, - interval_sec: int = None, + duration_ms: int = None, + interval_ms: int = None, ) -> List[List[Any]]: """ Generates a workload based on a given list of input requests and a concurrency function. @@ -64,9 +62,9 @@ def generate_synthetic(input_requests: List[Any], A=1, B=1, omega (float, optional): if None, omega = pi / (2 * length / period) period (float, optional): See omega. Defaults to 0.25. only_rise: if True, the concurrency will monotonically increase - length (int, optional): if None, length = test_duration_sec / interval_sec - duration_sec (int, optional): See param: length - interval_sec (int, optional): See param: length + length (int, optional): if None, length = duration_ms / interval_ms + duration_ms (int, optional): See param: length + interval_ms (int, optional): See param: length Returns: list: A list of items, where each item is a list of requests to be sent concurrently. @@ -91,19 +89,20 @@ def math_function(t): noise = random.gauss(0, sigma) return round(trend + noise) - assert length is not None or (duration_sec is not None and interval_sec is not None), \ - "duration_sec and interval_sec must be specified if length is not None" + assert length is not None or (duration_ms is not None and interval_ms is not None), \ + "duration_ms and interval_ms must be specified if length is not None" if length is None: - length = int(duration_sec // interval_sec) + length = int(duration_ms // interval_ms) + 1 assert omega is not None or period is not None, "period must be specified if length is not None" if omega is None: omega = 2 * math.pi / (length / period) workload = [] t = 0 - input_length = len(input_requests) previous_concurrency = -1 start_index, end_index = 0, 0 + ts = 0 + base_req_id = 0 while t < length: current_concurrency = math_function(t) if only_rise: @@ -113,32 +112,38 @@ def math_function(t): # start from last end index start_index = end_index end_index += current_concurrency - workload.append([input_requests[i % input_length] for i in range(start_index, end_index)]) + workload.append((ts, [base_req_id + i for i in range(start_index, end_index)])) + base_req_id += current_concurrency + ts += interval_ms t += 1 return workload -def pair_requests_with_prompts(workload: List[List[Any]], prompts: List[Tuple[str, int, int, None]], output_file: str = 'output/output.json') -> List[List[Tuple[Any, str]]]: +def pair_requests_with_prompts_round_robin(workload: List[List[Any]], + prompts: List[Tuple[str, int, int, None]], + output_file: str = 'output/output', + to_jsonl: bool = False + ) -> List[List[Tuple[Any, str]]]: paired_workload = [] prompt_count = len(prompts) - - for requests in workload: + for ts, requests in workload: requests_with_prompts = [ prompts[request % prompt_count] for request in requests ] - paired_workload.append(requests_with_prompts) + paired_workload.append((ts, requests_with_prompts)) # Save to file - with open(output_file, 'w') as file: - json.dump(paired_workload, file) + save_workload(paired_workload, output_file, use_jsonl = to_jsonl) return paired_workload # generated_workload = generate_from_azure_csv(demo_requests, file_path=args.trace_file, sampling_granularity_seconds=15, output_file=args.output) def generate_from_azure_csv(file_path: str, prompt_file_path: str, + duration_ms: int, tokenizer: PreTrainedTokenizerBase, - group_interval_seconds: int = 1, + interval_ms: int, output_file: str = 'output/output.json', + to_jsonl: bool = False, ) -> List[List[Any]]: # Load the CSV file df = pd.read_csv(file_path) @@ -147,7 +152,7 @@ def generate_from_azure_csv(file_path: str, df['TIMESTAMP'] = pd.to_datetime(df['TIMESTAMP']) # Define the grouping time range (e.g., 1 second) - time_range = timedelta(seconds=group_interval_seconds) + time_range = timedelta(milliseconds=interval_ms) # Initialize a list to hold the grouped requests grouped_requests = [] @@ -156,10 +161,11 @@ def generate_from_azure_csv(file_path: str, df.set_index('TIMESTAMP', inplace=True) current_time = df.index.min() end_time = df.index.max() - logging.INFO(f"Start generation from time {current_time} to {end_time}") + logging.warn(f"Start generation from time {current_time} to {end_time}") sharegpt_df = load_sharegpt_requests(dataset_path = prompt_file_path, tokenizer = tokenizer) + ts = 0 while current_time <= end_time: # Select requests within the current time range mask = (df.index >= current_time) & (df.index < current_time + time_range) @@ -169,7 +175,6 @@ def generate_from_azure_csv(file_path: str, for _, row in group.iterrows(): input_lens.append(int(row['ContextTokens'])) output_lens.append(int(row['GeneratedTokens'])) - logging.info(f"Sample iteration {len(grouped_requests)} for {len(input_lens)} requests") sampled_requests = sample_sharegpt_requests_len_range( df = sharegpt_df, num_requests = len(input_lens), @@ -180,64 +185,77 @@ def generate_from_azure_csv(file_path: str, ) if sampled_requests: # Only add non-empty groups - grouped_requests.append(sampled_requests) - + grouped_requests.append((ts, sampled_requests)) + ts += interval_ms + if ts > duration_ms: + break # Move to the next time range current_time += time_range # Print or process grouped_requests as needed # Save to file grouped_requests = make_serializable(grouped_requests) - with open(output_file, 'w') as file: - json.dump(grouped_requests, file) + save_workload(grouped_requests, output_file, use_jsonl = to_jsonl) - print(grouped_requests) return grouped_requests if __name__ == '__main__': parser = argparse.ArgumentParser(description='Workload Generator') - parser.add_argument('--prompt-file', type=str, required=True, help='File containing prompts') - parser.add_argument('--num-prompts', type=int, default=100, help='Number of prompts to sample') - parser.add_argument('--num-requests', type=int, default=10000, help='Number of requests in total') - parser.add_argument('--group-interval-seconds', type=int, default=1, help='Grouping interval seconds') - parser.add_argument('--trace-type', type=str, required=True, default="synthetic", help='Type of trace consumed') - parser.add_argument('--trace-file', type=str, required=False, default=None, help='File containing trace CSV') - parser.add_argument('--model', type=str, required=False, default="meta-llama/Llama-2-7b-hf", help='Target model tokenizer') - parser.add_argument('--output', type=str, required=False, default="output/output", help='Output path') + parser.add_argument('--prompt-file', type=str, required=True, help='File containing prompts.') + parser.add_argument('--num-prompts', type=int, default=100, help='Number of prompts to sample.') + parser.add_argument('--group-interval-seconds', type=int, default=1, help='Grouping interval seconds.') + parser.add_argument('--trace-type', type=str, required=True, default="synthetic", help='Type of trace consumed. Choose among: synthetic, internal, azure') + parser.add_argument('--trace-file', type=str, required=False, default=None, help='File containing original trace file csv, which workload generator depends upon to convert to workload format. This is only needed for for internal/azure trace type. ') + parser.add_argument('--model', type=str, required=False, default="Qwen/Qwen2.5-Coder-7B-Instruct", help='Target model tokenizer.') + parser.add_argument('--output', type=str, required=False, default="output", help='Output path to the workload.') + parser.add_argument('--interval-ms', type=int, required=False, default=1000, help='Granularity of request injection interval in milliseconds.') + parser.add_argument('--duration-ms', type=int, default=60000, help='Duration of the trace generated.') + parser.add_argument('--to-jsonl', dest='to_jsonl', action='store_true', help='Set output data format to .jsonl (default .json).') args = parser.parse_args() - # Load prompts from a file - prompts = sample_sharegpt_requests(args.prompt_file, args.num_prompts) - - # Generate input requests (ascending integers)quit - demo_requests = list(range(1, 1 + args.num_requests)) - interval = 30 # Generate workloads and pair with prompts workload_dict = {} + tokenizer = get_tokenizer(pretrained_model_name_or_path = args.model, trust_remote_code = True) if args.trace_type == "synthetic": + # Load prompts from a file + prompts = sample_sharegpt_requests(dataset_path = args.prompt_file, num_requests = args.num_prompts, tokenizer = tokenizer) # Generate workloads with different parameters scenarios = { - 'Quick Rising': {'duration_sec': 600, 'interval_sec': interval, 'A': 5, 'period': 5, 'only_rise': True}, - 'Slow Rising': {'duration_sec': 600, 'interval_sec': interval, 'A': 5, 'period': 0.25, 'only_rise': True}, - 'Slight Fluctuation': {'duration_sec': 600, 'interval_sec': interval, 'A': 5, 'B': 5, 'period': 1, 'only_rise': False}, - 'Severe Fluctuation': {'duration_sec': 600, 'interval_sec': interval, 'A': 5, 'B': 10, 'period': 12, 'only_rise': False}, + 'Quick Rising': {'duration_ms': args.duration_ms, 'interval_ms': args.interval_ms, 'A': 5, 'period': 5, 'only_rise': True}, + 'Slow Rising': {'duration_ms': args.duration_ms, 'interval_ms': args.interval_ms, 'A': 5, 'period': 0.25, 'only_rise': True}, + 'Slight Fluctuation': {'duration_ms': args.duration_ms, 'interval_ms': args.interval_ms, 'A': 5, 'B': 5, 'period': 1, 'only_rise': False}, + 'Severe Fluctuation': {'duration_ms': args.duration_ms, 'interval_ms': args.interval_ms, 'A': 5, 'B': 10, 'period': 12, 'only_rise': False}, } for scenario_name, params in scenarios.items(): - generated_workload = generate_synthetic(demo_requests, **params) - paired_workload = pair_requests_with_prompts(generated_workload, prompts, f"{args.output}/{scenario_name}.json") + generated_workload = generate_synthetic(**params) + paired_workload = pair_requests_with_prompts_round_robin(workload = generated_workload, + prompts = prompts, + output_file = f"{args.output}/{scenario_name}", + to_jsonl = args.to_jsonl) workload_dict[scenario_name] = paired_workload # Plot the workloads - plot_workload(workload_dict, interval_sec=interval, output_file=f"plot/synthetic.pdf") - elif args.trace_type == "summary": - generated_workload = generate_from_summary_csv(demo_requests, file_path=args.trace_file, sampling_granularity_seconds=15) - generated_workload = pair_requests_with_prompts(generated_workload, prompts, f"{args.output}/summary.json") - workload_dict["summary"] = generated_workload + plot_workload(workload_dict, interval_ms=args.interval_ms, output_file=f"plot/synthetic.pdf") + elif args.trace_type == "internal": + # Load prompts from a file + prompts = sample_sharegpt_requests(dataset_path = args.prompt_file, num_requests = args.num_prompts, tokenizer = tokenizer) + # Generate input requests (ascending integers)quit + generated_workload = generate_from_internal_csv(file_path=args.trace_file, duration_ms = args.duration_ms, summary_interval_ms=15000, interval_ms=args.interval_ms) + generated_workload = pair_requests_with_prompts_round_robin(workload = generated_workload, + prompts = prompts, + output_file = f"{args.output}/internal", + to_jsonl = args.to_jsonl) + workload_dict["internal"] = generated_workload # Plot the workloads - plot_workload(workload_dict, interval_sec=interval, output_file=f"plot/summary.pdf") + plot_workload(workload_dict, interval_ms=args.interval_ms, output_file=f"plot/internal.pdf") elif args.trace_type == "azure": - tokenizer = get_tokenizer(pretrained_model_name_or_path = args.model, trust_remote_code = True) - generated_workload = generate_from_azure_csv(file_path=args.trace_file, prompt_file_path = args.prompt_file, tokenizer = tokenizer, group_interval_seconds=1, output_file=f"{args.output}/azure.json") + generated_workload = generate_from_azure_csv(file_path=args.trace_file, + prompt_file_path = args.prompt_file, + duration_ms = args.duration_ms, + tokenizer = tokenizer, + interval_ms = args.interval_ms, + output_file = f"{args.output}/azure", + to_jsonl = args.to_jsonl) workload_dict["azure"] = generated_workload # Plot the workloads - plot_workload(workload_dict, interval_sec=interval, output_file=f"plot/azure.pdf") + plot_workload(workload_dict, interval_ms=args.interval_ms, output_file=f"plot/azure.pdf") From d2be10abc0d4c485a05256dd4966e31793a5f9f3 Mon Sep 17 00:00:00 2001 From: Jingyuan Zhang Date: Thu, 5 Dec 2024 15:37:45 -0800 Subject: [PATCH 4/4] Fix k8s accessibility regard namespaces. GPU optimizer now monitor all namespaces with model label. --- development/app/README.md | 2 +- .../patch_podautoscaler_a40.yaml | 2 +- .../simulator/patch_podautoscaler_a100.yaml | 2 +- .../podautoscaler/podautoscaler.yaml | 2 +- python/aibrix/aibrix/gpu_optimizer/Makefile | 16 +++++---- python/aibrix/aibrix/gpu_optimizer/app.py | 26 ++++++++++---- .../aibrix/gpu_optimizer/deployment.yaml | 36 ++++++++----------- .../gpu_optimizer/load_monitor/clusterer.py | 1 - .../gpu_optimizer/load_monitor/monitor.py | 16 +++++++-- 9 files changed, 61 insertions(+), 42 deletions(-) diff --git a/development/app/README.md b/development/app/README.md index f119c518..140b6c11 100644 --- a/development/app/README.md +++ b/development/app/README.md @@ -41,7 +41,7 @@ Alternatively, [vidur](https://github.com/microsoft/vidur) is integrated for hig 1. Builder simulator base model image ```dockerfile -docker build -t aibrix/vllm-simulator:nightly --build-arg GPU_TYPE=a100 -f Dockerfile . +docker build -t aibrix/vllm-simulator:nightly --build-arg SIMULATION=a100 -f Dockerfile . ``` 1.b (Optional) Load container image to docker context diff --git a/development/app/config/heterogeneous/simulator_a40/patch_podautoscaler_a40.yaml b/development/app/config/heterogeneous/simulator_a40/patch_podautoscaler_a40.yaml index 93b2d37d..339c87a2 100644 --- a/development/app/config/heterogeneous/simulator_a40/patch_podautoscaler_a40.yaml +++ b/development/app/config/heterogeneous/simulator_a40/patch_podautoscaler_a40.yaml @@ -9,7 +9,7 @@ spec: kind: Deployment name: simulator-llama2-7b-a40 metricsSources: - - endpoint: gpu-optimizer.aibrix-system.svc.cluster.local:8080 + - endpoint: aibrix-gpu-optimizer.aibrix-system.svc.cluster.local:8080 path: /metrics/default/simulator-llama2-7b-a40 metric: "vllm:deployment_replicas" targetValue: "1" \ No newline at end of file diff --git a/development/app/config/simulator/patch_podautoscaler_a100.yaml b/development/app/config/simulator/patch_podautoscaler_a100.yaml index c04a2c19..09836449 100644 --- a/development/app/config/simulator/patch_podautoscaler_a100.yaml +++ b/development/app/config/simulator/patch_podautoscaler_a100.yaml @@ -9,7 +9,7 @@ spec: kind: Deployment name: simulator-llama2-7b-a100 metricsSources: - - endpoint: gpu-optimizer.aibrix-system.svc.cluster.local:8080 + - endpoint: aibrix-gpu-optimizer.aibrix-system.svc.cluster.local:8080 path: /metrics/default/simulator-llama2-7b-a100 metric: "vllm:deployment_replicas" targetValue: "1" \ No newline at end of file diff --git a/development/app/config/templates/podautoscaler/podautoscaler.yaml b/development/app/config/templates/podautoscaler/podautoscaler.yaml index 945fb13c..a75a60c5 100644 --- a/development/app/config/templates/podautoscaler/podautoscaler.yaml +++ b/development/app/config/templates/podautoscaler/podautoscaler.yaml @@ -16,7 +16,7 @@ spec: maxReplicas: 10 targetMetric: "avg_prompt_throughput_toks_per_s" # Ignore if metricsSources is configured metricsSources: - - endpoint: gpu-optimizer.aibrix-system.svc.cluster.local:8080 + - endpoint: aibrix-gpu-optimizer.aibrix-system.svc.cluster.local:8080 path: /metrics/default/simulator-llama2-7b metric: "vllm:deployment_replicas" targetValue: "1" diff --git a/python/aibrix/aibrix/gpu_optimizer/Makefile b/python/aibrix/aibrix/gpu_optimizer/Makefile index e4a8ac3a..a7a1842c 100644 --- a/python/aibrix/aibrix/gpu_optimizer/Makefile +++ b/python/aibrix/aibrix/gpu_optimizer/Makefile @@ -7,13 +7,13 @@ DATASET ?= [set your DATASET path] deploy: kubectl apply -f deployment.yaml sleep 2 - kubectl -n aibrix-system port-forward svc/gpu-optimizer 8080:8080 1>/dev/null 2>&1 & + kubectl -n aibrix-system port-forward svc/aibrix-gpu-optimizer 8080:8080 1>/dev/null 2>&1 & .PHONY: clean clean: kubectl delete -f deployment.yaml sleep 1 - curl http://localhost:8080/metrics/aibrix-system/simulator-llama2-7b + curl http://localhost:8080/metrics/aibrix-system/simulator-llama2-7b-a100 .PHONY: benchmark benchmark: @@ -33,27 +33,31 @@ debug: .PHONY: debug-init-simulator debug-init-simulator: - curl http://localhost:8080/monitor/aibrix-system/simulator-llama2-7b \ + curl http://localhost:8080/monitor/default/simulator-llama2-7b-a100 \ -H "Content-Type: application/json" \ -H "Authorization: Bearer any_key" \ -d '{}' .PHONY: debug-scale-simulator debug-scale-simulator: - curl http://localhost:8080/scale/aibrix-system/simulator-llama2-7b/2 \ + curl http://localhost:8080/scale/default/simulator-llama2-7b-a100-a100/2 \ -H "Content-Type: application/json" \ -H "Authorization: Bearer any_key" \ -d '{}' .PHONY: debug-stop-simulator debug-stop-simulator: - curl -X DELETE http://localhost:8080/monitor/aibrix-system/simulator-llama2-7b \ + curl -X DELETE http://localhost:8080/monitor/default/simulator-llama2-7b-a100 \ -H "Content-Type: application/json" \ -H "Authorization: Bearer any_key" +.PHONY: debug-update-profile +debug-update-profile: + curl http://localhost:8080/update_profile/llama2-7b + .PHONY: debug-metrics debug-metrics: - curl http://localhost:8080/metrics/aibrix-system/simulator-llama2-7b + curl http://localhost:8080/metrics/aibrix-system/simulator-llama2-7b-a100 .PHONY: debug-workload debug-workload: diff --git a/python/aibrix/aibrix/gpu_optimizer/app.py b/python/aibrix/aibrix/gpu_optimizer/app.py index 4368a2ac..6bb344dd 100644 --- a/python/aibrix/aibrix/gpu_optimizer/app.py +++ b/python/aibrix/aibrix/gpu_optimizer/app.py @@ -27,7 +27,6 @@ from aibrix.gpu_optimizer.load_monitor.visualizer import mount_to as mount_visulizer from aibrix.gpu_optimizer.utils import ExcludePathsFilter -NAMESPACE = os.getenv("NAMESPACE", "aibrix-system") MODEL_LABEL = "model.aibrix.ai/name" MIN_REPLICAS_LABEL = "model.aibrix.ai/min_replicas" REDIS_HOST = os.getenv("REDIS_HOST", "localhost") @@ -186,6 +185,22 @@ async def stop_deployment_optimization(request): ) +@app.route("/update_profile/{model_name}") +async def update_profile(request): + model_name = request.path_params["model_name"] + monitor = model_monitors.get(model_name, None) + if monitor is None: + return JSONResponse({"error": f"{model_name} not monitored"}, status_code=404) + + if monitor.load_profiles(): + return JSONResponse({"message": f"workload profile of {model_name} updated"}) + else: + return JSONResponse( + {"error": f"failed to update workload profile of {model_name}"}, + status_code=500, + ) + + @app.route("/scale/{namespace}/{deployment_name}/{replicas}", methods=["POST"]) async def scale_deployment(request): namespace = request.path_params["namespace"] @@ -249,10 +264,8 @@ def main(signal, timeout): apps_v1 = client.AppsV1Api() # List existing deployments - logger.info(f"Looking for deployments in {NAMESPACE} with {MODEL_LABEL}") - deployments = apps_v1.list_namespaced_deployment( - namespace=NAMESPACE, label_selector=MODEL_LABEL - ) + logger.info(f"Looking for deployments with {MODEL_LABEL}") + deployments = apps_v1.list_deployment_for_all_namespaces(label_selector=MODEL_LABEL) watch_version = deployments.metadata.resource_version logger.debug(f"last watch version: {watch_version}") for deployment in deployments.items: @@ -284,8 +297,7 @@ def main(signal, timeout): w = watch.Watch() signal["watch"] = w for event in w.stream( - apps_v1.list_namespaced_deployment, - namespace=NAMESPACE, + apps_v1.list_deployment_for_all_namespaces, label_selector=MODEL_LABEL, resource_version=watch_version, timeout_seconds=timeout, diff --git a/python/aibrix/aibrix/gpu_optimizer/deployment.yaml b/python/aibrix/aibrix/gpu_optimizer/deployment.yaml index 1e122b93..a2b949cc 100644 --- a/python/aibrix/aibrix/gpu_optimizer/deployment.yaml +++ b/python/aibrix/aibrix/gpu_optimizer/deployment.yaml @@ -1,61 +1,55 @@ apiVersion: v1 kind: ServiceAccount metadata: - name: pod-autoscaler + name: aibrix-gpu-optimizer-sa namespace: aibrix-system --- apiVersion: rbac.authorization.k8s.io/v1 -kind: Role +kind: ClusterRole metadata: - namespace: aibrix-system - name: deployment-reader + name: gpu-optimizer-clusterrole rules: - apiGroups: ["apps"] resources: ["deployments"] verbs: ["get", "list", "watch"] --- apiVersion: rbac.authorization.k8s.io/v1 -kind: RoleBinding +kind: ClusterRoleBinding metadata: - name: deployment-reader-binding - namespace: aibrix-system + name: aibrix-gpu-optimizer-clusterrole-binding subjects: - kind: ServiceAccount - name: pod-autoscaler + name: aibrix-gpu-optimizer-sa namespace: aibrix-system roleRef: - kind: Role - name: deployment-reader + kind: ClusterRole + name: gpu-optimizer-clusterrole apiGroup: rbac.authorization.k8s.io --- apiVersion: apps/v1 kind: Deployment metadata: - name: gpu-optimizer + name: aibrix-gpu-optimizer namespace: aibrix-system spec: replicas: 1 selector: matchLabels: - app: gpu-optimizer + app: aibrix-gpu-optimizer template: metadata: labels: - app: gpu-optimizer + app: aibrix-gpu-optimizer spec: - serviceAccountName: pod-autoscaler + serviceAccountName: aibrix-gpu-optimizer-sa automountServiceAccountToken: true # Important! containers: - - name: gpu-optimizer + - name: aibrix-gpu-optimizer image: aibrix/runtime:nightly command: ["python", "-m", "aibrix.gpu_optimizer.app", "--debug"] ports: - containerPort: 8080 env: - - name: NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - name: REDIS_HOST value: aibrix-redis-master.aibrix-system.svc.cluster.local --- @@ -63,11 +57,11 @@ spec: apiVersion: v1 kind: Service metadata: - name: gpu-optimizer + name: aibrix-gpu-optimizer namespace: aibrix-system spec: selector: - app: gpu-optimizer + app: aibrix-gpu-optimizer ports: - protocol: TCP port: 8080 diff --git a/python/aibrix/aibrix/gpu_optimizer/load_monitor/clusterer.py b/python/aibrix/aibrix/gpu_optimizer/load_monitor/clusterer.py index 5fc887d9..409f23d8 100644 --- a/python/aibrix/aibrix/gpu_optimizer/load_monitor/clusterer.py +++ b/python/aibrix/aibrix/gpu_optimizer/load_monitor/clusterer.py @@ -127,7 +127,6 @@ def validate(self) -> bool: if len(self.clusterers) < self.buffer_size: self.clusterers.append(self.clusterers[current].clone()) self.frontier = len(self.clusterers) - 1 - logger.debug("test") logger.debug( "moving buffer created: %s, buffers: %s", self._reason, diff --git a/python/aibrix/aibrix/gpu_optimizer/load_monitor/monitor.py b/python/aibrix/aibrix/gpu_optimizer/load_monitor/monitor.py index 091fc152..dbc295ee 100644 --- a/python/aibrix/aibrix/gpu_optimizer/load_monitor/monitor.py +++ b/python/aibrix/aibrix/gpu_optimizer/load_monitor/monitor.py @@ -137,7 +137,12 @@ def add_deployment( profile = self._match_profile(key, deployment_name) if profile is not None: # No lock required here since the deployment has not been added to deployments. - self._optimizer.set_profile(profile) + try: + self._optimizer.set_profile(profile) + except Exception as e: + logger.warning( + f"Failed to set GPU profile for {key}. Optimizer will skip the GPU: {e}" + ) else: logger.warning( f"No GPU profile found for {key}. Optimizer will skip the GPU." @@ -197,12 +202,13 @@ def clear_outdated_deployments(self) -> int: del self.deployments[key] return len(self.deployments) - def load_profiles(self, profile_reader: Optional[ProfileReader] = None): + def load_profiles(self, profile_reader: Optional[ProfileReader] = None) -> bool: """Load profiles from a file""" try: if profile_reader is None: if self._profile_reader is None: - return + logger.error("Profile reader not initialized") + return False profile_reader = self._profile_reader else: self._profile_reader = profile_reader @@ -211,9 +217,13 @@ def load_profiles(self, profile_reader: Optional[ProfileReader] = None): for profile in profiles: if self._update_profile(profile): logger.debug(f"Profile of {profile.gpu} updated.") + + return True except Exception as e: logger.error(f"Failed to load profiles: {e}") + return False + def _update_profile(self, profile: GPUProfile) -> bool: """Update a profile, will update the formal alias copy, too.""" key = profile.gpu