diff --git a/examples/floodsub/basic_example.py b/examples/floodsub/basic_example.py new file mode 100644 index 000000000..6fb51098b --- /dev/null +++ b/examples/floodsub/basic_example.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 +""" +Basic FloodSub Example + +This is a simple example that demonstrates FloodSub publishing and subscribing +without relying on test utilities. It shows the core functionality. + +Run this example with: + python examples/floodsub/basic_example.py +""" + +import logging +import sys + +from multiaddr import Multiaddr +import trio + +from libp2p import new_host +from libp2p.crypto.secp256k1 import create_new_key_pair +from libp2p.pubsub.floodsub import FloodSub +from libp2p.pubsub.pubsub import Pubsub +from libp2p.tools.async_service import background_trio_service +from libp2p.tools.constants import FLOODSUB_PROTOCOL_ID + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("floodsub_basic") + + +async def main() -> None: + """Main function demonstrating basic FloodSub functionality.""" + logger.info("Starting basic FloodSub example...") + + # Create two hosts + key_pair1 = create_new_key_pair() + key_pair2 = create_new_key_pair() + host1 = new_host( + key_pair=key_pair1, + listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/0")], + ) + + host2 = new_host( + key_pair=key_pair2, + listen_addrs=[Multiaddr("/ip4/127.0.0.1/tcp/0")], + ) + + # Create FloodSub routers + floodsub1 = FloodSub(protocols=[FLOODSUB_PROTOCOL_ID]) + floodsub2 = FloodSub(protocols=[FLOODSUB_PROTOCOL_ID]) + + # Create Pubsub instances + pubsub1 = Pubsub( + host=host1, + router=floodsub1, + strict_signing=False, # Disable for simplicity + ) + + pubsub2 = Pubsub( + host=host2, + router=floodsub2, + strict_signing=False, # Disable for simplicity + ) + + # Start both pubsub services + async with background_trio_service(pubsub1): + async with background_trio_service(pubsub2): + await pubsub1.wait_until_ready() + await pubsub2.wait_until_ready() + + logger.info(f"Host 1 ID: {host1.get_id()}") + logger.info(f"Host 2 ID: {host2.get_id()}") + + # Start listening on both hosts + logger.info("Starting hosts...") + await host1.get_network().listen() + await host2.get_network().listen() + await trio.sleep(0.5) # Wait for hosts to start listening + + # Connect the hosts + logger.info("Connecting hosts...") + from libp2p.peer.peerinfo import info_from_p2p_addr + + peer_info = info_from_p2p_addr( + host2.get_addrs()[0].encapsulate( + Multiaddr(f"/p2p/{host2.get_id().pretty()}") + ) + ) + await host1.connect(peer_info) + await trio.sleep(1) # Wait for connection + + # Subscribe to topic on host2 + topic = "test-topic" + logger.info(f"Subscribing to topic: {topic}") + subscription = await pubsub2.subscribe(topic) + await trio.sleep(0.5) # Wait for subscription to propagate + + # Publish messages from host1 + messages = [ + "Hello from FloodSub!", + "This is message number 2", + "FloodSub is working great!", + ] + + for i, message in enumerate(messages): + logger.info(f"Publishing message {i + 1}: {message}") + await pubsub1.publish(topic, message.encode()) + await trio.sleep(0.5) + + # Receive messages on host2 + logger.info("Receiving messages...") + for i in range(len(messages)): + message = await subscription.get() + logger.info(f"Received message {i + 1}: {message.data.decode()}") + logger.info(f" From peer: {message.from_id.hex()}") + logger.info(f" Topics: {message.topicIDs}") + + logger.info("Basic FloodSub example completed successfully!") + + +if __name__ == "__main__": + try: + trio.run(main) + except KeyboardInterrupt: + logger.info("Example interrupted by user") + sys.exit(0) + except Exception as e: + logger.error(f"Example failed: {e}") + sys.exit(1) diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 912f3f1af..d88f72626 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -183,7 +183,7 @@ def new_swarm( tls_client_config: ssl.SSLContext | None = None, tls_server_config: ssl.SSLContext | None = None, ) -> INetworkService: - logger.debug(f"new_swarm: enable_quic={enable_quic}, listen_addrs={listen_addrs}") + logger.debug("new_swarm: enable_quic=%s, listen_addrs=%s", enable_quic, listen_addrs) """ Create a swarm instance based on the parameters. @@ -227,7 +227,7 @@ def new_swarm( ) addr = listen_addrs[0] - logger.debug(f"new_swarm: Creating transport for address: {addr}") + logger.debug("new_swarm: Creating transport for address: %s", addr) transport_maybe = create_transport_for_multiaddr( addr, temp_upgrader, @@ -241,14 +241,17 @@ def new_swarm( raise ValueError(f"Unsupported transport for listen_addrs: {listen_addrs}") transport = transport_maybe - logger.debug(f"new_swarm: Created transport: {type(transport)}") + logger.debug("new_swarm: Created transport: %s", type(transport)) # If enable_quic is True but we didn't get a QUIC transport, force QUIC if enable_quic and not isinstance(transport, QUICTransport): - logger.debug(f"new_swarm: Forcing QUIC transport (enable_quic=True but got {type(transport)})") + logger.debug( + "new_swarm: Forcing QUIC transport (enable_quic=True but got %s)", + type(transport), + ) transport = QUICTransport(key_pair.private_key, config=quic_transport_opt) - logger.debug(f"new_swarm: Final transport type: {type(transport)}") + logger.debug("new_swarm: Final transport type: %s", type(transport)) # Generate X25519 keypair for Noise noise_key_pair = create_new_x25519_key_pair() diff --git a/libp2p/kad_dht/kad_dht.py b/libp2p/kad_dht/kad_dht.py index 449fc94af..af105675e 100644 --- a/libp2p/kad_dht/kad_dht.py +++ b/libp2p/kad_dht/kad_dht.py @@ -23,7 +23,7 @@ ) from libp2p.custom_types import TProtocol from libp2p.discovery.random_walk.rt_refresh_manager import RTRefreshManager -from libp2p.kad_dht.utils import maybe_consume_signed_record +from libp2p.kad_dht.utils import maybe_consume_signed_record, sort_peer_ids_by_distance from libp2p.network.stream.net_stream import ( INetStream, ) @@ -68,6 +68,7 @@ # logger = logging.getLogger("libp2p.kademlia") # Default parameters ROUTING_TABLE_REFRESH_INTERVAL = 60 # 1 min in seconds for testing +MIN_PEERS_THRESHOLD = 5 # Configurable minimum for fallback to connected peers class DHTMode(Enum): @@ -116,6 +117,9 @@ def __init__( :param enable_random_walk: Whether to enable automatic random walk """ super().__init__() + self.MIN_PEERS_THRESHOLD = ( + 5 # Configurable minimum for fallback to connected peers + ) self.host = host self.local_peer_id = host.get_id() @@ -188,7 +192,7 @@ async def query_function(target_key: bytes) -> list[ID]: async def run(self) -> None: """Run the DHT service.""" - logger.info(f"Starting Kademlia DHT with peer ID {self.local_peer_id}") + logger.info("Starting Kademlia DHT with peer ID %s", self.local_peer_id) # Start the RT Refresh Manager in parallel with the main DHT service async with trio.open_nursery() as nursery: @@ -202,6 +206,11 @@ async def run(self) -> None: # Start the main DHT service loop nursery.start_soon(self._run_main_loop) + async def refresh_routing_table(self) -> None: + """Refresh the routing table.""" + logger.debug("Refreshing routing table") + await self.peer_routing.refresh_routing_table() + async def _run_main_loop(self) -> None: """Run the main DHT service loop.""" # Main service loop @@ -217,7 +226,7 @@ async def _run_main_loop(self) -> None: # Clean up expired values and provider records expired_values = self.value_store.cleanup_expired() if expired_values > 0: - logger.debug(f"Cleaned up {expired_values} expired values") + logger.debug("Cleaned up %d expired values", expired_values) self.provider_store.cleanup_expired() @@ -322,12 +331,12 @@ async def switch_mode(self, new_mode: DHTMode) -> DHTMode: """ # Validate that new_mode is a DHTMode enum if not isinstance(new_mode, DHTMode): - raise TypeError(f"new_mode must be DHTMode enum, got {type(new_mode)}") + raise TypeError("new_mode must be DHTMode enum, got %s", type(new_mode)) if new_mode == DHTMode.CLIENT: self.routing_table.cleanup_routing_table() self.mode = new_mode - logger.info(f"Switched to {new_mode.value} mode") + logger.info("Switched to %s mode", new_mode.value) return self.mode async def handle_stream(self, stream: INetStream) -> None: @@ -338,9 +347,9 @@ async def handle_stream(self, stream: INetStream) -> None: stream.close return peer_id = stream.muxed_conn.peer_id - logger.debug(f"Received DHT stream from peer {peer_id}") + logger.debug("Received DHT stream from peer %s", peer_id) await self.add_peer(peer_id) - logger.debug(f"Added peer {peer_id} to routing table") + logger.debug("Added peer %s to routing table", peer_id) closer_peer_envelope: Envelope | None = None provider_peer_envelope: Envelope | None = None @@ -371,7 +380,9 @@ async def handle_stream(self, stream: INetStream) -> None: message = Message() message.ParseFromString(msg_bytes) logger.debug( - f"Received DHT message from {peer_id}, type: {message.type}" + "Received DHT message from %s, type: %s", + peer_id, + message.type, ) # Handle FIND_NODE message @@ -383,7 +394,30 @@ async def handle_stream(self, stream: INetStream) -> None: closest_peers = self.routing_table.find_local_closest_peers( target_key, 20 ) - logger.debug(f"Found {len(closest_peers)} peers close to target") + + # Fallback to connected peers if + # routing table has insufficient peers + if len(closest_peers) < self.MIN_PEERS_THRESHOLD: + logger.debug( + "Routing table has insufficient peers (%d < %d) " + "for FIND_NODE in KadDHT, " + "using connected peers as fallback", + len(closest_peers), + self.MIN_PEERS_THRESHOLD, + ) + connected_peers = self.host.get_connected_peers() + if connected_peers: + # Sort connected peers by distance & use as response + fallback_peers = sort_peer_ids_by_distance( + target_key, connected_peers + )[:20] + closest_peers = fallback_peers + logger.debug( + "Using %d connected peers as FIND_NODE fallback in DHT", + len(closest_peers), + ) + + logger.debug("Found %d peers close to target", len(closest_peers)) # Consume the source signed_peer_record if sent if not maybe_consume_signed_record(message, self.host, peer_id): @@ -437,14 +471,15 @@ async def handle_stream(self, stream: INetStream) -> None: await stream.write(varint.encode(len(response_bytes))) await stream.write(response_bytes) logger.debug( - f"Sent FIND_NODE response with{len(response.closerPeers)} peers" + "Sent FIND_NODE response with %d peers", + len(response.closerPeers), ) # Handle ADD_PROVIDER message elif message.type == Message.MessageType.ADD_PROVIDER: # Process ADD_PROVIDER key = message.key - logger.debug(f"Received ADD_PROVIDER for key {key.hex()}") + logger.debug("Received ADD_PROVIDER for key %s", key.hex()) # Consume the source signed-peer-record if sent if not maybe_consume_signed_record(message, self.host, peer_id): @@ -461,8 +496,9 @@ async def handle_stream(self, stream: INetStream) -> None: provider_id = ID(provider_proto.id) if provider_id != peer_id: logger.warning( - f"Provider ID {provider_id} doesn't" - f"match sender {peer_id}, ignoring" + "Provider ID %s doesn't match sender %s, ignoring", + provider_id, + peer_id, ) continue @@ -472,13 +508,15 @@ async def handle_stream(self, stream: INetStream) -> None: try: addrs.append(Multiaddr(addr_bytes)) except Exception as e: - logger.warning(f"Failed to parse address: {e}") + logger.warning("Failed to parse address: %s", e) # Add to provider store provider_info = PeerInfo(provider_id, addrs) self.provider_store.add_provider(key, provider_info) logger.debug( - f"Added provider {provider_id} for key {key.hex()}" + "Added provider %s for key %s", + provider_id, + key.hex(), ) # Process the signed-records of provider if sent @@ -492,7 +530,7 @@ async def handle_stream(self, stream: INetStream) -> None: await stream.close() return except Exception as e: - logger.warning(f"Failed to process provider info: {e}") + logger.warning("Failed to process provider info: %s", e) # Send acknowledgement response = Message() @@ -512,7 +550,7 @@ async def handle_stream(self, stream: INetStream) -> None: elif message.type == Message.MessageType.GET_PROVIDERS: # Process GET_PROVIDERS key = message.key - logger.debug(f"Received GET_PROVIDERS request for key {key.hex()}") + logger.debug("Received GET_PROVIDERS request for key %s", key.hex()) # Consume the source signed_peer_record if sent if not maybe_consume_signed_record(message, self.host, peer_id): @@ -525,7 +563,9 @@ async def handle_stream(self, stream: INetStream) -> None: # Find providers for the key providers = self.provider_store.get_providers(key) logger.debug( - f"Found {len(providers)} providers for key {key.hex()}" + "Found %d providers for key %s", + len(providers), + key.hex(), ) # Create response @@ -564,6 +604,31 @@ async def handle_stream(self, stream: INetStream) -> None: closest_peers = self.routing_table.find_local_closest_peers( key, 20 ) + + # Fallback to connected peers if + # routing table has insufficient peers + MIN_PEERS_THRESHOLD = 5 # Configurable minimum + if len(closest_peers) < MIN_PEERS_THRESHOLD: + logger.debug( + "Routing table has insufficient peers" + " (%d < %d) for provider response, " + "using connected peers as fallback", + len(closest_peers), + MIN_PEERS_THRESHOLD, + ) + connected_peers = self.host.get_connected_peers() + if connected_peers: + # Sort by distance to target and use as response + fallback_peers = sort_peer_ids_by_distance( + key, connected_peers + )[:20] + closest_peers = fallback_peers + logger.debug( + "Using %d connected peers as " + "fallback for provider response", + len(closest_peers), + ) + logger.debug( f"No providers found, including {len(closest_peers)}" "closest peers" @@ -653,9 +718,35 @@ async def handle_stream(self, stream: INetStream) -> None: closest_peers = self.routing_table.find_local_closest_peers( key, 20 ) + + # Fallback to connected peers + # if routing table has insufficient peers + MIN_PEERS_THRESHOLD = 5 # Configurable minimum + if len(closest_peers) < MIN_PEERS_THRESHOLD: + logger.debug( + "Routing table has insufficient peers" + " (%d < %d) for GET_VALUE response, " + "using connected peers as fallback", + len(closest_peers), + MIN_PEERS_THRESHOLD, + ) + connected_peers = self.host.get_connected_peers() + if connected_peers: + # Sort connected peers by distance to target + # and use as response + fallback_peers = sort_peer_ids_by_distance( + key, connected_peers + )[:20] + closest_peers = fallback_peers + logger.debug( + "Using %d connected peers as " + "fallback for GET_VALUE response", + len(closest_peers), + ) + logger.debug( - "No value found," - f"including {len(closest_peers)} closest peers" + "No value found, including %d closest peers", + len(closest_peers), ) for peer in closest_peers: @@ -715,12 +806,16 @@ async def handle_stream(self, stream: INetStream) -> None: ) self.value_store.put(key, value) - logger.debug(f"Stored value {value.hex()} for key {key.hex()}") + logger.debug( + "Stored value %s for key %s", value.hex(), key.hex() + ) success = True except Exception as e: logger.warning( - f"Failed to store value {value.hex()} for key " - f"{key.hex()}: {e}" + "Failed to store value %s for key %s: %s", + value.hex(), + key.hex(), + str(e), ) finally: # Send acknowledgement @@ -740,25 +835,20 @@ async def handle_stream(self, stream: INetStream) -> None: logger.debug("Sent PUT_VALUE acknowledgement") except Exception as proto_err: - logger.warning(f"Failed to parse protobuf message: {proto_err}") + logger.warning("Failed to parse protobuf message: %s", str(proto_err)) await stream.close() except Exception as e: - logger.error(f"Error handling DHT stream: {e}") + logger.error("Error handling DHT stream: %s", str(e)) await stream.close() - async def refresh_routing_table(self) -> None: - """Refresh the routing table.""" - logger.debug("Refreshing routing table") - await self.peer_routing.refresh_routing_table() - # Peer routing methods async def find_peer(self, peer_id: ID) -> PeerInfo | None: """ Find a peer with the given ID. """ - logger.debug(f"Finding peer: {peer_id}") + logger.debug("Finding peer: %s", str(peer_id)) return await self.peer_routing.find_peer(peer_id) # Value storage and retrieval methods @@ -767,7 +857,7 @@ async def put_value(self, key: bytes, value: bytes) -> None: """ Store a value in the DHT. """ - logger.debug(f"Storing value for key {key.hex()}") + logger.debug("Storing value for key %s", key.hex()) if key.decode("utf-8").startswith("/"): if self.validator is not None: @@ -796,16 +886,35 @@ async def put_value(self, key: bytes, value: bytes) -> None: except UnicodeDecodeError: decoded_value = value.hex() logger.debug( - f"Stored value locally for key {key.hex()} with value {decoded_value}" + "Stored value locally for key %s with value %s", key.hex(), decoded_value ) # 2. Get closest peers, excluding self + routing_table_peers = self.routing_table.find_local_closest_peers(key) + + # Fallback to connected peers if routing table has insufficient peers + MIN_PEERS_THRESHOLD = 5 # Configurable minimum + if len(routing_table_peers) < MIN_PEERS_THRESHOLD: + logger.debug( + "Routing table has insufficient peers (%d < %d) for put_value, " + "using connected peers as fallback", + len(routing_table_peers), + MIN_PEERS_THRESHOLD, + ) + connected_peers = self.host.get_connected_peers() + if connected_peers: + # Sort connected peers by distance to target and use as fallback + fallback_peers = sort_peer_ids_by_distance(key, connected_peers) + routing_table_peers = fallback_peers + logger.debug( + "Using %d connected peers as fallback for put_value", + len(routing_table_peers), + ) + closest_peers = [ - peer - for peer in self.routing_table.find_local_closest_peers(key) - if peer != self.local_peer_id + peer for peer in routing_table_peers if peer != self.local_peer_id ] - logger.debug(f"Found {len(closest_peers)} peers to store value at") + logger.debug("Found %d peers to store value at", len(closest_peers)) # 3. Store at remote peers in batches of ALPHA, in parallel stored_count = 0 @@ -821,11 +930,13 @@ async def store_one(idx: int, peer: ID) -> None: ) batch_results[idx] = success if success: - logger.debug(f"Stored value at peer {peer}") + logger.debug("Stored value at peer %s", str(peer)) else: - logger.debug(f"Failed to store value at peer {peer}") + logger.debug("Failed to store value at peer %s", str(peer)) except Exception as e: - logger.debug(f"Error storing value at peer {peer}: {e}") + logger.debug( + "Error storing value at peer %s: %s", str(peer), str(e) + ) async with trio.open_nursery() as nursery: for idx, peer in enumerate(batch): @@ -833,10 +944,10 @@ async def store_one(idx: int, peer: ID) -> None: stored_count += sum(batch_results) - logger.info(f"Successfully stored value at {stored_count} peers") + logger.info("Successfully stored value at %d peers", stored_count) async def get_value(self, key: bytes) -> bytes | None: - logger.debug(f"Getting value for key: {key.hex()}") + logger.debug("Getting value for key: %s", key.hex()) # 1. Check local store first value_record = self.value_store.get(key) @@ -845,12 +956,31 @@ async def get_value(self, key: bytes) -> bytes | None: return value_record.value # 2. Get closest peers, excluding self + routing_table_peers = self.routing_table.find_local_closest_peers(key) + + # Fallback to connected peers if routing table has insufficient peers + MIN_PEERS_THRESHOLD = 5 # Configurable minimum + if len(routing_table_peers) < MIN_PEERS_THRESHOLD: + logger.debug( + "Routing table has insufficient peers (%d < %d) for get_value, " + "using connected peers as fallback", + len(routing_table_peers), + MIN_PEERS_THRESHOLD, + ) + connected_peers = self.host.get_connected_peers() + if connected_peers: + # Sort connected peers by distance to target and use as fallback + fallback_peers = sort_peer_ids_by_distance(key, connected_peers) + routing_table_peers = fallback_peers + logger.debug( + "Using %d connected peers as fallback for get_value", + len(routing_table_peers), + ) + closest_peers = [ - peer - for peer in self.routing_table.find_local_closest_peers(key) - if peer != self.local_peer_id + peer for peer in routing_table_peers if peer != self.local_peer_id ] - logger.debug(f"Searching {len(closest_peers)} peers for value") + logger.debug("Searching %d peers for value", len(closest_peers)) # 3. Query ALPHA peers at a time in parallel for i in range(0, len(closest_peers), ALPHA): @@ -864,9 +994,9 @@ async def query_one(peer: ID) -> None: value = await self.value_store._get_from_peer(peer, key) if value is not None and found_value is None: found_value = value - logger.debug(f"Found value at peer {peer}") + logger.debug("Found value at peer %s", str(peer)) except Exception as e: - logger.debug(f"Error querying peer {peer}: {e}") + logger.debug("Error querying peer %s: %s", str(peer), str(e)) async with trio.open_nursery() as nursery: for peer in batch: @@ -877,8 +1007,8 @@ async def query_one(peer: ID) -> None: logger.info("Successfully retrieved value from network") return found_value - # 4. Not found - logger.warning(f"Value not found for key {key.hex()}") + # 4. Not found + logger.warning("Value not found for key %s", key.hex()) return None # Add these methods in the Utility methods section diff --git a/libp2p/kad_dht/peer_routing.py b/libp2p/kad_dht/peer_routing.py index f5313cb60..546ac9937 100644 --- a/libp2p/kad_dht/peer_routing.py +++ b/libp2p/kad_dht/peer_routing.py @@ -88,14 +88,14 @@ async def find_peer(self, peer_id: ID) -> PeerInfo | None: # First check if the peer is in our routing table peer_info = self.routing_table.get_peer_info(peer_id) if peer_info: - logger.debug(f"Found peer {peer_id} in routing table") + logger.debug("Found peer %s in routing table", peer_id) return peer_info # Then check if the peer is in our peerstore try: addrs = self.host.get_peerstore().addrs(peer_id) if addrs: - logger.debug(f"Found peer {peer_id} in peerstore") + logger.debug("Found peer %s in peerstore", peer_id) return PeerInfo(peer_id, addrs) except Exception: pass @@ -103,7 +103,7 @@ async def find_peer(self, peer_id: ID) -> PeerInfo | None: # If not found locally, search the network try: closest_peers = await self.find_closest_peers_network(peer_id.to_bytes()) - logger.info(f"Closest peers found: {closest_peers}") + logger.info("Closest peers found: %s", closest_peers) # Check if we found the peer we're looking for for found_peer in closest_peers: @@ -116,10 +116,10 @@ async def find_peer(self, peer_id: ID) -> PeerInfo | None: pass except Exception as e: - logger.error(f"Error searching for peer {peer_id}: {e}") + logger.error("Error searching for peer %s: %s", peer_id, e) # Not found - logger.info(f"Peer {peer_id} not found") + logger.info("Peer %s not found", peer_id) return None async def _query_single_peer_for_closest( @@ -149,7 +149,7 @@ async def _query_single_peer_for_closest( len([p for p in result if p not in new_peers[: -len(result)]]), ) except Exception as e: - logger.debug(f"Query to peer {peer} failed: {e}") + logger.debug("Query to peer %s failed: %s", peer, e) async def find_closest_peers_network( self, target_key: bytes, count: int = 20 @@ -179,7 +179,7 @@ async def find_closest_peers_network( # Iterative lookup until convergence while rounds < MAX_PEER_LOOKUP_ROUNDS: rounds += 1 - logger.debug(f"Lookup round {rounds}/{MAX_PEER_LOOKUP_ROUNDS}") + logger.debug("Lookup round %d/%d", rounds, MAX_PEER_LOOKUP_ROUNDS) # Find peers we haven't queried yet peers_to_query = [p for p in closest_peers if p not in queried_peers] @@ -214,7 +214,7 @@ async def find_closest_peers_network( closest_peers = sort_peer_ids_by_distance(target_key, all_candidates)[ :count ] - logger.debug(f"Updated closest peers count: {len(closest_peers)}") + logger.debug("Updated closest peers count: %d", len(closest_peers)) # Check if we made any progress (found closer peers) if closest_peers == old_closest_peers: @@ -222,8 +222,9 @@ async def find_closest_peers_network( break logger.info( - f"Network lookup completed after {rounds} rounds, " - f"found {len(closest_peers)} peers" + "Network lookup completed after %d rounds, found %d peers", + rounds, + len(closest_peers), ) return closest_peers @@ -242,15 +243,15 @@ async def _query_peer_for_closest(self, peer: ID, target_key: bytes) -> list[ID] peer_info = PeerInfo(peer, addrs) await self.routing_table.add_peer(peer_info) except Exception as e: - logger.debug(f"Failed to add peer {peer} to routing table: {e}") + logger.debug("Failed to add peer %s to routing table: %s", peer, e) # Open a stream to the peer using the Kademlia protocol - logger.debug(f"Opening stream to {peer} for closest peers query") + logger.debug("Opening stream to %s for closest peers query", peer) try: stream = await self.host.new_stream(peer, [PROTOCOL_ID]) - logger.debug(f"Stream opened to {peer}") + logger.debug("Stream opened to %s", peer) except Exception as e: - logger.warning(f"Failed to open stream to {peer}: {e}") + logger.warning("Failed to open stream to %s: %s", peer, e) return [] # Create and send FIND_NODE request using protobuf @@ -265,7 +266,7 @@ async def _query_peer_for_closest(self, peer: ID, target_key: bytes) -> list[ID] # Serialize and send the protobuf message with varint length prefix proto_bytes = find_node_msg.SerializeToString() logger.debug( - f"Sending FIND_NODE: {proto_bytes.hex()} (len={len(proto_bytes)})" + "Sending FIND_NODE: %s (len=%d)", proto_bytes.hex(), len(proto_bytes) ) await stream.write(varint.encode(len(proto_bytes))) await stream.write(proto_bytes) @@ -283,17 +284,30 @@ async def _query_peer_for_closest(self, peer: ID, target_key: bytes) -> list[ID] if b[0] & 0x80 == 0: break response_length = varint.decode_bytes(length_bytes) + if response_length is None or response_length <= 0: + logger.warning("Invalid response length received from peer") + return [] # Read response data response_bytes = b"" - remaining = response_length + remaining = int(response_length) # Explicit int conversion after None check while remaining > 0: chunk = await stream.read(remaining) if not chunk: - logger.debug(f"Connection closed by peer {peer} while reading data") + logger.debug( + "Connection closed by peer %s while reading data", + peer, + ) return [] + chunk_len = len(chunk) + if chunk_len == 0: + break response_bytes += chunk - remaining -= len(chunk) + # Defensive programming: ensure we have valid integers + if not isinstance(remaining, int) or not isinstance(chunk_len, int): + logger.error("Invalid type in remaining calculation") + return [] + remaining -= chunk_len # Parse the protobuf response response_msg = Message() @@ -334,7 +348,7 @@ async def _query_peer_for_closest(self, peer: ID, target_key: bytes) -> list[ID] self.host.get_peerstore().add_addrs(new_peer_id, addrs, 3600) except Exception as e: - logger.debug(f"Error querying peer {peer} for closest: {e}") + logger.debug("Error querying peer %s for closest: %s", peer, e) finally: if stream: @@ -427,10 +441,10 @@ async def _handle_kad_stream(self, stream: INetStream) -> None: await stream.write(response_bytes) except Exception as parse_err: - logger.error(f"Failed to parse protocol buffer message: {parse_err}") + logger.error("Failed to parse protocol buffer message: %s", parse_err) except Exception as e: - logger.debug(f"Error handling Kademlia stream: {e}") + logger.debug("Error handling Kademlia stream: %s", e) finally: await stream.close() @@ -457,4 +471,4 @@ async def refresh_routing_table(self) -> None: peer_info = PeerInfo(peer_id, addrs) await self.routing_table.add_peer(peer_info) except Exception as e: - logger.debug(f"Failed to add discovered peer {peer_id}: {e}") + logger.debug("Failed to add discovered peer %s: %s", peer_id, e) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index f0e846418..ec6352537 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -108,6 +108,7 @@ def __init__( time_to_live: int = 60, gossip_window: int = 3, gossip_history: int = 5, + flood_publish: bool = False, heartbeat_initial_delay: float = 0.1, heartbeat_interval: int = 120, direct_connect_initial_delay: float = 0.1, @@ -138,6 +139,17 @@ def __init__( # Create message cache self.mcache = MessageCache(gossip_window, gossip_history) + # Whether to flood publish to all peers instead of following gossipsub + # mesh/fanout logic when acting as the original publisher. + # When enabled, this behaves as a hybrid between FloodSub and GossipSub: + # - When this node is the original publisher: Message is sent to ALL peers + # who are subscribed to the topic (flood publishing behavior) + # - When this node is forwarding a message: Regular GossipSub behavior is used + # This provides better reliability at publication time with a reasonable + # bandwidth cost since it only affects the original publisher. + # Default is False. + self.flood_publish = flood_publish + # Create heartbeat timer self.heartbeat_initial_delay = heartbeat_initial_delay self.heartbeat_interval = heartbeat_interval @@ -300,43 +312,54 @@ def _get_peers_to_send( if topic not in self.pubsub.peer_topics: continue - # direct peers - _direct_peers: set[ID] = {_peer for _peer in self.direct_peers} - send_to.update(_direct_peers) - - # floodsub peers - floodsub_peers: set[ID] = { - peer_id - for peer_id in self.pubsub.peer_topics[topic] - if peer_id in self.peer_protocol - and self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID - } - send_to.update(floodsub_peers) - - # gossipsub peers - gossipsub_peers: set[ID] = set() - if topic in self.mesh: - gossipsub_peers = self.mesh[topic] + # If flood_publish is enabled and we are the original publisher, + # send to all peers in the topic (flood publishing behavior) + if self.flood_publish and msg_forwarder == self.pubsub.my_id: + for peer in self.pubsub.peer_topics[topic]: + # TODO: add score threshold check when peer scoring is implemented + # if direct peer then skip score check + send_to.add(peer) else: - # When we publish to a topic that we have not subscribe to, we randomly - # pick `self.degree` number of peers who have subscribed to the topic - # and add them as our `fanout` peers. - topic_in_fanout: bool = topic in self.fanout - fanout_peers: set[ID] = self.fanout[topic] if topic_in_fanout else set() - fanout_size = len(fanout_peers) - if not topic_in_fanout or ( - topic_in_fanout and fanout_size < self.degree - ): - if topic in self.pubsub.peer_topics: - # Combine fanout peers with selected peers - fanout_peers.update( - self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree - fanout_size, fanout_peers + # Regular GossipSub routing logic + # direct peers + _direct_peers: set[ID] = {_peer for _peer in self.direct_peers} + send_to.update(_direct_peers) + + # floodsub peers + floodsub_peers: set[ID] = { + peer_id + for peer_id in self.pubsub.peer_topics[topic] + if peer_id in self.peer_protocol + and self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID + } + send_to.update(floodsub_peers) + + # gossipsub peers + gossipsub_peers: set[ID] = set() + if topic in self.mesh: + gossipsub_peers = self.mesh[topic] + else: + # When we publish to a topic that we have not subscribe to, + # we randomly pick `self.degree` number of peers who have + # subscribed to the topic and add them as our `fanout` peers. + topic_in_fanout: bool = topic in self.fanout + fanout_peers: set[ID] = ( + self.fanout[topic] if topic_in_fanout else set() + ) + fanout_size = len(fanout_peers) + if not topic_in_fanout or ( + topic_in_fanout and fanout_size < self.degree + ): + if topic in self.pubsub.peer_topics: + # Combine fanout peers with selected peers + fanout_peers.update( + self._get_in_topic_gossipsub_peers_from_minus( + topic, self.degree - fanout_size, fanout_peers + ) ) - ) - self.fanout[topic] = fanout_peers - gossipsub_peers = fanout_peers - send_to.update(gossipsub_peers) + self.fanout[topic] = fanout_peers + gossipsub_peers = fanout_peers + send_to.update(gossipsub_peers) # Excludes `msg_forwarder` and `origin` yield from send_to.difference([msg_forwarder, origin]) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 2c605fc3a..7558f3a0a 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -12,10 +12,19 @@ import logging import time from typing import ( + TYPE_CHECKING, NamedTuple, cast, ) +if TYPE_CHECKING: + from typing import TYPE_CHECKING + + if TYPE_CHECKING: + from typing import TypeAlias + + Pubsub: TypeAlias = "Pubsub" + import base58 import trio @@ -162,12 +171,10 @@ def __init__( gossipsub, etc. """ self.host = host - self.router = router - - self._msg_id_constructor = msg_id_constructor - # Attach this new Pubsub object to the router - self.router.attach(self) + from typing import cast + + self.router.attach(cast("Pubsub", self)) # type: ignore peer_send, peer_receive = trio.open_memory_channel[ID](0) dead_peer_send, dead_peer_receive = trio.open_memory_channel[ID](0) @@ -219,6 +226,9 @@ def __init__( # Set of blacklisted peer IDs self.blacklisted_peers = set() + # Store the message ID constructor function + self._msg_id_constructor = msg_id_constructor + self.event_handle_peer_queue_started = trio.Event() self.event_handle_dead_peer_queue_started = trio.Event() diff --git a/libp2p/security/noise/messages.py b/libp2p/security/noise/messages.py index f7e2dceb9..d2c5b3093 100644 --- a/libp2p/security/noise/messages.py +++ b/libp2p/security/noise/messages.py @@ -51,8 +51,14 @@ def make_handshake_payload_sig( id_privkey: PrivateKey, noise_static_pubkey: PublicKey ) -> bytes: data = make_data_to_be_signed(noise_static_pubkey) - logger.debug(f"make_handshake_payload_sig: signing data length: {len(data)}") - logger.debug(f"make_handshake_payload_sig: signing data hex: {data.hex()}") + logger.debug( + "make_handshake_payload_sig: signing data length: %d", + len(data), + ) + logger.debug( + "make_handshake_payload_sig: signing data hex: %s", + data.hex(), + ) return id_privkey.sign(data) @@ -66,26 +72,32 @@ def verify_handshake_payload_sig( """ expected_data = make_data_to_be_signed(noise_static_pubkey) logger.debug( - f"verify_handshake_payload_sig: payload.id_pubkey type: " - f"{type(payload.id_pubkey)}" + "verify_handshake_payload_sig: payload.id_pubkey type: %s", + type(payload.id_pubkey), ) logger.debug( - f"verify_handshake_payload_sig: noise_static_pubkey type: " - f"{type(noise_static_pubkey)}" + "verify_handshake_payload_sig: noise_static_pubkey type: %s", + type(noise_static_pubkey), ) logger.debug( - f"verify_handshake_payload_sig: expected_data length: {len(expected_data)}" + "verify_handshake_payload_sig: expected_data length: %d", + len(expected_data), ) logger.debug( - f"verify_handshake_payload_sig: expected_data hex: {expected_data.hex()}" + "verify_handshake_payload_sig: expected_data hex: %s", + expected_data.hex(), ) logger.debug( - f"verify_handshake_payload_sig: payload.id_sig length: {len(payload.id_sig)}" + "verify_handshake_payload_sig: payload.id_sig length: %d", + len(payload.id_sig), ) try: result = payload.id_pubkey.verify(expected_data, payload.id_sig) logger.debug(f"verify_handshake_payload_sig: verification result: {result}") return result except Exception as e: - logger.error(f"verify_handshake_payload_sig: verification exception: {e}") + logger.error( + "verify_handshake_payload_sig: verification exception: %s", + e, + ) return False diff --git a/libp2p/stream_muxer/yamux/yamux.py b/libp2p/stream_muxer/yamux/yamux.py index bb84a5db6..52d123692 100644 --- a/libp2p/stream_muxer/yamux/yamux.py +++ b/libp2p/stream_muxer/yamux/yamux.py @@ -105,15 +105,20 @@ async def write(self, data: bytes) -> None: # Flow control: Check if we have enough send window total_len = len(data) sent = 0 - logger.debug(f"Stream {self.stream_id}: Starts writing {total_len} bytes ") + logger.debug( + "Stream %d: Starts writing %d bytes", + self.stream_id, + total_len, + ) + while sent < total_len: # Wait for available window with timeout timeout = False async with self.window_lock: if self.send_window == 0: logger.debug( - f"Stream {self.stream_id}: " - "Window is zero, waiting for update" + "Stream %d: Window is zero, waiting for update", + self.stream_id, ) # Release lock and wait with timeout self.window_lock.release() @@ -194,7 +199,8 @@ async def read(self, n: int | None = -1) -> bytes: # If the stream is closed for receiving and the buffer is empty, raise EOF if self.recv_closed and not self.conn.stream_buffers.get(self.stream_id): logger.debug( - f"Stream {self.stream_id}: Stream closed for receiving and buffer empty" + "Stream %d: Stream closed for receiving and buffer empty", + self.stream_id, ) raise MuxedStreamEOF("Stream is closed for receiving") @@ -206,7 +212,7 @@ async def read(self, n: int | None = -1) -> bytes: # If buffer is not available, check if stream is closed if buffer is None: - logger.debug(f"Stream {self.stream_id}: No buffer available") + logger.debug("Stream %d: No buffer available", self.stream_id) raise MuxedStreamEOF("Stream buffer closed") # If we have data in buffer, process it @@ -218,34 +224,36 @@ async def read(self, n: int | None = -1) -> bytes: # Send window update for the chunk we just read async with self.window_lock: self.recv_window += len(chunk) - logger.debug(f"Stream {self.stream_id}: Update {len(chunk)}") + logger.debug("Stream %d: Update %d", self.stream_id, len(chunk)) await self.send_window_update(len(chunk), skip_lock=True) # If stream is closed (FIN received) and buffer is empty, break if self.recv_closed and len(buffer) == 0: - logger.debug(f"Stream {self.stream_id}: Closed with empty buffer") + logger.debug("Stream %d: Closed with empty buffer", self.stream_id) break # If stream was reset, raise reset error if self.reset_received: - logger.debug(f"Stream {self.stream_id}: Stream was reset") + logger.debug("Stream %d: Stream was reset", self.stream_id) raise MuxedStreamReset("Stream was reset") # Wait for more data or stream closure - logger.debug(f"Stream {self.stream_id}: Waiting for data or FIN") + logger.debug("Stream %d: Waiting for data or FIN", self.stream_id) await self.conn.stream_events[self.stream_id].wait() self.conn.stream_events[self.stream_id] = trio.Event() - # After loop exit, first check if we have data to return - if data: - logger.debug( - f"Stream {self.stream_id}: Returning {len(data)} bytes after loop" - ) + # After loop exit, first check if we have data to return + if data: + logger.debug( + "Stream %d: Returning %d bytes after loop", + self.stream_id, + len(data), + ) return data # No data accumulated, now check why we exited the loop if self.conn.event_shutting_down.is_set(): - logger.debug(f"Stream {self.stream_id}: Connection shutting down") + logger.debug("Stream %d: Connection shutting down", self.stream_id) raise MuxedStreamEOF("Connection shut down") # Return empty data @@ -255,8 +263,9 @@ async def read(self, n: int | None = -1) -> bytes: async with self.window_lock: self.recv_window += len(data) logger.debug( - f"Stream {self.stream_id}: Sending window update after read, " - f"increment={len(data)}" + "Stream %d: Sending window update after read, increment=%d", + self.stream_id, + len(data), ) await self.send_window_update(len(data), skip_lock=True) return data @@ -264,7 +273,7 @@ async def read(self, n: int | None = -1) -> bytes: async def close(self) -> None: async with self.close_lock: if not self.send_closed: - logger.debug(f"Half-closing stream {self.stream_id} (local end)") + logger.debug("Half-closing stream %d (local end)", self.stream_id) header = struct.pack( YAMUX_HEADER_FORMAT, 0, TYPE_DATA, FLAG_FIN, self.stream_id, 0 ) @@ -359,7 +368,7 @@ def __init__( self._nursery: Nursery | None = None async def start(self) -> None: - logger.debug(f"Starting Yamux for {self.peer_id}") + logger.debug("Starting Yamux for %s", self.peer_id) if self.event_started.is_set(): return async with trio.open_nursery() as nursery: @@ -372,7 +381,7 @@ def is_initiator(self) -> bool: return self.is_initiator_value async def close(self, error_code: int = GO_AWAY_NORMAL) -> None: - logger.debug(f"Closing Yamux connection with code {error_code}") + logger.debug("Closing Yamux connection with code %s", error_code) async with self.streams_lock: if not self.event_shutting_down.is_set(): try: @@ -381,7 +390,7 @@ async def close(self, error_code: int = GO_AWAY_NORMAL) -> None: ) await self.secured_conn.write(header) except Exception as e: - logger.debug(f"Failed to send GO_AWAY: {e}") + logger.debug("Failed to send GO_AWAY: %s", e) self.event_shutting_down.set() for stream in self.streams.values(): stream.closed = True @@ -392,12 +401,12 @@ async def close(self, error_code: int = GO_AWAY_NORMAL) -> None: self.stream_events.clear() try: await self.secured_conn.close() - logger.debug(f"Successfully closed secured_conn for peer {self.peer_id}") + logger.debug("Successfully closed secured_conn for peer %s", self.peer_id) except Exception as e: - logger.debug(f"Error closing secured_conn for peer {self.peer_id}: {e}") + logger.debug("Error closing secured_conn for peer %s: %s", self.peer_id, e) self.event_closed.set() if self.on_close: - logger.debug(f"Calling on_close in Yamux.close for peer {self.peer_id}") + logger.debug("Calling on_close in Yamux.close for peer %s", self.peer_id) if inspect.iscoroutinefunction(self.on_close): if self.on_close is not None: await self.on_close() @@ -426,7 +435,7 @@ async def open_stream(self) -> YamuxStream: header = struct.pack( YAMUX_HEADER_FORMAT, 0, TYPE_DATA, FLAG_SYN, stream_id, 0 ) - logger.debug(f"Sending SYN header for stream {stream_id}") + logger.debug("Sending SYN header for stream %s", stream_id) await self.secured_conn.write(header) return stream except Exception as e: @@ -437,39 +446,45 @@ async def accept_stream(self) -> IMuxedStream: logger.debug("Waiting for new stream") try: stream = await self.new_stream_receive_channel.receive() - logger.debug(f"Received stream {stream.stream_id}") + logger.debug("Received stream %s", stream.stream_id) return stream except trio.EndOfChannel: raise MuxedStreamError("No new streams available") async def read_stream(self, stream_id: int, n: int = -1) -> bytes: - logger.debug(f"Reading from stream {self.peer_id}:{stream_id}, n={n}") + logger.debug("Reading from stream %s:%s, n=%s", self.peer_id, stream_id, n) if n is None: n = -1 while True: async with self.streams_lock: if stream_id not in self.streams: - logger.debug(f"Stream {self.peer_id}:{stream_id} unknown") + logger.debug("Stream %s:%s unknown", self.peer_id, stream_id) raise MuxedStreamEOF("Stream closed") if self.event_shutting_down.is_set(): logger.debug( - f"Stream {self.peer_id}:{stream_id}: connection shutting down" + "Stream %s:%s: connection shutting down", + self.peer_id, + stream_id, ) raise MuxedStreamEOF("Connection shut down") stream = self.streams[stream_id] buffer = self.stream_buffers.get(stream_id) logger.debug( - f"Stream {self.peer_id}:{stream_id}: " - f"closed={stream.closed}, " - f"recv_closed={stream.recv_closed}, " - f"reset_received={stream.reset_received}, " - f"buffer_len={len(buffer) if buffer else 0}" + "Stream %s:%s: closed=%s, recv_closed=%s," + " reset_received=%s, buffer_len=%s", + self.peer_id, + stream_id, + stream.closed, + stream.recv_closed, + stream.reset_received, + len(buffer) if buffer else 0, ) if buffer is None: logger.debug( - f"Stream {self.peer_id}:{stream_id}:" - f"Buffer gone, assuming closed" + "Stream %s:%s: Buffer gone, assuming closed", + self.peer_id, + stream_id, ) raise MuxedStreamEOF("Stream buffer closed") # If FIN received and buffer has data, return it @@ -481,16 +496,19 @@ async def read_stream(self, stream_id: int, n: int = -1) -> bytes: data = bytes(buffer[:n]) del buffer[:n] logger.debug( - f"Returning {len(data)} bytes" - f"from stream {self.peer_id}:{stream_id}, " - f"buffer_len={len(buffer)}" + "Returning %d bytes from stream %s:%s, buffer_len=%d", + len(data), + self.peer_id, + stream_id, + len(buffer), ) return data # If reset received and buffer is empty, raise reset if stream.reset_received: logger.debug( - f"Stream {self.peer_id}:{stream_id}:" - f"reset_received=True, raising MuxedStreamReset" + "Stream %s:%s: reset_received=True, raising MuxedStreamReset", + self.peer_id, + stream_id, ) raise MuxedStreamReset("Stream was reset") # Check if we can return data (no FIN or reset) @@ -502,28 +520,32 @@ async def read_stream(self, stream_id: int, n: int = -1) -> bytes: data = bytes(buffer[:n]) del buffer[:n] logger.debug( - f"Returning {len(data)} bytes" - f"from stream {self.peer_id}:{stream_id}, " - f"buffer_len={len(buffer)}" + "Returning %d bytes from stream %s:%s, buffer_len=%d", + len(data), + self.peer_id, + stream_id, + len(buffer), ) return data # Check if stream is closed if stream.closed: logger.debug( - f"Stream {self.peer_id}:{stream_id}:" - f"closed=True, raising MuxedStreamReset" + "Stream %s:%s: closed=True, raising MuxedStreamReset", + self.peer_id, + stream_id, ) raise MuxedStreamReset("Stream is reset or closed") # Check if recv_closed and buffer empty if stream.recv_closed: logger.debug( - f"Stream {self.peer_id}:{stream_id}:" - f"recv_closed=True, buffer empty, raising EOF" + "Stream %s:%s: recv_closed=True, buffer empty, raising EOF", + self.peer_id, + stream_id, ) raise MuxedStreamEOF("Stream is closed for receiving") # Wait for data if stream is still open - logger.debug(f"Waiting for data on stream {self.peer_id}:{stream_id}") + logger.debug("Waiting for data on stream %s:%s", self.peer_id, stream_id) try: await self.stream_events[stream_id].wait() self.stream_events[stream_id] = trio.Event() diff --git a/libp2p/tools/constants.py b/libp2p/tools/constants.py index 5a62d1694..2349e16aa 100644 --- a/libp2p/tools/constants.py +++ b/libp2p/tools/constants.py @@ -72,6 +72,7 @@ class GossipsubParams(NamedTuple): px_peers_count: int = 16 prune_back_off: int = 60 unsubscribe_back_off: int = 10 + flood_publish: bool = False GOSSIPSUB_PARAMS = GossipsubParams() diff --git a/libp2p/transport/quic/connection.py b/libp2p/transport/quic/connection.py index fb4cff4a7..6aa5f8671 100644 --- a/libp2p/transport/quic/connection.py +++ b/libp2p/transport/quic/connection.py @@ -177,9 +177,11 @@ def __init__( } logger.debug( - f"Created QUIC connection to {remote_peer_id} " - f"(initiator: {is_initiator}, addr: {remote_addr}, " - "security: {security_manager is not None})" + "Created QUIC connection to %s (initiator: %s, addr: %s, security: %s)", + remote_peer_id, + is_initiator, + remote_addr, + security_manager is not None, ) def _calculate_initial_stream_id(self) -> int: @@ -240,9 +242,11 @@ def get_connection_id_stats(self) -> dict[str, Any]: """Get connection ID statistics and current state.""" return { "available_connection_ids": len(self._available_connection_ids), - "current_connection_id": self._current_connection_id.hex() - if self._current_connection_id - else None, + "current_connection_id": ( + self._current_connection_id.hex() + if self._current_connection_id + else None + ), "retired_connection_ids": len(self._retired_connection_ids), "connection_ids_issued": self._stats["connection_ids_issued"], "connection_ids_retired": self._stats["connection_ids_retired"], @@ -287,7 +291,7 @@ async def start(self) -> None: self._started = True self.event_started.set() - logger.debug(f"Starting QUIC connection to {self._remote_peer_id}") + logger.debug("Starting QUIC connection to %s", self._remote_peer_id) try: # If this is a client connection, we need to establish the connection @@ -298,10 +302,10 @@ async def start(self) -> None: self._established = True self._connected_event.set() - logger.debug(f"QUIC connection to {self._remote_peer_id} started") + logger.debug("QUIC connection to %s started", self._remote_peer_id) except Exception as e: - logger.error(f"Failed to start connection: {e}") + logger.error("Failed to start connection: %s", e) raise QUICConnectionError(f"Connection start failed: {e}") from e async def _initiate_connection(self) -> None: @@ -321,10 +325,10 @@ async def _initiate_connection(self) -> None: # Send initial packet(s) await self._transmit() - logger.debug(f"Initiated QUIC connection to {self._remote_addr}") + logger.debug("Initiated QUIC connection to %s", self._remote_addr) except Exception as e: - logger.error(f"Failed to initiate connection: {e}") + logger.error("Failed to initiate connection: %s", e) raise QUICConnectionError(f"Connection initiation failed: {e}") from e async def connect(self, nursery: trio.Nursery) -> None: @@ -375,9 +379,11 @@ async def connect(self, nursery: trio.Nursery) -> None: if peer_id: self.peer_id = peer_id - logger.debug(f"QUICConnection {id(self)}: Peer identity verified") + logger.debug("QUICConnection %s: Peer identity verified", id(self)) self._established = True - logger.debug(f"QUIC connection established with {self._remote_peer_id}") + logger.debug( + "QUIC connection established with %s", self._remote_peer_id + ) except Exception as e: logger.error(f"Failed to establish connection: {e}") @@ -469,7 +475,7 @@ async def _client_packet_receiver(self) -> None: try: # Receive UDP packets data, addr = await self._socket.recvfrom(65536) - logger.debug(f"Client received {len(data)} bytes from {addr}") + logger.debug("Client received %d bytes from %s", len(data), addr) # Feed packet to QUIC connection self._quic.receive_datagram(data, addr, now=time.time()) @@ -881,7 +887,7 @@ async def _process_event_batch(self) -> None: for event in event_list: await self._handle_quic_event(event) - logger.debug(f"Processed batch of {len(self._event_batch)} events") + logger.debug("Processed batch of %d events", len(self._event_batch)) async def _handle_stream_data_batch( self, events_list: list[events.StreamDataReceived] @@ -953,7 +959,7 @@ async def _create_inbound_stream(self, stream_id: int) -> QUICStream: self._stream_accept_queue.append(stream) self._stream_accept_event.set() - logger.debug(f"Created inbound stream {stream_id}") + logger.debug("Created inbound stream %d", stream_id) return stream async def _process_quic_events(self) -> None: @@ -963,8 +969,8 @@ async def _process_quic_events(self) -> None: async def _handle_quic_event(self, event: events.QuicEvent) -> None: """Handle a single QUIC event with COMPLETE event type coverage.""" - logger.debug(f"Handling QUIC event: {type(event).__name__}") - logger.debug(f"QUIC event: {type(event).__name__}") + logger.debug("Handling QUIC event: %s", type(event).__name__) + logger.debug("QUIC event: %s", type(event).__name__) try: if isinstance(event, events.ConnectionTerminated): @@ -1004,8 +1010,8 @@ async def _handle_connection_id_issued( This is the CRITICAL missing functionality that was causing your issue! """ - logger.debug(f"🆔 NEW CONNECTION ID ISSUED: {event.connection_id.hex()}") - logger.debug(f"🆔 NEW CONNECTION ID ISSUED: {event.connection_id.hex()}") + logger.debug("🆔 NEW CONNECTION ID ISSUED: %s", event.connection_id.hex()) + logger.debug("🆔 NEW CONNECTION ID ISSUED: %s", event.connection_id.hex()) # Add to available connection IDs self._available_connection_ids.add(event.connection_id) @@ -1023,8 +1029,12 @@ async def _handle_connection_id_issued( # Update statistics self._stats["connection_ids_issued"] += 1 - logger.debug(f"Available connection IDs: {len(self._available_connection_ids)}") - logger.debug(f"Available connection IDs: {len(self._available_connection_ids)}") + logger.debug( + "Available connection IDs: %d", len(self._available_connection_ids) + ) + logger.debug( + "Available connection IDs: %d", len(self._available_connection_ids) + ) async def _handle_connection_id_retired( self, event: events.ConnectionIdRetired @@ -1034,7 +1044,7 @@ async def _handle_connection_id_retired( This handles when the peer tells us to stop using a connection ID. """ - logger.debug(f"🗑️ CONNECTION ID RETIRED: {event.connection_id.hex()}") + logger.debug("🗑️ CONNECTION ID RETIRED: %s", event.connection_id.hex()) # Remove from available IDs and add to retired set self._available_connection_ids.discard(event.connection_id) @@ -1061,13 +1071,13 @@ async def _handle_connection_id_retired( async def _handle_ping_acknowledged(self, event: events.PingAcknowledged) -> None: """Handle ping acknowledgment.""" - logger.debug(f"Ping acknowledged: uid={event.uid}") + logger.debug("Ping acknowledged: uid=%s", event.uid) async def _handle_protocol_negotiated( self, event: events.ProtocolNegotiated ) -> None: """Handle protocol negotiation completion.""" - logger.debug(f"Protocol negotiated: {event.alpn_protocol}") + logger.debug("Protocol negotiated: %s", event.alpn_protocol) async def _handle_stop_sending_received( self, event: events.StopSendingReceived @@ -1104,7 +1114,7 @@ async def _handle_connection_terminated( self, event: events.ConnectionTerminated ) -> None: """Handle connection termination.""" - logger.debug(f"QUIC connection terminated: {event.reason_phrase}") + logger.debug("QUIC connection terminated: %s", event.reason_phrase) # Close all streams for stream in list(self._streams.values()): @@ -1119,7 +1129,7 @@ async def _handle_connection_terminated( self._closed_event.set() self._stream_accept_event.set() - logger.debug(f"Woke up pending accept_stream() calls, {id(self)}") + logger.debug("Woke up pending accept_stream() calls, %s", id(self)) await self._notify_parent_of_termination() @@ -1134,7 +1144,7 @@ async def _handle_stream_data(self, event: events.StreamDataReceived) -> None: if not stream: if self._is_incoming_stream(stream_id): - logger.debug(f"Creating new incoming stream {stream_id}") + logger.debug("Creating new incoming stream %d", stream_id) stream = await self._create_inbound_stream(stream_id) else: logger.error( @@ -1146,7 +1156,7 @@ async def _handle_stream_data(self, event: events.StreamDataReceived) -> None: except Exception as e: logger.error(f"Error handling stream data for stream {stream_id}: {e}") - logger.debug(f"❌ STREAM_DATA: Error: {e}") + logger.debug("❌ STREAM_DATA: Error: %s", e) async def _get_or_create_stream(self, stream_id: int) -> QUICStream: """Get existing stream or create new inbound stream.""" @@ -1201,13 +1211,13 @@ async def _handle_stream_reset(self, event: events.StreamReset) -> None: # Force remove the stream self._remove_stream(stream_id) else: - logger.debug(f"Received reset for unknown stream {stream_id}") + logger.debug("Received reset for unknown stream %d", stream_id) async def _handle_datagram_received( self, event: events.DatagramFrameReceived ) -> None: """Handle datagram frame (if using QUIC datagrams).""" - logger.debug(f"Datagram frame received: size={len(event.data)}") + logger.debug("Datagram frame received: size=%d", len(event.data)) # For now, just log. Could be extended for custom datagram handling async def _handle_timer_events(self) -> None: @@ -1278,7 +1288,7 @@ async def close(self) -> None: return self._closed = True - logger.debug(f"Closing QUIC connection to {self._remote_peer_id}") + logger.debug("Closing QUIC connection to %s", self._remote_peer_id) try: # Close all streams gracefully @@ -1321,7 +1331,7 @@ async def close(self) -> None: self._stream_cache.clear() # Clear cache self._closed_event.set() - logger.debug(f"QUIC connection to {self._remote_peer_id} closed") + logger.debug("QUIC connection to %s closed", self._remote_peer_id) except Exception as e: logger.error(f"Error during connection close: {e}") @@ -1369,7 +1379,7 @@ async def _cleanup_by_connection_id(self, connection_id: bytes) -> None: for tracked_cid, tracked_conn in list(listener._connections.items()): if tracked_conn is self: await listener._remove_connection(tracked_cid) - logger.debug(f"Removed connection {tracked_cid.hex()}") + logger.debug("Removed connection %s", tracked_cid.hex()) return logger.debug("Fallback cleanup by connection ID completed") diff --git a/libp2p/transport/quic/listener.py b/libp2p/transport/quic/listener.py index 42c8c6625..66d931fde 100644 --- a/libp2p/transport/quic/listener.py +++ b/libp2p/transport/quic/listener.py @@ -432,13 +432,13 @@ async def _handle_new_connection( if ext.oid == LIBP2P_TLS_EXTENSION_OID: has_libp2p_ext = True break - logger.debug(f"Certificate has libp2p extension: {has_libp2p_ext}") + logger.debug("Certificate has libp2p extension: %s", has_libp2p_ext) if not has_libp2p_ext: logger.error("Certificate missing libp2p extension!") logger.debug( - f"Original destination CID: {packet_info.destination_cid.hex()}" + "Original destination CID: %s", packet_info.destination_cid.hex() ) quic_conn = QuicConnection( @@ -450,18 +450,19 @@ async def _handle_new_connection( # Use the first host CID as our routing CID if quic_conn._host_cids: destination_cid = quic_conn._host_cids[0].cid - logger.debug(f"Using host CID as routing CID: {destination_cid.hex()}") + logger.debug("Using host CID as routing CID: %s", destination_cid.hex()) else: # Fallback to random if no host CIDs generated import secrets destination_cid = secrets.token_bytes(8) - logger.debug(f"Fallback to random CID: {destination_cid.hex()}") + logger.debug("Fallback to random CID: %s", destination_cid.hex()) - logger.debug(f"Generated {len(quic_conn._host_cids)} host CIDs for client") + logger.debug("Generated %d host CIDs for client", len(quic_conn._host_cids)) logger.debug( - f"QUIC connection created for destination CID {destination_cid.hex()}" + "QUIC connection created for destination CID %s", + destination_cid.hex(), ) # Store connection mapping using our generated CID @@ -479,21 +480,26 @@ async def _handle_new_connection( "request_client_certificate set to True in server TLS" ) except Exception as e: - logger.error(f"FAILED to apply request_client_certificate: {e}") + logger.error( + "FAILED to apply request_client_certificate: %s", e + ) # Process events and send response await self._process_quic_events(quic_conn, addr, destination_cid) await self._transmit_for_connection(quic_conn, addr) logger.debug( - f"Started handshake for new connection from {addr} " - f"(version: 0x{packet_info.version:08x}, cid: {destination_cid.hex()})" + "Started handshake for new connection from %s " + "(version: 0x%08x, cid: %s)", + addr, + packet_info.version, + destination_cid.hex(), ) return quic_conn except Exception as e: - logger.error(f"Error handling new connection from {addr}: {e}") + logger.error("Error handling new connection from %s: %s", addr, e) self._stats["connections_rejected"] += 1 return None @@ -1013,7 +1019,7 @@ async def _handle_new_established_connection( logger.error(f"Error adding QUIC connection to swarm: {e}") await connection.close() - def get_addrs(self) -> tuple[Multiaddr]: + def get_addrs(self) -> tuple[Multiaddr, ...]: return tuple(self.get_addresses()) def is_listening(self) -> bool: diff --git a/libp2p/transport/quic/security.py b/libp2p/transport/quic/security.py index 43ebfa37f..980645856 100644 --- a/libp2p/transport/quic/security.py +++ b/libp2p/transport/quic/security.py @@ -182,8 +182,8 @@ def parse_signed_key_extension( Handles both ASN.1 DER format (from go-libp2p) and simple binary format. """ try: - logger.debug(f"🔍 Extension type: {type(extension)}") - logger.debug(f"🔍 Extension.value type: {type(extension.value)}") + logger.debug("🔍 Extension type: %s", type(extension)) + logger.debug("🔍 Extension.value type: %s", type(extension.value)) # Extract the raw bytes from the extension if isinstance(extension.value, UnrecognizedExtension): @@ -195,8 +195,11 @@ def parse_signed_key_extension( raw_bytes = extension.value logger.debug("🔍 Extension.value is already bytes") - logger.debug(f"🔍 Total extension length: {len(raw_bytes)} bytes") - logger.debug(f"🔍 Extension hex (first 50 bytes): {raw_bytes[:50].hex()}") + logger.debug("🔍 Total extension length: %d bytes", len(raw_bytes)) + logger.debug( + "🔍 Extension hex (first 50 bytes): %s", + raw_bytes[:50].hex(), + ) if not isinstance(raw_bytes, bytes): raise QUICCertificateError(f"Expected bytes, got {type(raw_bytes)}") @@ -740,8 +743,8 @@ def generate_certificate( .sign(cert_private_key, hashes.SHA256()) ) - logger.info(f"Generated libp2p TLS certificate for peer {peer_id}") - logger.debug(f"Certificate valid from {not_before} to {not_after}") + logger.info("Generated libp2p TLS certificate for peer %s", peer_id) + logger.debug("Certificate valid from %s to %s", not_before, not_after) return TLSConfig( certificate=certificate, private_key=cert_private_key, peer_id=peer_id @@ -799,11 +802,11 @@ def verify_peer_certificate( raise QUICPeerVerificationError("Certificate missing libp2p extension") assert libp2p_extension.value is not None - logger.debug(f"Extension type: {type(libp2p_extension)}") - logger.debug(f"Extension value type: {type(libp2p_extension.value)}") + logger.debug("Extension type: %s", type(libp2p_extension)) + logger.debug("Extension value type: %s", type(libp2p_extension.value)) if hasattr(libp2p_extension.value, "__len__"): - logger.debug(f"Extension value length: {len(libp2p_extension.value)}") - logger.debug(f"Extension value: {libp2p_extension.value}") + logger.debug("Extension value length: %d", len(libp2p_extension.value)) + logger.debug("Extension value: %s", libp2p_extension.value) # Parse the extension to get public key and signature public_key, signature = self.extension_handler.parse_signed_key_extension( libp2p_extension @@ -830,15 +833,16 @@ def verify_peer_certificate( # Verify against expected peer ID if provided if expected_peer_id and derived_peer_id != expected_peer_id: - logger.debug(f"Expected Peer id: {expected_peer_id}") - logger.debug(f"Derived Peer ID: {derived_peer_id}") + logger.debug("Expected Peer id: %s", expected_peer_id) + logger.debug("Derived Peer ID: %s", derived_peer_id) raise QUICPeerVerificationError( f"Peer ID mismatch: expected {expected_peer_id}, " f"got {derived_peer_id}" ) logger.debug( - f"Successfully verified peer certificate for {derived_peer_id}" + "Successfully verified peer certificate for %s", + derived_peer_id, ) return derived_peer_id diff --git a/libp2p/transport/transport_registry.py b/libp2p/transport/transport_registry.py index 2f6a4c8bc..c6da78ee4 100644 --- a/libp2p/transport/transport_registry.py +++ b/libp2p/transport/transport_registry.py @@ -115,7 +115,9 @@ def register_transport( """ self._transports[protocol] = transport_class logger.debug( - f"Registered transport {transport_class.__name__} for protocol {protocol}" + "Registered transport %s for protocol %s", + transport_class.__name__, + protocol, ) def get_transport(self, protocol: str) -> type[ITransport] | None: @@ -151,7 +153,8 @@ def create_transport( # WebSocket transport requires upgrader if upgrader is None: logger.warning( - f"WebSocket transport '{protocol}' requires upgrader" + "WebSocket transport '%s' requires upgrader", + protocol, ) return None # Use explicit WebsocketTransport to avoid type issues @@ -166,7 +169,10 @@ def create_transport( # QUIC transport requires private_key private_key = kwargs.get("private_key") if private_key is None: - logger.warning(f"QUIC transport '{protocol}' requires private_key") + logger.warning( + "QUIC transport '%s' requires private_key", + protocol, + ) return None # Use explicit QUICTransport to avoid type issues QUICTransport = _get_quic_transport() @@ -176,7 +182,7 @@ def create_transport( # TCP transport doesn't require upgrader return transport_class() except Exception as e: - logger.error(f"Failed to create transport for protocol {protocol}: {e}") + logger.error("Failed to create transport for protocol %s: %s", protocol, e) return None diff --git a/libp2p/transport/websocket/connection.py b/libp2p/transport/websocket/connection.py index f5be8812e..4315ac12b 100644 --- a/libp2p/transport/websocket/connection.py +++ b/libp2p/transport/websocket/connection.py @@ -46,13 +46,13 @@ async def write(self, data: bytes) -> None: async with self._write_lock: try: - logger.debug(f"WebSocket writing {len(data)} bytes") + logger.debug("WebSocket writing %d bytes", len(data)) # Check buffer amount for flow control if hasattr(self._ws_connection, "bufferedAmount"): buffered = self._ws_connection.bufferedAmount if buffered > self._max_buffered_amount: - logger.warning(f"WebSocket buffer full: {buffered} bytes") + logger.warning("WebSocket buffer full: %d bytes", buffered) # In production, you might want to # wait or implement backpressure # For now, we'll continue but log the warning @@ -60,10 +60,10 @@ async def write(self, data: bytes) -> None: # Send as a binary WebSocket message await self._ws_connection.send_message(data) self._bytes_written += len(data) - logger.debug(f"WebSocket wrote {len(data)} bytes successfully") + logger.debug("WebSocket wrote %d bytes successfully", len(data)) except Exception as e: - logger.error(f"WebSocket write failed: {e}") + logger.error("WebSocket write failed: %s", str(e)) self._closed = True raise IOException from e @@ -137,7 +137,7 @@ async def read(self, n: int | None = None) -> bytes: return result except Exception as e: - logger.error(f"WebSocket read failed: {e}") + logger.error("WebSocket read failed: %s", str(e)) raise IOException from e async def close(self) -> None: @@ -157,7 +157,7 @@ async def close(self) -> None: if self._ws_context is not None: await self._ws_context.__aexit__(None, None, None) except Exception as e: - logger.error(f"WebSocket close error: {e}") + logger.error("WebSocket close error: %s", str(e)) # Don't raise here, as close() should be idempotent finally: logger.debug("WebSocket connection closed") diff --git a/libp2p/transport/websocket/listener.py b/libp2p/transport/websocket/listener.py index 1ea3bc9b6..978f1c67f 100644 --- a/libp2p/transport/websocket/listener.py +++ b/libp2p/transport/websocket/listener.py @@ -41,8 +41,7 @@ def __init__( self._is_wss = False # Track whether this is a WSS listener async def listen(self, maddr: Multiaddr, nursery: trio.Nursery) -> bool: - logger.debug(f"WebsocketListener.listen called with {maddr}") - + logger.debug("WebsocketListener.listen called with %s", maddr) # Parse the WebSocket multiaddr to determine if it's secure try: parsed = parse_websocket_multiaddr(maddr) @@ -73,7 +72,10 @@ async def listen(self, maddr: Multiaddr, nursery: trio.Nursery) -> bool: port = int(port_str) logger.debug( - f"WebsocketListener: host={host}, port={port}, secure={parsed.is_wss}" + "WebsocketListener: host=%s, port=%s, secure=%s", + host, + port, + parsed.is_wss, ) async def serve_websocket_tcp( @@ -115,18 +117,19 @@ async def websocket_handler(request: Any) -> None: except trio.TooSlowError: logger.debug( - f"WebSocket handshake timeout after {self._handshake_timeout}s" + "WebSocket handshake timeout after %s s", + self._handshake_timeout, ) try: await request.reject(408) # Request Timeout except Exception: pass except Exception as e: - logger.debug(f"WebSocket connection error: {e}") - logger.debug(f"Error type: {type(e)}") + logger.debug("WebSocket connection error: %s", str(e)) + logger.debug("Error type: %s", type(e)) import traceback - logger.debug(f"Traceback: {traceback.format_exc()}") + logger.debug("Traceback: %s", traceback.format_exc()) # Reject the connection try: await request.reject(400) @@ -150,10 +153,10 @@ async def websocket_handler(request: Any) -> None: port, host, ) - logger.debug(f"nursery.start() returned: {started_listeners}") + logger.debug("nursery.start() returned: %s", started_listeners) if started_listeners is None: - logger.error(f"Failed to start WebSocket listener for {maddr}") + logger.error("Failed to start WebSocket listener for %s", maddr) return False # Store the listeners for get_addrs() and close() - these are real @@ -181,7 +184,7 @@ def get_addrs(self) -> tuple[Multiaddr, ...]: listeners = self._listeners # Get addresses from listeners like TCP does return tuple( - _multiaddr_from_socket(listener.socket, self._is_wss) + self._multiaddr_from_socket(listener.socket, self._is_wss) for listener in listeners ) @@ -215,11 +218,11 @@ async def close(self) -> None: self._listeners = None logger.debug("WebsocketListener.close completed") - -def _multiaddr_from_socket( - socket: trio.socket.SocketType, is_wss: bool = False -) -> Multiaddr: - """Convert socket to multiaddr""" - ip, port = socket.getsockname() - protocol = "wss" if is_wss else "ws" - return Multiaddr(f"/ip4/{ip}/tcp/{port}/{protocol}") + @staticmethod + def _multiaddr_from_socket( + socket: trio.socket.SocketType, is_wss: bool = False + ) -> Multiaddr: + """Convert socket to multiaddr""" + ip, port = socket.getsockname() + protocol = "wss" if is_wss else "ws" + return Multiaddr(f"/ip4/{ip}/tcp/{port}/{protocol}") diff --git a/libp2p/transport/websocket/transport.py b/libp2p/transport/websocket/transport.py index 30da59426..ec7f50308 100644 --- a/libp2p/transport/websocket/transport.py +++ b/libp2p/transport/websocket/transport.py @@ -2,6 +2,7 @@ import ssl from multiaddr import Multiaddr +import trio from libp2p.abc import IListener, ITransport from libp2p.custom_types import THandler @@ -46,13 +47,12 @@ def __init__( async def dial(self, maddr: Multiaddr) -> RawConnection: """Dial a WebSocket connection to the given multiaddr.""" - logger.debug(f"WebsocketTransport.dial called with {maddr}") - + logger.debug("WebsocketTransport.dial called with %s", maddr) # Parse the WebSocket multiaddr to determine if it's secure try: parsed = parse_websocket_multiaddr(maddr) except ValueError as e: - raise ValueError(f"Invalid WebSocket multiaddr: {e}") from e + raise ValueError("Invalid WebSocket multiaddr: %s" % e) from e # Extract host and port from the base multiaddr host = ( @@ -64,14 +64,14 @@ async def dial(self, maddr: Multiaddr) -> RawConnection: ) port_str = parsed.rest_multiaddr.value_for_protocol("tcp") if port_str is None: - raise ValueError(f"No TCP port found in multiaddr: {maddr}") + raise ValueError("No TCP port found in multiaddr: %s" % maddr) port = int(port_str) # Build WebSocket URL based on security if parsed.is_wss: - ws_url = f"wss://{host}:{port}/" + ws_url = "wss://%s:%d/" % (host, port) else: - ws_url = f"ws://{host}:{port}/" + ws_url = "ws://%s:%d/" % (host, port) logger.debug( f"WebsocketTransport.dial connecting to {ws_url} (secure={parsed.is_wss})" @@ -113,17 +113,17 @@ async def dial(self, maddr: Multiaddr) -> RawConnection: ) logger.debug( - f"WebsocketTransport.dial parsed URL: host={ws_host}, " - f"port={ws_port}, resource={ws_resource}" + "WebsocketTransport.dial parsed URL: host=%s, port=%s, resource=%s", + ws_host, + ws_port, + ws_resource, ) # Create a background task manager for this connection - import trio - nursery_manager = trio.lowlevel.current_task().parent_nursery if nursery_manager is None: raise OpenConnectionError( - f"No parent nursery available for WebSocket connection to {maddr}" + "No parent nursery available for WebSocket connection to %s" % maddr ) # Apply timeout to the connection process @@ -150,17 +150,18 @@ async def dial(self, maddr: Multiaddr) -> RawConnection: logger.debug("WebsocketTransport.dial created P2PWebSocketConnection") self._connection_count += 1 - logger.debug(f"Total connections: {self._connection_count}") + logger.debug("Total connections: %d", self._connection_count) return RawConnection(conn, initiator=True) except trio.TooSlowError as e: raise OpenConnectionError( - f"WebSocket handshake timeout after {self._handshake_timeout}s " - f"for {maddr}" + "WebSocket handshake timeout after %s s for %s" + % (self._handshake_timeout, maddr) ) from e except Exception as e: - logger.error(f"Failed to dial WebSocket {maddr}: {e}") - raise OpenConnectionError(f"Failed to dial WebSocket {maddr}: {e}") from e + logger.error("Failed to dial WebSocket %s: %s", maddr, e) + msg = f"Failed to dial WebSocket {maddr}: {e}" + raise OpenConnectionError(msg) from e def create_listener(self, handler: THandler) -> IListener: # type: ignore[override] """ @@ -182,11 +183,14 @@ def resolve(self, maddr: Multiaddr) -> list[Multiaddr]: try: parsed = parse_websocket_multiaddr(maddr) except ValueError as e: - logger.debug(f"Invalid WebSocket multiaddr for resolution: {e}") + logger.debug("Invalid WebSocket multiaddr for resolution: %s", e) return [maddr] # Return original if not a valid WebSocket multiaddr logger.debug( - f"Parsed multiaddr {maddr}: is_wss={parsed.is_wss}, sni={parsed.sni}" + "Parsed multiaddr %s: is_wss=%s, sni=%s", + maddr, + parsed.is_wss, + parsed.sni, ) if not parsed.is_wss: @@ -216,14 +220,14 @@ def resolve(self, maddr: Multiaddr) -> list[Multiaddr]: try: # Remove /wss and add /tls/sni/example.com/ws without_wss = maddr.decapsulate(Multiaddr("/wss")) - sni_component = Multiaddr(f"/sni/{dns_name}") + sni_component = Multiaddr("/sni/%s" % dns_name) resolved = ( without_wss.encapsulate(Multiaddr("/tls")) .encapsulate(sni_component) .encapsulate(Multiaddr("/ws")) ) - logger.debug(f"Resolved {maddr} to {resolved}") + logger.debug("Resolved %s to %s", maddr, resolved) return [resolved] except Exception as e: - logger.debug(f"Failed to resolve multiaddr {maddr}: {e}") + logger.debug("Failed to resolve multiaddr %s: %s", maddr, e) return [maddr] diff --git a/newsfragments/713.feature.rst b/newsfragments/713.feature.rst new file mode 100644 index 000000000..6c0bb3bc0 --- /dev/null +++ b/newsfragments/713.feature.rst @@ -0,0 +1 @@ +Added flood publishing. diff --git a/tests/core/pubsub/test_gossipsub.py b/tests/core/pubsub/test_gossipsub.py index 5c341d0bf..6dece10f2 100644 --- a/tests/core/pubsub/test_gossipsub.py +++ b/tests/core/pubsub/test_gossipsub.py @@ -537,8 +537,8 @@ async def test_dense_connect_fallback(): connected_peers = len(pubsub.peers) expected_connections = len(hosts) - 1 assert connected_peers == expected_connections, ( - f"Host {i} has {connected_peers} connections, " - f"expected {expected_connections} in dense mode" + "Host %d has %d connections, expected %d in dense mode" + % (i, connected_peers, expected_connections) ) @@ -560,8 +560,8 @@ async def test_sparse_connect(): for i, pubsub in enumerate(pubsubs_gsub): connected_peers = len(pubsub.peers) assert degree <= connected_peers < len(hosts) - 1, ( - f"Host {i} has {connected_peers} connections, " - f"expected between {degree} and {len(hosts) - 1} in sparse mode" + "Host %d has %d connections, expected between %d and %d in sparse mode" + % (i, connected_peers, degree, len(hosts) - 1) ) # Test message propagation @@ -593,13 +593,126 @@ async def test_sparse_connect(): # require more than half for acceptable scalability min_required = (total_nodes + 1) // 2 assert received_count >= min_required, ( - f"Message propagation insufficient: " - f"{received_count}/{total_nodes} nodes " - f"received the message. Ideally all nodes should receive it, but at " - f"minimum {min_required} required for sparse network scalability." + "Message propagation insufficient: %d/%d nodes received the message. " + "Ideally all nodes should receive it, but at minimum %d required " + "for sparse network scalability." + % (received_count, total_nodes, min_required) ) +@pytest.mark.trio +async def test_flood_publish(): + """ + Test that with flood_publish disabled, message propagation still works + in a fully connected network topology. + """ + async with PubsubFactory.create_batch_with_gossipsub( + 6, + degree=2, + degree_low=1, + degree_high=3, + flood_publish=False, + ) as pubsubs_gsub: + routers: list[GossipSub] = [] + for pubsub in pubsubs_gsub: + assert isinstance(pubsub.router, GossipSub) + routers.append(pubsub.router) + hosts = [ps.host for ps in pubsubs_gsub] + + topic = "flood_test_topic" + queues = [await pubsub.subscribe(topic) for pubsub in pubsubs_gsub] + + # connect host 0 to all other hosts + await one_to_all_connect(hosts, 0) + + # wait for connections to be established + await trio.sleep(1) + + # publish a message from the first host + msg_content = b"flood_msg" + await pubsubs_gsub[0].publish(topic, msg_content) + + # wait for messages to propagate + await trio.sleep(1) + + # Debug info - only log if needed + # print(f"Mesh for topic: {routers[0].mesh[topic]}") + # if routers[0].pubsub: + # print(f"Peer topics: {routers[0].pubsub.peer_topics}") + + # verify all nodes received the message with timeout + for i, queue in enumerate(queues): + try: + with trio.fail_after(5): + msg = await queue.get() + assert msg.data == msg_content, ( + f"node {i} received wrong message: {msg.data}" + ) + except trio.TooSlowError: + pytest.fail(f"Node {i} did not receive the message (timeout)") + + # Test passed if all nodes received the message + print("Basic flood test passed - all nodes received the message") + + +@pytest.mark.trio +async def test_flood_publish_enabled(): + """ + Test that with flood_publish enabled, all nodes receive the message + even with a sparse network topology. + """ + # Create a network with flood_publish enabled + async with PubsubFactory.create_batch_with_gossipsub( + 6, + degree=2, + degree_low=1, + degree_high=3, + flood_publish=True, # Enable flood_publish + ) as pubsubs_gsub: + routers: list[GossipSub] = [] + for pubsub in pubsubs_gsub: + assert isinstance(pubsub.router, GossipSub) + routers.append(pubsub.router) + hosts = [ps.host for ps in pubsubs_gsub] + + topic = "flood_test_topic" + queues = [await pubsub.subscribe(topic) for pubsub in pubsubs_gsub] + + # Create a sparse topology - only connect to a few nodes + # We only connect nodes in a chain, which would normally + # prevent complete message propagation without flood_publish + await connect(hosts[0], hosts[1]) + await connect(hosts[1], hosts[2]) + await connect(hosts[2], hosts[3]) + await connect(hosts[3], hosts[4]) + await connect(hosts[4], hosts[5]) + + # wait for connections to be established + await trio.sleep(1) + + # publish a message from the first host + msg_content = b"flood_publish_msg" + await pubsubs_gsub[0].publish(topic, msg_content) + + # wait for messages to propagate + await trio.sleep(2) + + # verify all nodes received the message with timeout + for i, queue in enumerate(queues): + try: + with trio.fail_after(5): + msg = await queue.get() + assert msg.data == msg_content, ( + f"node {i} received wrong message: {msg.data}" + ) + print(f"Node {i} received message correctly") + except trio.TooSlowError: + pytest.fail(f"Node {i} did not receive the message (timeout)") + + # Test passed if all nodes received the message + print("Flood publish test passed - all nodes received the message") + + @pytest.mark.trio async def test_connect_some_with_fewer_hosts_than_degree(): """Test connect_some when there are fewer hosts than degree.""" diff --git a/tests/utils/factories.py b/tests/utils/factories.py index d94cc83e5..09604c5af 100644 --- a/tests/utils/factories.py +++ b/tests/utils/factories.py @@ -622,6 +622,7 @@ async def create_batch_with_gossipsub( px_peers_count: int = GOSSIPSUB_PARAMS.px_peers_count, prune_back_off: int = GOSSIPSUB_PARAMS.prune_back_off, unsubscribe_back_off: int = GOSSIPSUB_PARAMS.unsubscribe_back_off, + flood_publish: bool = GOSSIPSUB_PARAMS.flood_publish, security_protocol: TProtocol | None = None, muxer_opt: TMuxerOptions | None = None, msg_id_constructor: None @@ -646,6 +647,7 @@ async def create_batch_with_gossipsub( px_peers_count=px_peers_count, prune_back_off=prune_back_off, unsubscribe_back_off=unsubscribe_back_off, + flood_publish=flood_publish, ) else: gossipsubs = GossipsubFactory.create_batch( @@ -664,6 +666,7 @@ async def create_batch_with_gossipsub( px_peers_count=px_peers_count, prune_back_off=prune_back_off, unsubscribe_back_off=unsubscribe_back_off, + flood_publish=flood_publish, ) async with cls._create_batch_with_router(