diff --git a/ads/aqua/modeldeployment/deployment.py b/ads/aqua/modeldeployment/deployment.py index f86881b75..9834eddd1 100644 --- a/ads/aqua/modeldeployment/deployment.py +++ b/ads/aqua/modeldeployment/deployment.py @@ -4,6 +4,7 @@ import json +import re import shlex import threading from datetime import datetime, timedelta @@ -752,14 +753,16 @@ def _create_deployment( ).deploy(wait_for_completion=False) deployment_id = deployment.id + logger.info( f"Aqua model deployment {deployment_id} created for model {aqua_model_id}. Work request Id is {deployment.dsc_model_deployment.workflow_req_id}" ) + status_list = [] progress_thread = threading.Thread( target=self.get_deployment_status, args=( - deployment_id, + deployment, deployment.dsc_model_deployment.workflow_req_id, model_type, model_name, @@ -1265,7 +1268,7 @@ def list_shapes(self, **kwargs) -> List[ComputeShapeSummary]: def get_deployment_status( self, - model_deployment_id: str, + deployment: ModelDeployment, work_request_id: str, model_type: str, model_name: str, @@ -1287,13 +1290,10 @@ def get_deployment_status( AquaDeployment An Aqua deployment instance. """ - ocid = get_ocid_substring(model_deployment_id, key_len=8) - telemetry_kwargs = {"ocid": ocid} - + ocid = get_ocid_substring(deployment.id, key_len=8) data_science_work_request: DataScienceWorkRequest = DataScienceWorkRequest( work_request_id ) - try: data_science_work_request.wait_work_request( progress_bar_description="Creating model deployment", @@ -1301,23 +1301,49 @@ def get_deployment_status( poll_interval=DEFAULT_POLL_INTERVAL, ) except Exception: + status = "" + logs = deployment.show_logs().sort_values(by="time", ascending=False) + + if logs and len(logs) > 0: + status = logs.iloc[0]["message"] + + status = re.sub(r"[^a-zA-Z0-9]", " ", status) + if data_science_work_request._error_message: error_str = "" for error in data_science_work_request._error_message: error_str = error_str + " " + error.message - self.telemetry.record_event( - category=f"aqua/{model_type}/deployment/status", - action="FAILED", - detail=error_str, - value=model_name, - **telemetry_kwargs, - ) + error_str = re.sub(r"[^a-zA-Z0-9]", " ", error_str) + telemetry_kwargs = { + "ocid": ocid, + "model_name": model_name, + "work_request_error": error_str, + "status": status, + } + + self.telemetry.record_event( + category=f"aqua/{model_type}/deployment/status", + action="FAILED", + **telemetry_kwargs, + ) + else: + telemetry_kwargs = { + "ocid": ocid, + "model_name": model_name, + "status": status, + } + + self.telemetry.record_event( + category=f"aqua/{model_type}/deployment/status", + action="FAILED", + **telemetry_kwargs, + ) else: - self.telemetry.record_event_async( + telemetry_kwargs = {"ocid": ocid, "model_name": model_name} + self.telemetry.record_event( category=f"aqua/{model_type}/deployment/status", action="SUCCEEDED", - value=model_name, **telemetry_kwargs, ) diff --git a/ads/common/oci_logging.py b/ads/common/oci_logging.py index 502e4f813..7875f11c4 100644 --- a/ads/common/oci_logging.py +++ b/ads/common/oci_logging.py @@ -1,5 +1,4 @@ #!/usr/bin/env python -# -*- coding: utf-8; -*- # Copyright (c) 2021, 2024 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ @@ -7,16 +6,16 @@ import datetime import logging import time -from typing import Dict, Union, List +from typing import Dict, List, Union +import oci.exceptions import oci.logging import oci.loggingsearch -import oci.exceptions + from ads.common.decorator.utils import class_or_instance_method from ads.common.oci_mixin import OCIModelMixin, OCIWorkRequestMixin from ads.common.oci_resource import OCIResource, ResourceNotFoundError - logger = logging.getLogger(__name__) # Maximum number of log records to be returned by default. @@ -862,9 +861,7 @@ def tail( time_start=time_start, log_filter=log_filter, ) - self._print( - sorted(tail_logs, key=lambda log: log["time"]) - ) + self._print(sorted(tail_logs, key=lambda log: log["time"])) def head( self, diff --git a/ads/model/deployment/model_deployment.py b/ads/model/deployment/model_deployment.py index 56a70c112..a66c8ac6b 100644 --- a/ads/model/deployment/model_deployment.py +++ b/ads/model/deployment/model_deployment.py @@ -1,22 +1,27 @@ #!/usr/bin/env python -# -*- coding: utf-8; -*- -# Copyright (c) 2021, 2023 Oracle and/or its affiliates. +# Copyright (c) 2021, 2025 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ import collections import copy import datetime -import oci -import warnings import time -from typing import Dict, List, Union, Any +import warnings +from typing import Any, Dict, List, Union +import oci import oci.loggingsearch -from ads.common import auth as authutil import pandas as pd -from ads.model.serde.model_input import JsonModelInputSERDE +from oci.data_science.models import ( + CreateModelDeploymentDetails, + LogDetails, + UpdateModelDeploymentDetails, +) + +from ads.common import auth as authutil +from ads.common import utils as ads_utils from ads.common.oci_logging import ( LOG_INTERVAL, LOG_RECORDS_LIMIT, @@ -30,10 +35,10 @@ from ads.model.deployment.common.utils import send_request from ads.model.deployment.model_deployment_infrastructure import ( DEFAULT_BANDWIDTH_MBPS, + DEFAULT_MEMORY_IN_GBS, + DEFAULT_OCPUS, DEFAULT_REPLICA, DEFAULT_SHAPE_NAME, - DEFAULT_OCPUS, - DEFAULT_MEMORY_IN_GBS, MODEL_DEPLOYMENT_INFRASTRUCTURE_TYPE, ModelDeploymentInfrastructure, ) @@ -45,18 +50,14 @@ ModelDeploymentRuntimeType, OCIModelDeploymentRuntimeType, ) +from ads.model.serde.model_input import JsonModelInputSERDE from ads.model.service.oci_datascience_model_deployment import ( OCIDataScienceModelDeployment, ) -from ads.common import utils as ads_utils + from .common import utils from .common.utils import State from .model_deployment_properties import ModelDeploymentProperties -from oci.data_science.models import ( - LogDetails, - CreateModelDeploymentDetails, - UpdateModelDeploymentDetails, -) DEFAULT_WAIT_TIME = 1200 DEFAULT_POLL_INTERVAL = 10 @@ -751,6 +752,8 @@ def watch( log_filter : str, optional Expression for filtering the logs. This will be the WHERE clause of the query. Defaults to None. + status_list : List[str], optional + List of status of model deployment. This is used to store list of status from logs. Returns ------- @@ -964,7 +967,9 @@ def predict( except oci.exceptions.ServiceError as ex: # When bandwidth exceeds the allocated value, TooManyRequests error (429) will be raised by oci backend. if ex.status == 429: - bandwidth_mbps = self.infrastructure.bandwidth_mbps or DEFAULT_BANDWIDTH_MBPS + bandwidth_mbps = ( + self.infrastructure.bandwidth_mbps or DEFAULT_BANDWIDTH_MBPS + ) utils.get_logger().warning( f"Load balancer bandwidth exceeds the allocated {bandwidth_mbps} Mbps." "To estimate the actual bandwidth, use formula: (payload size in KB) * (estimated requests per second) * 8 / 1024." @@ -1644,22 +1649,22 @@ def _build_model_deployment_configuration_details(self) -> Dict: } if infrastructure.subnet_id: - instance_configuration[ - infrastructure.CONST_SUBNET_ID - ] = infrastructure.subnet_id + instance_configuration[infrastructure.CONST_SUBNET_ID] = ( + infrastructure.subnet_id + ) if infrastructure.private_endpoint_id: if not hasattr( oci.data_science.models.InstanceConfiguration, "private_endpoint_id" ): # TODO: add oci version with private endpoint support. - raise EnvironmentError( + raise OSError( "Private endpoint is not supported in the current OCI SDK installed." ) - instance_configuration[ - infrastructure.CONST_PRIVATE_ENDPOINT_ID - ] = infrastructure.private_endpoint_id + instance_configuration[infrastructure.CONST_PRIVATE_ENDPOINT_ID] = ( + infrastructure.private_endpoint_id + ) scaling_policy = { infrastructure.CONST_POLICY_TYPE: "FIXED_SIZE", @@ -1704,7 +1709,7 @@ def _build_model_deployment_configuration_details(self) -> Dict: oci.data_science.models, "ModelDeploymentEnvironmentConfigurationDetails", ): - raise EnvironmentError( + raise OSError( "Environment variable hasn't been supported in the current OCI SDK installed." ) @@ -1720,9 +1725,9 @@ def _build_model_deployment_configuration_details(self) -> Dict: and runtime.inference_server.upper() == MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON ): - environment_variables[ - "CONTAINER_TYPE" - ] = MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON + environment_variables["CONTAINER_TYPE"] = ( + MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON + ) runtime.set_spec(runtime.CONST_ENV, environment_variables) environment_configuration_details = { runtime.CONST_ENVIRONMENT_CONFIG_TYPE: runtime.environment_config_type, @@ -1734,7 +1739,7 @@ def _build_model_deployment_configuration_details(self) -> Dict: oci.data_science.models, "OcirModelDeploymentEnvironmentConfigurationDetails", ): - raise EnvironmentError( + raise OSError( "Container runtime hasn't been supported in the current OCI SDK installed." ) environment_configuration_details["image"] = runtime.image @@ -1742,9 +1747,9 @@ def _build_model_deployment_configuration_details(self) -> Dict: environment_configuration_details["cmd"] = runtime.cmd environment_configuration_details["entrypoint"] = runtime.entrypoint environment_configuration_details["serverPort"] = runtime.server_port - environment_configuration_details[ - "healthCheckPort" - ] = runtime.health_check_port + environment_configuration_details["healthCheckPort"] = ( + runtime.health_check_port + ) model_deployment_configuration_details = { infrastructure.CONST_DEPLOYMENT_TYPE: "SINGLE_MODEL", @@ -1754,7 +1759,7 @@ def _build_model_deployment_configuration_details(self) -> Dict: if runtime.deployment_mode == ModelDeploymentMode.STREAM: if not hasattr(oci.data_science.models, "StreamConfigurationDetails"): - raise EnvironmentError( + raise OSError( "Model deployment mode hasn't been supported in the current OCI SDK installed." ) model_deployment_configuration_details[ @@ -1786,9 +1791,13 @@ def _build_category_log_details(self) -> Dict: logs = {} if ( - self.infrastructure.access_log and - self.infrastructure.access_log.get(self.infrastructure.CONST_LOG_GROUP_ID, None) - and self.infrastructure.access_log.get(self.infrastructure.CONST_LOG_ID, None) + self.infrastructure.access_log + and self.infrastructure.access_log.get( + self.infrastructure.CONST_LOG_GROUP_ID, None + ) + and self.infrastructure.access_log.get( + self.infrastructure.CONST_LOG_ID, None + ) ): logs[self.infrastructure.CONST_ACCESS] = { self.infrastructure.CONST_LOG_GROUP_ID: self.infrastructure.access_log.get( @@ -1799,9 +1808,13 @@ def _build_category_log_details(self) -> Dict: ), } if ( - self.infrastructure.predict_log and - self.infrastructure.predict_log.get(self.infrastructure.CONST_LOG_GROUP_ID, None) - and self.infrastructure.predict_log.get(self.infrastructure.CONST_LOG_ID, None) + self.infrastructure.predict_log + and self.infrastructure.predict_log.get( + self.infrastructure.CONST_LOG_GROUP_ID, None + ) + and self.infrastructure.predict_log.get( + self.infrastructure.CONST_LOG_ID, None + ) ): logs[self.infrastructure.CONST_PREDICT] = { self.infrastructure.CONST_LOG_GROUP_ID: self.infrastructure.predict_log.get( diff --git a/tests/unitary/with_extras/aqua/test_deployment.py b/tests/unitary/with_extras/aqua/test_deployment.py index 2e6329fa8..da8830a77 100644 --- a/tests/unitary/with_extras/aqua/test_deployment.py +++ b/tests/unitary/with_extras/aqua/test_deployment.py @@ -2368,29 +2368,84 @@ def test_validate_multimodel_deployment_feasibility_positive_single( "test_data/deployment/aqua_summary_multi_model_single.json", ) - def test_get_deployment_status(self): + def test_get_deployment_status_success(self): + model_deployment = copy.deepcopy(TestDataset.model_deployment_object[0]) deployment_id = "fakeid.datasciencemodeldeployment.oc1.iad.xxx" work_request_id = "fakeid.workrequest.oc1.iad.xxx" model_type = "custom" model_name = "model_name" with patch( - "ads.model.service.oci_datascience_model_deployment.DataScienceWorkRequest.__init__" - ) as mock_ds_work_request: - mock_ds_work_request.return_value = None - with patch( - "ads.model.service.oci_datascience_model_deployment.DataScienceWorkRequest.wait_work_request" - ) as mock_wait: - self.app.get_deployment_status( - deployment_id, work_request_id, model_type, model_name - ) + "ads.model.service.oci_datascience_model_deployment.DataScienceWorkRequest.__init__", + return_value=None, + ) as mock_ds_work_request, patch( + "ads.model.service.oci_datascience_model_deployment.DataScienceWorkRequest.wait_work_request" + ) as mock_wait: + self.app.get_deployment_status( + oci.data_science.models.ModelDeploymentSummary(**model_deployment), + work_request_id, + model_type, + model_name, + ) - mock_ds_work_request.assert_called_with(work_request_id) - mock_wait.assert_called_with( - progress_bar_description="Creating model deployment", - max_wait_time=DEFAULT_WAIT_TIME, - poll_interval=DEFAULT_POLL_INTERVAL, - ) + mock_ds_work_request.assert_called_once_with(work_request_id) + mock_wait.assert_called_once_with( + progress_bar_description="Creating model deployment", + max_wait_time=DEFAULT_WAIT_TIME, + poll_interval=DEFAULT_POLL_INTERVAL, + ) + + def raise_exception(*args, **kwargs): + raise Exception("Work request failed") + + def test_get_deployment_status_failed(self): + model_deployment = copy.deepcopy(TestDataset.model_deployment_object[0]) + deployment_id = "fakeid.datasciencemodeldeployment.oc1.iad.xxx" + work_request_id = "fakeid.workrequest.oc1.iad.xxx" + model_type = "custom" + model_name = "model_name" + with patch( + "ads.telemetry.client.TelemetryClient.record_event" + ) as mock_record_event, patch( + "ads.aqua.modeldeployment.deployment.DataScienceWorkRequest" + ) as mock_ds_work_request_class, patch( + "ads.model.deployment.model_deployment.ModelDeployment.show_logs" + ) as mock_show_log: + mock_ds_work_request_instance = MagicMock() + mock_ds_work_request_class.return_value = mock_ds_work_request_instance + + mock_ds_work_request_instance._error_message = [ + MagicMock(message="Some error occurred") + ] + + mock_ds_work_request_instance.wait_work_request.side_effect = ( + self.raise_exception + ) + + logs_df = MagicMock() + logs_df.sort_values.return_value = logs_df + logs_df.empty = False + logs_df.iloc.__getitem__.return_value = { + "message": "Error: deployment failed!" + } + mock_show_log.return_value = logs_df + + self.app.get_deployment_status( + ModelDeployment(), + work_request_id, + model_type, + model_name, + ) + mock_record_event.assert_called_once() + args, kwargs = mock_record_event.call_args + self.assertEqual(kwargs["category"], f"aqua/{model_type}/deployment/status") + self.assertEqual(kwargs["action"], "FAILED") + self.assertIn("work_request_error", kwargs) + self.assertIn("status", kwargs) + self.assertIn("ocid", kwargs) + self.assertIn("model_name", kwargs) + + mock_ds_work_request_class.assert_called_once_with(work_request_id) class TestBaseModelSpec: