From 0a2725cabba9369137a0baeaf858f0307f5e48d3 Mon Sep 17 00:00:00 2001 From: Agrim Khanna Date: Thu, 26 Jun 2025 22:53:36 +0530 Subject: [PATCH 1/7] watching md predict/access logs for error --- ads/aqua/modeldeployment/deployment.py | 41 ++++++++--- ads/model/deployment/model_deployment.py | 92 ++++++++++++++---------- 2 files changed, 84 insertions(+), 49 deletions(-) diff --git a/ads/aqua/modeldeployment/deployment.py b/ads/aqua/modeldeployment/deployment.py index d84a5e465..973cc71f1 100644 --- a/ads/aqua/modeldeployment/deployment.py +++ b/ads/aqua/modeldeployment/deployment.py @@ -727,21 +727,33 @@ def _create_deployment( ).deploy(wait_for_completion=False) deployment_id = deployment.id + logger.info( f"Aqua model deployment {deployment_id} created for model {aqua_model_id}. Work request Id is {deployment.dsc_model_deployment.workflow_req_id}" ) + status_list = [] + progress_thread_1 = threading.Thread( + target=deployment.watch, + args=(status_list), + daemon=True, + ) + progress_thread_1.start() - progress_thread = threading.Thread( + progress_thread_2 = threading.Thread( target=self.get_deployment_status, args=( deployment_id, deployment.dsc_model_deployment.workflow_req_id, model_type, model_name, + status_list, ), daemon=True, ) - progress_thread.start() + progress_thread_2.start() + + progress_thread_1.join() + progress_thread_2.join() # we arbitrarily choose last 8 characters of OCID to identify MD in telemetry telemetry_kwargs = {"ocid": get_ocid_substring(deployment_id, key_len=8)} @@ -1237,6 +1249,7 @@ def get_deployment_status( work_request_id: str, model_type: str, model_name: str, + status_list: List[str] = [], ) -> None: """Waits for the data science model deployment to be completed and log its status in telemetry. @@ -1249,14 +1262,17 @@ def get_deployment_status( The work request Id of the model deployment. model_type: str The type of aqua model to be deployed. Allowed values are: `custom`, `service` and `multi_model`. + status_list: List[str] + The list of status frmo streams the access and/or predict logs of model deployment. Returns ------- AquaDeployment An Aqua deployment instance. """ + ocid = get_ocid_substring(model_deployment_id, key_len=8) - telemetry_kwargs = {"ocid": ocid} + telemetry_kwargs = {"ocid": ocid, "model_name": model_name} data_science_work_request: DataScienceWorkRequest = DataScienceWorkRequest( work_request_id @@ -1274,18 +1290,21 @@ def get_deployment_status( for error in data_science_work_request._error_message: error_str = error_str + " " + error.message - self.telemetry.record_event( - category=f"aqua/{model_type}/deployment/status", - action="FAILED", - detail=error_str, - value=model_name, - **telemetry_kwargs, - ) + status = "" + if len(status_list) > 0: + status = status_list[-1] + telemetry_kwargs["status"] = status + + self.telemetry.record_event( + category=f"aqua/{model_type}/deployment/status", + action="FAILED", + detail=error_str, + **telemetry_kwargs, + ) else: self.telemetry.record_event_async( category=f"aqua/{model_type}/deployment/status", action="SUCCEEDED", - value=model_name, **telemetry_kwargs, ) diff --git a/ads/model/deployment/model_deployment.py b/ads/model/deployment/model_deployment.py index 56a70c112..f8db73370 100644 --- a/ads/model/deployment/model_deployment.py +++ b/ads/model/deployment/model_deployment.py @@ -1,22 +1,27 @@ #!/usr/bin/env python -# -*- coding: utf-8; -*- -# Copyright (c) 2021, 2023 Oracle and/or its affiliates. +# Copyright (c) 2021, 2025 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ import collections import copy import datetime -import oci -import warnings import time -from typing import Dict, List, Union, Any +import warnings +from typing import Any, Dict, List, Union +import oci import oci.loggingsearch -from ads.common import auth as authutil import pandas as pd -from ads.model.serde.model_input import JsonModelInputSERDE +from oci.data_science.models import ( + CreateModelDeploymentDetails, + LogDetails, + UpdateModelDeploymentDetails, +) + +from ads.common import auth as authutil +from ads.common import utils as ads_utils from ads.common.oci_logging import ( LOG_INTERVAL, LOG_RECORDS_LIMIT, @@ -30,10 +35,10 @@ from ads.model.deployment.common.utils import send_request from ads.model.deployment.model_deployment_infrastructure import ( DEFAULT_BANDWIDTH_MBPS, + DEFAULT_MEMORY_IN_GBS, + DEFAULT_OCPUS, DEFAULT_REPLICA, DEFAULT_SHAPE_NAME, - DEFAULT_OCPUS, - DEFAULT_MEMORY_IN_GBS, MODEL_DEPLOYMENT_INFRASTRUCTURE_TYPE, ModelDeploymentInfrastructure, ) @@ -45,18 +50,14 @@ ModelDeploymentRuntimeType, OCIModelDeploymentRuntimeType, ) +from ads.model.serde.model_input import JsonModelInputSERDE from ads.model.service.oci_datascience_model_deployment import ( OCIDataScienceModelDeployment, ) -from ads.common import utils as ads_utils + from .common import utils from .common.utils import State from .model_deployment_properties import ModelDeploymentProperties -from oci.data_science.models import ( - LogDetails, - CreateModelDeploymentDetails, - UpdateModelDeploymentDetails, -) DEFAULT_WAIT_TIME = 1200 DEFAULT_POLL_INTERVAL = 10 @@ -734,6 +735,7 @@ def watch( time_start: datetime = None, interval: int = LOG_INTERVAL, log_filter: str = None, + status_list: List[str] = None, ) -> "ModelDeployment": """Streams the access and/or predict logs of model deployment. @@ -751,6 +753,8 @@ def watch( log_filter : str, optional Expression for filtering the logs. This will be the WHERE clause of the query. Defaults to None. + status_list : List[str], optional + List of status of model deployment. This is used to store list of status from logs. Returns ------- @@ -760,6 +764,8 @@ def watch( status = "" while not self._stop_condition(): status = self._check_and_print_status(status) + if status not in status_list: + status_list.append(status) time.sleep(interval) time_start = time_start or self.time_created @@ -964,7 +970,9 @@ def predict( except oci.exceptions.ServiceError as ex: # When bandwidth exceeds the allocated value, TooManyRequests error (429) will be raised by oci backend. if ex.status == 429: - bandwidth_mbps = self.infrastructure.bandwidth_mbps or DEFAULT_BANDWIDTH_MBPS + bandwidth_mbps = ( + self.infrastructure.bandwidth_mbps or DEFAULT_BANDWIDTH_MBPS + ) utils.get_logger().warning( f"Load balancer bandwidth exceeds the allocated {bandwidth_mbps} Mbps." "To estimate the actual bandwidth, use formula: (payload size in KB) * (estimated requests per second) * 8 / 1024." @@ -1644,22 +1652,22 @@ def _build_model_deployment_configuration_details(self) -> Dict: } if infrastructure.subnet_id: - instance_configuration[ - infrastructure.CONST_SUBNET_ID - ] = infrastructure.subnet_id + instance_configuration[infrastructure.CONST_SUBNET_ID] = ( + infrastructure.subnet_id + ) if infrastructure.private_endpoint_id: if not hasattr( oci.data_science.models.InstanceConfiguration, "private_endpoint_id" ): # TODO: add oci version with private endpoint support. - raise EnvironmentError( + raise OSError( "Private endpoint is not supported in the current OCI SDK installed." ) - instance_configuration[ - infrastructure.CONST_PRIVATE_ENDPOINT_ID - ] = infrastructure.private_endpoint_id + instance_configuration[infrastructure.CONST_PRIVATE_ENDPOINT_ID] = ( + infrastructure.private_endpoint_id + ) scaling_policy = { infrastructure.CONST_POLICY_TYPE: "FIXED_SIZE", @@ -1704,7 +1712,7 @@ def _build_model_deployment_configuration_details(self) -> Dict: oci.data_science.models, "ModelDeploymentEnvironmentConfigurationDetails", ): - raise EnvironmentError( + raise OSError( "Environment variable hasn't been supported in the current OCI SDK installed." ) @@ -1720,9 +1728,9 @@ def _build_model_deployment_configuration_details(self) -> Dict: and runtime.inference_server.upper() == MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON ): - environment_variables[ - "CONTAINER_TYPE" - ] = MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON + environment_variables["CONTAINER_TYPE"] = ( + MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON + ) runtime.set_spec(runtime.CONST_ENV, environment_variables) environment_configuration_details = { runtime.CONST_ENVIRONMENT_CONFIG_TYPE: runtime.environment_config_type, @@ -1734,7 +1742,7 @@ def _build_model_deployment_configuration_details(self) -> Dict: oci.data_science.models, "OcirModelDeploymentEnvironmentConfigurationDetails", ): - raise EnvironmentError( + raise OSError( "Container runtime hasn't been supported in the current OCI SDK installed." ) environment_configuration_details["image"] = runtime.image @@ -1742,9 +1750,9 @@ def _build_model_deployment_configuration_details(self) -> Dict: environment_configuration_details["cmd"] = runtime.cmd environment_configuration_details["entrypoint"] = runtime.entrypoint environment_configuration_details["serverPort"] = runtime.server_port - environment_configuration_details[ - "healthCheckPort" - ] = runtime.health_check_port + environment_configuration_details["healthCheckPort"] = ( + runtime.health_check_port + ) model_deployment_configuration_details = { infrastructure.CONST_DEPLOYMENT_TYPE: "SINGLE_MODEL", @@ -1754,7 +1762,7 @@ def _build_model_deployment_configuration_details(self) -> Dict: if runtime.deployment_mode == ModelDeploymentMode.STREAM: if not hasattr(oci.data_science.models, "StreamConfigurationDetails"): - raise EnvironmentError( + raise OSError( "Model deployment mode hasn't been supported in the current OCI SDK installed." ) model_deployment_configuration_details[ @@ -1786,9 +1794,13 @@ def _build_category_log_details(self) -> Dict: logs = {} if ( - self.infrastructure.access_log and - self.infrastructure.access_log.get(self.infrastructure.CONST_LOG_GROUP_ID, None) - and self.infrastructure.access_log.get(self.infrastructure.CONST_LOG_ID, None) + self.infrastructure.access_log + and self.infrastructure.access_log.get( + self.infrastructure.CONST_LOG_GROUP_ID, None + ) + and self.infrastructure.access_log.get( + self.infrastructure.CONST_LOG_ID, None + ) ): logs[self.infrastructure.CONST_ACCESS] = { self.infrastructure.CONST_LOG_GROUP_ID: self.infrastructure.access_log.get( @@ -1799,9 +1811,13 @@ def _build_category_log_details(self) -> Dict: ), } if ( - self.infrastructure.predict_log and - self.infrastructure.predict_log.get(self.infrastructure.CONST_LOG_GROUP_ID, None) - and self.infrastructure.predict_log.get(self.infrastructure.CONST_LOG_ID, None) + self.infrastructure.predict_log + and self.infrastructure.predict_log.get( + self.infrastructure.CONST_LOG_GROUP_ID, None + ) + and self.infrastructure.predict_log.get( + self.infrastructure.CONST_LOG_ID, None + ) ): logs[self.infrastructure.CONST_PREDICT] = { self.infrastructure.CONST_LOG_GROUP_ID: self.infrastructure.predict_log.get( From c9e40b9350c8826e060cbc408cb8993b34a2fe81 Mon Sep 17 00:00:00 2001 From: Agrim Khanna Date: Fri, 27 Jun 2025 18:40:22 +0530 Subject: [PATCH 2/7] watching logs and pushing them to telemetry --- ads/aqua/modeldeployment/deployment.py | 111 ++++++++++++++++++----- ads/common/oci_logging.py | 50 ++++++++-- ads/model/deployment/model_deployment.py | 37 +++++++- 3 files changed, 167 insertions(+), 31 deletions(-) diff --git a/ads/aqua/modeldeployment/deployment.py b/ads/aqua/modeldeployment/deployment.py index 973cc71f1..9bc3303ec 100644 --- a/ads/aqua/modeldeployment/deployment.py +++ b/ads/aqua/modeldeployment/deployment.py @@ -4,6 +4,7 @@ import json +import re import shlex import threading from datetime import datetime, timedelta @@ -732,28 +733,18 @@ def _create_deployment( f"Aqua model deployment {deployment_id} created for model {aqua_model_id}. Work request Id is {deployment.dsc_model_deployment.workflow_req_id}" ) status_list = [] - progress_thread_1 = threading.Thread( - target=deployment.watch, - args=(status_list), - daemon=True, - ) - progress_thread_1.start() - progress_thread_2 = threading.Thread( + progress_thread = threading.Thread( target=self.get_deployment_status, args=( deployment_id, deployment.dsc_model_deployment.workflow_req_id, model_type, model_name, - status_list, ), daemon=True, ) - progress_thread_2.start() - - progress_thread_1.join() - progress_thread_2.join() + progress_thread.start() # we arbitrarily choose last 8 characters of OCID to identify MD in telemetry telemetry_kwargs = {"ocid": get_ocid_substring(deployment_id, key_len=8)} @@ -1245,11 +1236,11 @@ def list_shapes(self, **kwargs) -> List[ComputeShapeSummary]: def get_deployment_status( self, + deployment, model_deployment_id: str, work_request_id: str, model_type: str, model_name: str, - status_list: List[str] = [], ) -> None: """Waits for the data science model deployment to be completed and log its status in telemetry. @@ -1262,17 +1253,14 @@ def get_deployment_status( The work request Id of the model deployment. model_type: str The type of aqua model to be deployed. Allowed values are: `custom`, `service` and `multi_model`. - status_list: List[str] - The list of status frmo streams the access and/or predict logs of model deployment. Returns ------- AquaDeployment An Aqua deployment instance. """ - ocid = get_ocid_substring(model_deployment_id, key_len=8) - telemetry_kwargs = {"ocid": ocid, "model_name": model_name} + status_list: List[str] = [] data_science_work_request: DataScienceWorkRequest = DataScienceWorkRequest( work_request_id @@ -1284,17 +1272,62 @@ def get_deployment_status( max_wait_time=DEFAULT_WAIT_TIME, poll_interval=DEFAULT_POLL_INTERVAL, ) - except Exception: + predict_logs = deployment.tail_logs("predict") + access_logs = deployment.tail_logs("access") + + status = "" + if access_logs and len(access_logs) > 0: + print("access log list ############################") + print(access_logs) + status = access_logs[0]["message"] + + if predict_logs and len(predict_logs) > 0: + print("predict_logs ############################") + print(predict_logs) + status += predict_logs[0]["message"] + + status = re.sub(r"[^a-zA-Z0-9]", "", status) + telemetry_kwargs = { + "ocid": ocid, + "model_name": model_name, + "status": status, + } + print(telemetry_kwargs) + print("############################") + + self.telemetry.record_event( + category=f"aqua/{model_type}/deployment/status", + action="LAST_LOG", + # detail=error_str, + **telemetry_kwargs, + ) + + except Exception as e: if data_science_work_request._error_message: error_str = "" for error in data_science_work_request._error_message: error_str = error_str + " " + error.message status = "" - if len(status_list) > 0: - status = status_list[-1] - - telemetry_kwargs["status"] = status + predict_logs = deployment.tail_logs("predict") + access_logs = deployment.tail_logs("access") + if access_logs and len(access_logs) > 0: + print(access_logs) + status = access_logs[0]["message"] + + if predict_logs and len(predict_logs) > 0: + print("predict_logs ############################") + print(predict_logs) + status += predict_logs[0]["message"] + status = re.sub(r"[^a-zA-Z0-9]", "", status) + error_str = re.sub(r"[^a-zA-Z0-9]", "", error_str) + telemetry_kwargs = { + "ocid": ocid, + "model_name": model_name, + "status": error_str + " " + status, + } + print(telemetry_kwargs) + print("############################") self.telemetry.record_event( category=f"aqua/{model_type}/deployment/status", @@ -1302,7 +1335,41 @@ def get_deployment_status( detail=error_str, **telemetry_kwargs, ) + else: + print(str(e)) + status = str(e) + predict_logs = deployment.tail_logs("predict") + access_logs = deployment.tail_logs("access") + if access_logs and len(access_logs) > 0: + print("access log list ############################") + print(access_logs) + status = access_logs[0]["message"] + + if predict_logs and len(predict_logs) > 0: + print("predict_logs ############################") + print(predict_logs) + status += predict_logs[0]["message"] + + status = re.sub(r"[^a-zA-Z0-9]", "", status) + error_str = re.sub(r"[^a-zA-Z0-9]", "", error_str) + + telemetry_kwargs = { + "ocid": ocid, + "model_name": model_name, + "status": error_str + " " + status, + } + print(telemetry_kwargs) + print("############################") + + self.telemetry.record_event( + category=f"aqua/{model_type}/deployment/status", + action="FAILED", + # detail=error_str, + **telemetry_kwargs, + ) + else: + telemetry_kwargs = {"ocid": ocid, "model_name": model_name} self.telemetry.record_event_async( category=f"aqua/{model_type}/deployment/status", action="SUCCEEDED", diff --git a/ads/common/oci_logging.py b/ads/common/oci_logging.py index 502e4f813..dee80327c 100644 --- a/ads/common/oci_logging.py +++ b/ads/common/oci_logging.py @@ -1,5 +1,4 @@ #!/usr/bin/env python -# -*- coding: utf-8; -*- # Copyright (c) 2021, 2024 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ @@ -7,16 +6,16 @@ import datetime import logging import time -from typing import Dict, Union, List +from typing import Dict, List, Union +import oci.exceptions import oci.logging import oci.loggingsearch -import oci.exceptions + from ads.common.decorator.utils import class_or_instance_method from ads.common.oci_mixin import OCIModelMixin, OCIWorkRequestMixin from ads.common.oci_resource import OCIResource, ResourceNotFoundError - logger = logging.getLogger(__name__) # Maximum number of log records to be returned by default. @@ -862,9 +861,48 @@ def tail( time_start=time_start, log_filter=log_filter, ) - self._print( - sorted(tail_logs, key=lambda log: log["time"]) + self._print(sorted(tail_logs, key=lambda log: log["time"])) + + def get_tail_logs( + self, + source: str = None, + limit: int = LOG_RECORDS_LIMIT, + time_start: datetime.datetime = None, + log_filter: str = None, + ) -> List[Union[oci.loggingsearch.models.SearchResult, dict]]: + """Returns the most recent consolidated log records. + + Parameters + ---------- + source : str, optional + Expression or OCID to filter the "source" field of the OCI log record. + Defaults to None. + limit : int, optional. + Maximum number of records to be returned. + If limit is set to None, all logs from time_start to now will be returned. + Defaults to 100. + time_start : datetime.datetime, optional + Starting time for the log query. + Defaults to None. + log_filter : str, optional + Expression for filtering the logs. This will be the WHERE clause of the query. + Defaults to None. + + Returns + ------- + list + A list of oci.loggingsearch.models.SearchResult objects or log records sorted in descending order by time + Each log record is a dictionary with the following keys: `annotation`, `id`, `time`, + `message` and `datetime`. + """ + tail_logs = self._search_and_format( + source=source, + limit=limit, + sort_order=SortOrder.DESC, + time_start=time_start, + log_filter=log_filter, ) + return sorted(tail_logs, key=lambda log: log["time"]) def head( self, diff --git a/ads/model/deployment/model_deployment.py b/ads/model/deployment/model_deployment.py index f8db73370..c9beb84dd 100644 --- a/ads/model/deployment/model_deployment.py +++ b/ads/model/deployment/model_deployment.py @@ -729,13 +729,46 @@ def update( return self._update_from_oci_model(response) + def tail_logs( + self, log_type: str = None, time_start: datetime = None, log_filter: str = None + ) -> List[Union[oci.loggingsearch.models.SearchResult, dict]]: + """Returns the most recent consolidated log records for the model deployment + + Parameters + ---------- + source : str, optional + Expression or OCID to filter the "source" field of the OCI log record. + Defaults to None. + limit : int, optional. + Maximum number of records to be returned. + If limit is set to None, all logs from time_start to now will be returned. + Defaults to 100. + time_start : datetime.datetime, optional + Starting time for the log query. + Defaults to None. + log_filter : str, optional + Expression for filtering the logs. This will be the WHERE clause of the query. + Defaults to None. + + Returns + ------- + list + A list of oci.loggingsearch.models.SearchResult objects or log records sorted in descending order by time + Each log record is a dictionary with the following keys: `annotation`, `id`, `time`, + `message` and `datetime`. + """ + return self.logs(log_type).get_tail_logs( + source=self.model_deployment_id, + time_start=time_start, + log_filter=log_filter, + ) + def watch( self, log_type: str = None, time_start: datetime = None, interval: int = LOG_INTERVAL, log_filter: str = None, - status_list: List[str] = None, ) -> "ModelDeployment": """Streams the access and/or predict logs of model deployment. @@ -764,8 +797,6 @@ def watch( status = "" while not self._stop_condition(): status = self._check_and_print_status(status) - if status not in status_list: - status_list.append(status) time.sleep(interval) time_start = time_start or self.time_created From 4e974174d355caf94ab8d9e1d7149da4565d7cc0 Mon Sep 17 00:00:00 2001 From: Agrim Khanna Date: Mon, 30 Jun 2025 21:57:25 +0530 Subject: [PATCH 3/7] watching logs and pushing them to telemetry --- ads/aqua/modeldeployment/deployment.py | 67 +++--------------------- ads/model/deployment/model_deployment.py | 14 +++-- 2 files changed, 15 insertions(+), 66 deletions(-) diff --git a/ads/aqua/modeldeployment/deployment.py b/ads/aqua/modeldeployment/deployment.py index 9bc3303ec..93060806f 100644 --- a/ads/aqua/modeldeployment/deployment.py +++ b/ads/aqua/modeldeployment/deployment.py @@ -1260,7 +1260,6 @@ def get_deployment_status( An Aqua deployment instance. """ ocid = get_ocid_substring(model_deployment_id, key_len=8) - status_list: List[str] = [] data_science_work_request: DataScienceWorkRequest = DataScienceWorkRequest( work_request_id @@ -1272,99 +1271,45 @@ def get_deployment_status( max_wait_time=DEFAULT_WAIT_TIME, poll_interval=DEFAULT_POLL_INTERVAL, ) + except Exception as e: + status = "" predict_logs = deployment.tail_logs("predict") access_logs = deployment.tail_logs("access") - - status = "" if access_logs and len(access_logs) > 0: - print("access log list ############################") - print(access_logs) status = access_logs[0]["message"] if predict_logs and len(predict_logs) > 0: - print("predict_logs ############################") - print(predict_logs) status += predict_logs[0]["message"] + status = re.sub(r"[^a-zA-Z0-9]", " ", status) - status = re.sub(r"[^a-zA-Z0-9]", "", status) - telemetry_kwargs = { - "ocid": ocid, - "model_name": model_name, - "status": status, - } - print(telemetry_kwargs) - print("############################") - - self.telemetry.record_event( - category=f"aqua/{model_type}/deployment/status", - action="LAST_LOG", - # detail=error_str, - **telemetry_kwargs, - ) - - except Exception as e: if data_science_work_request._error_message: error_str = "" for error in data_science_work_request._error_message: error_str = error_str + " " + error.message - status = "" - predict_logs = deployment.tail_logs("predict") - access_logs = deployment.tail_logs("access") - if access_logs and len(access_logs) > 0: - print(access_logs) - status = access_logs[0]["message"] - - if predict_logs and len(predict_logs) > 0: - print("predict_logs ############################") - print(predict_logs) - status += predict_logs[0]["message"] - status = re.sub(r"[^a-zA-Z0-9]", "", status) - error_str = re.sub(r"[^a-zA-Z0-9]", "", error_str) + error_str = re.sub(r"[^a-zA-Z0-9]", " ", error_str) telemetry_kwargs = { "ocid": ocid, "model_name": model_name, "status": error_str + " " + status, } - print(telemetry_kwargs) - print("############################") self.telemetry.record_event( category=f"aqua/{model_type}/deployment/status", action="FAILED", - detail=error_str, **telemetry_kwargs, ) else: - print(str(e)) - status = str(e) - predict_logs = deployment.tail_logs("predict") - access_logs = deployment.tail_logs("access") - if access_logs and len(access_logs) > 0: - print("access log list ############################") - print(access_logs) - status = access_logs[0]["message"] - - if predict_logs and len(predict_logs) > 0: - print("predict_logs ############################") - print(predict_logs) - status += predict_logs[0]["message"] - - status = re.sub(r"[^a-zA-Z0-9]", "", status) - error_str = re.sub(r"[^a-zA-Z0-9]", "", error_str) - + status = +" " + str(e) telemetry_kwargs = { "ocid": ocid, "model_name": model_name, - "status": error_str + " " + status, + "status": status, } - print(telemetry_kwargs) - print("############################") self.telemetry.record_event( category=f"aqua/{model_type}/deployment/status", action="FAILED", - # detail=error_str, **telemetry_kwargs, ) diff --git a/ads/model/deployment/model_deployment.py b/ads/model/deployment/model_deployment.py index c9beb84dd..53f599b64 100644 --- a/ads/model/deployment/model_deployment.py +++ b/ads/model/deployment/model_deployment.py @@ -757,11 +757,15 @@ def tail_logs( Each log record is a dictionary with the following keys: `annotation`, `id`, `time`, `message` and `datetime`. """ - return self.logs(log_type).get_tail_logs( - source=self.model_deployment_id, - time_start=time_start, - log_filter=log_filter, - ) + try: + logs = self.logs(log_type).get_tail_logs( + source=self.model_deployment_id, + time_start=time_start, + log_filter=log_filter, + ) + return logs + except LogNotConfiguredError: + return [] def watch( self, From 06ccb9f9b0efe0d4b24ae238553933da0f985482 Mon Sep 17 00:00:00 2001 From: Agrim Khanna Date: Mon, 30 Jun 2025 22:40:48 +0530 Subject: [PATCH 4/7] replaced async call with sync call --- ads/aqua/modeldeployment/deployment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ads/aqua/modeldeployment/deployment.py b/ads/aqua/modeldeployment/deployment.py index 93060806f..4ddc7b7a0 100644 --- a/ads/aqua/modeldeployment/deployment.py +++ b/ads/aqua/modeldeployment/deployment.py @@ -1315,7 +1315,7 @@ def get_deployment_status( else: telemetry_kwargs = {"ocid": ocid, "model_name": model_name} - self.telemetry.record_event_async( + self.telemetry.record_event( category=f"aqua/{model_type}/deployment/status", action="SUCCEEDED", **telemetry_kwargs, From b8028941ab9dab95bed15c38815c9b8499d927db Mon Sep 17 00:00:00 2001 From: Agrim Khanna Date: Mon, 30 Jun 2025 23:04:10 +0530 Subject: [PATCH 5/7] added deployment object in get_deployment_status method --- ads/aqua/modeldeployment/deployment.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ads/aqua/modeldeployment/deployment.py b/ads/aqua/modeldeployment/deployment.py index 4ddc7b7a0..5fa99526c 100644 --- a/ads/aqua/modeldeployment/deployment.py +++ b/ads/aqua/modeldeployment/deployment.py @@ -737,6 +737,7 @@ def _create_deployment( progress_thread = threading.Thread( target=self.get_deployment_status, args=( + deployment, deployment_id, deployment.dsc_model_deployment.workflow_req_id, model_type, From b60cab78fe70682d96207d8d8edcfadd063d5e48 Mon Sep 17 00:00:00 2001 From: Agrim Khanna Date: Tue, 1 Jul 2025 12:14:00 +0530 Subject: [PATCH 6/7] fixed unit test of get_deployment_status --- tests/unitary/with_extras/aqua/test_deployment.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/unitary/with_extras/aqua/test_deployment.py b/tests/unitary/with_extras/aqua/test_deployment.py index 2e6329fa8..16768ae07 100644 --- a/tests/unitary/with_extras/aqua/test_deployment.py +++ b/tests/unitary/with_extras/aqua/test_deployment.py @@ -2369,6 +2369,7 @@ def test_validate_multimodel_deployment_feasibility_positive_single( ) def test_get_deployment_status(self): + model_deployment = copy.deepcopy(TestDataset.model_deployment_object[0]) deployment_id = "fakeid.datasciencemodeldeployment.oc1.iad.xxx" work_request_id = "fakeid.workrequest.oc1.iad.xxx" model_type = "custom" @@ -2382,7 +2383,11 @@ def test_get_deployment_status(self): "ads.model.service.oci_datascience_model_deployment.DataScienceWorkRequest.wait_work_request" ) as mock_wait: self.app.get_deployment_status( - deployment_id, work_request_id, model_type, model_name + model_deployment, + deployment_id, + work_request_id, + model_type, + model_name, ) mock_ds_work_request.assert_called_with(work_request_id) From 83dc9fc9852f7c9fd549578f8c4318eea0868013 Mon Sep 17 00:00:00 2001 From: Agrim Khanna Date: Sat, 5 Jul 2025 21:51:12 +0530 Subject: [PATCH 7/7] added test cases and PR review comments --- ads/aqua/modeldeployment/deployment.py | 24 ++--- ads/common/oci_logging.py | 41 --------- ads/model/deployment/model_deployment.py | 38 -------- .../with_extras/aqua/test_deployment.py | 90 ++++++++++++++----- 4 files changed, 79 insertions(+), 114 deletions(-) diff --git a/ads/aqua/modeldeployment/deployment.py b/ads/aqua/modeldeployment/deployment.py index d81f905a7..9834eddd1 100644 --- a/ads/aqua/modeldeployment/deployment.py +++ b/ads/aqua/modeldeployment/deployment.py @@ -763,7 +763,6 @@ def _create_deployment( target=self.get_deployment_status, args=( deployment, - deployment_id, deployment.dsc_model_deployment.workflow_req_id, model_type, model_name, @@ -1269,8 +1268,7 @@ def list_shapes(self, **kwargs) -> List[ComputeShapeSummary]: def get_deployment_status( self, - deployment, - model_deployment_id: str, + deployment: ModelDeployment, work_request_id: str, model_type: str, model_name: str, @@ -1292,27 +1290,23 @@ def get_deployment_status( AquaDeployment An Aqua deployment instance. """ - ocid = get_ocid_substring(model_deployment_id, key_len=8) - + ocid = get_ocid_substring(deployment.id, key_len=8) data_science_work_request: DataScienceWorkRequest = DataScienceWorkRequest( work_request_id ) - try: data_science_work_request.wait_work_request( progress_bar_description="Creating model deployment", max_wait_time=DEFAULT_WAIT_TIME, poll_interval=DEFAULT_POLL_INTERVAL, ) - except Exception as e: + except Exception: status = "" - predict_logs = deployment.tail_logs("predict") - access_logs = deployment.tail_logs("access") - if access_logs and len(access_logs) > 0: - status = access_logs[0]["message"] + logs = deployment.show_logs().sort_values(by="time", ascending=False) + + if logs and len(logs) > 0: + status = logs.iloc[0]["message"] - if predict_logs and len(predict_logs) > 0: - status += predict_logs[0]["message"] status = re.sub(r"[^a-zA-Z0-9]", " ", status) if data_science_work_request._error_message: @@ -1324,7 +1318,8 @@ def get_deployment_status( telemetry_kwargs = { "ocid": ocid, "model_name": model_name, - "status": error_str + " " + status, + "work_request_error": error_str, + "status": status, } self.telemetry.record_event( @@ -1333,7 +1328,6 @@ def get_deployment_status( **telemetry_kwargs, ) else: - status = +" " + str(e) telemetry_kwargs = { "ocid": ocid, "model_name": model_name, diff --git a/ads/common/oci_logging.py b/ads/common/oci_logging.py index dee80327c..7875f11c4 100644 --- a/ads/common/oci_logging.py +++ b/ads/common/oci_logging.py @@ -863,47 +863,6 @@ def tail( ) self._print(sorted(tail_logs, key=lambda log: log["time"])) - def get_tail_logs( - self, - source: str = None, - limit: int = LOG_RECORDS_LIMIT, - time_start: datetime.datetime = None, - log_filter: str = None, - ) -> List[Union[oci.loggingsearch.models.SearchResult, dict]]: - """Returns the most recent consolidated log records. - - Parameters - ---------- - source : str, optional - Expression or OCID to filter the "source" field of the OCI log record. - Defaults to None. - limit : int, optional. - Maximum number of records to be returned. - If limit is set to None, all logs from time_start to now will be returned. - Defaults to 100. - time_start : datetime.datetime, optional - Starting time for the log query. - Defaults to None. - log_filter : str, optional - Expression for filtering the logs. This will be the WHERE clause of the query. - Defaults to None. - - Returns - ------- - list - A list of oci.loggingsearch.models.SearchResult objects or log records sorted in descending order by time - Each log record is a dictionary with the following keys: `annotation`, `id`, `time`, - `message` and `datetime`. - """ - tail_logs = self._search_and_format( - source=source, - limit=limit, - sort_order=SortOrder.DESC, - time_start=time_start, - log_filter=log_filter, - ) - return sorted(tail_logs, key=lambda log: log["time"]) - def head( self, source: str = None, diff --git a/ads/model/deployment/model_deployment.py b/ads/model/deployment/model_deployment.py index 53f599b64..a66c8ac6b 100644 --- a/ads/model/deployment/model_deployment.py +++ b/ads/model/deployment/model_deployment.py @@ -729,44 +729,6 @@ def update( return self._update_from_oci_model(response) - def tail_logs( - self, log_type: str = None, time_start: datetime = None, log_filter: str = None - ) -> List[Union[oci.loggingsearch.models.SearchResult, dict]]: - """Returns the most recent consolidated log records for the model deployment - - Parameters - ---------- - source : str, optional - Expression or OCID to filter the "source" field of the OCI log record. - Defaults to None. - limit : int, optional. - Maximum number of records to be returned. - If limit is set to None, all logs from time_start to now will be returned. - Defaults to 100. - time_start : datetime.datetime, optional - Starting time for the log query. - Defaults to None. - log_filter : str, optional - Expression for filtering the logs. This will be the WHERE clause of the query. - Defaults to None. - - Returns - ------- - list - A list of oci.loggingsearch.models.SearchResult objects or log records sorted in descending order by time - Each log record is a dictionary with the following keys: `annotation`, `id`, `time`, - `message` and `datetime`. - """ - try: - logs = self.logs(log_type).get_tail_logs( - source=self.model_deployment_id, - time_start=time_start, - log_filter=log_filter, - ) - return logs - except LogNotConfiguredError: - return [] - def watch( self, log_type: str = None, diff --git a/tests/unitary/with_extras/aqua/test_deployment.py b/tests/unitary/with_extras/aqua/test_deployment.py index 16768ae07..da8830a77 100644 --- a/tests/unitary/with_extras/aqua/test_deployment.py +++ b/tests/unitary/with_extras/aqua/test_deployment.py @@ -2368,7 +2368,7 @@ def test_validate_multimodel_deployment_feasibility_positive_single( "test_data/deployment/aqua_summary_multi_model_single.json", ) - def test_get_deployment_status(self): + def test_get_deployment_status_success(self): model_deployment = copy.deepcopy(TestDataset.model_deployment_object[0]) deployment_id = "fakeid.datasciencemodeldeployment.oc1.iad.xxx" work_request_id = "fakeid.workrequest.oc1.iad.xxx" @@ -2376,26 +2376,76 @@ def test_get_deployment_status(self): model_name = "model_name" with patch( - "ads.model.service.oci_datascience_model_deployment.DataScienceWorkRequest.__init__" - ) as mock_ds_work_request: - mock_ds_work_request.return_value = None - with patch( - "ads.model.service.oci_datascience_model_deployment.DataScienceWorkRequest.wait_work_request" - ) as mock_wait: - self.app.get_deployment_status( - model_deployment, - deployment_id, - work_request_id, - model_type, - model_name, - ) + "ads.model.service.oci_datascience_model_deployment.DataScienceWorkRequest.__init__", + return_value=None, + ) as mock_ds_work_request, patch( + "ads.model.service.oci_datascience_model_deployment.DataScienceWorkRequest.wait_work_request" + ) as mock_wait: + self.app.get_deployment_status( + oci.data_science.models.ModelDeploymentSummary(**model_deployment), + work_request_id, + model_type, + model_name, + ) - mock_ds_work_request.assert_called_with(work_request_id) - mock_wait.assert_called_with( - progress_bar_description="Creating model deployment", - max_wait_time=DEFAULT_WAIT_TIME, - poll_interval=DEFAULT_POLL_INTERVAL, - ) + mock_ds_work_request.assert_called_once_with(work_request_id) + mock_wait.assert_called_once_with( + progress_bar_description="Creating model deployment", + max_wait_time=DEFAULT_WAIT_TIME, + poll_interval=DEFAULT_POLL_INTERVAL, + ) + + def raise_exception(*args, **kwargs): + raise Exception("Work request failed") + + def test_get_deployment_status_failed(self): + model_deployment = copy.deepcopy(TestDataset.model_deployment_object[0]) + deployment_id = "fakeid.datasciencemodeldeployment.oc1.iad.xxx" + work_request_id = "fakeid.workrequest.oc1.iad.xxx" + model_type = "custom" + model_name = "model_name" + with patch( + "ads.telemetry.client.TelemetryClient.record_event" + ) as mock_record_event, patch( + "ads.aqua.modeldeployment.deployment.DataScienceWorkRequest" + ) as mock_ds_work_request_class, patch( + "ads.model.deployment.model_deployment.ModelDeployment.show_logs" + ) as mock_show_log: + mock_ds_work_request_instance = MagicMock() + mock_ds_work_request_class.return_value = mock_ds_work_request_instance + + mock_ds_work_request_instance._error_message = [ + MagicMock(message="Some error occurred") + ] + + mock_ds_work_request_instance.wait_work_request.side_effect = ( + self.raise_exception + ) + + logs_df = MagicMock() + logs_df.sort_values.return_value = logs_df + logs_df.empty = False + logs_df.iloc.__getitem__.return_value = { + "message": "Error: deployment failed!" + } + mock_show_log.return_value = logs_df + + self.app.get_deployment_status( + ModelDeployment(), + work_request_id, + model_type, + model_name, + ) + mock_record_event.assert_called_once() + args, kwargs = mock_record_event.call_args + self.assertEqual(kwargs["category"], f"aqua/{model_type}/deployment/status") + self.assertEqual(kwargs["action"], "FAILED") + self.assertIn("work_request_error", kwargs) + self.assertIn("status", kwargs) + self.assertIn("ocid", kwargs) + self.assertIn("model_name", kwargs) + + mock_ds_work_request_class.assert_called_once_with(work_request_id) class TestBaseModelSpec: