From 5206c4ed27b32be54d97c8295fb49d84d8793925 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Mon, 8 Jul 2024 22:32:08 +0200 Subject: [PATCH] feat(api): support pagination on list_apps route (#77) --- spark_on_k8s/api/apps.py | 64 ++++++++++++++++++++++++++++++---------- 1 file changed, 49 insertions(+), 15 deletions(-) diff --git a/spark_on_k8s/api/apps.py b/spark_on_k8s/api/apps.py index 48d6014..f00e0e7 100644 --- a/spark_on_k8s/api/apps.py +++ b/spark_on_k8s/api/apps.py @@ -1,6 +1,9 @@ from __future__ import annotations from fastapi import APIRouter + +# ruff: noqa: TCH002 +from fastapi.responses import Response from kubernetes_asyncio.client import CoreV1Api from pydantic import BaseModel @@ -24,25 +27,56 @@ class SparkApp(BaseModel): spark_history_proxy: bool = False +async def _list_apps(namespace: str, max_per_page: int, continue_from: str) -> tuple[str, list[SparkApp]]: + no_more_items = "no-more-items" + if continue_from == no_more_items: + return no_more_items, [] + core_client = CoreV1Api(await KubernetesClientSingleton.client()) + search_params = { + "namespace": namespace, + "label_selector": "spark-role=driver", + "limit": max_per_page, + } + if continue_from: + search_params["_continue"] = continue_from + driver_pods = await core_client.list_namespaced_pod(**search_params) + return ( + driver_pods.metadata._continue if driver_pods.metadata._continue else no_more_items, + [ + SparkApp( + app_id=pod.metadata.labels.get("spark-app-id", pod.metadata.name), + status=get_app_status(pod), + driver_logs=True, + spark_ui_proxy=pod.metadata.labels.get("spark-ui-proxy", False), + ) + for pod in driver_pods.items + ], + ) + + @router.get("/list_apps") -async def list_apps_default_namespace() -> list[SparkApp]: +async def list_apps_default_namespace( + response: Response, max_per_page: int = 10, continue_from: str = "" +) -> list[SparkApp]: """List spark apps in the default namespace.""" - return await list_apps(namespace=APIConfiguration.SPARK_ON_K8S_API_DEFAULT_NAMESPACE) + continue_from, apps = await _list_apps( + namespace=APIConfiguration.SPARK_ON_K8S_API_DEFAULT_NAMESPACE, + max_per_page=max_per_page, + continue_from=continue_from, + ) + response.headers["X-Continue"] = continue_from + return apps @router.get("/list_apps/{namespace}") -async def list_apps(namespace: str) -> list[SparkApp]: +async def list_apps( + response: Response, namespace: str, max_per_page: int = 10, continue_from: str = "" +) -> list[SparkApp]: """List spark apps in a namespace.""" - core_client = CoreV1Api(await KubernetesClientSingleton.client()) - driver_pods = await core_client.list_namespaced_pod( - namespace=namespace, label_selector="spark-role=driver" + continue_from, apps = await _list_apps( + namespace=namespace, + max_per_page=max_per_page, + continue_from=continue_from, ) - return [ - SparkApp( - app_id=pod.metadata.labels.get("spark-app-id", pod.metadata.name), - status=get_app_status(pod), - driver_logs=True, - spark_ui_proxy=pod.metadata.labels.get("spark-ui-proxy", False), - ) - for pod in driver_pods.items - ] + response.headers["X-Continue"] = continue_from + return apps