Skip to content

Commit 4f30121

Browse files
committed
cancell enqueued rpcs on receiving IDontWant
1 parent ed53c17 commit 4f30121

File tree

7 files changed

+3802
-17
lines changed

7 files changed

+3802
-17
lines changed

comm.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func (p *PubSub) handlePeerDead(s network.Stream) {
165165
}
166166

167167
func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing *rpcQueue) {
168-
writeRpc := func(rpc *RPC) error {
168+
writeRpc := func(rpc *pb.RPC) error {
169169
size := uint64(rpc.Size())
170170

171171
buf := pool.Get(varint.UvarintSize(size) + int(size))
@@ -193,8 +193,11 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, ou
193193
p.logger.Debug("error popping message from the queue to send to peer", "peer", s.Conn().RemotePeer(), "err", err)
194194
return
195195
}
196+
if rpc.Size() == 0 {
197+
continue
198+
}
196199

197-
err = writeRpc(rpc)
200+
err = writeRpc(&rpc.RPC)
198201
if err != nil {
199202
s.Reset()
200203
p.logger.Debug("error writing message to peer", "peer", s.Conn().RemotePeer(), "err", err)
@@ -215,6 +218,10 @@ func rpcWithMessages(msgs ...*pb.Message) *RPC {
215218
return &RPC{RPC: pb.RPC{Publish: msgs}}
216219
}
217220

221+
func rpcWithMessageAndMsgID(msg *pb.Message, msgID string) *RPC {
222+
return &RPC{RPC: pb.RPC{Publish: []*pb.Message{msg}}, MsgIDs: []string{msgID}}
223+
}
224+
218225
func rpcWithControl(msgs []*pb.Message,
219226
ihave []*pb.ControlIHave,
220227
iwant []*pb.ControlIWant,

gossipsub.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1163,6 +1163,9 @@ func (gs *GossipSubRouter) handleIDontWant(p peer.ID, ctl *pb.ControlMessage) {
11631163
gs.peerdontwant[p]++
11641164

11651165
totalUnwantedIds := 0
1166+
// Collect message IDs for cancellation
1167+
var msgIDsToCancel []string
1168+
11661169
// Remember all the unwanted message ids
11671170
mainIDWLoop:
11681171
for _, idontwant := range ctl.GetIdontwant() {
@@ -1175,8 +1178,14 @@ mainIDWLoop:
11751178

11761179
totalUnwantedIds++
11771180
gs.unwanted[p][computeChecksum(mid)] = gs.params.IDontWantMessageTTL
1181+
msgIDsToCancel = append(msgIDsToCancel, mid)
11781182
}
11791183
}
1184+
1185+
// Cancel these messages in the RPC queue if it exists
1186+
if queue, ok := gs.p.peers[p]; ok && len(msgIDsToCancel) > 0 {
1187+
queue.CancelMessages(msgIDsToCancel)
1188+
}
11801189
}
11811190

11821191
func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string, isUnsubscribe bool) {
@@ -1370,7 +1379,7 @@ func (gs *GossipSubRouter) rpcs(msg *Message) iter.Seq2[peer.ID, *RPC] {
13701379
}
13711380
}
13721381

1373-
out := rpcWithMessages(msg.Message)
1382+
out := rpcWithMessageAndMsgID(msg.Message, gs.p.idGen.ID(msg))
13741383
for pid := range tosend {
13751384
if pid == from || pid == peer.ID(msg.GetFrom()) {
13761385
continue

gossipsub_spam_test.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -930,20 +930,25 @@ func TestGossipsubHandleIDontwantSpam(t *testing.T) {
930930
rPid := hosts[1].ID()
931931
ctrlMessage := &pb.ControlMessage{Idontwant: []*pb.ControlIDontWant{{MessageIDs: idwIds}}}
932932
grt := psubs[0].rt.(*GossipSubRouter)
933-
grt.handleIDontWant(rPid, ctrlMessage)
933+
completed := make(chan struct{})
934+
psubs[0].eval <- func() {
935+
grt.handleIDontWant(rPid, ctrlMessage)
934936

935-
if grt.peerdontwant[rPid] != 1 {
936-
t.Errorf("Wanted message count of %d but received %d", 1, grt.peerdontwant[rPid])
937-
}
938-
mid := fmt.Sprintf("idontwant-%d", GossipSubMaxIDontWantLength-1)
939-
if _, ok := grt.unwanted[rPid][computeChecksum(mid)]; !ok {
940-
t.Errorf("Desired message id was not stored in the unwanted map: %s", mid)
941-
}
937+
if grt.peerdontwant[rPid] != 1 {
938+
t.Errorf("Wanted message count of %d but received %d", 1, grt.peerdontwant[rPid])
939+
}
940+
mid := fmt.Sprintf("idontwant-%d", GossipSubMaxIDontWantLength-1)
941+
if _, ok := grt.unwanted[rPid][computeChecksum(mid)]; !ok {
942+
t.Errorf("Desired message id was not stored in the unwanted map: %s", mid)
943+
}
942944

943-
mid = fmt.Sprintf("idontwant-%d", GossipSubMaxIDontWantLength)
944-
if _, ok := grt.unwanted[rPid][computeChecksum(mid)]; ok {
945-
t.Errorf("Unwanted message id was stored in the unwanted map: %s", mid)
945+
mid = fmt.Sprintf("idontwant-%d", GossipSubMaxIDontWantLength)
946+
if _, ok := grt.unwanted[rPid][computeChecksum(mid)]; ok {
947+
t.Errorf("Unwanted message id was stored in the unwanted map: %s", mid)
948+
}
949+
close(completed)
946950
}
951+
<-completed
947952
}
948953

949954
type mockGSOnRead func(writeMsg func(*pb.RPC), irpc *pb.RPC)

0 commit comments

Comments
 (0)