From 685e6bb3745748799cc8238b6e48668ef4f70dac Mon Sep 17 00:00:00 2001 From: qstokkink Date: Fri, 21 Feb 2025 14:38:49 +0100 Subject: [PATCH] Avoid micro-writes in the Rendezvous component --- src/tribler/core/components.py | 2 +- src/tribler/core/rendezvous/database.py | 2 +- .../core/rendezvous/rendezvous_hook.py | 32 +++++++++++-- .../core/rendezvous/test_rendezvous_hook.py | 47 ++++++++++++++----- 4 files changed, 66 insertions(+), 17 deletions(-) diff --git a/src/tribler/core/components.py b/src/tribler/core/components.py index a11e396a04c..0ce6be8682d 100644 --- a/src/tribler/core/components.py +++ b/src/tribler/core/components.py @@ -199,7 +199,7 @@ def finalize(self, ipv8: IPv8, session: Session, community: Community) -> None: from tribler.core.rendezvous.community import RendezvousCommunity from tribler.core.rendezvous.rendezvous_hook import RendezvousHook - rendezvous_hook = RendezvousHook(cast(RendezvousCommunity, community).composition.database) + rendezvous_hook = RendezvousHook(cast(RendezvousCommunity, community).composition.database, community) ipv8.network.add_peer_observer(rendezvous_hook) diff --git a/src/tribler/core/rendezvous/database.py b/src/tribler/core/rendezvous/database.py index 401f28b90bf..a588fa6a231 100644 --- a/src/tribler/core/rendezvous/database.py +++ b/src/tribler/core/rendezvous/database.py @@ -38,7 +38,7 @@ def add(self, peer: Peer, start_timestamp: float, stop_timestamp: float) -> None """ Write a peer's session time to the database. """ - with db_session(immediate=True): + with db_session(): address = peer.address family = socket.AF_INET6 if isinstance(address, UDPv6Address) else socket.AF_INET self.Certificate(public_key=peer.public_key.key_to_bin(), diff --git a/src/tribler/core/rendezvous/rendezvous_hook.py b/src/tribler/core/rendezvous/rendezvous_hook.py index 03c631f1e1f..cd45b3e5b1f 100644 --- a/src/tribler/core/rendezvous/rendezvous_hook.py +++ b/src/tribler/core/rendezvous/rendezvous_hook.py @@ -1,8 +1,10 @@ import logging import time +from ipv8.community import Community from ipv8.peerdiscovery.network import Network, PeerObserver from ipv8.types import Peer +from pony.orm import db_session from tribler.core.rendezvous.database import RendezvousDatabase @@ -12,18 +14,40 @@ class RendezvousHook(PeerObserver): Keep track of peers that we have seen. """ - def __init__(self, rendezvous_db: RendezvousDatabase) -> None: + def __init__(self, rendezvous_db: RendezvousDatabase, community: Community) -> None: """ Write rendezvous info to the given database. """ self.rendezvous_db = rendezvous_db + self.write_queue: list[tuple[Peer, float, float]] = [] + + self.community = community + self.community.register_shutdown_task(self.shutdown, community.network) + self.community.register_task("Peer write scheduler", self.schedule_write_peers, interval=60.0) + + def consume_write_queue(self, queue: list[tuple[Peer, float, float]]) -> None: + """ + Consume the queue in bulk. + """ + with db_session: + for entry in queue: + self.rendezvous_db.add(*entry) + + async def schedule_write_peers(self) -> None: + """ + Write the peers in an executor. + """ + forward = self.write_queue + self.write_queue = [] + await self.community.register_executor_task("Peer write executor", self.consume_write_queue, forward) + def shutdown(self, network: Network) -> None: """ Write all data to disk. """ - for peer in network.verified_peers: - self.on_peer_removed(peer) + self.consume_write_queue([(peer, peer.creation_time, self.current_time) for peer in network.verified_peers + if self.current_time >= peer.creation_time]) if self.rendezvous_db: self.rendezvous_db.shutdown() @@ -44,6 +68,6 @@ def on_peer_removed(self, peer: Peer) -> None: Callback for when a peer is removed: write its online time to the database. """ if self.current_time >= peer.creation_time: - self.rendezvous_db.add(peer, peer.creation_time, self.current_time) + self.write_queue.append((peer, peer.creation_time, self.current_time)) else: logging.exception("%s was first seen in the future! Something is seriously wrong!", peer) diff --git a/src/tribler/test_unit/core/rendezvous/test_rendezvous_hook.py b/src/tribler/test_unit/core/rendezvous/test_rendezvous_hook.py index 5a54d30057a..0321f411b2a 100644 --- a/src/tribler/test_unit/core/rendezvous/test_rendezvous_hook.py +++ b/src/tribler/test_unit/core/rendezvous/test_rendezvous_hook.py @@ -1,24 +1,41 @@ from __future__ import annotations +from asyncio import sleep + from ipv8.keyvault.crypto import default_eccrypto from ipv8.peer import Peer from ipv8.peerdiscovery.network import Network +from ipv8.taskmanager import TaskManager from ipv8.test.base import TestBase from tribler.core.rendezvous.database import RendezvousDatabase from tribler.core.rendezvous.rendezvous_hook import RendezvousHook +class MockCommunity(TaskManager): + """ + Fake community for testing. + """ + + def __init__(self) -> None: + """ + Fake a Community, just add a network field to a TaskManager. + """ + super().__init__() + self.network = Network() + + class MockedRendezvousHook(RendezvousHook): """ A mocked RendezvousHook that allows for time to be controlled. """ - def __init__(self, rendezvous_db: RendezvousDatabase, mocked_time: float | None = None) -> None: + def __init__(self, rendezvous_db: RendezvousDatabase, task_manager: MockCommunity, + mocked_time: float | None = None) -> None: """ Create a new MockedRendezvousHook with a certain time set. """ - super().__init__(rendezvous_db) + super().__init__(rendezvous_db, task_manager) self.mocked_time = mocked_time @property @@ -41,64 +58,72 @@ def setUp(self) -> None: Create a peer and a memory-based database. """ super().setUp() + self.community = MockCommunity() self.peer = Peer(default_eccrypto.generate_key("curve25519").pub()) self.memdb = RendezvousDatabase(":memory:") - self.hook = MockedRendezvousHook(self.memdb) + self.hook = MockedRendezvousHook(self.memdb, self.community) async def tearDown(self) -> None: """ Shut down the database. """ - self.memdb.shutdown() - self.hook.shutdown(Network()) + await self.community.shutdown_task_manager() await super().tearDown() - def test_peer_added(self) -> None: + async def test_peer_added(self) -> None: """ Test if peers are not added to the database after an add hook yet. """ self.hook.on_peer_added(self.peer) + await sleep(0) retrieved = self.memdb.get(self.peer) self.assertEqual(0, len(retrieved)) + self.assertEqual(0, len(self.hook.write_queue)) - def test_peer_removed(self) -> None: + async def test_peer_removed(self) -> None: """ Test if peers are correctly added to the database after a remove hook. """ self.hook.on_peer_added(self.peer) self.hook.mocked_time = self.peer.creation_time + 1.0 self.hook.on_peer_removed(self.peer) + await sleep(0) # Schedule the addition + self.hook.consume_write_queue(self.hook.write_queue) # Consume the queue manually retrieved = self.memdb.get(self.peer) + self.assertEqual(1, len(self.hook.write_queue)) self.assertEqual(1, len(retrieved)) self.assertEqual((self.peer.creation_time, self.hook.mocked_time), (retrieved[0].start, retrieved[0].stop)) def test_peer_store_on_shutdown(self) -> None: """ Test if peers are correctly added to the database when shutting down. + + This should happen immediately, not on a thread. """ - network = Network() - network.add_verified_peer(self.peer) + self.community.network.add_verified_peer(self.peer) self.hook.on_peer_added(self.peer) self.hook.mocked_time = self.peer.creation_time + 1.0 - self.hook.shutdown(network) + self.hook.shutdown(self.community.network) retrieved = self.memdb.get(self.peer) self.assertEqual(1, len(retrieved)) self.assertEqual((self.peer.creation_time, self.hook.mocked_time), (retrieved[0].start, retrieved[0].stop)) - def test_peer_ignore_future(self) -> None: + async def test_peer_ignore_future(self) -> None: """ Test if peers are not added from the future. """ self.hook.on_peer_added(self.peer) self.hook.mocked_time = self.peer.creation_time - 1.0 self.hook.on_peer_removed(self.peer) + await sleep(0) retrieved = self.memdb.get(self.peer) self.assertEqual(0, len(retrieved)) + self.assertEqual(0, len(self.hook.write_queue))