Skip to content

Commit

Permalink
feat: bulk save objects ignoring conflicts (#185)
Browse files Browse the repository at this point in the history
* feat: bulk save objects and ignore conflicts

Signed-off-by: Victor Garcia Reolid <[email protected]>

* style: apply black

Signed-off-by: Victor Garcia Reolid <[email protected]>

* rename belief_records to beliefs_data_frame

Signed-off-by: Victor Garcia Reolid <[email protected]>

* on conflict update

Signed-off-by: Victor Garcia Reolid <[email protected]>

* apply black

Signed-off-by: Victor Garcia Reolid <[email protected]>

* apply pre-commit

Signed-off-by: Victor Garcia Reolid <[email protected]>

* use column names instead of constraint name

Signed-off-by: Victor Garcia Reolid <[email protected]>

* appyl pre-commit

Signed-off-by: Victor Garcia Reolid <[email protected]>

* allow_overwrite is False -> on_conflict_do_nothing

Signed-off-by: Victor Garcia Reolid <[email protected]>

* fix: update now failing test

Signed-off-by: F.N. Claessen <[email protected]>

* just in case it's empty...

Signed-off-by: Victor Garcia Reolid <[email protected]>

* update tests

Signed-off-by: Victor Garcia Reolid <[email protected]>

* fix copy/pasting issue

Signed-off-by: Victor Garcia Reolid <[email protected]>

* fix: update another now failing test

Signed-off-by: F.N. Claessen <[email protected]>

* Revert "fix: update another now failing test"

This reverts commit a68b5cb.

* refactor: move up check start of method

Signed-off-by: F.N. Claessen <[email protected]>

---------

Signed-off-by: Victor Garcia Reolid <[email protected]>
Signed-off-by: F.N. Claessen <[email protected]>
Co-authored-by: F.N. Claessen <[email protected]>
  • Loading branch information
victorgarcia98 and Flix6x authored Jul 17, 2024
1 parent 5b38bb0 commit a22cce8
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 16 deletions.
55 changes: 43 additions & 12 deletions timely_beliefs/beliefs/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
func,
select,
)
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.hybrid import hybrid_method, hybrid_property
from sqlalchemy.orm import Session, backref, declarative_mixin, relationship
Expand Down Expand Up @@ -251,7 +252,7 @@ def add_to_session(
beliefs_data_frame: "BeliefsDataFrame",
expunge_session: bool = False,
allow_overwrite: bool = False,
bulk_save_objects: bool = False,
bulk_save_objects: bool = True,
commit_transaction: bool = False,
):
"""Add a BeliefsDataFrame as timed beliefs to a database session.
Expand All @@ -274,23 +275,53 @@ def add_to_session(
if False, you can still add other data to the session
and commit it all within an atomic transaction
"""
if beliefs_data_frame.empty:
return
# Belief timing is stored as the belief horizon rather than as the belief time
belief_records = (
beliefs_data_frame.convert_index_from_belief_time_to_horizon()
.reset_index()
.to_dict("records")
beliefs_data_frame = (
beliefs_data_frame.convert_index_from_belief_time_to_horizon().reset_index()
)
beliefs = [cls(sensor=beliefs_data_frame.sensor, **d) for d in belief_records]
beliefs = [
cls(sensor=beliefs_data_frame.sensor, **d)
for d in beliefs_data_frame.to_dict("records")
]

if expunge_session:
session.expunge_all()
if not allow_overwrite:
if bulk_save_objects:
session.bulk_save_objects(beliefs)

if bulk_save_objects:
# serialize source and sensor
beliefs_data_frame["source_id"] = beliefs_data_frame["source"].apply(
lambda x: x.id
)
beliefs_data_frame["sensor_id"] = beliefs_data_frame.sensor.id
beliefs_data_frame = beliefs_data_frame.drop(columns=["source"])

smt = insert(cls).values(beliefs_data_frame.to_dict("records"))

if allow_overwrite:
smt = smt.on_conflict_do_update(
index_elements=[
"event_start",
"belief_horizon",
"source_id",
"sensor_id",
"cumulative_probability",
],
set_=dict(event_value=smt.excluded.event_value),
)
else:
session.add_all(beliefs)
smt = smt.on_conflict_do_nothing()

session.execute(smt)

else:
for belief in beliefs:
session.merge(belief)
if allow_overwrite:
for belief in beliefs:
session.merge(belief)
else:
session.add_all(beliefs)

if commit_transaction:
session.commit()

Expand Down
30 changes: 26 additions & 4 deletions timely_beliefs/tests/test_belief_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,31 @@ def test_adding_to_session(
assert len(bdf) == len(new_bdf)


@pytest.mark.parametrize("bulk_save_objects", [False, True])
def test_fail_adding_to_session(
bulk_save_objects: bool,
def test_adding_to_session_succeeds(
time_slot_sensor: DBSensor,
rolling_day_ahead_beliefs_about_time_slot_events,
):

# Retrieve some data from the database
bdf = DBTimedBelief.search_session(
session=session,
sensor=time_slot_sensor,
)

# Attempting to save the same data should not fail, even if we expunge everything from the session
try:
DBTimedBelief.add_to_session(
session,
bdf,
expunge_session=True,
bulk_save_objects=True,
commit_transaction=True,
)
except IntegrityError as exception:
raise pytest.fail("DID RAISE {0}".format(exception))


def test_adding_to_session_fails(
time_slot_sensor: DBSensor,
rolling_day_ahead_beliefs_about_time_slot_events,
):
Expand All @@ -68,6 +90,6 @@ def test_fail_adding_to_session(
session,
bdf,
expunge_session=True,
bulk_save_objects=bulk_save_objects,
bulk_save_objects=False,
commit_transaction=True,
)

0 comments on commit a22cce8

Please sign in to comment.