Skip to content

Commit 239b439

Browse files
committed
test: add skeleton gossipsub to drive a gossipsub peer
1 parent a5434b4 commit 239b439

File tree

1 file changed

+75
-0
lines changed

1 file changed

+75
-0
lines changed

gossipsub_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"time"
2121

2222
pb "github.com/libp2p/go-libp2p-pubsub/pb"
23+
"github.com/libp2p/go-msgio"
2324

2425
"github.com/libp2p/go-libp2p/core/host"
2526
"github.com/libp2p/go-libp2p/core/network"
@@ -3886,3 +3887,77 @@ func BenchmarkSplitRPCLargeMessages(b *testing.B) {
38863887
}
38873888
})
38883889
}
3890+
3891+
type skeletonGossipsub struct {
3892+
outRPC <-chan *pb.RPC
3893+
inRPC chan<- *pb.RPC
3894+
}
3895+
3896+
func newSkeletonGossipsub(ctx context.Context, h host.Host) *skeletonGossipsub {
3897+
recvRPC := make(chan *pb.RPC, 16)
3898+
sendRPC := make(chan *pb.RPC, 16)
3899+
3900+
h.SetStreamHandler(GossipSubID_v13, func(s network.Stream) {
3901+
// Open outbound stream to send writes too
3902+
outboundStream, err := h.NewStream(context.Background(), s.Conn().RemotePeer(), GossipSubID_v13)
3903+
if err != nil {
3904+
panic(err)
3905+
}
3906+
ctx, cancel := context.WithCancel(ctx)
3907+
defer cancel()
3908+
go func(ctx context.Context) {
3909+
defer outboundStream.Close()
3910+
w := msgio.NewVarintWriter(outboundStream)
3911+
for {
3912+
select {
3913+
case <-ctx.Done():
3914+
return
3915+
case r := <-sendRPC:
3916+
b, err := r.Marshal()
3917+
if err != nil {
3918+
panic(err)
3919+
}
3920+
err = w.WriteMsg(b)
3921+
if err != nil {
3922+
panic(err)
3923+
}
3924+
}
3925+
}
3926+
}(ctx)
3927+
3928+
r := msgio.NewVarintReaderSize(s, DefaultMaxMessageSize)
3929+
for {
3930+
msgbytes, err := r.ReadMsg()
3931+
if err != nil {
3932+
r.ReleaseMsg(msgbytes)
3933+
if err != io.EOF {
3934+
s.Reset()
3935+
log.Debugf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err)
3936+
} else {
3937+
// Just be nice. They probably won't read this
3938+
// but it doesn't hurt to send it.
3939+
s.Close()
3940+
}
3941+
3942+
return
3943+
}
3944+
if len(msgbytes) == 0 {
3945+
continue
3946+
}
3947+
3948+
rpc := new(pb.RPC)
3949+
err = rpc.Unmarshal(msgbytes)
3950+
r.ReleaseMsg(msgbytes)
3951+
if err != nil {
3952+
s.Reset()
3953+
panic(err)
3954+
}
3955+
recvRPC <- rpc
3956+
}
3957+
})
3958+
3959+
return &skeletonGossipsub{
3960+
outRPC: recvRPC,
3961+
inRPC: sendRPC,
3962+
}
3963+
}

0 commit comments

Comments
 (0)