Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify cache, add support for mssql #9938

Merged
merged 2 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 72 additions & 47 deletions ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@
import importlib.metadata
import keyword
import re
import sys
import urllib.parse
import weakref
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar
from typing import TYPE_CHECKING, Any, ClassVar, NamedTuple

import ibis
import ibis.common.exceptions as exc
import ibis.config
import ibis.expr.operations as ops
import ibis.expr.types as ir
from ibis import util
from ibis.common.caching import RefCountedCache

if TYPE_CHECKING:
from collections.abc import Iterable, Iterator, Mapping, MutableMapping
Expand Down Expand Up @@ -777,7 +778,74 @@
self.drop_database(name=name, catalog=database, force=force)


class BaseBackend(abc.ABC, _FileIOHandler):
class CacheEntry(NamedTuple):
orig_op: ops.Relation
cached_op_ref: weakref.ref[ops.Relation]
finalizer: weakref.finalize


class CacheHandler:
"""A mixin for handling `.cache()`/`CachedTable` operations."""

def __init__(self):
self._cache_name_to_entry = {}
self._cache_op_to_entry = {}

def _cached_table(self, table: ir.Table) -> ir.CachedTable:
"""Convert a Table to a CachedTable.

Parameters
----------
table
Table expression to cache

Returns
-------
Table
Cached table
"""
entry = self._cache_op_to_entry.get(table.op())
if entry is None or (cached_op := entry.cached_op_ref()) is None:
cached_op = self._create_cached_table(util.gen_name("cached"), table).op()
entry = CacheEntry(
table.op(),
weakref.ref(cached_op),
weakref.finalize(
cached_op, self._finalize_cached_table, cached_op.name
),
)
self._cache_op_to_entry[table.op()] = entry
self._cache_name_to_entry[cached_op.name] = entry
return ir.CachedTable(cached_op)

def _finalize_cached_table(self, name: str) -> None:
"""Release a cached table given its name.

This is a no-op if the cached table is already released.

Parameters
----------
name
The name of the cached table.
"""
if (entry := self._cache_name_to_entry.pop(name, None)) is not None:
jcrist marked this conversation as resolved.
Show resolved Hide resolved
self._cache_op_to_entry.pop(entry.orig_op)
entry.finalizer.detach()
try:
self._drop_cached_table(name)
except Exception:

Check warning on line 836 in ibis/backends/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/__init__.py#L836

Added line #L836 was not covered by tests
# suppress exceptions during interpreter shutdown
if not sys.is_finalizing():
raise

Check warning on line 839 in ibis/backends/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/__init__.py#L839

Added line #L839 was not covered by tests

def _create_cached_table(self, name: str, expr: ir.Table) -> ir.Table:
return self.create_table(name, expr, schema=expr.schema(), temp=True)

def _drop_cached_table(self, name: str) -> None:
self.drop_table(name, force=True)


class BaseBackend(abc.ABC, _FileIOHandler, CacheHandler):
"""Base backend class.

All Ibis backends must subclass this class and implement all the
Expand All @@ -794,12 +862,7 @@
self._con_args: tuple[Any] = args
self._con_kwargs: dict[str, Any] = kwargs
self._can_reconnect: bool = True
# expression cache
self._query_cache = RefCountedCache(
populate=self._load_into_cache,
lookup=lambda name: self.table(name).op(),
finalize=self._clean_up_cached_table,
)
super().__init__()

@property
@abc.abstractmethod
Expand Down Expand Up @@ -1225,44 +1288,6 @@
f"{cls.name} backend has not implemented `has_operation` API"
)

def _cached(self, expr: ir.Table):
"""Cache the provided expression.

All subsequent operations on the returned expression will be performed on the cached data.

Parameters
----------
expr
Table expression to cache

Returns
-------
Expr
Cached table

"""
op = expr.op()
if (result := self._query_cache.get(op)) is None:
result = self._query_cache.store(expr)
return ir.CachedTable(result)

def _release_cached(self, expr: ir.CachedTable) -> None:
"""Releases the provided cached expression.

Parameters
----------
expr
Cached expression to release

"""
self._query_cache.release(expr.op().name)

def _load_into_cache(self, name, expr):
raise NotImplementedError(self.name)

def _clean_up_cached_table(self, name):
raise NotImplementedError(self.name)

def _transpile_sql(self, query: str, *, dialect: str | None = None) -> str:
# only transpile if dialect was passed
if dialect is None:
Expand Down
9 changes: 1 addition & 8 deletions ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,6 @@ class Backend(SQLBackend, CanCreateDatabase, CanCreateSchema):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.__session_dataset: bq.DatasetReference | None = None
self._query_cache.lookup = lambda name: self.table(
name,
database=(self._session_dataset.project, self._session_dataset.dataset_id),
).op()

@property
def _session_dataset(self):
Expand Down Expand Up @@ -1137,10 +1133,7 @@ def drop_view(
)
self.raw_sql(stmt.sql(self.name))

def _load_into_cache(self, name, expr):
self.create_table(name, expr, schema=expr.schema(), temp=True)

def _clean_up_cached_table(self, name):
def _drop_cached_table(self, name):
self.drop_table(
name,
database=(self._session_dataset.project, self._session_dataset.dataset_id),
Expand Down
4 changes: 2 additions & 2 deletions ibis/backends/dask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,5 +179,5 @@ def _convert_object(self, obj) -> dd.DataFrame:
pandas_df = super()._convert_object(obj)
return dd.from_pandas(pandas_df, npartitions=1)

def _load_into_cache(self, name, expr):
self.create_table(name, self.compile(expr).persist())
def _create_cached_table(self, name, expr):
return self.create_table(name, self.compile(expr).persist())
11 changes: 7 additions & 4 deletions ibis/backends/mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,15 +692,18 @@ def create_table(
new = raw_this.sql(self.dialect)
cur.execute(f"EXEC sp_rename '{old}', '{new}'")

if temp:
# If a temporary table, amend the output name/catalog/db accordingly
name = "##" + name
catalog = "tempdb"
db = "dbo"

if schema is None:
# Clean up temporary memtable if we've created one
# for in-memory reads
if temp_memtable_view is not None:
self.drop_table(temp_memtable_view)
return self.table(
"##" * temp + name,
database=("tempdb" * temp or catalog, "dbo" * temp or db),
)
return self.table(name, database=(catalog, db))

# preserve the input schema if it was provided
return ops.DatabaseTable(
Expand Down
13 changes: 13 additions & 0 deletions ibis/backends/mssql/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,19 @@ def test_create_temp_table_from_obj(con):
con.drop_table("fuhreal")


@pytest.mark.parametrize("explicit_schema", [False, True])
def test_create_temp_table_from_expression(con, explicit_schema, temp_table):
t = ibis.memtable(
{"x": [1, 2, 3], "y": ["a", "b", "c"]}, schema={"x": "int64", "y": "str"}
)
t2 = con.create_table(
temp_table, t, temp=True, schema=t.schema() if explicit_schema else None
)
res = con.to_pandas(t.order_by("y"))
sol = con.to_pandas(t2.order_by("y"))
assert res.equals(sol)


def test_from_url():
user = MSSQL_USER
password = MSSQL_PASS
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/oracle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,5 +647,5 @@ def _clean_up_tmp_table(self, name: str) -> None:
with contextlib.suppress(oracledb.DatabaseError):
bind.execute(f'DROP TABLE "{name}"')

def _clean_up_cached_table(self, name):
def _drop_cached_table(self, name):
self._clean_up_tmp_table(name)
6 changes: 3 additions & 3 deletions ibis/backends/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def _get_operations(cls):
def has_operation(cls, operation: type[ops.Value]) -> bool:
return operation in cls._get_operations()

def _clean_up_cached_table(self, name):
def _drop_cached_table(self, name):
del self.dictionary[name]

def to_pyarrow(
Expand Down Expand Up @@ -328,8 +328,8 @@ def execute(self, query, params=None, limit="default", **kwargs):

return PandasExecutor.execute(query.op(), backend=self, params=params)

def _load_into_cache(self, name, expr):
self.create_table(name, expr.execute())
def _create_cached_table(self, name, expr):
return self.create_table(name, expr.execute())


@lazy_singledispatch
Expand Down
6 changes: 3 additions & 3 deletions ibis/backends/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,10 +552,10 @@ def to_pyarrow_batches(
table = self._to_pyarrow_table(expr, params=params, limit=limit, **kwargs)
return table.to_reader(chunk_size)

def _load_into_cache(self, name, expr):
self.create_table(name, self.compile(expr).cache())
def _create_cached_table(self, name, expr):
return self.create_table(name, self.compile(expr).cache())

def _clean_up_cached_table(self, name):
def _drop_cached_table(self, name):
self.drop_table(name, force=True)


Expand Down
5 changes: 3 additions & 2 deletions ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,16 +704,17 @@
)
return self.raw_sql(f"ANALYZE TABLE {table} COMPUTE STATISTICS{maybe_noscan}")

def _load_into_cache(self, name, expr):
def _create_cached_table(self, name, expr):
query = self.compile(expr)
t = self._session.sql(query).cache()
assert t.is_cached
t.createOrReplaceTempView(name)
# store the underlying spark dataframe so we can release memory when
# asked to, instead of when the session ends
self._cached_dataframes[name] = t
return self.table(name)

Check warning on line 715 in ibis/backends/pyspark/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/pyspark/__init__.py#L715

Added line #L715 was not covered by tests

def _clean_up_cached_table(self, name):
def _drop_cached_table(self, name):
self._session.catalog.dropTempView(name)
t = self._cached_dataframes.pop(name)
assert t.is_cached
Expand Down
6 changes: 0 additions & 6 deletions ibis/backends/sql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,6 @@ def drop_view(
with self._safe_raw_sql(src):
pass

def _load_into_cache(self, name, expr):
self.create_table(name, expr, schema=expr.schema(), temp=True)

def _clean_up_cached_table(self, name):
self.drop_table(name, force=True)

def execute(
self,
expr: ir.Expr,
Expand Down
Loading
Loading