diff --git a/benchmarks/autoscaling/7b.yaml b/benchmarks/autoscaling/7b.yaml deleted file mode 100644 index 80eb2e2c..00000000 --- a/benchmarks/autoscaling/7b.yaml +++ /dev/null @@ -1,202 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - labels: - model.aibrix.ai/name: deepseek-coder-7b-instruct - model.aibrix.ai/port: "8000" - name: aibrix-model-deepseek-coder-7b-instruct - namespace: default -spec: - replicas: 1 - selector: - matchLabels: - model.aibrix.ai/name: deepseek-coder-7b-instruct - strategy: - type: Recreate - template: - metadata: - annotations: - prometheus.io/scrape: "true" - prometheus.io/port: "8000" - prometheus.io/path: "/metrics" - labels: - model.aibrix.ai/name: deepseek-coder-7b-instruct - spec: - terminationGracePeriodSeconds: 60 - containers: - - command: - - python3 - - -m - - vllm.entrypoints.openai.api_server - - --host - - "0.0.0.0" - - --port - - "8000" - - --model - - /models/deepseek-coder-6.7b-instruct - - --served-model-name - - deepseek-coder-7b-instruct - - --trust-remote-code - - --max-model-len - - "10240" - - --api-key - - sk-kFJ12nKsFVfVmGpj3QzX65s4RbN2xJqWzPYCjYu7wT3BlbLi - - --disable-fastapi-docs - image: aibrix-container-registry-cn-beijing.cr.volces.com/aibrix/vllm-openai:v0.6.2-distributed - imagePullPolicy: Always - livenessProbe: - failureThreshold: 3 - httpGet: - path: /health - port: 8000 - scheme: HTTP - initialDelaySeconds: 90 - periodSeconds: 5 - successThreshold: 1 - timeoutSeconds: 1 - name: vllm-openai - ports: - - containerPort: 8000 - protocol: TCP - readinessProbe: - failureThreshold: 3 - httpGet: - path: /health - port: 8000 - scheme: HTTP - initialDelaySeconds: 90 - periodSeconds: 5 - successThreshold: 1 - timeoutSeconds: 1 - resources: - limits: - nvidia.com/gpu: "1" - requests: - nvidia.com/gpu: "1" - # We need to use dataset cache - volumeMounts: - - mountPath: /models - name: model-hostpath - - name: dshm - mountPath: /dev/shm - lifecycle: - preStop: - exec: - command: - - /bin/sh - - -c - - | - while true; do - RUNNING=$(curl -s http://localhost:8000/metrics | grep 'vllm:num_requests_running' | grep -v '#' | awk '{print $2}') - WAITING=$(curl -s http://localhost:8000/metrics | grep 'vllm:num_requests_waiting' | grep -v '#' | awk '{print $2}') - if [ "$RUNNING" = "0.0" ] && [ "$WAITING" = "0.0" ]; then - echo "Terminating: No active or waiting requests, safe to terminate" >> /proc/1/fd/1 - exit 0 - else - echo "Terminating: Running: $RUNNING, Waiting: $WAITING" >> /proc/1/fd/1 - sleep 5 - fi - done - - name: aibrix-runtime - image: aibrix-container-registry-cn-beijing.cr.volces.com/aibrix/runtime:v0.2.0 - command: - - aibrix_runtime - - --port - - "8080" - env: - - name: INFERENCE_ENGINE - value: vllm - - name: INFERENCE_ENGINE_ENDPOINT - value: http://localhost:8000 - ports: - - containerPort: 8080 - protocol: TCP - volumeMounts: - - mountPath: /models - name: model-hostpath - livenessProbe: - httpGet: - path: /healthz - port: 8080 - initialDelaySeconds: 3 - periodSeconds: 2 - readinessProbe: - httpGet: - path: /ready - port: 8080 - initialDelaySeconds: 5 - periodSeconds: 10 - initContainers: - - name: init-model - image: aibrix-container-registry-cn-beijing.cr.volces.com/aibrix/runtime:v0.1.1 - command: - - aibrix_download - - --model-uri - - tos://aibrix-artifact-testing/models/deepseek-ai/deepseek-coder-6.7b-instruct/ - - --local-dir - - /models/ - env: - - name: DOWNLOADER_MODEL_NAME - value: deepseek-coder-6.7b-instruct - - name: DOWNLOADER_NUM_THREADS - value: "16" - - name: DOWNLOADER_ALLOW_FILE_SUFFIX - value: json, safetensors - - name: TOS_ACCESS_KEY - valueFrom: - secretKeyRef: - name: tos-credential - key: TOS_ACCESS_KEY - - name: TOS_SECRET_KEY - valueFrom: - secretKeyRef: - name: tos-credential - key: TOS_SECRET_KEY - - name: TOS_ENDPOINT - value: tos-cn-beijing.ivolces.com - - name: TOS_REGION - value: cn-beijing - volumeMounts: - - mountPath: /models - name: model-hostpath - volumes: - - name: model-hostpath - hostPath: - path: /root/models - type: DirectoryOrCreate - - name: dshm - emptyDir: - medium: Memory - sizeLimit: "4Gi" - affinity: - nodeAffinity: - requiredDuringSchedulingIgnoredDuringExecution: - nodeSelectorTerms: - - matchExpressions: - - key: machine.cluster.vke.volcengine.com/gpu-name - operator: In - values: - - NVIDIA-L20 - ---- - -apiVersion: v1 -kind: Service -metadata: - labels: - model.aibrix.ai/name: deepseek-coder-7b-instruct - prometheus-discovery: "true" - annotations: - prometheus.io/scrape: "true" - prometheus.io/port: "8000" - name: deepseek-coder-7b-instruct - namespace: default -spec: - ports: - - name: serve - port: 8000 - protocol: TCP - targetPort: 8000 - selector: - model.aibrix.ai/name: deepseek-coder-7b-instruct - type: LoadBalancer diff --git a/benchmarks/autoscaling/README.md b/benchmarks/autoscaling/README.md index 827ea337..df0ba6a6 100644 --- a/benchmarks/autoscaling/README.md +++ b/benchmarks/autoscaling/README.md @@ -43,10 +43,10 @@ For example, There are two plots that you can plot. ### Generating report -`python plot-everything.py ` +`python /benchmarks/plot/plot-everything.py ` For example, -`python plot-everything.py experiment_results/25min_test` +`python /benchmarks/plot/plot-everything.py experiment_results/25min_test` The directories should look like ```bash @@ -60,12 +60,12 @@ ls experiment_results/25min_test ### Generating per pod graph ```bash -python plot_per_pod.py +python /benchmarks/plot/plot_per_pod.py ``` For example, ```bash -python plot_per_pod.py experiment_results/25min_test/25min_up_and_down-apa-least-request-20250209-064742/pod_logs +python /benchmarks/plot/plot_per_pod.py experiment_results/25min_test/25min_up_and_down-apa-least-request-20250209-064742/pod_logs ``` Move all experiment output dir under the same dir (e.g., 25min_test) to compare them in the report. diff --git a/benchmarks/autoscaling/apa.yaml b/benchmarks/autoscaling/apa.yaml deleted file mode 100644 index a4925276..00000000 --- a/benchmarks/autoscaling/apa.yaml +++ /dev/null @@ -1,18 +0,0 @@ -apiVersion: autoscaling.aibrix.ai/v1alpha1 -kind: PodAutoscaler -metadata: - name: deepseek-coder-7b-instruct-apa - labels: - app.kubernetes.io/name: aibrix - app.kubernetes.io/managed-by: kustomize - namespace: default -spec: - scaleTargetRef: - apiVersion: apps/v1 - kind: Deployment - name: aibrix-model-deepseek-coder-7b-instruct - minReplicas: 1 - maxReplicas: 10 - targetMetric: "vllm:gpu_cache_usage_perc" - targetValue: "50" - scalingStrategy: "APA" diff --git a/benchmarks/autoscaling/bench_workload_generator.py b/benchmarks/autoscaling/bench_workload_generator.py deleted file mode 100644 index 0614810a..00000000 --- a/benchmarks/autoscaling/bench_workload_generator.py +++ /dev/null @@ -1,137 +0,0 @@ -import logging -import math -import os -import random -from typing import List, Any - -import matplotlib.pyplot as plt -import numpy as np - - -def generate_workload(input_requests: List[Any], A=1, B=1, - sigma=0.1, - only_rise: bool = False, - omega: float = None, - period=0.25, - length: int = None, - duration_sec: int = None, - interval_sec: int = None, - ) -> List[List[Any]]: - """ - Generates a workload based on a given list of input requests and a concurrency function. - - Reference: https://bytedance.larkoffice.com/docx/C114dkyJioiFDzxoaTScWqh8nof - - The concurrency function is defined as: - concurrency(t) = trend(t) + noise - trend(t) = A * sin(omega * t) + B - noise ~ N(0, sigma^2) - - Args: - input_requests (list): The list of all requests to be sent. - A (float, optional): The amplitude of the sine wave in the concurrency function. Defaults to 1. - B (float, optional): The vertical shift of the sine wave in the concurrency function. Defaults to 1. - sigma (float, optional): The standard deviation of the normal distribution for the noise. Defaults to 0.1. - omega (float, optional): if None, omega = pi / (2 * length / period) - period (float, optional): See omega. Defaults to 0.25. - only_rise: if True, the concurrency will monotonically increase - length (int, optional): if None, length = test_duration_sec / interval_sec - duration_sec (int, optional): See param: length - interval_sec (int, optional): See param: length - - Returns: - list: A list of items, where each item is a list of requests to be sent concurrently. - """ - - def math_function(t): - """ - Calculates the concurrency value based on the given concurrency function. - - The concurrency function is defined as: - concurrency(t) = trend(t) + noise - trend(t) = A * sin(omega * t) + B - noise ~ N(0, sigma^2) - - Args: - t (int): The discrete integer value of t, starting from 0. - - Returns: - int: The concurrency value rounded to the nearest integer. - """ - trend = A * math.sin(omega * t) + B - noise = random.gauss(0, sigma) - return round(trend + noise) - - assert length is not None or (duration_sec is not None and interval_sec is not None), \ - "duration_sec and interval_sec must be specified if length is not None" - if length is None: - length = int(duration_sec // interval_sec) - assert omega is not None or period is not None, "period must be specified if length is not None" - if omega is None: - omega = 2 * math.pi / (length / period) - workload = [] - t = 0 - input_length = len(input_requests) - - previous_concurrency = -1 - start_index, end_index = 0, 0 - while t < length: - current_concurrency = math_function(t) - if only_rise: - current_concurrency = max(previous_concurrency, current_concurrency) - previous_concurrency = current_concurrency - - # start from last end index - start_index = end_index - end_index += current_concurrency - workload.append([input_requests[i % input_length] for i in range(start_index, end_index)]) - t += 1 - - return workload - - -def plot_workload(workload_dict, interval_sec, output_path: str = None): - """ - Plots the concurrency (item length) of the generated workload. - - Args: - workload_dict (dict): A dictionary where the keys are workload names (labels) and the values are lists of lists representing the workload. - interval_sec (int): - """ - for workload_name, workload in workload_dict.items(): - concurrency_values = [len(item) for item in workload] - plt.plot(np.arange(len(concurrency_values)) * interval_sec, concurrency_values, label=workload_name) - - plt.xlabel('Time (Sec)') - plt.ylabel('Concurrency') - plt.title('Workload Concurrency') - plt.legend() - if output_path is None: - plt.show() - else: - os.makedirs(os.path.dirname(output_path), exist_ok=True) - plt.savefig(output_path) - logging.info(f'Saved workload plot to {output_path}') - - -if __name__ == '__main__': - # Generate input requests (ascending integers) - demo_requests = list(range(1, 10001)) - interval = 30 - # Generate workloads with different parameters - workload_dict = { - 'quick_rising': - generate_workload(demo_requests, duration_sec=600, interval_sec=interval, A=5, period=5, only_rise=True), - 'slow_rising': - generate_workload(demo_requests, duration_sec=600, interval_sec=interval, A=5, period=0.25, - only_rise=True), - 'slight_fluctuation': - generate_workload(demo_requests, duration_sec=600, interval_sec=interval, A=5, B=5, period=1, - only_rise=False), - 'severe_fluctuation': - generate_workload(demo_requests, duration_sec=600, interval_sec=interval, A=5, B=10, period=12, - only_rise=False), - } - - # Plot the workloads - plot_workload(workload_dict, interval_sec=interval) diff --git a/benchmarks/autoscaling/benchmark.py b/benchmarks/autoscaling/benchmark.py deleted file mode 100644 index f6834aea..00000000 --- a/benchmarks/autoscaling/benchmark.py +++ /dev/null @@ -1,332 +0,0 @@ -import dataclasses -import json -import os -import random -import time -from datetime import datetime -from typing import List, Optional, Tuple, Dict - -import openai -import asyncio -import logging - -from transformers import PreTrainedTokenizerBase - -from vllm.engine.arg_utils import EngineArgs -from vllm.utils import FlexibleArgumentParser - -from bench_workload_generator import generate_workload, plot_workload - -try: - from vllm.transformers_utils.tokenizer import get_tokenizer -except ImportError: - from backend_request_func import get_tokenizer - - -def setup_logging(log_filename, level=logging.INFO): - """ - Set the global log configuration. The logs will be written into the specified file and output to the console. - - :param log_filename: logging output file - """ - - logging.basicConfig( - level=level, - format="%(asctime)s - %(levelname)s - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) - if not os.path.exists((log_dir := os.path.dirname(log_filename))): - os.makedirs(log_dir, exist_ok=True) - - logger = logging.getLogger() - logger.setLevel(level) - - # create a handler to file - file_handler = logging.FileHandler(log_filename) - file_handler.setLevel(level) - - # create a handler to console - console_handler = logging.StreamHandler() - console_handler.setLevel(level) - - formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s" - ) - file_handler.setFormatter(formatter) - console_handler.setFormatter(formatter) - logger.addHandler(file_handler) - - logging.info(f"save log to {log_filename}") - - -# Function to wrap the prompt into OpenAI's chat completion message format. -def wrap_prompt_as_chat_message(prompt: str): - """ - Wrap the prompt into OpenAI's chat completion message format. - - :param prompt: The user prompt to be converted. - :return: A list containing chat completion messages. - """ - # Define a system message if needed (optional) - # system_message = {"role": "system", "content": "You are a helpful assistant."} - - # Wrap the user prompt as a user message - user_message = {"role": "user", "content": prompt} - - # Combine the system and user message into the expected message format - return [user_message] - - -# Function to build OpenAI clients for each model endpoint. -def build_openai_clients(model_endpoints: Dict[str, str]): - clients = {} - for model, endpoint in model_endpoints.items(): - # Create an OpenAI client with custom base URL if necessary - client = openai.AsyncOpenAI( - api_key="sk-kFJ12nKsFVfVmGpj3QzX65s4RbN2xJqWzPYCjYu7wT3BlbLi", - base_url=endpoint, - ) - - clients[model] = client - return clients - - -# Build the OpenAI endpoint dictionary from the deployment plans -def build_openai_endpoints(deployment_plan): - endpoints = { - "deployment0": { - "deepseek-coder-7b-instruct": "http://0.0.0.0:8000/v1", - }, - "deployment1": { - "model-1": "http://0.0.0.0:8071/v1", - "model-2": "http://0.0.0.0:8072/v1", - "model-3": "http://0.0.0.0:8073/v1", - "model-4": "http://0.0.0.0:8074/v1" - }, - "deployment2": { - "model-1": "http://0.0.0.0:8071/v1", - "model-2": "http://0.0.0.0:8072/v1", - "model-3": "http://0.0.0.0:8073/v1", - "model-4": "http://0.0.0.0:8074/v1" - }, - "deployment3": { - "model-1": "http://0.0.0.0:8070/v1", - "model-2": "http://0.0.0.0:8070/v1", - "model-3": "http://0.0.0.0:8070/v1", - "model-4": "http://0.0.0.0:8070/v1" - } - } - return endpoints[deployment_plan] - - -PROMPT = "You are a helpful assistant in recognizes the content of tables in markdown format. Here is a table as fellows. You need to answer my question about the table.\n# Table\n|Opening|Opening|Sl. No.|Film|Cast|Director|Music Director|Notes|\n|----|----|----|----|----|----|----|----|\n|J A N|9|1|Agni Pushpam|Jayabharathi, Kamalahasan|Jeassy|M. K. Arjunan||\n|J A N|16|2|Priyamvada|Mohan Sharma, Lakshmi, KPAC Lalitha|K. S. Sethumadhavan|V. Dakshinamoorthy||\n|J A N|23|3|Yakshagaanam|Madhu, Sheela|Sheela|M. S. Viswanathan||\n|J A N|30|4|Paalkkadal|Sheela, Sharada|T. K. Prasad|A. T. Ummer||\n|F E B|5|5|Amma|Madhu, Srividya|M. Krishnan Nair|M. K. Arjunan||\n|F E B|13|6|Appooppan|Thikkurissi Sukumaran Nair, Kamal Haasan|P. Bhaskaran|M. S. Baburaj||\n|F E B|20|7|Srishti|Chowalloor Krishnankutty, Ravi Alummoodu|K. T. Muhammad|M. S. Baburaj||\n|F E B|20|8|Vanadevatha|Prem Nazir, Madhubala|Yusufali Kechery|G. Devarajan||\n|F E B|27|9|Samasya|Madhu, Kamalahaasan|K. Thankappan|Shyam||\n|F E B|27|10|Yudhabhoomi|K. P. Ummer, Vidhubala|Crossbelt Mani|R. K. Shekhar||\n|M A R|5|11|Seemantha Puthran|Prem Nazir, Jayabharathi|A. B. Raj|M. K. Arjunan||\n|M A R|12|12|Swapnadanam|Rani Chandra, Dr. Mohandas|K. G. George|Bhaskar Chandavarkar||\n|M A R|19|13|Thulavarsham|Prem Nazir, sreedevi, Sudheer|N. Sankaran Nair|V. Dakshinamoorthy||\n|M A R|20|14|Aruthu|Kaviyoor Ponnamma, Kamalahasan|Ravi|G. Devarajan||\n|M A R|26|15|Swimming Pool|Kamal Haasan, M. G. Soman|J. Sasikumar|M. K. Arjunan||\n\n# Question\nWhat' s the content in the (1,1) cells\n" # noqa: E501 - - -# Asynchronous request handler -async def send_request(client, model, endpoint, prompt, output_file): - start_time = asyncio.get_event_loop().time() - start_ts = time.time() - try: - response = await client.chat.completions.create( - model=model, - messages=prompt, - temperature=0, - max_tokens=128 - ) - latency = asyncio.get_event_loop().time() - start_time - prompt_tokens = response.usage.prompt_tokens - output_tokens = response.usage.completion_tokens - total_tokens = response.usage.total_tokens - throughput = output_tokens / latency - output_text = response.choices[0].message.content - - result = { - "model": model, - "endpoint": endpoint, - "start_timestamp": start_ts, - "latency": latency, - "output": output_text, - "prompt_tokens": prompt_tokens, - "output_tokens": output_tokens, - "total_tokens": total_tokens, - "throughput": throughput, - } - - logging.info( - f"Request for {model} completed in {latency:.2f} seconds with throughput {throughput:.2f} tokens/s, answer: {output_text[:30]}...") - return result - except Exception as e: - logging.error(f"Error sending request to {model} at {endpoint}: {str(e)}") - result = { - "model": model, - "endpoint": endpoint, - "start_timestamp": start_ts, - "latency": asyncio.get_event_loop().time() - start_time, - "start_time": start_time, - "error": str(e), - } - return None - finally: - # Write result to JSONL file - output_file.write(json.dumps(result) + "\n") - output_file.flush() # Ensure data is written immediately to the file - - - -# Benchmark requests and log results into the specified file -async def benchmark_requests(clients, model_endpoints, prompts, num_requests, concurrency, output_file_path): - models = list(model_endpoints.keys()) - total_models = len(models) - requests_per_model = num_requests // total_models - - with open(output_file_path, 'a', encoding='utf-8') as output_file: - for start_idx in range(0, requests_per_model, concurrency): - batch_tasks = [] - end_idx = min(start_idx + concurrency, requests_per_model) - - for model_index, model in enumerate(models): - client = clients[model] - endpoint = model_endpoints[model] - - for request_idx in range(start_idx, end_idx): - prompt_idx = (model_index * requests_per_model + request_idx) % len(prompts) - prompt, _, _, _ = prompts[prompt_idx] - prompt = wrap_prompt_as_chat_message(prompt) - - task = asyncio.create_task(send_request(client, model, endpoint, prompt, output_file)) - batch_tasks.append(task) - - logging.info(f"submit a batch with {concurrency * total_models} requests") - await asyncio.gather(*batch_tasks) - logging.info(f"Completed requests {start_idx} to {end_idx - 1} for each model.") - - logging.info(f"All {num_requests} requests completed for deployment.") - - -def sample_sharegpt_requests( - dataset_path: str, - num_requests: int, - tokenizer: PreTrainedTokenizerBase, - fixed_output_len: Optional[int] = None, -) -> List[Tuple[str, int, int, None]]: - # Load the dataset - with open(dataset_path, encoding='utf-8') as f: - dataset = json.load(f) - dataset = [data for data in dataset if len(data["conversations"]) >= 2] - dataset = [(data["conversations"][0]["value"], data["conversations"][1]["value"]) for data in dataset] - - filtered_dataset: List[Tuple[str, int, int]] = [] - for i in range(len(dataset)): - if len(filtered_dataset) == num_requests: - break - prompt = dataset[i][0] - prompt_token_ids = tokenizer(prompt).input_ids - completion = dataset[i][1] - completion_token_ids = tokenizer(completion).input_ids - prompt_len = len(prompt_token_ids) - output_len = len(completion_token_ids) if fixed_output_len is None else fixed_output_len - if prompt_len < 4 or (fixed_output_len is None and output_len < 4): - continue - if prompt_len > 1024 or prompt_len + output_len > 2048: - continue - filtered_dataset.append((prompt, prompt_len, output_len, None)) - - return filtered_dataset - - -def main(args): - WORKLOAD_MODE = args.concurrency is not None - tokenizer = get_tokenizer(args.model, trust_remote_code=True) - if args.dataset_path is not None: - logging.info(f"Start to sample {args.num_prompts} prompts from {args.dataset_path}") - num_prompts = args.num_prompts - if WORKLOAD_MODE: - # length * avearge bar - num_prompts = int(args.w_B * args.w_duration_sec / args.w_interval_sec) - input_requests = sample_sharegpt_requests( - dataset_path=args.dataset_path, - num_requests=num_prompts, - tokenizer=tokenizer, - fixed_output_len=args.sharegpt_output_len, - ) - else: - prompt_len = len(tokenizer(PROMPT).input_ids) - input_requests = [(PROMPT, prompt_len, args.output_len, None)] * args.num_prompts - - logging.info(f"Samples results: {input_requests[0]}") - - openai_endpoints = build_openai_endpoints(args.deployment_endpoints) - openai_clients = build_openai_clients(openai_endpoints) - - start_time = time.time() - if WORKLOAD_MODE: - logging.info(f"Starting benchmark for {args.num_prompts} prompts with deployment {args.deployment_endpoints}") - asyncio.run(benchmark_requests(openai_clients, openai_endpoints, input_requests, args.num_prompts, args.concurrency, - args.output_file_path)) - else: - interval_sec = args.w_interval_sec - logging.info(f"args.concurrency is None, generate workload: " - f"A={args.w_A}, B={args.w_B}, sigma={args.w_sigma}, period={args.w_period}, " - f"duration_sec={args.w_duration_sec}, interval_sec={interval_sec}") - workloads = generate_workload( - input_requests, - A=args.w_A, B=args.w_B, sigma=args.w_sigma, only_rise=args.w_only_rise, - period=args.w_period, duration_sec=args.w_duration_sec, interval_sec=interval_sec, - ) - # if you want to see the workload traffic trend - plot_workload({"llm": workloads}, interval_sec=interval_sec, - output_path=f"workload_plot/{identifier}.png") - next_start = start_time + interval_sec - for idx, each_input_requests in enumerate(workloads): - - if len(each_input_requests) == 0: - logging.info(f"===== Sending Batch[{idx}], concurrency={len(each_input_requests)}: not sending in the batch") - else: - logging.info(f"===== Sending Batch[{idx}], concurrency={len(each_input_requests)}: E.g. question: {each_input_requests[0][:30]}...") - asyncio.run(benchmark_requests(openai_clients, openai_endpoints, each_input_requests, len(each_input_requests), - len(each_input_requests), args.output_file_path)) - - # wait until passing args.w_interval_sec - wait_time = next_start - time.time() - if wait_time > 0: - time.sleep(wait_time) - next_start += interval_sec - end_time = time.time() - logging.info(f"Benchmark completed in {end_time - start_time:.2f} seconds") - - -if __name__ == "__main__": - parser = FlexibleArgumentParser(description='Benchmark the performance of multi-loras.') - parser.add_argument("--dataset-path", type=str, default='/Users/kangrong.cn/data/AIBrix/ShareGPT_V3_unfiltered_cleaned_split.json', help="Path to the dataset.") - parser.add_argument("--sharegpt-output-len", type=int, default=None, - help="Output length for each request. Overrides the output length from the ShareGPT dataset.") - parser.add_argument('--num-prompts', type=int, default=200, help="Number of the prompts sampled from dataset") - parser.add_argument('--concurrency', type=int, default=None, help="Number of the prompts concurrency, if None, use workload param to generate") - parser.add_argument('--output-len', type=int, default=10) - parser.add_argument('--output-file-path', type=str, default="output.jsonl") - parser.add_argument('--deployment-endpoints', type=str, default="deployment0") - """ - A=1, B=1, sigma=0.1, only_rise: bool = False, - period=0.25, duration_sec: int = None, interval_sec: int = None, - """ - parser.add_argument('--w-A', type=float, default=5) - parser.add_argument('--w-B', type=float, default=5) - parser.add_argument('--w-sigma', type=float, default=0.1) - parser.add_argument('--w-only-rise', action='store_true') - parser.add_argument('--w-period', type=float, default=12) - parser.add_argument('--w-duration-sec', type=int, default=600) - parser.add_argument('--w-interval-sec', type=int, default=10) - - parser = EngineArgs.add_cli_args(parser) - args = parser.parse_args() - - if args.concurrency is not None: - identifier = f"onestep_np{args.num_prompts}_c{args.concurrency}" - else: - identifier = f"workload_A{args.w_A}_B{args.w_B}_P{args.w_period}_D{args.w_duration_sec}s_I{args.w_interval_sec}s" - identifier += "_" + datetime.now().strftime("%Y%m%d_%H%M%S") - args.output_file_path = f"output_stats/output_{identifier}.jsonl" - os.makedirs(os.path.dirname(args.output_file_path), exist_ok=True) - - setup_logging(f'logs/bench_{identifier}.log') - main(args) diff --git a/benchmarks/autoscaling/debug.yaml b/benchmarks/autoscaling/debug.yaml deleted file mode 100644 index 9b24a3b8..00000000 --- a/benchmarks/autoscaling/debug.yaml +++ /dev/null @@ -1,34 +0,0 @@ -apiVersion: v1 -kind: Pod -metadata: - name: debug-pod - namespace: default -spec: - containers: - - name: debug-container - image: aibrix-container-registry-cn-beijing.cr.volces.com/aibrix/ray:2.10.0 - command: ["sh", "-c", "sleep infinity"] - resources: - limits: - cpu: "100m" - memory: "128Mi" - requests: - cpu: "100m" - memory: "128Mi" - volumeMounts: - - mountPath: /models - name: model-hostpath - volumes: - - name: model-hostpath - hostPath: - path: /root/models - type: DirectoryOrCreate - affinity: - nodeAffinity: - requiredDuringSchedulingIgnoredDuringExecution: - nodeSelectorTerms: - - matchExpressions: - - key: machine.cluster.vke.volcengine.com/gpu-name - operator: In - values: - - NVIDIA-A10 diff --git a/benchmarks/autoscaling/hpa.yaml b/benchmarks/autoscaling/hpa.yaml deleted file mode 100644 index 914c932b..00000000 --- a/benchmarks/autoscaling/hpa.yaml +++ /dev/null @@ -1,18 +0,0 @@ -apiVersion: autoscaling.aibrix.ai/v1alpha1 -kind: PodAutoscaler -metadata: - name: deepseek-coder-7b-instruct-hpa - labels: - app.kubernetes.io/name: aibrix - app.kubernetes.io/managed-by: kustomize - namespace: default -spec: - scaleTargetRef: - apiVersion: apps/v1 - kind: Deployment - name: aibrix-model-deepseek-coder-7b-instruct - minReplicas: 1 - maxReplicas: 10 - targetMetric: "gpu_cache_usage_perc" - targetValue: "50" - scalingStrategy: "HPA" \ No newline at end of file diff --git a/benchmarks/autoscaling/kpa.yaml b/benchmarks/autoscaling/kpa.yaml deleted file mode 100644 index f91ab827..00000000 --- a/benchmarks/autoscaling/kpa.yaml +++ /dev/null @@ -1,18 +0,0 @@ -apiVersion: autoscaling.aibrix.ai/v1alpha1 -kind: PodAutoscaler -metadata: - name: deepseek-coder-7b-instruct-kpa - labels: - app.kubernetes.io/name: aibrix - app.kubernetes.io/managed-by: kustomize - namespace: default -spec: - scaleTargetRef: - apiVersion: apps/v1 - kind: Deployment - name: aibrix-model-deepseek-coder-7b-instruct - minReplicas: 1 - maxReplicas: 10 - targetMetric: "vllm:gpu_cache_usage_perc" - targetValue: "0.5" - scalingStrategy: "KPA" \ No newline at end of file diff --git a/benchmarks/autoscaling/run-test.sh b/benchmarks/autoscaling/run-test.sh index 229ca3cc..027ad325 100755 --- a/benchmarks/autoscaling/run-test.sh +++ b/benchmarks/autoscaling/run-test.sh @@ -2,8 +2,8 @@ input_workload_path=$1 autoscaler=$2 -aibrix_repo="/Users/bytedance/projects/aibrix-2" # root dir of aibrix repo -api_key="sk-kFJ12nKsFVfVmGpj3QzX65s4RbN2xJqWzPYCjYu7wT3BlbLi" # set your api key +aibrix_repo="" # root dir of aibrix repo +api_key="" # set your api key k8s_yaml_dir="deepseek-llm-7b-chat" target_deployment="deepseek-llm-7b-chat" # "aibrix-model-deepseek-llm-7b-chat" target_ai_model=deepseek-llm-7b-chat @@ -66,7 +66,7 @@ kubectl delete -f ${k8s_yaml_dir}/deploy.yaml kubectl apply -f ${k8s_yaml_dir}/deploy.yaml kubectl apply -f ${k8s_yaml_dir}/${autoscaler}.yaml echo "kubectl apply -f ${k8s_yaml_dir}/${autoscaler}.yaml" -python set_num_replicas.py --deployment ${target_deployment} --replicas 1 +python3 ${aibrix_repo}/benchmarks/utils/set_num_replicas.py --deployment ${target_deployment} --replicas 1 echo "Set number of replicas to \"1\". Autoscaling experiment will start from 1 pod" echo "Restart aibrix-controller-manager deployment" @@ -82,9 +82,9 @@ kubectl rollout restart deploy ${target_deployment} -n default sleep_before_pod_check=20 echo "Sleep for ${sleep_before_pod_check} seconds after restarting deployment" sleep ${sleep_before_pod_check} -python check_k8s_is_ready.py ${target_deployment} -python check_k8s_is_ready.py aibrix-controller-manager -python check_k8s_is_ready.py aibrix-gateway-plugins +python3 ${aibrix_repo}/benchmarks/utils/check_k8s_is_ready.py ${target_deployment} +python3 ${aibrix_repo}/benchmarks/utils/check_k8s_is_ready.py aibrix-controller-manager +python3 ${aibrix_repo}/benchmarks/utils/check_k8s_is_ready.py aibrix-gateway-plugins # Start pod log monitoring pod_log_dir="${experiment_result_dir}/pod_logs" @@ -94,14 +94,14 @@ mkdir -p ${pod_log_dir} cp ${input_workload_path} ${experiment_result_dir} # Start pod counter. It will run on background until the end of the experiment. -python count_num_pods.py ${target_deployment} ${experiment_result_dir} & +python3 ${aibrix_repo}/benchmarks/utils/count_num_pods.py ${target_deployment} ${experiment_result_dir} & COUNT_NUM_POD_PID=$! echo "started count_num_pods.py with PID: $COUNT_NUM_POD_PID" # Streaming pod logs to files on the background -python streaming_pod_log_to_file.py ${target_deployment} default ${pod_log_dir} & pid_1=$! -python streaming_pod_log_to_file.py aibrix-controller-manager aibrix-system ${pod_log_dir} & pid_2=$! -python streaming_pod_log_to_file.py aibrix-gateway-plugins aibrix-system ${pod_log_dir} & pid_3=$! +python3 ${aibrix_repo}/benchmarks/utils/streaming_pod_log_to_file.py ${target_deployment} default ${pod_log_dir} & pid_1=$! +python3 ${aibrix_repo}/benchmarks/utils/streaming_pod_log_to_file.py aibrix-controller-manager aibrix-system ${pod_log_dir} & pid_2=$! +python3 ${aibrix_repo}/benchmarks/utils/streaming_pod_log_to_file.py aibrix-gateway-plugins aibrix-system ${pod_log_dir} & pid_3=$! # Run experiment!!! output_jsonl_path=${experiment_result_dir}/output.jsonl @@ -123,7 +123,7 @@ sleep 1 # Cleanup kubectl delete podautoscaler --all --all-namespaces -python set_num_replicas.py --deployment ${target_deployment} --replicas 1 +python3 ${aibrix_repo}/benchmarks/utils/set_num_replicas.py --deployment ${target_deployment} --replicas 1 kubectl delete -f ${k8s_yaml_dir}/deploy.yaml # Stop monitoring processes diff --git a/benchmarks/autoscaling/check_k8s_is_ready.py b/benchmarks/utils/check_k8s_is_ready.py similarity index 100% rename from benchmarks/autoscaling/check_k8s_is_ready.py rename to benchmarks/utils/check_k8s_is_ready.py diff --git a/benchmarks/autoscaling/count_num_pods.py b/benchmarks/utils/count_num_pods.py similarity index 100% rename from benchmarks/autoscaling/count_num_pods.py rename to benchmarks/utils/count_num_pods.py diff --git a/benchmarks/autoscaling/set_num_replicas.py b/benchmarks/utils/set_num_replicas.py similarity index 100% rename from benchmarks/autoscaling/set_num_replicas.py rename to benchmarks/utils/set_num_replicas.py diff --git a/benchmarks/autoscaling/streaming_pod_log_to_file.py b/benchmarks/utils/streaming_pod_log_to_file.py similarity index 66% rename from benchmarks/autoscaling/streaming_pod_log_to_file.py rename to benchmarks/utils/streaming_pod_log_to_file.py index 47b098ec..6a87affb 100644 --- a/benchmarks/autoscaling/streaming_pod_log_to_file.py +++ b/benchmarks/utils/streaming_pod_log_to_file.py @@ -11,24 +11,26 @@ def get_all_pods(namespace): pod_list = pod_list_output.decode('utf-8').split() return pod_list -def write_logs(keywords, fname, process): +def write_logs(include, exclude, fname, process): with open(fname, 'w') as log_file: while True: line = process.stdout.readline() if not line: break - if len(keywords) == 0: # If there is no keyword, write all logs + if include is None and exclude is None: log_file.write(line) - log_file.flush() - else: - for keyword in keywords: - if keyword in line: - # If there is keyword, write only the lines containing the keyword - log_file.write(line) - log_file.flush() - break + elif include is not None and exclude is None: + if include in line: + log_file.write(line) + elif include is None and exclude is not None: + if exclude not in line: + log_file.write(line) + elif include is not None and exclude is not None: + if include in line and exclude not in line: + log_file.write(line) + log_file.flush() -def save_proxy_logs_streaming(pod_log_dir, pod_name, namespace): +def save_proxy_logs_streaming(pod_log_dir, pod_name, namespace, include, exclude): if not os.path.exists(pod_log_dir): os.makedirs(pod_log_dir) @@ -40,11 +42,7 @@ def save_proxy_logs_streaming(pod_log_dir, pod_name, namespace): stderr=subprocess.PIPE, universal_newlines=True ) - if namespace == "default": - keywords = ["Avg prompt throughput:", "logger.py", "engine.py"] - else: - keywords = [] - log_thread = threading.Thread(target=write_logs, args=(keywords, fname, process)) + log_thread = threading.Thread(target=write_logs, args=(include, exclude, fname, process)) log_thread.start() return process, log_thread @@ -59,6 +57,12 @@ def signal_handler(sig, frame): target_deployment = sys.argv[1] namespace = sys.argv[2] pod_log_dir = sys.argv[3] + include = sys.argv[4] + exclude = sys.argv[5] + if include == "none": + include = None + if exclude == "none": + exclude = None running_processes = [] signal.signal(signal.SIGINT, signal_handler) @@ -68,7 +72,7 @@ def signal_handler(sig, frame): assert False for pod_name in all_pods: if target_deployment in pod_name: - process, thread = save_proxy_logs_streaming(pod_log_dir, pod_name, namespace) + process, thread = save_proxy_logs_streaming(pod_log_dir, pod_name, namespace, include, exclude) running_processes.append((process, thread)) print(f"Started streaming logs for {len(all_pods)} pods") @@ -79,4 +83,4 @@ def signal_handler(sig, frame): while True: signal.pause() except (KeyboardInterrupt, SystemExit): - signal_handler(None, None) \ No newline at end of file + signal_handler(None, None)