Skip to content

Commit 9c972fe

Browse files
committed
gossipsub: implement extensions
1 parent 239b439 commit 9c972fe

File tree

4 files changed

+274
-3
lines changed

4 files changed

+274
-3
lines changed

extensions.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package pubsub
2+
3+
import (
4+
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
5+
"github.com/libp2p/go-libp2p/core/peer"
6+
)
7+
8+
type PeerExtensions struct {
9+
TestExtension bool
10+
}
11+
12+
func WithTestExtension() Option {
13+
return func(ps *PubSub) error {
14+
if rt, ok := ps.rt.(*GossipSubRouter); ok {
15+
rt.extensions.myExtensions.TestExtension = true
16+
}
17+
return nil
18+
}
19+
}
20+
21+
func hasPeerExtensions(rpc *RPC) bool {
22+
if rpc != nil && rpc.Control != nil && rpc.Control.Extensions != nil {
23+
return true
24+
}
25+
return false
26+
}
27+
28+
func peerExtensionsFromRPC(rpc *RPC) PeerExtensions {
29+
out := PeerExtensions{}
30+
if hasPeerExtensions(rpc) {
31+
out.TestExtension = rpc.Control.Extensions.GetTestExtension()
32+
}
33+
return out
34+
}
35+
36+
func (pe *PeerExtensions) ExtendRPC(rpc *RPC) {
37+
if pe.TestExtension {
38+
if rpc.Control == nil {
39+
rpc.Control = &pubsub_pb.ControlMessage{}
40+
}
41+
rpc.Control.Extensions = &pubsub_pb.ControlExtensions{
42+
TestExtension: &pe.TestExtension,
43+
}
44+
}
45+
}
46+
47+
type extensionsState struct {
48+
myExtensions PeerExtensions
49+
peerExtensions map[peer.ID]PeerExtensions // peer's extensions
50+
sentExtensions map[peer.ID]struct{}
51+
reportMisbehavior func(peer.ID)
52+
sendRPC func(p peer.ID, r *RPC, urgent bool)
53+
testExtension testExtension
54+
}
55+
56+
func newExtensionsState(myExtensions PeerExtensions, reportMisbehavior func(peer.ID), sendRPC func(peer.ID, *RPC, bool)) *extensionsState {
57+
return &extensionsState{
58+
myExtensions: myExtensions,
59+
peerExtensions: make(map[peer.ID]PeerExtensions),
60+
sentExtensions: make(map[peer.ID]struct{}),
61+
reportMisbehavior: reportMisbehavior,
62+
sendRPC: sendRPC,
63+
testExtension: testExtension{sendRPC: sendRPC},
64+
}
65+
}
66+
67+
func (es *extensionsState) HandleRPC(rpc *RPC) {
68+
if _, ok := es.peerExtensions[rpc.from]; !ok {
69+
// We know this is the first message because we didn't have extensions
70+
// for this peer, and we always set extensions on the first rpc.
71+
es.peerExtensions[rpc.from] = peerExtensionsFromRPC(rpc)
72+
if _, ok := es.sentExtensions[rpc.from]; ok {
73+
// We just finished both sending and receiving the extensions
74+
// control message.
75+
es.extensionsAddPeer(rpc.from)
76+
}
77+
} else {
78+
// We already have an extension for this peer. If they send us another
79+
// extensions control message, that is a protocol error. We should
80+
// down score them because they are misbehaving.
81+
if hasPeerExtensions(rpc) {
82+
es.reportMisbehavior(rpc.from)
83+
}
84+
}
85+
86+
es.extensionsHandleRPC(rpc)
87+
}
88+
89+
func (es *extensionsState) AddPeer(id peer.ID, helloPacket *RPC) {
90+
// Send our extensions as the first message.
91+
es.myExtensions.ExtendRPC(helloPacket)
92+
93+
es.sentExtensions[id] = struct{}{}
94+
if _, ok := es.peerExtensions[id]; ok {
95+
// We've just finished sending and receiving the extensions control
96+
// message.
97+
es.extensionsAddPeer(id)
98+
}
99+
}
100+
101+
func (es *extensionsState) RemovePeer(id peer.ID) {
102+
_, recvdExt := es.peerExtensions[id]
103+
_, sentExt := es.sentExtensions[id]
104+
if recvdExt && sentExt {
105+
// Add peer was previously called, so we need to call remove peer
106+
es.extensionsRemovePeer(id)
107+
}
108+
delete(es.peerExtensions, id)
109+
if len(es.peerExtensions) == 0 {
110+
es.peerExtensions = make(map[peer.ID]PeerExtensions)
111+
}
112+
delete(es.sentExtensions, id)
113+
if len(es.sentExtensions) == 0 {
114+
es.sentExtensions = make(map[peer.ID]struct{})
115+
}
116+
}
117+
118+
// extensionsAddPeer is only called once we've both sent and received the
119+
// extensions control message.
120+
func (es *extensionsState) extensionsAddPeer(id peer.ID) {
121+
if es.myExtensions.TestExtension && es.peerExtensions[id].TestExtension {
122+
es.testExtension.AddPeer(id)
123+
}
124+
}
125+
126+
// extensionsRemovePeer is always called after extensionsAddPeer.
127+
func (es *extensionsState) extensionsRemovePeer(id peer.ID) {
128+
}
129+
130+
func (es *extensionsState) extensionsHandleRPC(rpc *RPC) {
131+
}

gossipsub.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ func NewGossipSubWithRouter(ctx context.Context, h host.Host, rt PubSubRouter, o
260260
// DefaultGossipSubRouter returns a new GossipSubRouter with default parameters.
261261
func DefaultGossipSubRouter(h host.Host) *GossipSubRouter {
262262
params := DefaultGossipSubParams()
263-
return &GossipSubRouter{
263+
rt := &GossipSubRouter{
264264
peers: make(map[peer.ID]protocol.ID),
265265
mesh: make(map[string]map[peer.ID]struct{}),
266266
fanout: make(map[string]map[peer.ID]struct{}),
@@ -281,6 +281,14 @@ func DefaultGossipSubRouter(h host.Host) *GossipSubRouter {
281281
tagTracer: newTagTracer(h.ConnManager()),
282282
params: params,
283283
}
284+
285+
rt.extensions = newExtensionsState(PeerExtensions{}, func(p peer.ID) {
286+
if rt.score != nil {
287+
rt.score.AddPenalty(p, 10)
288+
}
289+
}, rt.sendRPC)
290+
291+
return rt
284292
}
285293

286294
// DefaultGossipSubParams returns the default gossip sub parameters
@@ -466,8 +474,10 @@ func WithGossipSubParams(cfg GossipSubParams) Option {
466474
// is the fanout map. Fanout peer lists are expired if we don't publish any
467475
// messages to their topic for GossipSubFanoutTTL.
468476
type GossipSubRouter struct {
469-
p *PubSub
470-
peers map[peer.ID]protocol.ID // peer protocols
477+
p *PubSub
478+
peers map[peer.ID]protocol.ID // peer protocols
479+
extensions *extensionsState
480+
471481
direct map[peer.ID]struct{} // direct peers
472482
mesh map[string]map[peer.ID]struct{} // topic meshes
473483
fanout map[string]map[peer.ID]struct{} // topic fanout
@@ -652,11 +662,18 @@ loop:
652662
}
653663
}
654664
gs.outbound[p] = outbound
665+
666+
if gs.feature(GossipSubFeatureExtensions, proto) {
667+
gs.extensions.AddPeer(p, helloPacket)
668+
}
655669
}
656670

657671
func (gs *GossipSubRouter) RemovePeer(p peer.ID) {
658672
log.Debugf("PEERDOWN: Remove disconnected peer %s", p)
659673
gs.tracer.RemovePeer(p)
674+
if gs.feature(GossipSubFeatureExtensions, gs.peers[p]) {
675+
gs.extensions.RemovePeer(p)
676+
}
660677
delete(gs.peers, p)
661678
for _, peers := range gs.mesh {
662679
delete(peers, p)
@@ -746,6 +763,8 @@ func (gs *GossipSubRouter) Preprocess(from peer.ID, msgs []*Message) {
746763
}
747764

748765
func (gs *GossipSubRouter) HandleRPC(rpc *RPC) {
766+
gs.extensions.HandleRPC(rpc)
767+
749768
ctl := rpc.GetControl()
750769
if ctl == nil {
751770
return

gossipsub_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3961,3 +3961,106 @@ func newSkeletonGossipsub(ctx context.Context, h host.Host) *skeletonGossipsub {
39613961
inRPC: sendRPC,
39623962
}
39633963
}
3964+
3965+
func TestExtensionsControlMessage(t *testing.T) {
3966+
for _, wellBehaved := range []bool{true, false} {
3967+
t.Run(fmt.Sprintf("wellBehaved=%t", wellBehaved), func(t *testing.T) {
3968+
3969+
ctx, cancel := context.WithCancel(context.Background())
3970+
defer cancel()
3971+
hosts := getDefaultHosts(t, 2)
3972+
psub0 := getGossipsub(ctx, hosts[0],
3973+
WithPeerScore(
3974+
&PeerScoreParams{
3975+
AppSpecificScore: func(peer.ID) float64 { return 0 },
3976+
BehaviourPenaltyWeight: -1,
3977+
BehaviourPenaltyDecay: ScoreParameterDecay(time.Minute),
3978+
DecayInterval: DefaultDecayInterval,
3979+
DecayToZero: DefaultDecayToZero,
3980+
},
3981+
&PeerScoreThresholds{
3982+
GossipThreshold: -100,
3983+
PublishThreshold: -500,
3984+
GraylistThreshold: -1000,
3985+
}),
3986+
WithMessageIdFn(func(msg *pb.Message) string {
3987+
return string(msg.Data)
3988+
}))
3989+
3990+
psub1 := newSkeletonGossipsub(ctx, hosts[1])
3991+
3992+
connect(t, hosts[0], hosts[1])
3993+
time.Sleep(time.Second)
3994+
3995+
loopTimes := 3
3996+
3997+
for i := range loopTimes {
3998+
rpcToSend := &pb.RPC{
3999+
Control: &pb.ControlMessage{
4000+
Extensions: &pb.ControlExtensions{},
4001+
},
4002+
}
4003+
if wellBehaved && i > 0 {
4004+
// A well behaved node does not repeat the control
4005+
// extension message
4006+
rpcToSend.Control.Extensions = nil
4007+
}
4008+
psub1.inRPC <- rpcToSend
4009+
}
4010+
4011+
time.Sleep(time.Second)
4012+
4013+
peerScore := psub0.rt.(*GossipSubRouter).score.Score(hosts[1].ID())
4014+
t.Log("Peer score:", peerScore)
4015+
if wellBehaved {
4016+
if peerScore < 0 {
4017+
t.Fatal("Peer score should not be negative")
4018+
}
4019+
} else {
4020+
if peerScore >= 0 {
4021+
t.Fatal("Peer score should not be positive")
4022+
}
4023+
}
4024+
})
4025+
}
4026+
}
4027+
4028+
func TestTestExtension(t *testing.T) {
4029+
hosts := getDefaultHosts(t, 2)
4030+
psub := getGossipsub(context.Background(), hosts[0], WithTestExtension())
4031+
_ = psub
4032+
4033+
ctx, cancel := context.WithCancel(context.Background())
4034+
defer cancel()
4035+
psub1 := newSkeletonGossipsub(ctx, hosts[1])
4036+
4037+
connect(t, hosts[0], hosts[1])
4038+
4039+
const timeout = 3 * time.Second
4040+
select {
4041+
case <-time.After(timeout):
4042+
t.Fatal("Timeout")
4043+
case r := <-psub1.outRPC:
4044+
if !*r.Control.Extensions.TestExtension {
4045+
t.Fatal("Unexpected RPC. First RPC should be the Extensions Control Message")
4046+
}
4047+
}
4048+
4049+
truePtr := true
4050+
psub1.inRPC <- &pb.RPC{
4051+
Control: &pb.ControlMessage{
4052+
Extensions: &pb.ControlExtensions{
4053+
TestExtension: &truePtr,
4054+
},
4055+
},
4056+
}
4057+
4058+
select {
4059+
case <-time.After(timeout):
4060+
t.Fatal("Timeout")
4061+
case r := <-psub1.outRPC:
4062+
if r.TestExtension == nil {
4063+
t.Fatal("Unexpected RPC. Next RPC should be the TestExtension Message")
4064+
}
4065+
}
4066+
}

testextension.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package pubsub
2+
3+
import (
4+
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
5+
"github.com/libp2p/go-libp2p/core/peer"
6+
)
7+
8+
type testExtension struct {
9+
sendRPC func(p peer.ID, r *RPC, urgent bool)
10+
}
11+
12+
func (e *testExtension) AddPeer(id peer.ID) {
13+
e.sendRPC(id, &RPC{
14+
RPC: pubsub_pb.RPC{
15+
TestExtension: &pubsub_pb.TestExtension{},
16+
},
17+
}, false)
18+
}

0 commit comments

Comments
 (0)