Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 294 additions & 0 deletions extra/journal_analysis.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "e21c672d-c631-4b27-9b5e-2b248c00a2e1",
"metadata": {},
"source": [
"# Journal processing demonstration\n",
"\n",
"This notebook demonstrates how to load HyperQueue JSON journal export and\n",
"do a simple analysis over it.\n",
"\n",
"You can export journal file to JSON via:\n",
"\n",
"```bash\n",
"$ hq journal export <JOURNAL_FILE> > journal.json\n",
"```\n",
"\n",
"or get JSON events from live server via:\n",
"\n",
"```bash\n",
"$ hq journal replay > journal.json\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cb5b0606-8547-43b2-855e-ee3ca7c4523f",
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"import dateutil.parser\n",
"from datetime import datetime\n",
"import pandas as pd\n",
"import plotly.express as px"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dd3625d2-80ef-4842-8395-dc86d713fdaa",
"metadata": {},
"outputs": [],
"source": [
"# Load journal from file\n",
"with open(\"journal.json\") as f:\n",
" journal_events = [json.loads(s) for s in f]\n",
"\n",
"# Show first 10 events\n",
"for event in journal_events[:10]:\n",
" print(event[\"time\"], event[\"event\"][\"type\"]) "
]
},
{
"cell_type": "markdown",
"id": "aaf8e4a1-a35a-4756-ae41-4259400d0f5b",
"metadata": {},
"source": [
"# Analysis 1: What tasks were assigned to each worker"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c9fef944-624c-47a5-9621-8aa17146b92a",
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"# Mapping from workers to tasks: dict[int, list[str]]\n",
"worker_tasks = {}\n",
"\n",
"for event in journal_events:\n",
" ev = event[\"event\"] \n",
" if ev[\"type\"] == \"worker-connected\":\n",
" # If worker is a new connected then create an entry in worker_tasks\n",
" worker_tasks[ev[\"id\"]] = []\n",
" if ev[\"type\"] == \"task-started\": \n",
" # This event is called when a task is started on a worker\n",
"\n",
" # Construct a string <job-id>@<task-id>\n",
" job_task_id = f\"{ev[\"job\"]}@{ev[\"task\"]}\"\n",
" \n",
" if \"worker\" in ev:\n",
" # Handle single-node tasks and put the task into record for the worker\n",
" worker_tasks[ev[\"worker\"]].append(job_task_id)\n",
" else:\n",
" # Handle multi-node tasks and put the task into record for the workers\n",
" for w in ev[\"workers\"]:\n",
" worker_tasks[w].append(job_task_id)\n",
"\n",
"print(\"Tasks assigned to workers:\")\n",
"for worker_id, tasks in worker_tasks.items():\n",
" print(f\"{worker_id}: {','.join(tasks)}\")"
]
},
{
"cell_type": "markdown",
"id": "6e6dbbfc-257d-4cb1-b6d1-a75addd3a344",
"metadata": {},
"source": [
"# Analsis 2: CPU utilization over time across all workers\n",
"\n",
"This tracks the total number of CPUs in use at each point in time"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d279364f-13b1-43a0-8950-37b393b79635",
"metadata": {},
"outputs": [],
"source": [
"def parse_time(tm):\n",
" \"\"\"\n",
" Parse time of an event\n",
" \"\"\"\n",
" try:\n",
" return datetime.strptime(tm, \"%Y-%m-%dT%H:%M:%S.%fZ\")\n",
" except ValueError:\n",
" return datetime.strptime(tm, \"%Y-%m-%dT%H:%M:%SZ\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ff2c3889-221f-4d4e-97f1-4bf49f16171d",
"metadata": {},
"outputs": [],
"source": [
"def task_configs(submit_desc):\n",
" \"\"\"\n",
" Extract resource configurations from a submit description\n",
" \"\"\"\n",
" task_desc = submit_desc[\"task_desc\"]\n",
" result = {} \n",
" if \"n_tasks\" in task_desc:\n",
" raise Exception(\"Task graphs not supported\")\n",
" for id_range in task_desc[\"ids\"][\"ranges\"]:\n",
" start = id_range[\"start\"]\n",
" for i in range(0, id_range[\"count\"], id_range[\"step\"]):\n",
" result[start + i] = task_desc[\"resources\"]\n",
" return result\n",
"\n",
"def get_resource_amount(resources, name, all_amount):\n",
" \"\"\"\n",
" Get a resource amount from task configution.\n",
" \n",
" The parameter \"all_amount\" are resources on a worker to resolve policy \"all\"\n",
" that takes all resources of the worker, i.e. it depends\n",
" on the specific worker how many resources we get.\n",
" \"\"\"\n",
" for r in resources[\"resources\"]: \n",
" if r[\"resource\"] == name:\n",
" if r[\"policy\"] == \"All\":\n",
" return all_amount\n",
" elif r[\"policy\"] in (\"ForceCompact\", \"Compact\", \"ForceTight\", \"Tight\", \"Scatter\"):\n",
" # Resources are represented in fixed point where 1.0 = 10_000\n",
" return list(r[\"policy\"].values())[0] / 10_000\n",
" return 0\n",
"\n",
"def get_worker_res_count(resources, name):\n",
" \"\"\"\n",
" Get an amount of a resources provided by a worker\n",
" \"\"\"\n",
" for r in resources:\n",
" if r[\"name\"] == name:\n",
" kind = r[\"kind\"] \n",
" if \"List\" in kind:\n",
" return len(kind[\"List\"][\"values\"])\n",
" elif \"Groups\" in kind:\n",
" return sum(len(g) for g in kind[\"Groups\"][\"groups\"])\n",
" elif \"Range\" in kind:\n",
" return kind[\"Range\"][\"end\"] - kind[\"Range\"][\"start\"] + 1\n",
" elif \"Sum\" in kind:\n",
" # Resources are represented in fixed point where 1.0 = 10_000\n",
" return kind[\"Sum\"][\"size\"] / 10_000\n",
" else:\n",
" raise Exception(\"Unknown resurce kind\")\n",
" return 0"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "69c9524e-cfa7-404b-bdaf-5e41605760ed",
"metadata": {},
"outputs": [],
"source": [
"# Read the starting time from the first event\n",
"BASE_TIME = parse_time(journal_events[0][\"time\"])\n",
"\n",
"# Job definitions, we need it to get resource requests of tasks\n",
"job_defs = {} \n",
"\n",
"# Worker deinitions, we need to be able to resolve allocation policy \"all\"\n",
"worker_resources = {}\n",
"\n",
"# Amount of cpus of currently running tasks\n",
"running_tasks = {}\n",
"\n",
"# Output variable. It is initialized by (0,0), i.e. at time 0, there are 0 running cpus\n",
"running_cpus = [(0, 0)]\n",
"\n",
"\n",
"for event in journal_events:\n",
" ev = event[\"event\"] \n",
" if ev[\"type\"] == \"job-created\":\n",
" # When a job is created, remember resource requests of tasks \n",
" # Note that they may be multiple submits into one job\n",
" job_defs.setdefault(ev[\"job\"], {}) \n",
" job_defs[ev[\"job\"]].update(task_configs(ev[\"submit_desc\"]))\n",
" elif ev[\"type\"] == \"worker-connected\":\n",
" # When a worker is connected, lets remember its resources\n",
" worker_resources[ev[\"id\"]] = ev[\"configuration\"][\"resources\"][\"resources\"] \n",
" elif ev[\"type\"] == \"task-started\": \n",
" # When task is started, compute allocated cpus and store them in `running_cpus`.\n",
" time = (parse_time(event[\"time\"]) - BASE_TIME).total_seconds()\n",
" \n",
" # Get task resource request\n",
" # There may be more resource request variants, so we have to choose the resource request that\n",
" # was actually started on the worker\n",
" task_def = job_defs[ev[\"job\"]][ev[\"task\"]]\n",
" task_resources = task_def[\"variants\"][ev.get(\"variant\", 0)] \n",
" \n",
" if \"worker\" in ev:\n",
" # Get CPUs of a where the task was started\n",
" worker_res = worker_resources[ev[\"worker\"]]\n",
" all_amount = get_worker_res_count(worker_res, \"cpus\")\n",
"\n",
" # Get amount of resources that task asked for\n",
" amount = get_resource_amount(task_resources, \"cpus\", all_amount)\n",
"\n",
" # Store how many CPUs we have asked for\n",
" running_tasks[(ev[\"job\"], ev[\"task\"])] = amount\n",
"\n",
" # Remember the current used CPUs in the given time\n",
" running_cpus.append((time, running_cpus[-1][1] + amount))\n",
" else:\n",
" raise Exception(\"This analysis support only single node tasks\")\n",
" elif ev[\"type\"] in (\"task-finished\", \"task-failed\", \"task-canceled\"):\n",
" # When task is finished/failed/canceled, we need to modify our counter\n",
" time = (parse_time(event[\"time\"]) - BASE_TIME).total_seconds()\n",
" amount = running_tasks.get((ev[\"job\"], ev[\"task\"]), 0)\n",
" if amount > 0:\n",
" running_cpus.append((time, running_cpus[-1][1] - all_amount)) "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "87e1af8f-7c20-427b-8a83-de6a876be43e",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "0fdea084-ff4d-482c-a08a-3bfd80544ed5",
"metadata": {},
"outputs": [],
"source": [
"# Let's visualize the result\n",
"df = pd.DataFrame(running_cpus, columns=[\"time\", \"cpus\"])\n",
"px.line(df, x=\"time\", y=\"cpus\", line_shape=\"vh\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.3"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading