diff --git a/docs/docs_requirements.txt b/docs/docs_requirements.txt index fd94f18af..77f4b0bd5 100644 --- a/docs/docs_requirements.txt +++ b/docs/docs_requirements.txt @@ -19,5 +19,6 @@ notebook <7 black >= 24.4.2 +# Formatting packages pinned for consistency with format checker. yapf == 0.32.0 -isort == 5.10.1 +isort == 5.10.1 \ No newline at end of file diff --git a/trulens_eval/examples/experimental/dummy_example.ipynb b/trulens_eval/examples/experimental/dummy_example.ipynb index ea4399954..1453f877d 100644 --- a/trulens_eval/examples/experimental/dummy_example.ipynb +++ b/trulens_eval/examples/experimental/dummy_example.ipynb @@ -50,7 +50,7 @@ "from trulens_eval import Feedback\n", "from trulens_eval import Tru\n", "from trulens_eval.feedback.provider.hugs import Dummy\n", - "from trulens_eval.schema import FeedbackMode\n", + "from trulens_eval.schema.feedback import FeedbackMode\n", "from trulens_eval.tru_custom_app import TruCustomApp\n", "from trulens_eval.utils.threading import TP\n", "\n", @@ -76,6 +76,15 @@ ")" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "tru.get_records_and_feedback(limit=10)[0]" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/trulens_eval/trulens_eval/database/base.py b/trulens_eval/trulens_eval/database/base.py index 542ec5588..654c9f126 100644 --- a/trulens_eval/trulens_eval/database/base.py +++ b/trulens_eval/trulens_eval/database/base.py @@ -270,13 +270,19 @@ def get_apps(self) -> Iterable[JSON]: @abc.abstractmethod def get_records_and_feedback( self, - app_ids: Optional[List[mod_types_schema.AppID]] = None + app_ids: Optional[List[mod_types_schema.AppID]] = None, + offset: Optional[int] = None, + limit: Optional[int] = None ) -> Tuple[pd.DataFrame, Sequence[str]]: """Get records fom the database. Args: app_ids: If given, retrieve only the records for the given apps. Otherwise all apps are retrieved. + + offset: Database row offset. + + limit: Limit on rows (records) returned. Returns: A dataframe with the records. diff --git a/trulens_eval/trulens_eval/database/orm.py b/trulens_eval/trulens_eval/database/orm.py index a7431fb08..6ef3481d6 100644 --- a/trulens_eval/trulens_eval/database/orm.py +++ b/trulens_eval/trulens_eval/database/orm.py @@ -22,7 +22,6 @@ from trulens_eval.schema import app as mod_app_schema from trulens_eval.schema import feedback as mod_feedback_schema from trulens_eval.schema import record as mod_record_schema -from trulens_eval.schema import types as mod_types_schema from trulens_eval.utils.json import json_str_of_obj TYPE_JSON = Text @@ -222,6 +221,7 @@ class Record(base): backref=backref('records', cascade="all,delete"), primaryjoin='AppDefinition.app_id == Record.app_id', foreign_keys=app_id, + order_by="(Record.ts,Record.record_id)" ) @classmethod @@ -282,6 +282,8 @@ class FeedbackResult(base): backref=backref('feedback_results', cascade="all,delete"), primaryjoin='Record.record_id == FeedbackResult.record_id', foreign_keys=record_id, + order_by= + "(FeedbackResult.last_ts,FeedbackResult.feedback_result_id)" ) feedback_definition = relationship( @@ -290,6 +292,8 @@ class FeedbackResult(base): primaryjoin= "FeedbackDefinition.feedback_definition_id == FeedbackResult.feedback_definition_id", foreign_keys=feedback_definition_id, + order_by= + "(FeedbackResult.last_ts,FeedbackResult.feedback_result_id)" ) @classmethod diff --git a/trulens_eval/trulens_eval/database/sqlalchemy.py b/trulens_eval/trulens_eval/database/sqlalchemy.py index 8523067cb..25d5e385c 100644 --- a/trulens_eval/trulens_eval/database/sqlalchemy.py +++ b/trulens_eval/trulens_eval/database/sqlalchemy.py @@ -14,10 +14,7 @@ import numpy as np import pandas as pd from pydantic import Field -from sqlalchemy import create_engine -from sqlalchemy import Engine -from sqlalchemy import func -from sqlalchemy import select +import sqlalchemy as sa from sqlalchemy.orm import joinedload from sqlalchemy.orm import sessionmaker from sqlalchemy.sql import text as sql_text @@ -75,7 +72,7 @@ class SQLAlchemyDB(DB): session_params: dict = Field(default_factory=dict) """Sqlalchemy-related session.""" - engine: Optional[Engine] = None + engine: Optional[sa.Engine] = None """Sqlalchemy engine.""" session: Optional[sessionmaker] = None @@ -113,7 +110,7 @@ def __init__( ) def _reload_engine(self): - self.engine = create_engine(**self.engine_params) + self.engine = sa.create_engine(**self.engine_params) self.session = sessionmaker(self.engine, **self.session_params) @classmethod @@ -415,7 +412,7 @@ def get_feedback_defs( """See [DB.get_feedback_defs][trulens_eval.database.base.DB.get_feedback_defs].""" with self.session.begin() as session: - q = select(self.orm.FeedbackDefinition) + q = sa.select(self.orm.FeedbackDefinition) if feedback_definition_id: q = q.filter_by(feedback_definition_id=feedback_definition_id) fb_defs = (row[0] for row in session.execute(q)) @@ -484,9 +481,9 @@ def _feedback_query( limit: Optional[int] = None ): if count: - q = func.count(self.orm.FeedbackResult.feedback_result_id) + q = sa.func.count(self.orm.FeedbackResult.feedback_result_id) else: - q = select(self.orm.FeedbackResult) + q = sa.select(self.orm.FeedbackResult) if record_id: q = q.filter_by(record_id=record_id) @@ -515,7 +512,7 @@ def _feedback_query( q = q.limit(limit) if shuffle: - q = q.order_by(func.random()) + q = q.order_by(sa.func.random()) return q @@ -573,7 +570,9 @@ def get_feedback( def get_records_and_feedback( self, - app_ids: Optional[List[str]] = None + app_ids: Optional[List[str]] = None, + offset: Optional[int] = None, + limit: Optional[int] = None ) -> Tuple[pd.DataFrame, Sequence[str]]: """See [DB.get_records_and_feedback][trulens_eval.database.base.DB.get_records_and_feedback].""" @@ -582,18 +581,42 @@ def get_records_and_feedback( # for large databases without the use of pagination. with self.session.begin() as session: - stmt = select(self.orm.AppDefinition).options( - joinedload(self.orm.AppDefinition.records)\ - .joinedload(self.orm.Record.feedback_results) - ) + stmt = sa.select(self.orm.Record) + # NOTE: We are selecting records here because offset and limit need + # to be with respect to those rows instead of AppDefinition or + # FeedbackResult rows. if app_ids: - stmt = stmt.where(self.orm.AppDefinition.app_id.in_(app_ids)) + stmt = stmt.where(self.orm.Record.app_id.in_(app_ids)) + + stmt = stmt.options(joinedload(self.orm.Record.feedback_results)) + # NOTE(piotrm): The joinedload here makes it so that the + # feedback_results get loaded eagerly instead if lazily when + # accessed later. + + # TODO(piotrm): The subsequent logic in helper methods end up + # reading all of the records and feedback_results in order to create + # a DataFrame so there is no reason to not eagerly get all of this + # data. Ideally, though, we would be making some sort of lazy + # DataFrame and then could use the lazy join feature of sqlalchemy. + + stmt = stmt.order_by(self.orm.Record.ts, self.orm.Record.record_id) + # NOTE: feedback_results order is governed by the order_by on the + # orm.FeedbackResult.record backref definition. Here, we need to + # order Records as we did not use an auto join to retrieve them. If + # records were to be retrieved from AppDefinition.records via auto + # join, though, the orm backref ordering would be able to take hold. + + stmt = stmt.limit(limit).offset(offset) - ex = session.execute(stmt).unique() # unique needed for joinedload - apps = (row[0] for row in ex) + ex = session.execute(stmt).unique() + # unique needed for joinedload above. - return AppsExtractor().get_df_and_cols(apps) + records = [rec[0] for rec in ex] + # TODO: Make the iteration of records lazy in some way. See + # TODO(piotrm) above. + + return AppsExtractor().get_df_and_cols(records=records) # Use this Perf for missing Perfs. @@ -699,6 +722,8 @@ def _extract(_cost_json: Union[str, dict]) -> Tuple[int, float]: class AppsExtractor: + """Utilities for creating dataframes from orm instances.""" + app_cols = ["app_id", "app_json", "type"] rec_cols = [ "record_id", "input", "output", "tags", "record_json", "cost_json", @@ -711,9 +736,33 @@ def __init__(self): self.feedback_columns = set() def get_df_and_cols( - self, apps: Iterable[orm.AppDefinition] + self, + apps: Optional[List[orm.AppDefinition]] = None, + records: Optional[List[orm.Record]] = None ) -> Tuple[pd.DataFrame, Sequence[str]]: - df = pd.concat(self.extract_apps(apps)) + """Produces a records dataframe which joins in information from apps and + feedback results. + + Args: + apps: If given, includes all records of all of the apps in this + iterable. + + records: If given, includes only these records. Mutually exclusive + with `apps`. + """ + + assert apps is None or records is None, "`apps` and `records` are mutually exclusive" + + if apps is not None: + df = pd.concat(self.extract_apps(apps)) + + elif records is not None: + apps = set(record.app for record in records) + df = pd.concat(self.extract_apps(apps=apps, records=records)) + + else: + raise ValueError("'apps` or `records` must be provided") + df["latency"] = _extract_latency(df["perf_json"]) df.reset_index( drop=True, inplace=True @@ -722,14 +771,35 @@ def get_df_and_cols( return df, list(self.feedback_columns) def extract_apps( - self, apps: Iterable[orm.AppDefinition] + self, + apps: Iterable[orm.AppDefinition], + records: Optional[List[orm.Record]] = None ) -> Iterable[pd.DataFrame]: + """ + Creates record rows with app information. + + TODO: The means for enumerating records in this method is not ideal as + it does a lot of filtering. + """ + yield pd.DataFrame( [], columns=self.app_cols + self.rec_cols ) # prevent empty iterator for _app in apps: try: - if _recs := _app.records: + if records is None: + # If records not provided, get all of them for `_app`. + _recs = _app.records + else: + # Otherwise get only the ones in `records`. WARNING: Avoid + # using _app.records here as doing so might get all of the + # records even the ones not in `records` + _recs = ( + record for record in records + if record.app_id == _app.app_id + ) + + if _recs: df = pd.DataFrame(data=self.extract_records(_recs)) for col in self.app_cols: diff --git a/trulens_eval/trulens_eval/pages/Evaluations.py b/trulens_eval/trulens_eval/pages/Evaluations.py index a67444aab..883d52e8e 100644 --- a/trulens_eval/trulens_eval/pages/Evaluations.py +++ b/trulens_eval/trulens_eval/pages/Evaluations.py @@ -55,8 +55,6 @@ tru = Tru() lms = tru.db -df_results, feedback_cols = lms.get_records_and_feedback([]) - # TODO: remove code redundancy / redundant database calls feedback_directions = { ( @@ -157,33 +155,32 @@ def extract_metadata(row: pd.Series) -> str: return str(record_data["meta"]) -if df_results.empty: - st.write("No records yet...") +apps = list(app['app_id'] for app in lms.get_apps()) +if "app" in st.session_state: + app = st.session_state.app else: - apps = list(df_results.app_id.unique()) - if "app" in st.session_state: - app = st.session_state.app - else: - app = apps + app = apps - st.query_params['app'] = app +st.query_params['app'] = app - options = st.multiselect("Filter Applications", apps, default=app) +options = st.multiselect("Filter Applications", apps, default=app) - if len(options) == 0: - st.header("All Applications") - app_df = df_results +df_results, feedback_cols = lms.get_records_and_feedback(app_ids=options) - elif len(options) == 1: - st.header(options[0]) +if len(options) == 0: + st.header("All Applications") - app_df = df_results[df_results.app_id.isin(options)] +elif len(options) == 1: + st.header(options[0]) - else: - st.header("Multiple Applications Selected") +else: + st.header("Multiple Applications Selected") - app_df = df_results[df_results.app_id.isin(options)] +if df_results.empty: + st.write("No records yet...") +else: + app_df = df_results tab1, tab2 = st.tabs(["Records", "Feedback Functions"]) diff --git a/trulens_eval/trulens_eval/tru.py b/trulens_eval/trulens_eval/tru.py index 8a970c6c8..4cc022c44 100644 --- a/trulens_eval/trulens_eval/tru.py +++ b/trulens_eval/trulens_eval/tru.py @@ -688,7 +688,9 @@ def get_apps(self) -> List[serial.JSONized[mod_app_schema.AppDefinition]]: def get_records_and_feedback( self, - app_ids: Optional[List[mod_types_schema.AppID]] = None + app_ids: Optional[List[mod_types_schema.AppID]] = None, + offset: Optional[int] = None, + limit: Optional[int] = None ) -> Tuple[pandas.DataFrame, List[str]]: """Get records, their feeback results, and feedback names. @@ -696,6 +698,10 @@ def get_records_and_feedback( app_ids: A list of app ids to filter records by. If empty or not given, all apps' records will be returned. + offset: Record row offset. + + limit: Limit on the number of records to return. + Returns: Dataframe of records with their feedback results. @@ -705,7 +711,9 @@ def get_records_and_feedback( if app_ids is None: app_ids = [] - df, feedback_columns = self.db.get_records_and_feedback(app_ids) + df, feedback_columns = self.db.get_records_and_feedback( + app_ids, offset=offset, limit=limit + ) return df, feedback_columns