Skip to content

Conversation

SuchitraSwain
Copy link

@SuchitraSwain SuchitraSwain commented Sep 17, 2025

What was wrong?

Issue #

How was it fixed?

Summary of approach.

🔍 Technical Details
The FloodSub publishing flow works as follows:

  • User calls pubsub.publish(topic, data)
  • Message creation: Creates a rpc_pb2.Message with data, topic, sequence number, and signature
  • Validation: Passes through push_msg() which validates the message
  • Routing: Calls router.publish() which floods the message to all peers subscribed to the topic
  • Delivery: Messages are delivered to subscribers

To-Do

  • Clean up commit history
  • Add or update documentation related to these changes
  • Add entry to the release notes

Cute Animal Picture

image

@SuchitraSwain
Copy link
Author

@seetadev The FloodSub publishing functionality is already working correctly in py-libp2p. The examples and documentation I created demonstrate this functionality and provide guidance for users who want to use FloodSub.

@seetadev
Copy link
Contributor

@SuchitraSwain : HI Suchitra, wish if you could resolve the merge conflicts.

Re-ran the CI/Cd pipeline. Waiting for the test results.

@SuchitraSwain
Copy link
Author

@seetadev please Re-ran the CI/Cd pipeline.

@bomanaps
Copy link
Contributor

Can you add me to this branch?

@seetadev
Copy link
Contributor

@bomanaps and @SuchitraSwain : Lets resolve the CI/CD issues that are failing.

For the kad-dht issue, we investigated it and documented it at #949 .

@yashksaini-coder , @sumanjeet0012 and @acul71 are fixing it. We will soon have the kad-dht test issue resolved.

@seetadev
Copy link
Contributor

@bomanaps and @SuchitraSwain : Please resolve the other CI/Cd issues in the meantime.

@seetadev
Copy link
Contributor

@bomanaps and @SuchitraSwain : Please resolve the other CI/Cd issues in the meantime. Kindly take help from @acul71, @Winter-Soren and @sumanjeet0012. They are working on gossipsub/floodsub stack.

@yashksaini-coder
Copy link
Contributor

@seetadev @SuchitraSwain I'm working on resolving the CI/CD failures

@seetadev seetadev changed the title Added flooding publishing Add Floodsub module in py-libp2p Sep 24, 2025
@seetadev seetadev changed the title Add Floodsub module in py-libp2p Add Floods publishing in py-libp2p Sep 24, 2025
@seetadev seetadev mentioned this pull request Sep 24, 2025
3 tasks
@seetadev seetadev changed the title Add Floods publishing in py-libp2p Add flood publishing in py-libp2p Sep 24, 2025
@SuchitraSwain
Copy link
Author

@seetadev could you please run CI/CD

@acul71
Copy link
Contributor

acul71 commented Oct 4, 2025

could you please run CI/CD

@SuchitraSwain

I can see that this tests are hanging (I did set a manual timeout)
Can you tell why ?
when you run make test what is the output ?

FAILED tests/interop/go_libp2p/test_floodsub_interop.py::test_floodsub_basic_functionality - Failed: Timeout (>80.0s) from pytest-timeout.
FAILED tests/interop/js_libp2p/test_floodsub_interop.py::test_floodsub_js_compatibility - Failed: Timeout (>80.0s) from pytest-timeout.

@SuchitraSwain
Copy link
Author

@seetadev could you re-run the CI/CD please

@Winter-Soren
Copy link
Contributor

Hi @SuchitraSwain

I’ve reviewed the PR and appreciate the comprehensive approach you’ve taken with the documentation, examples, and tests. The FLOODSUB_IMPLEMENTATION.md document is particularly well-structured and informative. You can remove this document and instead create a discussion page. Also, remove the PR_DISCUSSION_TEMPLATE.md file, as it’s not required.

The examples (basic_example.py, simple_pubsub.py, and multi_node_pubsub.py) are well-organized and effectively demonstrate the functionality.

The addition of the flood_publish option to GossipSub is a valuable enhancement that provides flexibility for users who want GossipSub’s features while retaining FloodSub’s broadcasting behavior in specific scenarios.

Here are a few points for further improvement:

  1. Interoperability Tests: The interop tests with the JS implementation (test_floodsub_interop.py) appear to hang indefinitely. Could you please investigate the cause and consider adding timeouts to prevent this?
  2. Code Comments: The implementation of flood_publish in GossipSub would benefit from additional inline comments explaining how it interacts with the existing mesh selection logic.
  3. Example Issue: The basic_example.py example also hangs indefinitely. I’ve included logs below for reference:
2025-10-08 03:58:40,473 [INFO] [floodsub_basic] Starting basic FloodSub example...
2025-10-08 03:58:40,473 [DEBUG] [multiaddr.transforms] [DEBUG bytes_to_string] Decoded protocol code: 4
2025-10-08 03:58:40,473 [DEBUG] [multiaddr.transforms] [DEBUG bytes_to_string] Protocol name: ip4
2025-10-08 03:58:40,473 [DEBUG] [multiaddr.transforms] [DEBUG] bytes_to_string: proto=ip4, value='127.0.0.1'
2025-10-08 03:58:40,473 [DEBUG] [multiaddr.transforms] [DEBUG bytes_to_string] Decoded protocol code: 6
2025-10-08 03:58:40,473 [DEBUG] [multiaddr.transforms] [DEBUG bytes_to_string] Protocol name: tcp
2025-10-08 03:58:40,473 [DEBUG] [multiaddr.transforms] [DEBUG] bytes_to_string: proto=tcp, value='0'
2025-10-08 03:58:40,473 [DEBUG] [multiaddr.transforms] [DEBUG bytes_to_string] Decoded protocol code: 4
2025-10-08 03:58:40,473 [DEBUG] [multiaddr.transforms] [DEBUG bytes_to_string] Protocol name: ip4
2025-10-08 03:58:40,473 [DEBUG] [multiaddr.transforms] [DEBUG] bytes_to_string: proto=ip4, value='127.0.0.1'
2025-10-08 03:58:40,473 [DEBUG] [multiaddr.transforms] [DEBUG bytes_to_string] Decoded protocol code: 6
2025-10-08 03:58:40,473 [DEBUG] [multiaddr.transforms] [DEBUG bytes_to_string] Protocol name: tcp
2025-10-08 03:58:40,473 [DEBUG] [multiaddr.transforms] [DEBUG] bytes_to_string: proto=tcp, value='0'
2025-10-08 03:58:40,475 [DEBUG] [multiaddr.codecs.cid] [DEBUG CID to_bytes] Input value: 16Uiu2HAm7gpGgvKFVoZgR4zT7EFFXdAWVd6g1VVjMp9dETrRqgLU
2025-10-08 03:58:40,475 [DEBUG] [multiaddr.codecs.cid] [DEBUG CID to_bytes] Parsed as CIDv0: 00250802122102b62d548bceed68fa133c8b69696c8160ae0c3da9bb914cd69e5e953b7419c485
2025-10-08 03:58:40,475 [DEBUG] [multiaddr.transforms] [DEBUG bytes_to_string] Decoded protocol code: 4
2025-10-08 03:58:40,475 [DEBUG] [multiaddr.transforms] [DEBUG bytes_to_string] Protocol name: ip4
2025-10-08 03:58:40,476 [DEBUG] [multiaddr.transforms] [DEBUG] bytes_to_string: proto=ip4, value='127.0.0.1'
2025-10-08 03:58:40,476 [DEBUG] [multiaddr.transforms] [DEBUG bytes_to_string] Decoded protocol code: 6
2025-10-08 03:58:40,476 [DEBUG] [multiaddr.transforms] [DEBUG bytes_to_string] Protocol name: tcp
2025-10-08 03:58:40,476 [DEBUG] [multiaddr.transforms] [DEBUG] bytes_to_string: proto=tcp, value='0'
2025-10-08 03:58:40,476 [DEBUG] [multiaddr.transforms] [DEBUG bytes_to_string] Decoded protocol code: 4
2025-10-08 03:58:40,476 [DEBUG] [multiaddr.transforms] [DEBUG bytes_to_string] Protocol name: ip4
2025-10-08 03:58:40,476 [DEBUG] [multiaddr.transforms] [DEBUG] bytes_to_string: proto=ip4, value='127.0.0.1'
2025-10-08 03:58:40,476 [DEBUG] [multiaddr.transforms] [DEBUG bytes_to_string] Decoded protocol code: 6
2025-10-08 03:58:40,476 [DEBUG] [multiaddr.transforms] [DEBUG bytes_to_string] Protocol name: tcp
2025-10-08 03:58:40,476 [DEBUG] [multiaddr.transforms] [DEBUG] bytes_to_string: proto=tcp, value='0'
2025-10-08 03:58:40,476 [DEBUG] [multiaddr.codecs.cid] [DEBUG CID to_bytes] Input value: 16Uiu2HAm1inSssjHtYUY1eGSmG892HHyxZSL3Xa1v97mpYmxga4z
2025-10-08 03:58:40,476 [DEBUG] [multiaddr.codecs.cid] [DEBUG CID to_bytes] Parsed as CIDv0: 002508021221025d884ebb187a5fad95cc1b96342597bd1c610720a43c193f2ecf2e51d1b033d3
2025-10-08 03:58:40,476 [DEBUG] [async_service.Manager] <Manager[Pubsub] flags=SRcfe>: _handle_cancelled waiting for cancellation
2025-10-08 03:58:40,476 [DEBUG] [async_service.Manager] <Manager[Pubsub] flags=SRcfe>: _handle_cancelled waiting for cancellation
2025-10-08 03:58:40,477 [INFO] [floodsub_basic] Host 1 ID: 16Uiu2HAm7gpGgvKFVoZgR4zT7EFFXdAWVd6g1VVjMp9dETrRqgLU
2025-10-08 03:58:40,477 [INFO] [floodsub_basic] Host 2 ID: 16Uiu2HAm1inSssjHtYUY1eGSmG892HHyxZSL3Xa1v97mpYmxga4z
2025-10-08 03:58:40,477 [INFO] [floodsub_basic] Starting hosts...

Overall, this is a valuable addition to py-libp2p that will enhance the flexibility and usability. With the interoperability and example issues addressed, I believe this PR would be ready for merging.

@SuchitraSwain
Copy link
Author

@seetadev @Winter-Soren I have addressed all the comments., please review the PR again

@yashksaini-coder
Copy link
Contributor

@SuchitraSwain A quick review that I would like to add here, Please run the make command pr before giving the PR ready to merge status.

There are still some issue to be resolved in this PR.

(suchitra-lib) [yks@archlinux suchitra-lib]$ make pr
rm -fr build/
rm -fr dist/
rm -fr *.egg-info
find . -name '*.pyc' -exec rm -f {} +
find . -name '*.pyo' -exec rm -f {} +
find . -name '*~' -exec rm -f {} +
find . -name '__pycache__' -exec rm -rf {} +
python -m ruff check --fix
W293 Blank line contains whitespace
  --> examples/floodsub/basic_example.py:70:1
   |
68 |         nursery.start_soon(pubsub1.run)
69 |         nursery.start_soon(pubsub2.run)
70 |         
   | ^^^^^^^^
71 |         # Give services time to start
72 |         await trio.sleep(0.5)
   |
help: Remove whitespace from blank line

W293 Blank line contains whitespace
  --> examples/floodsub/basic_example.py:96:1
   |
94 |             nursery.cancel_scope.cancel()
95 |             return
96 |         
   | ^^^^^^^^
97 |         await trio.sleep(1)  # Wait for connection to establish
   |
help: Remove whitespace from blank line

invalid-syntax: Expected ',', found 'await'
   --> examples/floodsub/basic_example.py:103:9
    |
101 |         logger.info(f"Subscribing to topic: {topic}")
102 |         subscription = await pubsub2.subscribe(t
103 |         await trio.sleep(0.5)  # Wait for subscription to propagate
    |         ^^^^^
104 |
105 |         # Publish messages from host1
    |

invalid-syntax: Expected ',', found name
   --> examples/floodsub/basic_example.py:106:9
    |
105 |         # Publish messages from host1
106 |         messages = [
    |         ^^^^^^^^
107 |             "Hello from FloodSub!",
108 |             "This is message number 2",
    |

invalid-syntax: Expected ',', found name
   --> examples/floodsub/basic_example.py:112:9
    |
110 |         ]
111 |
112 |         logger.info("Publishing messages...")
    |         ^^^^^^
113 |         for i, message in enumerate(messages):
114 |             logger.info(f"Publishing message {i + 1}: {message}")
    |

invalid-syntax: Unparenthesized generator expression cannot be used here
   --> examples/floodsub/basic_example.py:112:9
    |
110 |           ]
111 |
112 | /         logger.info("Publishing messages...")
113 | |         for i, message in enumerate(messages):
    | |_____________________________________________^
114 |               logger.info(f"Publishing message {i + 1}: {message}")
115 |               await pubsub1.publish(topic, message.encode())
    |

invalid-syntax: Expected ',', found ':'
   --> examples/floodsub/basic_example.py:113:46
    |
112 |         logger.info("Publishing messages...")
113 |         for i, message in enumerate(messages):
    |                                              ^
114 |             logger.info(f"Publishing message {i + 1}: {message}")
115 |             await pubsub1.publish(topic, message.encode())
    |

invalid-syntax: Positional argument cannot follow keyword argument
   --> examples/floodsub/basic_example.py:114:13
    |
112 |         logger.info("Publishing messages...")
113 |         for i, message in enumerate(messages):
114 |             logger.info(f"Publishing message {i + 1}: {message}")
    |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
115 |             await pubsub1.publish(topic, message.encode())
116 |             await trio.sleep(0.5)
    |

invalid-syntax: Expected ',', found 'await'
   --> examples/floodsub/basic_example.py:115:13
    |
113 |         for i, message in enumerate(messages):
114 |             logger.info(f"Publishing message {i + 1}: {message}")
115 |             await pubsub1.publish(topic, message.encode())
    |             ^^^^^
116 |             await trio.sleep(0.5)
    |

invalid-syntax: Expected ',', found 'await'
   --> examples/floodsub/basic_example.py:116:13
    |
114 |             logger.info(f"Publishing message {i + 1}: {message}")
115 |             await pubsub1.publish(topic, message.encode())
116 |             await trio.sleep(0.5)
    |             ^^^^^
117 |
118 |         # Receive messages on host2 with timeout
    |

invalid-syntax: Expected ',', found name
   --> examples/floodsub/basic_example.py:119:9
    |
118 |         # Receive messages on host2 with timeout
119 |         logger.info("Receiving messages...")
    |         ^^^^^^
120 |         received_count = 0
121 |         for i in range(len(messages)):
    |

invalid-syntax: Expected ',', found name
   --> examples/floodsub/basic_example.py:120:9
    |
118 |         # Receive messages on host2 with timeout
119 |         logger.info("Receiving messages...")
120 |         received_count = 0
    |         ^^^^^^^^^^^^^^
121 |         for i in range(len(messages)):
122 |             # Use a timeout to prevent hanging
    |

invalid-syntax: Expected ')', found newline
   --> examples/floodsub/basic_example.py:120:27
    |
118 |         # Receive messages on host2 with timeout
119 |         logger.info("Receiving messages...")
120 |         received_count = 0
    |                           ^
121 |         for i in range(len(messages)):
122 |             # Use a timeout to prevent hanging
    |

W293 Blank line contains whitespace
   --> examples/floodsub/basic_example.py:129:1
    |
127 |                 logger.info(f"  Topics: {message.topicIDs}")
128 |                 received_count += 1
129 |                 
    | ^^^^^^^^^^^^^^^^
130 |             if cancel_scope.cancelled_caught:
131 |                 logger.warning(f"Timed out waiting for message {i + 1}")
    |
help: Remove whitespace from blank line

W293 Blank line contains whitespace
   --> examples/floodsub/basic_example.py:133:1
    |
131 |                 logger.warning(f"Timed out waiting for message {i + 1}")
132 |                 break
133 |         
    | ^^^^^^^^
134 |         logger.info(f"Successfully received {received_count} out of {len(messages)} messages")
    |
help: Remove whitespace from blank line

E501 Line too long (94 > 88)
   --> examples/floodsub/basic_example.py:134:89
    |
132 |                 break
133 |         
134 |         logger.info(f"Successfully received {received_count} out of {len(messages)} messages")
    |                                                                                         ^^^^^^
135 |         
136 |         # Clean up by cancelling the nursery
    |

W293 Blank line contains whitespace
   --> examples/floodsub/basic_example.py:135:1
    |
134 |         logger.info(f"Successfully received {received_count} out of {len(messages)} messages")
135 |         
    | ^^^^^^^^
136 |         # Clean up by cancelling the nursery
137 |         nursery.cancel_scope.cancel()
    |
help: Remove whitespace from blank line

E501 Line too long (157 > 88)
   --> libp2p/pubsub/gossipsub.py:121:89
    |
119 | …
120 | …
121 | …e: flood initial publishes to all topic peers, then use normal GossipSub mesh routing for forwarding
    |                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
122 | …
123 | …
    |

E501 Line too long (91 > 88)
   --> libp2p/pubsub/gossipsub.py:309:89
    |
308 |             # FLOOD_PUBLISH MODE: Hybrid GossipSub/FloodSub behavior
309 |             # When flood_publish is enabled and we are the original publisher of a message,
    |                                                                                         ^^^
310 |             # we send the message to ALL peers subscribed to the topic, similar to FloodSub.
311 |             # This provides a hybrid approach that combines GossipSub's efficiency with
    |

E501 Line too long (92 > 88)
   --> libp2p/pubsub/gossipsub.py:310:89
    |
308 |             # FLOOD_PUBLISH MODE: Hybrid GossipSub/FloodSub behavior
309 |             # When flood_publish is enabled and we are the original publisher of a message,
310 |             # we send the message to ALL peers subscribed to the topic, similar to FloodSub.
    |                                                                                         ^^^^
311 |             # This provides a hybrid approach that combines GossipSub's efficiency with
312 |             # FloodSub's reliability for initial message propagation.
    |

E501 Line too long (89 > 88)
   --> libp2p/pubsub/gossipsub.py:315:89
    |
313 |             if self.flood_publish and msg_forwarder == self.pubsub.my_id:
314 |                 # CRITICAL: Only apply flooding to messages we originally published
315 |                 # (msg_forwarder == self.pubsub.my_id), not to messages we're forwarding.
    |                                                                                         ^
316 |                 # This ensures that:
317 |                 # 1. Our own messages get maximum initial propagation (flooding behavior)
    |

E501 Line too long (89 > 88)
   --> libp2p/pubsub/gossipsub.py:317:89
    |
315 |                 # (msg_forwarder == self.pubsub.my_id), not to messages we're forwarding.
316 |                 # This ensures that:
317 |                 # 1. Our own messages get maximum initial propagation (flooding behavior)
    |                                                                                         ^
318 |                 # 2. Forwarded messages still use efficient GossipSub mesh routing
319 |                 # 3. We don't create excessive network traffic for relayed messages
    |

E501 Line too long (93 > 88)
   --> libp2p/pubsub/gossipsub.py:321:89
    |
319 |                 # 3. We don't create excessive network traffic for relayed messages
320 |
321 |                 # Send to all peers subscribed to this topic, bypassing normal mesh selection
    |                                                                                         ^^^^^
322 |                 # This is the core of the flood_publish feature - it temporarily switches
323 |                 # from selective GossipSub routing to FloodSub-style broadcasting for
    |

E501 Line too long (89 > 88)
   --> libp2p/pubsub/gossipsub.py:322:89
    |
321 |                 # Send to all peers subscribed to this topic, bypassing normal mesh selection
322 |                 # This is the core of the flood_publish feature - it temporarily switches
    |                                                                                         ^
323 |                 # from selective GossipSub routing to FloodSub-style broadcasting for
324 |                 # messages originating from this node.
    |

E501 Line too long (92 > 88)
   --> libp2p/pubsub/gossipsub.py:326:89
    |
324 |                 # messages originating from this node.
325 |                 for peer in self.pubsub.peer_topics[topic]:
326 |                     # TODO: When peer scoring is implemented, add score threshold check here
    |                                                                                         ^^^^
327 |                     #       Direct peers should skip score checks as they're trusted connections
328 |                     send_to.add(peer)
    |

E501 Line too long (96 > 88)
   --> libp2p/pubsub/gossipsub.py:327:89
    |
325 |                 for peer in self.pubsub.peer_topics[topic]:
326 |                     # TODO: When peer scoring is implemented, add score threshold check here
327 |                     #       Direct peers should skip score checks as they're trusted connections
    |                                                                                         ^^^^^^^^
328 |                     send_to.add(peer)
329 |             else:
    |

E501 Line too long (110 > 88)
   --> libp2p/pubsub/gossipsub.py:330:89
    |
328 |                     send_to.add(peer)
329 |             else:
330 |                 # Normal GossipSub behavior when flood_publish is disabled or we're not the original publisher
    |                                                                                         ^^^^^^^^^^^^^^^^^^^^^^
331 |
332 |                 # direct peers
    |

E501 Line too long (92 > 88)
   --> libp2p/pubsub/gossipsub.py:354:89
    |
352 |                     #  to the topic and add them as our `fanout` peers.
353 |                     topic_in_fanout: bool = topic in self.fanout
354 |                     fanout_peers: set[ID] = self.fanout[topic] if topic_in_fanout else set()
    |                                                                                         ^^^^
355 |                     fanout_size = len(fanout_peers)
356 |                     if not topic_in_fanout or (
    |

E501 Line too long (98 > 88)
   --> tests/core/pubsub/test_gossipsub.py:648:89
    |
646 |                 with trio.fail_after(5):
647 |                     msg = await queue.get()
648 |                     assert msg.data == msg_content, f"node {i} received wrong message: {msg.data}"
    |                                                                                         ^^^^^^^^^^
649 |             except trio.TooSlowError:
650 |                 pytest.fail(f"Node {i} did not receive the message (timeout)")
    |

E501 Line too long (98 > 88)
   --> tests/core/pubsub/test_gossipsub.py:703:89
    |
701 |                 with trio.fail_after(5):
702 |                     msg = await queue.get()
703 |                     assert msg.data == msg_content, f"node {i} received wrong message: {msg.data}"
    |                                                                                         ^^^^^^^^^^
704 |                     print(f"Node {i} received message correctly")
705 |             except trio.TooSlowError:
    |

E501 Line too long (92 > 88)
   --> tests/core/pubsub/test_gossipsub.py:715:89
    |
713 |             msg = await queue.get()
714 |             assert msg.data == msg_content, (
715 |                 f"Node did not receive expected message with flood_publish=True: {msg.data}"
    |                                                                                         ^^^^
716 |             )
    |

E501 Line too long (95 > 88)
   --> tests/interop/js_libp2p/test_floodsub_interop.py:323:89
    |
322 |     try:
323 |         # Simple approach without nested trio.open_nursery to avoid complex cancellation issues
    |                                                                                         ^^^^^^^
324 |         async with background_trio_service(pubsub1):
325 |             async with background_trio_service(pubsub2):
    |

Found 32 errors.
make: *** [Makefile:43: fix] Error 1
(suchitra-lib) [yks@archlinux suchitra-lib]$ 


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants