Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid micro-writes in the Rendezvous component #8450

Merged
merged 1 commit into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/tribler/core/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def finalize(self, ipv8: IPv8, session: Session, community: Community) -> None:
from tribler.core.rendezvous.community import RendezvousCommunity
from tribler.core.rendezvous.rendezvous_hook import RendezvousHook

rendezvous_hook = RendezvousHook(cast(RendezvousCommunity, community).composition.database)
rendezvous_hook = RendezvousHook(cast(RendezvousCommunity, community).composition.database, community)
ipv8.network.add_peer_observer(rendezvous_hook)


Expand Down
2 changes: 1 addition & 1 deletion src/tribler/core/rendezvous/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def add(self, peer: Peer, start_timestamp: float, stop_timestamp: float) -> None
"""
Write a peer's session time to the database.
"""
with db_session(immediate=True):
with db_session():
address = peer.address
family = socket.AF_INET6 if isinstance(address, UDPv6Address) else socket.AF_INET
self.Certificate(public_key=peer.public_key.key_to_bin(),
Expand Down
32 changes: 28 additions & 4 deletions src/tribler/core/rendezvous/rendezvous_hook.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
import time

from ipv8.community import Community
from ipv8.peerdiscovery.network import Network, PeerObserver
from ipv8.types import Peer
from pony.orm import db_session

from tribler.core.rendezvous.database import RendezvousDatabase

Expand All @@ -12,18 +14,40 @@ class RendezvousHook(PeerObserver):
Keep track of peers that we have seen.
"""

def __init__(self, rendezvous_db: RendezvousDatabase) -> None:
def __init__(self, rendezvous_db: RendezvousDatabase, community: Community) -> None:
"""
Write rendezvous info to the given database.
"""
self.rendezvous_db = rendezvous_db

self.write_queue: list[tuple[Peer, float, float]] = []

self.community = community
self.community.register_shutdown_task(self.shutdown, community.network)
self.community.register_task("Peer write scheduler", self.schedule_write_peers, interval=60.0)

def consume_write_queue(self, queue: list[tuple[Peer, float, float]]) -> None:
"""
Consume the queue in bulk.
"""
with db_session:
for entry in queue:
self.rendezvous_db.add(*entry)

async def schedule_write_peers(self) -> None:
"""
Write the peers in an executor.
"""
forward = self.write_queue
self.write_queue = []
await self.community.register_executor_task("Peer write executor", self.consume_write_queue, forward)

def shutdown(self, network: Network) -> None:
"""
Write all data to disk.
"""
for peer in network.verified_peers:
self.on_peer_removed(peer)
self.consume_write_queue([(peer, peer.creation_time, self.current_time) for peer in network.verified_peers
if self.current_time >= peer.creation_time])
if self.rendezvous_db:
self.rendezvous_db.shutdown()

Expand All @@ -44,6 +68,6 @@ def on_peer_removed(self, peer: Peer) -> None:
Callback for when a peer is removed: write its online time to the database.
"""
if self.current_time >= peer.creation_time:
self.rendezvous_db.add(peer, peer.creation_time, self.current_time)
self.write_queue.append((peer, peer.creation_time, self.current_time))
else:
logging.exception("%s was first seen in the future! Something is seriously wrong!", peer)
47 changes: 36 additions & 11 deletions src/tribler/test_unit/core/rendezvous/test_rendezvous_hook.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,41 @@
from __future__ import annotations

from asyncio import sleep

from ipv8.keyvault.crypto import default_eccrypto
from ipv8.peer import Peer
from ipv8.peerdiscovery.network import Network
from ipv8.taskmanager import TaskManager
from ipv8.test.base import TestBase

from tribler.core.rendezvous.database import RendezvousDatabase
from tribler.core.rendezvous.rendezvous_hook import RendezvousHook


class MockCommunity(TaskManager):
"""
Fake community for testing.
"""

def __init__(self) -> None:
"""
Fake a Community, just add a network field to a TaskManager.
"""
super().__init__()
self.network = Network()


class MockedRendezvousHook(RendezvousHook):
"""
A mocked RendezvousHook that allows for time to be controlled.
"""

def __init__(self, rendezvous_db: RendezvousDatabase, mocked_time: float | None = None) -> None:
def __init__(self, rendezvous_db: RendezvousDatabase, task_manager: MockCommunity,
mocked_time: float | None = None) -> None:
"""
Create a new MockedRendezvousHook with a certain time set.
"""
super().__init__(rendezvous_db)
super().__init__(rendezvous_db, task_manager)
self.mocked_time = mocked_time

@property
Expand All @@ -41,64 +58,72 @@ def setUp(self) -> None:
Create a peer and a memory-based database.
"""
super().setUp()
self.community = MockCommunity()
self.peer = Peer(default_eccrypto.generate_key("curve25519").pub())
self.memdb = RendezvousDatabase(":memory:")
self.hook = MockedRendezvousHook(self.memdb)
self.hook = MockedRendezvousHook(self.memdb, self.community)

async def tearDown(self) -> None:
"""
Shut down the database.
"""
self.memdb.shutdown()
self.hook.shutdown(Network())
await self.community.shutdown_task_manager()
await super().tearDown()

def test_peer_added(self) -> None:
async def test_peer_added(self) -> None:
"""
Test if peers are not added to the database after an add hook yet.
"""
self.hook.on_peer_added(self.peer)
await sleep(0)

retrieved = self.memdb.get(self.peer)

self.assertEqual(0, len(retrieved))
self.assertEqual(0, len(self.hook.write_queue))

def test_peer_removed(self) -> None:
async def test_peer_removed(self) -> None:
"""
Test if peers are correctly added to the database after a remove hook.
"""
self.hook.on_peer_added(self.peer)
self.hook.mocked_time = self.peer.creation_time + 1.0
self.hook.on_peer_removed(self.peer)
await sleep(0) # Schedule the addition
self.hook.consume_write_queue(self.hook.write_queue) # Consume the queue manually

retrieved = self.memdb.get(self.peer)

self.assertEqual(1, len(self.hook.write_queue))
self.assertEqual(1, len(retrieved))
self.assertEqual((self.peer.creation_time, self.hook.mocked_time), (retrieved[0].start, retrieved[0].stop))

def test_peer_store_on_shutdown(self) -> None:
"""
Test if peers are correctly added to the database when shutting down.
This should happen immediately, not on a thread.
"""
network = Network()
network.add_verified_peer(self.peer)
self.community.network.add_verified_peer(self.peer)
self.hook.on_peer_added(self.peer)
self.hook.mocked_time = self.peer.creation_time + 1.0
self.hook.shutdown(network)
self.hook.shutdown(self.community.network)

retrieved = self.memdb.get(self.peer)

self.assertEqual(1, len(retrieved))
self.assertEqual((self.peer.creation_time, self.hook.mocked_time), (retrieved[0].start, retrieved[0].stop))

def test_peer_ignore_future(self) -> None:
async def test_peer_ignore_future(self) -> None:
"""
Test if peers are not added from the future.
"""
self.hook.on_peer_added(self.peer)
self.hook.mocked_time = self.peer.creation_time - 1.0
self.hook.on_peer_removed(self.peer)
await sleep(0)

retrieved = self.memdb.get(self.peer)

self.assertEqual(0, len(retrieved))
self.assertEqual(0, len(self.hook.write_queue))