From be5b81ac8abcc1b7c53b3cbdc65e770921ab98b6 Mon Sep 17 00:00:00 2001 From: kryanbeane Date: Mon, 23 Jun 2025 16:59:17 +0100 Subject: [PATCH 1/2] no-jira: add remote offline batch inference with vllm example --- .../batch-inference/remote_offline_bi.ipynb | 227 ++++++++++++++++++ .../batch-inference/requirements.txt | 4 + .../batch-inference/simple_batch_inf.py | 62 +++++ 3 files changed, 293 insertions(+) create mode 100644 demo-notebooks/additional-demos/batch-inference/remote_offline_bi.ipynb create mode 100644 demo-notebooks/additional-demos/batch-inference/requirements.txt create mode 100644 demo-notebooks/additional-demos/batch-inference/simple_batch_inf.py diff --git a/demo-notebooks/additional-demos/batch-inference/remote_offline_bi.ipynb b/demo-notebooks/additional-demos/batch-inference/remote_offline_bi.ipynb new file mode 100644 index 000000000..b80cb5d04 --- /dev/null +++ b/demo-notebooks/additional-demos/batch-inference/remote_offline_bi.ipynb @@ -0,0 +1,227 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Remote Offline Batch Inference with Ray Data & vLLM Example\n", + "\n", + "This notebook presumes:\n", + "- You are working on Openshift AI\n", + "- You have a Ray Cluster URL given to you to run workloads on\n" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "from codeflare_sdk import RayJobClient\n", + "\n", + "# Setup Authentication Configuration\n", + "auth_token = \"XXXX\"\n", + "header = {\"Authorization\": f\"Bearer {auth_token}\"}" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "# Gather the dashboard URL (provided by the creator of the RayCluster)\n", + "ray_dashboard = \"XXXX\" # Replace with the Ray dashboard URL\n", + "\n", + "# Initialize the RayJobClient\n", + "client = RayJobClient(address=ray_dashboard, headers=header, verify=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Simple Example Explanation\n", + "\n", + "With the RayJobClient instantiated, lets run some batch inference. The following code is stored in `simple_batch_inf.py`, and is used as the entrypoint for the RayJob.\n", + "\n", + "What this processor configuration does:\n", + "- Set up a vLLM engine with your model\n", + "- Configure some settings for GPU processing\n", + "- Defines batch processing parameters (8 requests per batch, 2 GPU workers)\n", + "\n", + "```python\n", + "import ray\n", + "from ray.data.llm import build_llm_processor, vLLMEngineProcessorConfig\n", + "\n", + "processor_config = vLLMEngineProcessorConfig(\n", + " model_source=\"replace-me\",\n", + " engine_kwargs=dict(\n", + " enable_lora=False,\n", + " dtype=\"half\",\n", + " max_model_len=1024,\n", + " ),\n", + " batch_size=8,\n", + " concurrency=2,\n", + ")\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "With the config defined, we can instantiate the processor. This enables batch inference by processing multiple requests through the vLLM engine, with two key steps:\n", + "- **Preprocess**: Converts each row into a structured chat format with system instructions and user queries, preparing the input for the LLM\n", + "- **Postprocess**: Extracts only the generated text from the model response, cleaning up the output\n", + "\n", + "The processor defines the pipeline that will be applied to each row in the dataset, enabling efficient batch processing through Ray Data's distributed execution framework.\n", + "\n", + "```python\n", + "processor = build_llm_processor(\n", + " processor_config,\n", + " preprocess=lambda row: dict(\n", + " messages=[\n", + " {\n", + " \"role\": \"system\",\n", + " \"content\": \"You are a calculator. Please only output the answer \"\n", + " \"of the given equation.\",\n", + " },\n", + " {\"role\": \"user\", \"content\": f\"{row['id']} ** 3 = ?\"},\n", + " ],\n", + " sampling_params=dict(\n", + " temperature=0.3,\n", + " max_tokens=20,\n", + " detokenize=False,\n", + " ),\n", + " ),\n", + " postprocess=lambda row: {\n", + " \"resp\": row[\"generated_text\"],\n", + " },\n", + ")\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we can run the batch inference pipeline on our data, it will:\n", + "- In the background, the processor will download the model into memory where vLLM serves it locally (on Ray Cluster) for use in inference\n", + "- Generate a sample Ray Dataset with 32 rows (0-31) to process\n", + "- Run the LLM processor on the dataset, triggering the preprocessing, inference, and postprocessing steps\n", + "- Execute the lazy pipeline and loads results into memory\n", + "- Iterate through all outputs and print each response \n", + "\n", + "```python\n", + "ds = ray.data.range(30)\n", + "ds = processor(ds)\n", + "ds = ds.materialize()\n", + "\n", + "for out in ds.take_all():\n", + " print(out)\n", + " print(\"==========\")\n", + "```\n", + "\n", + "### Job Submission\n", + "\n", + "Now we can submit this job against the Ray Cluster using the `RayJobClient` from earlier " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2025-06-23 16:56:53,008\tINFO dashboard_sdk.py:338 -- Uploading package gcs://_ray_pkg_d3badb03645503e8.zip.\n", + "2025-06-23 16:56:53,010\tINFO packaging.py:576 -- Creating a file package for local module './'.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "raysubmit_AJhmqzWsvHu6SqZD successfully submitted\n" + ] + } + ], + "source": [ + "entrypoint_command = \"python simple_batch_inf.py\"\n", + "\n", + "submission_id = client.submit_job(\n", + " entrypoint=entrypoint_command,\n", + " runtime_env={\"working_dir\": \"./\", \"pip\": \"requirements.txt\"},\n", + ")\n", + "\n", + "print(submission_id + \" successfully submitted\")" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Get the job's status\n", + "client.get_job_status(submission_id)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'2025-06-23 15:47:22,272\\tINFO job_manager.py:531 -- Runtime env is setting up.\\nINFO 06-23 15:53:36 [__init__.py:244] Automatically detected platform cuda.\\n2025-06-23 15:53:54,307\\tINFO worker.py:1554 -- Using address 10.128.2.45:6379 set in the environment variable RAY_ADDRESS\\n2025-06-23 15:53:54,308\\tINFO worker.py:1694 -- Connecting to existing Ray cluster at address: 10.128.2.45:6379...\\n2025-06-23 15:53:54,406\\tINFO worker.py:1879 -- Connected to Ray cluster. View the dashboard at \\x1b[1m\\x1b[32mhttp://10.128.2.45:8265 \\x1b[39m\\x1b[22m\\nNo cloud storage mirror configured\\n2025-06-23 15:53:57,501\\tWARNING util.py:589 -- The argument ``compute`` is deprecated in Ray 2.9. Please specify argument ``concurrency`` instead. For more information, see https://docs.ray.io/en/master/data/transforming-data.html#stateful-transforms.\\n2025-06-23 15:53:58,095\\tINFO logging.py:290 -- Registered dataset logger for dataset dataset_33_0\\n2025-06-23 15:53:59,702\\tINFO streaming_executor.py:117 -- Starting execution of Dataset dataset_33_0. Full logs are in /tmp/ray/session_2025-06-23_10-53-41_019757_1/logs/ray-data\\n2025-06-23 15:53:59,702\\tINFO streaming_executor.py:118 -- Execution plan of Dataset dataset_33_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->Map(_preprocess)] -> ActorPoolMapOperator[MapBatches(ChatTemplateUDF)] -> ActorPoolMapOperator[MapBatches(TokenizeUDF)] -> ActorPoolMapOperator[MapBatches(vLLMEngineStageUDF)] -> ActorPoolMapOperator[MapBatches(DetokenizeUDF)] -> TaskPoolMapOperator[Map(_postprocess)]\\n\\nRunning 0: 0.00 row [00:00, ? row/s]\\n \\n\\x1b[33m(raylet)\\x1b[0m [2025-06-23 15:54:00,800 E 829 829] (raylet) node_manager.cc:3287: 2 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: b72a45799ac9496bf52347fb9f9ef218722683d7bd8dd14702e821f0, IP: 10.128.2.45) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.128.2.45`\\n\\nRunning 0: 0.00 row [00:01, ? row/s]\\n \\n\\x1b[33m(raylet)\\x1b[0m \\n\\nRunning 0: 0.00 row [00:01, ? row/s]\\n \\n\\x1b[33m(raylet)\\x1b[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.\\n\\nRunning 0: 0.00 row [00:01, ? row/s]\\n \\n\\x1b[33m(raylet)\\x1b[0m \\n\\nRunning 0: 0.00 row [01:01, ? row/s]\\n \\n\\x1b[33m(raylet)\\x1b[0m [2025-06-23 15:55:00,824 E 829 829] (raylet) node_manager.cc:3287: 1 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: b72a45799ac9496bf52347fb9f9ef218722683d7bd8dd14702e821f0, IP: 10.128.2.45) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.128.2.45`\\n\\nRunning 0: 0.00 row [01:01, ? row/s]\\n \\n\\x1b[33m(raylet)\\x1b[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.\\n\\nRunning 0: 0.00 row [01:01, ? row/s]'" + ] + }, + "execution_count": 15, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Get the job's logs\n", + "client.get_job_logs(submission_id)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.12" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/demo-notebooks/additional-demos/batch-inference/requirements.txt b/demo-notebooks/additional-demos/batch-inference/requirements.txt new file mode 100644 index 000000000..d9e8b73b2 --- /dev/null +++ b/demo-notebooks/additional-demos/batch-inference/requirements.txt @@ -0,0 +1,4 @@ +vllm +transformers +triton>=2.0.0 +torch>=2.0.0 diff --git a/demo-notebooks/additional-demos/batch-inference/simple_batch_inf.py b/demo-notebooks/additional-demos/batch-inference/simple_batch_inf.py new file mode 100644 index 000000000..c86ed15b4 --- /dev/null +++ b/demo-notebooks/additional-demos/batch-inference/simple_batch_inf.py @@ -0,0 +1,62 @@ +import ray +from ray.data.llm import build_llm_processor, vLLMEngineProcessorConfig + + +# 1. Construct a vLLM processor config. +processor_config = vLLMEngineProcessorConfig( + # The base model. + model_source="unsloth/Llama-3.2-1B-Instruct", + # vLLM engine config. + engine_kwargs=dict( + enable_lora=False, + # # Older GPUs (e.g. T4) don't support bfloat16. You should remove + # # this line if you're using later GPUs. + dtype="half", + # Reduce the model length to fit small GPUs. You should remove + # this line if you're using large GPUs. + max_model_len=1024, + ), + # The batch size used in Ray Data. + batch_size=8, + # Use one GPU in this example. + concurrency=1, + # If you save the LoRA adapter in S3, you can set the following path. + # dynamic_lora_loading_path="s3://your-lora-bucket/", +) + +# 2. Construct a processor using the processor config. +processor = build_llm_processor( + processor_config, + preprocess=lambda row: dict( + # Remove the LoRA model specification + messages=[ + { + "role": "system", + "content": "You are a calculator. Please only output the answer " + "of the given equation.", + }, + {"role": "user", "content": f"{row['id']} ** 3 = ?"}, + ], + sampling_params=dict( + temperature=0.3, + max_tokens=20, + detokenize=False, + ), + ), + postprocess=lambda row: { + "resp": row["generated_text"], + }, +) + +# 3. Synthesize a dataset with 32 rows. +ds = ray.data.range(32) +# 4. Apply the processor to the dataset. Note that this line won't kick off +# anything because processor is execution lazily. +ds = processor(ds) +# Materialization kicks off the pipeline execution. +ds = ds.materialize() + +# 5. Print all outputs. +for out in ds.take_all(): + print(out) + print("==========") From 1390ee8f502e69e7a9cb5443e5eee2a3ae2e91f4 Mon Sep 17 00:00:00 2001 From: kryanbeane Date: Tue, 24 Jun 2025 10:50:59 +0100 Subject: [PATCH 2/2] no-jira: remove output --- .../batch-inference/remote_offline_bi.ipynb | 47 ++----------------- 1 file changed, 4 insertions(+), 43 deletions(-) diff --git a/demo-notebooks/additional-demos/batch-inference/remote_offline_bi.ipynb b/demo-notebooks/additional-demos/batch-inference/remote_offline_bi.ipynb index b80cb5d04..02d28b137 100644 --- a/demo-notebooks/additional-demos/batch-inference/remote_offline_bi.ipynb +++ b/demo-notebooks/additional-demos/batch-inference/remote_offline_bi.ipynb @@ -7,7 +7,6 @@ "# Remote Offline Batch Inference with Ray Data & vLLM Example\n", "\n", "This notebook presumes:\n", - "- You are working on Openshift AI\n", "- You have a Ray Cluster URL given to you to run workloads on\n" ] }, @@ -132,23 +131,7 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2025-06-23 16:56:53,008\tINFO dashboard_sdk.py:338 -- Uploading package gcs://_ray_pkg_d3badb03645503e8.zip.\n", - "2025-06-23 16:56:53,010\tINFO packaging.py:576 -- Creating a file package for local module './'.\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "raysubmit_AJhmqzWsvHu6SqZD successfully submitted\n" - ] - } - ], + "outputs": [], "source": [ "entrypoint_command = \"python simple_batch_inf.py\"\n", "\n", @@ -162,20 +145,9 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "" - ] - }, - "execution_count": 12, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "# Get the job's status\n", "client.get_job_status(submission_id)" @@ -185,18 +157,7 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "'2025-06-23 15:47:22,272\\tINFO job_manager.py:531 -- Runtime env is setting up.\\nINFO 06-23 15:53:36 [__init__.py:244] Automatically detected platform cuda.\\n2025-06-23 15:53:54,307\\tINFO worker.py:1554 -- Using address 10.128.2.45:6379 set in the environment variable RAY_ADDRESS\\n2025-06-23 15:53:54,308\\tINFO worker.py:1694 -- Connecting to existing Ray cluster at address: 10.128.2.45:6379...\\n2025-06-23 15:53:54,406\\tINFO worker.py:1879 -- Connected to Ray cluster. View the dashboard at \\x1b[1m\\x1b[32mhttp://10.128.2.45:8265 \\x1b[39m\\x1b[22m\\nNo cloud storage mirror configured\\n2025-06-23 15:53:57,501\\tWARNING util.py:589 -- The argument ``compute`` is deprecated in Ray 2.9. Please specify argument ``concurrency`` instead. For more information, see https://docs.ray.io/en/master/data/transforming-data.html#stateful-transforms.\\n2025-06-23 15:53:58,095\\tINFO logging.py:290 -- Registered dataset logger for dataset dataset_33_0\\n2025-06-23 15:53:59,702\\tINFO streaming_executor.py:117 -- Starting execution of Dataset dataset_33_0. Full logs are in /tmp/ray/session_2025-06-23_10-53-41_019757_1/logs/ray-data\\n2025-06-23 15:53:59,702\\tINFO streaming_executor.py:118 -- Execution plan of Dataset dataset_33_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->Map(_preprocess)] -> ActorPoolMapOperator[MapBatches(ChatTemplateUDF)] -> ActorPoolMapOperator[MapBatches(TokenizeUDF)] -> ActorPoolMapOperator[MapBatches(vLLMEngineStageUDF)] -> ActorPoolMapOperator[MapBatches(DetokenizeUDF)] -> TaskPoolMapOperator[Map(_postprocess)]\\n\\nRunning 0: 0.00 row [00:00, ? row/s]\\n \\n\\x1b[33m(raylet)\\x1b[0m [2025-06-23 15:54:00,800 E 829 829] (raylet) node_manager.cc:3287: 2 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: b72a45799ac9496bf52347fb9f9ef218722683d7bd8dd14702e821f0, IP: 10.128.2.45) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.128.2.45`\\n\\nRunning 0: 0.00 row [00:01, ? row/s]\\n \\n\\x1b[33m(raylet)\\x1b[0m \\n\\nRunning 0: 0.00 row [00:01, ? row/s]\\n \\n\\x1b[33m(raylet)\\x1b[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.\\n\\nRunning 0: 0.00 row [00:01, ? row/s]\\n \\n\\x1b[33m(raylet)\\x1b[0m \\n\\nRunning 0: 0.00 row [01:01, ? row/s]\\n \\n\\x1b[33m(raylet)\\x1b[0m [2025-06-23 15:55:00,824 E 829 829] (raylet) node_manager.cc:3287: 1 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: b72a45799ac9496bf52347fb9f9ef218722683d7bd8dd14702e821f0, IP: 10.128.2.45) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.128.2.45`\\n\\nRunning 0: 0.00 row [01:01, ? row/s]\\n \\n\\x1b[33m(raylet)\\x1b[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.\\n\\nRunning 0: 0.00 row [01:01, ? row/s]'" - ] - }, - "execution_count": 15, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "# Get the job's logs\n", "client.get_job_logs(submission_id)"