Skip to content

Commit

Permalink
Avoid micro-writes in the Rendezvous component
Browse files Browse the repository at this point in the history
  • Loading branch information
qstokkink committed Feb 21, 2025
1 parent 405df9f commit b8e3989
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/tribler/core/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def finalize(self, ipv8: IPv8, session: Session, community: Community) -> None:
from tribler.core.rendezvous.community import RendezvousCommunity
from tribler.core.rendezvous.rendezvous_hook import RendezvousHook

rendezvous_hook = RendezvousHook(cast(RendezvousCommunity, community).composition.database)
rendezvous_hook = RendezvousHook(cast(RendezvousCommunity, community).composition.database, community)
ipv8.network.add_peer_observer(rendezvous_hook)


Expand Down
2 changes: 1 addition & 1 deletion src/tribler/core/rendezvous/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def add(self, peer: Peer, start_timestamp: float, stop_timestamp: float) -> None
"""
Write a peer's session time to the database.
"""
with db_session(immediate=True):
with db_session():
address = peer.address
family = socket.AF_INET6 if isinstance(address, UDPv6Address) else socket.AF_INET
self.Certificate(public_key=peer.public_key.key_to_bin(),
Expand Down
32 changes: 28 additions & 4 deletions src/tribler/core/rendezvous/rendezvous_hook.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
import time

from ipv8.community import Community
from ipv8.peerdiscovery.network import Network, PeerObserver
from ipv8.types import Peer
from pony.orm import db_session

from tribler.core.rendezvous.database import RendezvousDatabase

Expand All @@ -12,18 +14,40 @@ class RendezvousHook(PeerObserver):
Keep track of peers that we have seen.
"""

def __init__(self, rendezvous_db: RendezvousDatabase) -> None:
def __init__(self, rendezvous_db: RendezvousDatabase, community: Community) -> None:
"""
Write rendezvous info to the given database.
"""
self.rendezvous_db = rendezvous_db

self.write_queue: list[tuple[Peer, float, float]] = []

self.community = community
self.community.register_shutdown_task(self.shutdown, community.network)
self.community.register_task("Peer write scheduler", self.schedule_write_peers, interval=60.0)

def consume_write_queue(self, queue: list[tuple[Peer, float, float]]) -> None:
"""
Consume the queue in bulk.
"""
with db_session:
for entry in queue:
self.rendezvous_db.add(*entry)

async def schedule_write_peers(self) -> None:
"""
Write the peers in an executor.
"""
forward = self.write_queue
self.write_queue = []
await self.community.register_executor_task("Peer write executor", self.consume_write_queue, forward)

def shutdown(self, network: Network) -> None:
"""
Write all data to disk.
"""
for peer in network.verified_peers:
self.on_peer_removed(peer)
self.consume_write_queue([(peer, peer.creation_time, self.current_time) for peer in network.verified_peers
if self.current_time >= peer.creation_time])
if self.rendezvous_db:
self.rendezvous_db.shutdown()

Expand All @@ -44,6 +68,6 @@ def on_peer_removed(self, peer: Peer) -> None:
Callback for when a peer is removed: write its online time to the database.
"""
if self.current_time >= peer.creation_time:
self.rendezvous_db.add(peer, peer.creation_time, self.current_time)
self.write_queue.append((peer, peer.creation_time, self.current_time))
else:
logging.exception("%s was first seen in the future! Something is seriously wrong!", peer)
47 changes: 36 additions & 11 deletions src/tribler/test_unit/core/rendezvous/test_rendezvous_hook.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,41 @@
from __future__ import annotations

from asyncio import sleep

from ipv8.keyvault.crypto import default_eccrypto
from ipv8.peer import Peer
from ipv8.peerdiscovery.network import Network
from ipv8.taskmanager import TaskManager
from ipv8.test.base import TestBase

from tribler.core.rendezvous.database import RendezvousDatabase
from tribler.core.rendezvous.rendezvous_hook import RendezvousHook


class MockCommunity(TaskManager):
"""
Fake community for testing.
"""

def __init__(self) -> None:
"""
Fake a Community, just add a network field to a TaskManager.
"""
super().__init__()
self.network = Network()


class MockedRendezvousHook(RendezvousHook):
"""
A mocked RendezvousHook that allows for time to be controlled.
"""

def __init__(self, rendezvous_db: RendezvousDatabase, mocked_time: float | None = None) -> None:
def __init__(self, rendezvous_db: RendezvousDatabase, task_manager: MockCommunity,
mocked_time: float | None = None) -> None:
"""
Create a new MockedRendezvousHook with a certain time set.
"""
super().__init__(rendezvous_db)
super().__init__(rendezvous_db, task_manager)
self.mocked_time = mocked_time

@property
Expand All @@ -41,64 +58,72 @@ def setUp(self) -> None:
Create a peer and a memory-based database.
"""
super().setUp()
self.community = MockCommunity()
self.peer = Peer(default_eccrypto.generate_key("curve25519").pub())
self.memdb = RendezvousDatabase(":memory:")
self.hook = MockedRendezvousHook(self.memdb)
self.hook = MockedRendezvousHook(self.memdb, self.community)

async def tearDown(self) -> None:
"""
Shut down the database.
"""
self.memdb.shutdown()
self.hook.shutdown(Network())
await self.community.shutdown_task_manager()
await super().tearDown()

def test_peer_added(self) -> None:
async def test_peer_added(self) -> None:
"""
Test if peers are not added to the database after an add hook yet.
"""
self.hook.on_peer_added(self.peer)
await sleep(0)

retrieved = self.memdb.get(self.peer)

self.assertEqual(0, len(retrieved))
self.assertEqual(0, len(self.hook.write_queue))

def test_peer_removed(self) -> None:
async def test_peer_removed(self) -> None:
"""
Test if peers are correctly added to the database after a remove hook.
"""
self.hook.on_peer_added(self.peer)
self.hook.mocked_time = self.peer.creation_time + 1.0
self.hook.on_peer_removed(self.peer)
await sleep(0) # Schedule the addition
self.hook.consume_write_queue(self.hook.write_queue) # Consume the queue manually

retrieved = self.memdb.get(self.peer)

self.assertEqual(1, len(self.hook.write_queue))
self.assertEqual(1, len(retrieved))
self.assertEqual((self.peer.creation_time, self.hook.mocked_time), (retrieved[0].start, retrieved[0].stop))

def test_peer_store_on_shutdown(self) -> None:
"""
Test if peers are correctly added to the database when shutting down.
This should happen immediately, not on a thread.
"""
network = Network()
network.add_verified_peer(self.peer)
self.community.network.add_verified_peer(self.peer)
self.hook.on_peer_added(self.peer)
self.hook.mocked_time = self.peer.creation_time + 1.0
self.hook.shutdown(network)
self.hook.shutdown(self.community.network)

retrieved = self.memdb.get(self.peer)

self.assertEqual(1, len(retrieved))
self.assertEqual((self.peer.creation_time, self.hook.mocked_time), (retrieved[0].start, retrieved[0].stop))

def test_peer_ignore_future(self) -> None:
async def test_peer_ignore_future(self) -> None:
"""
Test if peers are not added from the future.
"""
self.hook.on_peer_added(self.peer)
self.hook.mocked_time = self.peer.creation_time - 1.0
self.hook.on_peer_removed(self.peer)
await sleep(0)

retrieved = self.memdb.get(self.peer)

self.assertEqual(0, len(retrieved))
self.assertEqual(0, len(self.hook.write_queue))

0 comments on commit b8e3989

Please sign in to comment.