Skip to content

Commit c9e40b9

Browse files
committed
watching logs and pushing them to telemetry
1 parent 0a2725c commit c9e40b9

File tree

3 files changed

+167
-31
lines changed

3 files changed

+167
-31
lines changed

ads/aqua/modeldeployment/deployment.py

Lines changed: 89 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55

66
import json
7+
import re
78
import shlex
89
import threading
910
from datetime import datetime, timedelta
@@ -732,28 +733,18 @@ def _create_deployment(
732733
f"Aqua model deployment {deployment_id} created for model {aqua_model_id}. Work request Id is {deployment.dsc_model_deployment.workflow_req_id}"
733734
)
734735
status_list = []
735-
progress_thread_1 = threading.Thread(
736-
target=deployment.watch,
737-
args=(status_list),
738-
daemon=True,
739-
)
740-
progress_thread_1.start()
741736

742-
progress_thread_2 = threading.Thread(
737+
progress_thread = threading.Thread(
743738
target=self.get_deployment_status,
744739
args=(
745740
deployment_id,
746741
deployment.dsc_model_deployment.workflow_req_id,
747742
model_type,
748743
model_name,
749-
status_list,
750744
),
751745
daemon=True,
752746
)
753-
progress_thread_2.start()
754-
755-
progress_thread_1.join()
756-
progress_thread_2.join()
747+
progress_thread.start()
757748

758749
# we arbitrarily choose last 8 characters of OCID to identify MD in telemetry
759750
telemetry_kwargs = {"ocid": get_ocid_substring(deployment_id, key_len=8)}
@@ -1245,11 +1236,11 @@ def list_shapes(self, **kwargs) -> List[ComputeShapeSummary]:
12451236

12461237
def get_deployment_status(
12471238
self,
1239+
deployment,
12481240
model_deployment_id: str,
12491241
work_request_id: str,
12501242
model_type: str,
12511243
model_name: str,
1252-
status_list: List[str] = [],
12531244
) -> None:
12541245
"""Waits for the data science model deployment to be completed and log its status in telemetry.
12551246
@@ -1262,17 +1253,14 @@ def get_deployment_status(
12621253
The work request Id of the model deployment.
12631254
model_type: str
12641255
The type of aqua model to be deployed. Allowed values are: `custom`, `service` and `multi_model`.
1265-
status_list: List[str]
1266-
The list of status frmo streams the access and/or predict logs of model deployment.
12671256
12681257
Returns
12691258
-------
12701259
AquaDeployment
12711260
An Aqua deployment instance.
12721261
"""
1273-
12741262
ocid = get_ocid_substring(model_deployment_id, key_len=8)
1275-
telemetry_kwargs = {"ocid": ocid, "model_name": model_name}
1263+
status_list: List[str] = []
12761264

12771265
data_science_work_request: DataScienceWorkRequest = DataScienceWorkRequest(
12781266
work_request_id
@@ -1284,25 +1272,104 @@ def get_deployment_status(
12841272
max_wait_time=DEFAULT_WAIT_TIME,
12851273
poll_interval=DEFAULT_POLL_INTERVAL,
12861274
)
1287-
except Exception:
1275+
predict_logs = deployment.tail_logs("predict")
1276+
access_logs = deployment.tail_logs("access")
1277+
1278+
status = ""
1279+
if access_logs and len(access_logs) > 0:
1280+
print("access log list ############################")
1281+
print(access_logs)
1282+
status = access_logs[0]["message"]
1283+
1284+
if predict_logs and len(predict_logs) > 0:
1285+
print("predict_logs ############################")
1286+
print(predict_logs)
1287+
status += predict_logs[0]["message"]
1288+
1289+
status = re.sub(r"[^a-zA-Z0-9]", "", status)
1290+
telemetry_kwargs = {
1291+
"ocid": ocid,
1292+
"model_name": model_name,
1293+
"status": status,
1294+
}
1295+
print(telemetry_kwargs)
1296+
print("############################")
1297+
1298+
self.telemetry.record_event(
1299+
category=f"aqua/{model_type}/deployment/status",
1300+
action="LAST_LOG",
1301+
# detail=error_str,
1302+
**telemetry_kwargs,
1303+
)
1304+
1305+
except Exception as e:
12881306
if data_science_work_request._error_message:
12891307
error_str = ""
12901308
for error in data_science_work_request._error_message:
12911309
error_str = error_str + " " + error.message
12921310

12931311
status = ""
1294-
if len(status_list) > 0:
1295-
status = status_list[-1]
1296-
1297-
telemetry_kwargs["status"] = status
1312+
predict_logs = deployment.tail_logs("predict")
1313+
access_logs = deployment.tail_logs("access")
1314+
if access_logs and len(access_logs) > 0:
1315+
print(access_logs)
1316+
status = access_logs[0]["message"]
1317+
1318+
if predict_logs and len(predict_logs) > 0:
1319+
print("predict_logs ############################")
1320+
print(predict_logs)
1321+
status += predict_logs[0]["message"]
1322+
status = re.sub(r"[^a-zA-Z0-9]", "", status)
1323+
error_str = re.sub(r"[^a-zA-Z0-9]", "", error_str)
1324+
telemetry_kwargs = {
1325+
"ocid": ocid,
1326+
"model_name": model_name,
1327+
"status": error_str + " " + status,
1328+
}
1329+
print(telemetry_kwargs)
1330+
print("############################")
12981331

12991332
self.telemetry.record_event(
13001333
category=f"aqua/{model_type}/deployment/status",
13011334
action="FAILED",
13021335
detail=error_str,
13031336
**telemetry_kwargs,
13041337
)
1338+
else:
1339+
print(str(e))
1340+
status = str(e)
1341+
predict_logs = deployment.tail_logs("predict")
1342+
access_logs = deployment.tail_logs("access")
1343+
if access_logs and len(access_logs) > 0:
1344+
print("access log list ############################")
1345+
print(access_logs)
1346+
status = access_logs[0]["message"]
1347+
1348+
if predict_logs and len(predict_logs) > 0:
1349+
print("predict_logs ############################")
1350+
print(predict_logs)
1351+
status += predict_logs[0]["message"]
1352+
1353+
status = re.sub(r"[^a-zA-Z0-9]", "", status)
1354+
error_str = re.sub(r"[^a-zA-Z0-9]", "", error_str)
1355+
1356+
telemetry_kwargs = {
1357+
"ocid": ocid,
1358+
"model_name": model_name,
1359+
"status": error_str + " " + status,
1360+
}
1361+
print(telemetry_kwargs)
1362+
print("############################")
1363+
1364+
self.telemetry.record_event(
1365+
category=f"aqua/{model_type}/deployment/status",
1366+
action="FAILED",
1367+
# detail=error_str,
1368+
**telemetry_kwargs,
1369+
)
1370+
13051371
else:
1372+
telemetry_kwargs = {"ocid": ocid, "model_name": model_name}
13061373
self.telemetry.record_event_async(
13071374
category=f"aqua/{model_type}/deployment/status",
13081375
action="SUCCEEDED",

ads/common/oci_logging.py

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
11
#!/usr/bin/env python
2-
# -*- coding: utf-8; -*-
32

43
# Copyright (c) 2021, 2024 Oracle and/or its affiliates.
54
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
65

76
import datetime
87
import logging
98
import time
10-
from typing import Dict, Union, List
9+
from typing import Dict, List, Union
1110

11+
import oci.exceptions
1212
import oci.logging
1313
import oci.loggingsearch
14-
import oci.exceptions
14+
1515
from ads.common.decorator.utils import class_or_instance_method
1616
from ads.common.oci_mixin import OCIModelMixin, OCIWorkRequestMixin
1717
from ads.common.oci_resource import OCIResource, ResourceNotFoundError
1818

19-
2019
logger = logging.getLogger(__name__)
2120

2221
# Maximum number of log records to be returned by default.
@@ -862,9 +861,48 @@ def tail(
862861
time_start=time_start,
863862
log_filter=log_filter,
864863
)
865-
self._print(
866-
sorted(tail_logs, key=lambda log: log["time"])
864+
self._print(sorted(tail_logs, key=lambda log: log["time"]))
865+
866+
def get_tail_logs(
867+
self,
868+
source: str = None,
869+
limit: int = LOG_RECORDS_LIMIT,
870+
time_start: datetime.datetime = None,
871+
log_filter: str = None,
872+
) -> List[Union[oci.loggingsearch.models.SearchResult, dict]]:
873+
"""Returns the most recent consolidated log records.
874+
875+
Parameters
876+
----------
877+
source : str, optional
878+
Expression or OCID to filter the "source" field of the OCI log record.
879+
Defaults to None.
880+
limit : int, optional.
881+
Maximum number of records to be returned.
882+
If limit is set to None, all logs from time_start to now will be returned.
883+
Defaults to 100.
884+
time_start : datetime.datetime, optional
885+
Starting time for the log query.
886+
Defaults to None.
887+
log_filter : str, optional
888+
Expression for filtering the logs. This will be the WHERE clause of the query.
889+
Defaults to None.
890+
891+
Returns
892+
-------
893+
list
894+
A list of oci.loggingsearch.models.SearchResult objects or log records sorted in descending order by time
895+
Each log record is a dictionary with the following keys: `annotation`, `id`, `time`,
896+
`message` and `datetime`.
897+
"""
898+
tail_logs = self._search_and_format(
899+
source=source,
900+
limit=limit,
901+
sort_order=SortOrder.DESC,
902+
time_start=time_start,
903+
log_filter=log_filter,
867904
)
905+
return sorted(tail_logs, key=lambda log: log["time"])
868906

869907
def head(
870908
self,

ads/model/deployment/model_deployment.py

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -729,13 +729,46 @@ def update(
729729

730730
return self._update_from_oci_model(response)
731731

732+
def tail_logs(
733+
self, log_type: str = None, time_start: datetime = None, log_filter: str = None
734+
) -> List[Union[oci.loggingsearch.models.SearchResult, dict]]:
735+
"""Returns the most recent consolidated log records for the model deployment
736+
737+
Parameters
738+
----------
739+
source : str, optional
740+
Expression or OCID to filter the "source" field of the OCI log record.
741+
Defaults to None.
742+
limit : int, optional.
743+
Maximum number of records to be returned.
744+
If limit is set to None, all logs from time_start to now will be returned.
745+
Defaults to 100.
746+
time_start : datetime.datetime, optional
747+
Starting time for the log query.
748+
Defaults to None.
749+
log_filter : str, optional
750+
Expression for filtering the logs. This will be the WHERE clause of the query.
751+
Defaults to None.
752+
753+
Returns
754+
-------
755+
list
756+
A list of oci.loggingsearch.models.SearchResult objects or log records sorted in descending order by time
757+
Each log record is a dictionary with the following keys: `annotation`, `id`, `time`,
758+
`message` and `datetime`.
759+
"""
760+
return self.logs(log_type).get_tail_logs(
761+
source=self.model_deployment_id,
762+
time_start=time_start,
763+
log_filter=log_filter,
764+
)
765+
732766
def watch(
733767
self,
734768
log_type: str = None,
735769
time_start: datetime = None,
736770
interval: int = LOG_INTERVAL,
737771
log_filter: str = None,
738-
status_list: List[str] = None,
739772
) -> "ModelDeployment":
740773
"""Streams the access and/or predict logs of model deployment.
741774
@@ -764,8 +797,6 @@ def watch(
764797
status = ""
765798
while not self._stop_condition():
766799
status = self._check_and_print_status(status)
767-
if status not in status_list:
768-
status_list.append(status)
769800
time.sleep(interval)
770801

771802
time_start = time_start or self.time_created

0 commit comments

Comments
 (0)