11package pubsub
22
33import (
4- "bufio"
54 "bytes"
65 "context"
76 crand "crypto/rand"
@@ -20,10 +19,8 @@ import (
2019 "testing/quick"
2120 "time"
2221
23- "github.com/gogo/protobuf/proto"
2422 pb "github.com/libp2p/go-libp2p-pubsub/pb"
2523 "github.com/libp2p/go-msgio"
26- "github.com/multiformats/go-varint"
2724
2825 "github.com/libp2p/go-libp2p/core/host"
2926 "github.com/libp2p/go-libp2p/core/network"
@@ -3891,104 +3888,6 @@ func BenchmarkSplitRPCLargeMessages(b *testing.B) {
38913888 })
38923889}
38933890
3894- func TestExtensionsControlMessage (t * testing.T ) {
3895- for _ , wellBehaved := range []bool {true , false } {
3896- t .Run (fmt .Sprintf ("wellBehaved=%t" , wellBehaved ), func (t * testing.T ) {
3897-
3898- ctx , cancel := context .WithCancel (context .Background ())
3899- defer cancel ()
3900- hosts := getDefaultHosts (t , 2 )
3901- psub0 := getGossipsub (ctx , hosts [0 ],
3902- WithPeerScore (
3903- & PeerScoreParams {
3904- AppSpecificScore : func (peer.ID ) float64 { return 0 },
3905- BehaviourPenaltyWeight : - 1 ,
3906- BehaviourPenaltyDecay : ScoreParameterDecay (time .Minute ),
3907- DecayInterval : DefaultDecayInterval ,
3908- DecayToZero : DefaultDecayToZero ,
3909- },
3910- & PeerScoreThresholds {
3911- GossipThreshold : - 100 ,
3912- PublishThreshold : - 500 ,
3913- GraylistThreshold : - 1000 ,
3914- }),
3915- WithMessageIdFn (func (msg * pb.Message ) string {
3916- return string (msg .Data )
3917- }))
3918-
3919- hosts1Msgs := make (chan * pb.RPC , 10 )
3920- hosts [1 ].SetStreamHandler (GossipSubDefaultProtocols [0 ], func (s network.Stream ) {
3921- defer s .Close ()
3922- rdr := bufio .NewReader (s )
3923- for {
3924- msgSize , err := varint .ReadUvarint (rdr )
3925- if err != nil {
3926- return
3927- }
3928- msgBytes := make ([]byte , msgSize )
3929- if _ , err := io .ReadFull (rdr , msgBytes ); err != nil {
3930- t .Fatal (err )
3931- }
3932- var rpc pb.RPC
3933- err = proto .Unmarshal (msgBytes , & rpc )
3934- if err != nil {
3935- t .Fatal (err )
3936- }
3937- hosts1Msgs <- & rpc
3938- }
3939- })
3940-
3941- connect (t , hosts [0 ], hosts [1 ])
3942- time .Sleep (time .Second )
3943-
3944- go func () {
3945- s , err := hosts [1 ].NewStream (ctx , hosts [0 ].ID (), GossipSubDefaultProtocols [0 ])
3946- if err != nil {
3947- panic (err )
3948- }
3949- defer s .Close ()
3950-
3951- loopTimes := 3
3952-
3953- for i := range loopTimes {
3954- rpcToSend := & pb.RPC {
3955- Control : & pb.ControlMessage {
3956- Extensions : & pb.ControlExtensions {},
3957- },
3958- }
3959- if wellBehaved && i > 0 {
3960- // A well behaved node does not repeat the control
3961- // extension message
3962- rpcToSend .Control .Extensions = nil
3963- }
3964- toSendBytes , err := proto .Marshal (rpcToSend )
3965- if err != nil {
3966- panic (err )
3967- }
3968- varintBuf := make ([]byte , 4 )
3969- n := varint .PutUvarint (varintBuf , uint64 (len (toSendBytes )))
3970- s .Write (varintBuf [:n ])
3971- s .Write (toSendBytes )
3972- }
3973- }()
3974-
3975- time .Sleep (time .Second )
3976-
3977- peerScore := psub0 .rt .(* GossipSubRouter ).score .Score (hosts [1 ].ID ())
3978- t .Log ("Peer score:" , peerScore )
3979- if wellBehaved {
3980- if peerScore < 0 {
3981- t .Fatal ("Peer score should not be negative" )
3982- }
3983- } else {
3984- if peerScore >= 0 {
3985- t .Fatal ("Peer score should not be positive" )
3986- }
3987- }
3988- })
3989- }
3990- }
3991-
39923891type skeletonGossipsub struct {
39933892 outRPC <- chan * pb.RPC
39943893 inRPC chan <- * pb.RPC
@@ -4063,6 +3962,69 @@ func newSkeletonGossipsub(ctx context.Context, h host.Host) *skeletonGossipsub {
40633962 }
40643963}
40653964
3965+ func TestExtensionsControlMessage (t * testing.T ) {
3966+ for _ , wellBehaved := range []bool {true , false } {
3967+ t .Run (fmt .Sprintf ("wellBehaved=%t" , wellBehaved ), func (t * testing.T ) {
3968+
3969+ ctx , cancel := context .WithCancel (context .Background ())
3970+ defer cancel ()
3971+ hosts := getDefaultHosts (t , 2 )
3972+ psub0 := getGossipsub (ctx , hosts [0 ],
3973+ WithPeerScore (
3974+ & PeerScoreParams {
3975+ AppSpecificScore : func (peer.ID ) float64 { return 0 },
3976+ BehaviourPenaltyWeight : - 1 ,
3977+ BehaviourPenaltyDecay : ScoreParameterDecay (time .Minute ),
3978+ DecayInterval : DefaultDecayInterval ,
3979+ DecayToZero : DefaultDecayToZero ,
3980+ },
3981+ & PeerScoreThresholds {
3982+ GossipThreshold : - 100 ,
3983+ PublishThreshold : - 500 ,
3984+ GraylistThreshold : - 1000 ,
3985+ }),
3986+ WithMessageIdFn (func (msg * pb.Message ) string {
3987+ return string (msg .Data )
3988+ }))
3989+
3990+ psub1 := newSkeletonGossipsub (ctx , hosts [1 ])
3991+
3992+ connect (t , hosts [0 ], hosts [1 ])
3993+ time .Sleep (time .Second )
3994+
3995+ loopTimes := 3
3996+
3997+ for i := range loopTimes {
3998+ rpcToSend := & pb.RPC {
3999+ Control : & pb.ControlMessage {
4000+ Extensions : & pb.ControlExtensions {},
4001+ },
4002+ }
4003+ if wellBehaved && i > 0 {
4004+ // A well behaved node does not repeat the control
4005+ // extension message
4006+ rpcToSend .Control .Extensions = nil
4007+ }
4008+ psub1 .inRPC <- rpcToSend
4009+ }
4010+
4011+ time .Sleep (time .Second )
4012+
4013+ peerScore := psub0 .rt .(* GossipSubRouter ).score .Score (hosts [1 ].ID ())
4014+ t .Log ("Peer score:" , peerScore )
4015+ if wellBehaved {
4016+ if peerScore < 0 {
4017+ t .Fatal ("Peer score should not be negative" )
4018+ }
4019+ } else {
4020+ if peerScore >= 0 {
4021+ t .Fatal ("Peer score should not be positive" )
4022+ }
4023+ }
4024+ })
4025+ }
4026+ }
4027+
40664028func TestTestExtension (t * testing.T ) {
40674029 hosts := getDefaultHosts (t , 2 )
40684030 psub := getGossipsub (context .Background (), hosts [0 ], WithTestExtension ())
0 commit comments