Skip to content

Commit

Permalink
Fix DB Issues (#1023)
Browse files Browse the repository at this point in the history
* Converting errors to UTF-8

* Exception logged when app records are not accessable

* Updated error log

* Added exception handing for feedback results

* Comment added for future reference

---------

Co-authored-by: Aaron <[email protected]>
Co-authored-by: Aaron Varghese <[email protected]>
Co-authored-by: Josh Reini <[email protected]>
  • Loading branch information
4 people authored Mar 22, 2024
1 parent 15fe242 commit 45ee7b1
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 45 deletions.
97 changes: 54 additions & 43 deletions trulens_eval/trulens_eval/database/sqlalchemy_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from sqlalchemy import select
from sqlalchemy.orm import sessionmaker
from sqlalchemy.schema import MetaData
from sqlite3 import OperationalError

from trulens_eval import schema
from trulens_eval.app import App
Expand Down Expand Up @@ -519,57 +520,67 @@ def extract_apps(
[], columns=self.app_cols + self.rec_cols
) # prevent empty iterator
for _app in apps:
if _recs := _app.records:
df = pd.DataFrame(data=self.extract_records(_recs))

for col in self.app_cols:
if col == "type":
# Previous DBs did not contain entire app so we cannot
# deserialize AppDefinition here unless we fix prior DBs
# in migration. Because of this, loading just the
# `root_class` here.
df[col] = str(
Class.model_validate(
json.loads(_app.app_json).get('root_class')
try:
if _recs := _app.records:
df = pd.DataFrame(data=self.extract_records(_recs))

for col in self.app_cols:
if col == "type":
# Previous DBs did not contain entire app so we cannot
# deserialize AppDefinition here unless we fix prior DBs
# in migration. Because of this, loading just the
# `root_class` here.
df[col] = str(
Class.model_validate(
json.loads(_app.app_json).get('root_class')
)
)
)
else:
df[col] = getattr(_app, col)
else:
df[col] = getattr(_app, col)

yield df
yield df
except OperationalError as e:
print("Error encountered while attempting to retrieve an app. This issue may stem from a corrupted database.")
print(f"Error details: {e}")


def extract_records(self,
records: Iterable[orm.Record]) -> Iterable[pd.Series]:
for _rec in records:
calls = defaultdict(list)
values = defaultdict(list)

for _res in _rec.feedback_results:
calls[_res.name].append(json.loads(_res.calls_json)["calls"])
if _res.multi_result is not None and (multi_result :=
json.loads(
_res.multi_result
)) is not None:
for key, val in multi_result.items():
if val is not None: # avoid getting Nones into np.mean
name = f"{_res.name}:::{key}"
values[name] = val
self.feedback_columns.add(name)
elif _res.result is not None: # avoid getting Nones into np.mean
values[_res.name].append(_res.result)
self.feedback_columns.add(_res.name)

row = {
**{k: np.mean(v) for k, v in values.items()},
**{k + "_calls": flatten(v) for k, v in calls.items()},
}

for col in self.rec_cols:
row[col] = datetime.fromtimestamp(
_rec.ts
).isoformat() if col == "ts" else getattr(_rec, col)

yield row

try:
for _res in _rec.feedback_results:
calls[_res.name].append(json.loads(_res.calls_json)["calls"])
if _res.multi_result is not None and (multi_result :=
json.loads(
_res.multi_result
)) is not None:
for key, val in multi_result.items():
if val is not None: # avoid getting Nones into np.mean
name = f"{_res.name}:::{key}"
values[name] = val
self.feedback_columns.add(name)
elif _res.result is not None: # avoid getting Nones into np.mean
values[_res.name].append(_res.result)
self.feedback_columns.add(_res.name)

row = {
**{k: np.mean(v) for k, v in values.items()},
**{k + "_calls": flatten(v) for k, v in calls.items()},
}

for col in self.rec_cols:
row[col] = datetime.fromtimestamp(
_rec.ts
).isoformat() if col == "ts" else getattr(_rec, col)

yield row
except Exception as e:
# Handling unexpected errors, possibly due to database issues.
print("Error encountered while attempting to retrieve feedback results. This issue may stem from a corrupted database.")
print(f"Error details: {e}")


def flatten(nested: Iterable[Iterable[Any]]) -> List[Any]:
Expand Down
6 changes: 4 additions & 2 deletions trulens_eval/trulens_eval/feedback/feedback.py
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,8 @@ def run(
return feedback_result

except:
exc_tb = traceback.format_exc()
# Convert traceback to a UTF-8 string, replacing errors to avoid encoding issues
exc_tb = traceback.format_exc().encode('utf-8', errors='replace').decode('utf-8')
logger.warning(f"Feedback Function exception caught: %s", exc_tb)
feedback_result.update(
error=exc_tb, status=FeedbackResultStatus.FAILED
Expand Down Expand Up @@ -950,7 +951,8 @@ def run_and_log(
).update(feedback_result_id=feedback_result_id)

except Exception:
exc_tb = traceback.format_exc()
# Convert traceback to a UTF-8 string, replacing errors to avoid encoding issues
exc_tb = traceback.format_exc().encode('utf-8', errors='replace').decode('utf-8')
db.insert_feedback(
feedback_result.update(
error=exc_tb, status=FeedbackResultStatus.FAILED
Expand Down

0 comments on commit 45ee7b1

Please sign in to comment.