Skip to content

[AQUA] Track md logs for error logging #1219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
56 changes: 41 additions & 15 deletions ads/aqua/modeldeployment/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


import json
import re
import shlex
import threading
from datetime import datetime, timedelta
Expand Down Expand Up @@ -752,14 +753,16 @@ def _create_deployment(
).deploy(wait_for_completion=False)

deployment_id = deployment.id

logger.info(
f"Aqua model deployment {deployment_id} created for model {aqua_model_id}. Work request Id is {deployment.dsc_model_deployment.workflow_req_id}"
)
status_list = []

progress_thread = threading.Thread(
target=self.get_deployment_status,
args=(
deployment_id,
deployment,
deployment.dsc_model_deployment.workflow_req_id,
model_type,
model_name,
Expand Down Expand Up @@ -1265,7 +1268,7 @@ def list_shapes(self, **kwargs) -> List[ComputeShapeSummary]:

def get_deployment_status(
self,
model_deployment_id: str,
deployment: ModelDeployment,
work_request_id: str,
model_type: str,
model_name: str,
Expand All @@ -1287,37 +1290,60 @@ def get_deployment_status(
AquaDeployment
An Aqua deployment instance.
"""
ocid = get_ocid_substring(model_deployment_id, key_len=8)
telemetry_kwargs = {"ocid": ocid}

ocid = get_ocid_substring(deployment.id, key_len=8)
data_science_work_request: DataScienceWorkRequest = DataScienceWorkRequest(
work_request_id
)

try:
data_science_work_request.wait_work_request(
progress_bar_description="Creating model deployment",
max_wait_time=DEFAULT_WAIT_TIME,
poll_interval=DEFAULT_POLL_INTERVAL,
)
except Exception:
status = ""
logs = deployment.show_logs().sort_values(by="time", ascending=False)

if logs and len(logs) > 0:
status = logs.iloc[0]["message"]

status = re.sub(r"[^a-zA-Z0-9]", " ", status)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: any reason why we're removing the non-alphanumeric characters here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without removing the non-alphanumeric characters, I was getting bad request when calling the head_object endpoint.


if data_science_work_request._error_message:
error_str = ""
for error in data_science_work_request._error_message:
error_str = error_str + " " + error.message

self.telemetry.record_event(
category=f"aqua/{model_type}/deployment/status",
action="FAILED",
detail=error_str,
value=model_name,
**telemetry_kwargs,
)
error_str = re.sub(r"[^a-zA-Z0-9]", " ", error_str)
telemetry_kwargs = {
"ocid": ocid,
"model_name": model_name,
"work_request_error": error_str,
"status": status,
}

self.telemetry.record_event(
category=f"aqua/{model_type}/deployment/status",
action="FAILED",
**telemetry_kwargs,
)
else:
telemetry_kwargs = {
"ocid": ocid,
"model_name": model_name,
"status": status,
}

self.telemetry.record_event(
category=f"aqua/{model_type}/deployment/status",
action="FAILED",
**telemetry_kwargs,
)

else:
self.telemetry.record_event_async(
telemetry_kwargs = {"ocid": ocid, "model_name": model_name}
self.telemetry.record_event(
category=f"aqua/{model_type}/deployment/status",
action="SUCCEEDED",
value=model_name,
**telemetry_kwargs,
)
11 changes: 4 additions & 7 deletions ads/common/oci_logging.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2021, 2024 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

import datetime
import logging
import time
from typing import Dict, Union, List
from typing import Dict, List, Union

import oci.exceptions
import oci.logging
import oci.loggingsearch
import oci.exceptions

from ads.common.decorator.utils import class_or_instance_method
from ads.common.oci_mixin import OCIModelMixin, OCIWorkRequestMixin
from ads.common.oci_resource import OCIResource, ResourceNotFoundError


logger = logging.getLogger(__name__)

# Maximum number of log records to be returned by default.
Expand Down Expand Up @@ -862,9 +861,7 @@ def tail(
time_start=time_start,
log_filter=log_filter,
)
self._print(
sorted(tail_logs, key=lambda log: log["time"])
)
self._print(sorted(tail_logs, key=lambda log: log["time"]))

def head(
self,
Expand Down
89 changes: 51 additions & 38 deletions ads/model/deployment/model_deployment.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2021, 2023 Oracle and/or its affiliates.
# Copyright (c) 2021, 2025 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/


import collections
import copy
import datetime
import oci
import warnings
import time
from typing import Dict, List, Union, Any
import warnings
from typing import Any, Dict, List, Union

import oci
import oci.loggingsearch
from ads.common import auth as authutil
import pandas as pd
from ads.model.serde.model_input import JsonModelInputSERDE
from oci.data_science.models import (
CreateModelDeploymentDetails,
LogDetails,
UpdateModelDeploymentDetails,
)

from ads.common import auth as authutil
from ads.common import utils as ads_utils
from ads.common.oci_logging import (
LOG_INTERVAL,
LOG_RECORDS_LIMIT,
Expand All @@ -30,10 +35,10 @@
from ads.model.deployment.common.utils import send_request
from ads.model.deployment.model_deployment_infrastructure import (
DEFAULT_BANDWIDTH_MBPS,
DEFAULT_MEMORY_IN_GBS,
DEFAULT_OCPUS,
DEFAULT_REPLICA,
DEFAULT_SHAPE_NAME,
DEFAULT_OCPUS,
DEFAULT_MEMORY_IN_GBS,
MODEL_DEPLOYMENT_INFRASTRUCTURE_TYPE,
ModelDeploymentInfrastructure,
)
Expand All @@ -45,18 +50,14 @@
ModelDeploymentRuntimeType,
OCIModelDeploymentRuntimeType,
)
from ads.model.serde.model_input import JsonModelInputSERDE
from ads.model.service.oci_datascience_model_deployment import (
OCIDataScienceModelDeployment,
)
from ads.common import utils as ads_utils

from .common import utils
from .common.utils import State
from .model_deployment_properties import ModelDeploymentProperties
from oci.data_science.models import (
LogDetails,
CreateModelDeploymentDetails,
UpdateModelDeploymentDetails,
)

DEFAULT_WAIT_TIME = 1200
DEFAULT_POLL_INTERVAL = 10
Expand Down Expand Up @@ -751,6 +752,8 @@ def watch(
log_filter : str, optional
Expression for filtering the logs. This will be the WHERE clause of the query.
Defaults to None.
status_list : List[str], optional
List of status of model deployment. This is used to store list of status from logs.

Returns
-------
Expand Down Expand Up @@ -964,7 +967,9 @@ def predict(
except oci.exceptions.ServiceError as ex:
# When bandwidth exceeds the allocated value, TooManyRequests error (429) will be raised by oci backend.
if ex.status == 429:
bandwidth_mbps = self.infrastructure.bandwidth_mbps or DEFAULT_BANDWIDTH_MBPS
bandwidth_mbps = (
self.infrastructure.bandwidth_mbps or DEFAULT_BANDWIDTH_MBPS
)
utils.get_logger().warning(
f"Load balancer bandwidth exceeds the allocated {bandwidth_mbps} Mbps."
"To estimate the actual bandwidth, use formula: (payload size in KB) * (estimated requests per second) * 8 / 1024."
Expand Down Expand Up @@ -1644,22 +1649,22 @@ def _build_model_deployment_configuration_details(self) -> Dict:
}

if infrastructure.subnet_id:
instance_configuration[
infrastructure.CONST_SUBNET_ID
] = infrastructure.subnet_id
instance_configuration[infrastructure.CONST_SUBNET_ID] = (
infrastructure.subnet_id
)

if infrastructure.private_endpoint_id:
if not hasattr(
oci.data_science.models.InstanceConfiguration, "private_endpoint_id"
):
# TODO: add oci version with private endpoint support.
raise EnvironmentError(
raise OSError(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, did ruff suggest this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

"Private endpoint is not supported in the current OCI SDK installed."
)

instance_configuration[
infrastructure.CONST_PRIVATE_ENDPOINT_ID
] = infrastructure.private_endpoint_id
instance_configuration[infrastructure.CONST_PRIVATE_ENDPOINT_ID] = (
infrastructure.private_endpoint_id
)

scaling_policy = {
infrastructure.CONST_POLICY_TYPE: "FIXED_SIZE",
Expand Down Expand Up @@ -1704,7 +1709,7 @@ def _build_model_deployment_configuration_details(self) -> Dict:
oci.data_science.models,
"ModelDeploymentEnvironmentConfigurationDetails",
):
raise EnvironmentError(
raise OSError(
"Environment variable hasn't been supported in the current OCI SDK installed."
)

Expand All @@ -1720,9 +1725,9 @@ def _build_model_deployment_configuration_details(self) -> Dict:
and runtime.inference_server.upper()
== MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON
):
environment_variables[
"CONTAINER_TYPE"
] = MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON
environment_variables["CONTAINER_TYPE"] = (
MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON
)
runtime.set_spec(runtime.CONST_ENV, environment_variables)
environment_configuration_details = {
runtime.CONST_ENVIRONMENT_CONFIG_TYPE: runtime.environment_config_type,
Expand All @@ -1734,17 +1739,17 @@ def _build_model_deployment_configuration_details(self) -> Dict:
oci.data_science.models,
"OcirModelDeploymentEnvironmentConfigurationDetails",
):
raise EnvironmentError(
raise OSError(
"Container runtime hasn't been supported in the current OCI SDK installed."
)
environment_configuration_details["image"] = runtime.image
environment_configuration_details["imageDigest"] = runtime.image_digest
environment_configuration_details["cmd"] = runtime.cmd
environment_configuration_details["entrypoint"] = runtime.entrypoint
environment_configuration_details["serverPort"] = runtime.server_port
environment_configuration_details[
"healthCheckPort"
] = runtime.health_check_port
environment_configuration_details["healthCheckPort"] = (
runtime.health_check_port
)

model_deployment_configuration_details = {
infrastructure.CONST_DEPLOYMENT_TYPE: "SINGLE_MODEL",
Expand All @@ -1754,7 +1759,7 @@ def _build_model_deployment_configuration_details(self) -> Dict:

if runtime.deployment_mode == ModelDeploymentMode.STREAM:
if not hasattr(oci.data_science.models, "StreamConfigurationDetails"):
raise EnvironmentError(
raise OSError(
"Model deployment mode hasn't been supported in the current OCI SDK installed."
)
model_deployment_configuration_details[
Expand Down Expand Up @@ -1786,9 +1791,13 @@ def _build_category_log_details(self) -> Dict:

logs = {}
if (
self.infrastructure.access_log and
self.infrastructure.access_log.get(self.infrastructure.CONST_LOG_GROUP_ID, None)
and self.infrastructure.access_log.get(self.infrastructure.CONST_LOG_ID, None)
self.infrastructure.access_log
and self.infrastructure.access_log.get(
self.infrastructure.CONST_LOG_GROUP_ID, None
)
and self.infrastructure.access_log.get(
self.infrastructure.CONST_LOG_ID, None
)
):
logs[self.infrastructure.CONST_ACCESS] = {
self.infrastructure.CONST_LOG_GROUP_ID: self.infrastructure.access_log.get(
Expand All @@ -1799,9 +1808,13 @@ def _build_category_log_details(self) -> Dict:
),
}
if (
self.infrastructure.predict_log and
self.infrastructure.predict_log.get(self.infrastructure.CONST_LOG_GROUP_ID, None)
and self.infrastructure.predict_log.get(self.infrastructure.CONST_LOG_ID, None)
self.infrastructure.predict_log
and self.infrastructure.predict_log.get(
self.infrastructure.CONST_LOG_GROUP_ID, None
)
and self.infrastructure.predict_log.get(
self.infrastructure.CONST_LOG_ID, None
)
):
logs[self.infrastructure.CONST_PREDICT] = {
self.infrastructure.CONST_LOG_GROUP_ID: self.infrastructure.predict_log.get(
Expand Down
Loading