-
Notifications
You must be signed in to change notification settings - Fork 91
Add KFTO Distributed training notebook and scripts #103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,310 @@ | ||
| { | ||
| "cells": [ | ||
| { | ||
| "cell_type": "markdown", | ||
| "metadata": {}, | ||
| "source": [ | ||
| "# Training the Fraud Detection model with Kubeflow Training Operator" | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "markdown", | ||
| "metadata": {}, | ||
| "source": [ | ||
| "The example fraud detection model is very small and quickly trained. However, for many large models, training requires multiple GPUs and often multiple machines. In this notebook, you learn how to train a model by using Kubeflow Training Operator on OpenShift AI to scale out the model training. You use the Training Operator SDK to create a PyTorchJob executing the provided training script." | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "markdown", | ||
| "metadata": {}, | ||
| "source": [ | ||
| "### Preparing Training Operator SDK\n", | ||
| "\n", | ||
| "Training operator SDK is not available by default on Tensorflow notebooks.Therefore it needs to be installed first." | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "code", | ||
| "execution_count": null, | ||
| "metadata": {}, | ||
| "outputs": [], | ||
| "source": [ | ||
| "%pip install -qqU kubeflow-training==1.9.2" | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "markdown", | ||
| "metadata": {}, | ||
| "source": [ | ||
| "### Preparing the data\n", | ||
| "\n", | ||
| "Normally, the training data for your model would be available in a shared location. For this example, the data is local. You must upload it to your object storage so that you can see how data loading from a shared data source works. Training data is downloaded via the training script and distributed among workers by DistributedSampler." | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "code", | ||
| "execution_count": null, | ||
| "metadata": { | ||
| "tags": [] | ||
| }, | ||
| "outputs": [], | ||
| "source": [ | ||
| "import sys\n", | ||
| "sys.path.append('./utils')\n", | ||
| "\n", | ||
| "import utils.s3\n", | ||
| "\n", | ||
| "utils.s3.upload_directory_to_s3(\"data\", \"data\")\n", | ||
| "print(\"---\")\n", | ||
| "utils.s3.list_objects(\"data\")" | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "markdown", | ||
| "metadata": {}, | ||
| "source": [ | ||
| "### Authenticate to the cluster by using the OpenShift console login\n", | ||
| "\n", | ||
| "Training Operator SDK requires authenticated access to the OpenShift cluster to create PyTorchJobs. The easiest way to get access details is through the OpenShift web console. \n", | ||
| " \n", | ||
| "\n", | ||
| "1. To generate the command, select **Copy login command** from the username drop-down menu at the top right of the web console.\n", | ||
| "\n", | ||
| " <figure>\n", | ||
| " <img src=\"./assets/copy-login.png\" alt=\"copy login\" >\n", | ||
| " <figure/>\n", | ||
| "\n", | ||
| "2. Click **Display token**.\n", | ||
| "\n", | ||
| "3. Below **Log in with this token**, take note of the parameters for token and server.\n", | ||
| " For example:\n", | ||
| " ```\n", | ||
| " oc login --token=sha256~LongString --server=https://api.your-cluster.domain.com:6443\n", | ||
| " ``` \n", | ||
| " - token: `sha256~LongString`\n", | ||
| " - server: `https://api.your-cluster.domain.com:6443`\n", | ||
| " \n", | ||
| "4. In the following code cell replace the token and server values with the values that you noted in Step 3.\n", | ||
| " For example:\n", | ||
| " ```\n", | ||
| " api_server = \"https://api.your-cluster.domain.com:6443\"\n", | ||
| " token = \"sha256~LongString\"\n", | ||
| " ```\n" | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "code", | ||
| "execution_count": null, | ||
| "metadata": { | ||
| "tags": [] | ||
| }, | ||
| "outputs": [], | ||
| "source": [ | ||
| "from kubernetes import client\n", | ||
| "\n", | ||
| "api_server = \"https://XXXX\"\n", | ||
| "token = \"sha256~XXXX\"\n", | ||
| "\n", | ||
| "configuration = client.Configuration()\n", | ||
| "configuration.host = api_server\n", | ||
| "configuration.api_key = {\"authorization\": f\"Bearer {token}\"}\n", | ||
| "# Un-comment if your cluster API server uses a self-signed certificate or an un-trusted CA\n", | ||
| "#configuration.verify_ssl = False" | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "markdown", | ||
| "metadata": { | ||
| "tags": [] | ||
| }, | ||
| "source": [ | ||
| "## Running the distributed training" | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "markdown", | ||
| "metadata": {}, | ||
| "source": [ | ||
| "### Initialize Training client\n", | ||
| "\n", | ||
| "Initialize Training client using provided user credentials." | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "code", | ||
| "execution_count": null, | ||
| "metadata": {}, | ||
| "outputs": [], | ||
| "source": [ | ||
| "from kubeflow.training import TrainingClient\n", | ||
| "\n", | ||
| "client = TrainingClient(client_configuration=configuration)" | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "markdown", | ||
| "metadata": {}, | ||
| "source": [ | ||
| "### Create PyTorchJob\n", | ||
| "\n", | ||
| "Submit PyTorchJob using Training Operator SDK client.\n", | ||
| "\n", | ||
| "Training script is imported from `kfto-scripts` folder.\n", | ||
| "\n", | ||
| "Training script loads and distributes training dataset among nodes, performs distributed training, evaluation using test dataset, exports the trained model to onnx format and uploads it to the S3 bucket specified in provided connection.\n", | ||
| "\n", | ||
| "Important note - If Kueue component is enabled in RHOAI then you must create all Kueue related resources (ResourceFlavor, ClusterQueue and LocalQueue) and provide LocalQueue name in the script below, also uncomment label declaration in create_job function." | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "code", | ||
| "execution_count": null, | ||
| "metadata": { | ||
| "tags": [] | ||
| }, | ||
| "outputs": [], | ||
| "source": [ | ||
| "import sys\n", | ||
| "import os\n", | ||
| "sys.path.append(\"./kfto-scripts\") # needed to make training function available in the notebook\n", | ||
| "from train_pytorch_cpu import train_func\n", | ||
| "from kubernetes.client import (\n", | ||
| " V1EnvVar,\n", | ||
| " V1EnvVarSource,\n", | ||
| " V1SecretKeySelector\n", | ||
| ")\n", | ||
| "\n", | ||
| "# Job name serves as unique identifier to retrieve job related informations using SDK\n", | ||
| "job_name = \"fraud-detection\"\n", | ||
| "\n", | ||
| "# Specifies Kueue LocalQueue name.\n", | ||
| "# If Kueue component is enabled then you must create all Kueue related resources (ResourceFlavor, ClusterQueue and LocalQueue) and provide LocalQueue name here.\n", | ||
| "local_queue_name = \"local-queue\"\n", | ||
| "\n", | ||
| "client.create_job(\n", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When running this with a PyTorch notebook image version 2025.1 I get this error: Should a different image be used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @RHRolun , Actually the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sutaakar
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAIK it will require latest KFTO SDK, which should be available in next RHOAI release - 2.22 Considering that the PR will get merged after a while, 2.22 should be out at that time, so I think the SDK upgrade doesn't need to be included? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. JFI : The Kubeflow-training SDK v1.9.2 is also available in RHOAI release - 2.21
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another option is to remove the |
||
| " job_kind=\"PyTorchJob\",\n", | ||
| " name=job_name,\n", | ||
| " train_func=train_func,\n", | ||
| " num_workers=2,\n", | ||
| " num_procs_per_worker=\"1\",\n", | ||
| " resources_per_worker={\n", | ||
| " \"memory\": \"4Gi\",\n", | ||
| " \"cpu\": 1,\n", | ||
| " },\n", | ||
| " base_image=\"quay.io/modh/training:py311-cuda124-torch251\",\n", | ||
| " # Uncomment the following line to add the queue-name label if Kueue component is enabled in RHOAI and all Kueue related resources are created. Replace `local_queue_name` with the name of your LocalQueue\n", | ||
| "# labels={\"kueue.x-k8s.io/queue-name\": local_queue_name},\n", | ||
| " env_vars=[\n", | ||
| " V1EnvVar(name=\"AWS_ACCESS_KEY_ID\", value=os.environ.get(\"AWS_ACCESS_KEY_ID\")),\n", | ||
| " V1EnvVar(name=\"AWS_S3_BUCKET\", value=os.environ.get(\"AWS_S3_BUCKET\")),\n", | ||
| " V1EnvVar(name=\"AWS_S3_ENDPOINT\", value=os.environ.get(\"AWS_S3_ENDPOINT\")),\n", | ||
| " V1EnvVar(name=\"AWS_SECRET_ACCESS_KEY\", value=os.environ.get(\"AWS_SECRET_ACCESS_KEY\")),\n", | ||
| " ],\n", | ||
| " packages_to_install=[\n", | ||
| " \"s3fs\",\n", | ||
| " \"boto3\",\n", | ||
| " \"scikit-learn\",\n", | ||
| " \"onnx\",\n", | ||
| " ],\n", | ||
| ")" | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "markdown", | ||
| "metadata": { | ||
| "tags": [] | ||
| }, | ||
| "source": [ | ||
| "### Query important job information" | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "code", | ||
| "execution_count": null, | ||
| "metadata": { | ||
| "tags": [] | ||
| }, | ||
| "outputs": [], | ||
| "source": [ | ||
| "import time\n", | ||
| "\n", | ||
| "\n", | ||
| "# Wait until the job finishes\n", | ||
| "print(f\"PyTorchJob '{job_name}' is running.\", end='')\n", | ||
| "while True:\n", | ||
| " try:\n", | ||
| " if client.is_job_running(name=job_name):\n", | ||
| " print(\".\", end='')\n", | ||
| " elif client.is_job_succeeded(name=job_name):\n", | ||
| " print(\".\")\n", | ||
| " print([x.message for x in client.get_job_conditions(name=job_name) if x.type == \"Succeeded\"][0])\n", | ||
| " break\n", | ||
| " elif client.is_job_failed(name=job_name):\n", | ||
| " print(\".\")\n", | ||
| " print([x.message for x in client.get_job_conditions(name=job_name) if x.type == \"Failed\"][0])\n", | ||
| " break\n", | ||
| " else:\n", | ||
| " print(f\"PyTorchJob '{job_name}' status not available or no conditions found.\")\n", | ||
| " break\n", | ||
| "\n", | ||
| " except Exception as e:\n", | ||
| " print(f\"Error getting PyTorchJob status: {e}.\")\n", | ||
| "\n", | ||
| " time.sleep(3)" | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "code", | ||
| "execution_count": null, | ||
| "metadata": {}, | ||
| "outputs": [], | ||
| "source": [ | ||
| "# Get the job logs\n", | ||
| "print(client.get_job_logs(name=job_name)[0][\"fraud-detection-master-0\"])" | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "markdown", | ||
| "metadata": { | ||
| "tags": [] | ||
| }, | ||
| "source": [ | ||
| "### Delete jobs\n", | ||
| "\n", | ||
| "When finished you can delete the PyTorchJob." | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "code", | ||
| "execution_count": null, | ||
| "metadata": { | ||
| "tags": [] | ||
| }, | ||
| "outputs": [], | ||
| "source": [ | ||
| "client.delete_job(name=job_name)" | ||
| ] | ||
| } | ||
| ], | ||
| "metadata": { | ||
| "kernelspec": { | ||
| "display_name": "Python 3.11", | ||
| "language": "python", | ||
| "name": "python3" | ||
| }, | ||
| "language_info": { | ||
| "codemirror_mode": { | ||
| "name": "ipython", | ||
| "version": 3 | ||
| }, | ||
| "file_extension": ".py", | ||
| "mimetype": "text/x-python", | ||
| "name": "python", | ||
| "nbconvert_exporter": "python", | ||
| "pygments_lexer": "ipython3", | ||
| "version": "3.11.11" | ||
| } | ||
| }, | ||
| "nbformat": 4, | ||
| "nbformat_minor": 4 | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.