Skip to content
Draft
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"io"
"net"
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"


"github.com/learning-at-home/go-libp2p-daemon/internal/utils"
pb "github.com/learning-at-home/go-libp2p-daemon/pb"


ggio "github.com/gogo/protobuf/io"
ma "github.com/multiformats/go-multiaddr"
)
Expand Down Expand Up @@ -123,6 +124,14 @@ func (d *Daemon) handleConn(c net.Conn) {
log.Debugw("error writing response", "error", err)
return
}

case pb.Request_BANDWIDTH_METRICS:
res := d.doBandwidthMetrics(&req)
err := w.WriteMsg(res)
if err != nil {
log.Debugw("error writing response", "error", err)
return
}

case pb.Request_CONNMANAGER:
res := d.doConnManager(&req)
Expand Down Expand Up @@ -329,6 +338,63 @@ func (d *Daemon) doRemoveStreamHandler(req *pb.Request) *pb.Response {
return okResponse()
}

func (d *Daemon) doBandwidthMetrics(req *pb.Request) *pb.Response {
if d.bandwidth_metrics == nil {
log.Debugw("error getting bandwidth metrics: daemon option is off")
return errorResponseString("error getting bandwidth metrics: daemon option is off")
}
selfRateIn := 0.0
selfRateOut := 0.0
if req.Bwr.GetForSelf() {
stats := d.bandwidth_metrics.GetBandwidthTotals()
selfRateIn = stats.RateIn
selfRateOut = stats.RateOut
}
res := okResponse()
res.Bwr = &pb.BandwidthMetricsResponse {
SelfRateIn: &selfRateIn,
SelfRateOut: &selfRateOut,
}

if req.Bwr.GetForAllPeers() {
peerStats := d.bandwidth_metrics.GetBandwidthByPeer()
peers := make([]*pb.PeerInfo, len(peerStats))
i := 0
for id, stats := range peerStats {
rateIn := stats.RateIn
rateOut := stats.RateOut
peers[i] = &pb.PeerInfo{
Id: []byte(id),
Ratein: &rateIn,
Rateout: &rateOut,
}
i++
}
res.Bwr.Peers = peers
} else {
peers := make([]*pb.PeerInfo, len(req.Bwr.Ids))
i := 0
for _, id := range req.Bwr.Ids {
peer_id, err := peer.IDFromBytes([]byte(id))
if err != nil {
log.Debugw("error parsing peer ID", "error", err)
return errorResponse(err)
}
stats := d.bandwidth_metrics.GetBandwidthForPeer(peer_id)

peers[i] = &pb.PeerInfo{
Id: []byte(id),
Ratein: &stats.RateIn,
Rateout: &stats.RateOut,
}
i++
}
res.Bwr.Peers = peers
}

return res
}

func (d *Daemon) doListPeers(req *pb.Request) *pb.Response {
conns := d.host.Network().Conns()
peers := make([]*pb.PeerInfo, len(conns))
Expand Down
1 change: 1 addition & 0 deletions connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/libp2p/go-libp2p/core/peer"


pb "github.com/learning-at-home/go-libp2p-daemon/pb"
)

Expand Down
12 changes: 12 additions & 0 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"github.com/learning-at-home/go-libp2p-daemon/config"
"github.com/learning-at-home/go-libp2p-daemon/internal/utils"


"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
Expand Down Expand Up @@ -64,13 +66,16 @@ type Daemon struct {
cancelTerminateTimer context.CancelFunc

persistentConnMsgMaxSize int

bandwidth_metrics *metrics.BandwidthCounter
}

func NewDaemon(
ctx context.Context,
maddr ma.Multiaddr,
dhtMode string,
relayDiscovery bool,
bandwidthMetricsEnabled bool,
trustedRelays []string,
persistentConnMsgMaxSize int,
opts ...libp2p.Option,
Expand Down Expand Up @@ -101,6 +106,13 @@ func NewDaemon(
opts = append(opts, libp2p.Routing(d.DHTRoutingFactory(dhtOpts)))
}

if bandwidthMetricsEnabled {
d.bandwidth_metrics = metrics.NewBandwidthCounter()
opts = append(opts, libp2p.BandwidthReporter(d.bandwidth_metrics))
} else {
d.bandwidth_metrics = nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: d.bandwidth_metrics are nil be default anyway.
ignore this comment if you wish

}

h, err := libp2p.New(opts...)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.20
require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.3
github.com/google/uuid v1.4.0
github.com/hashicorp/go-multierror v1.1.1
github.com/ipfs/go-cid v0.4.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Expand Down Expand Up @@ -549,6 +550,7 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
3 changes: 2 additions & 1 deletion p2pd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func main() {
persistentConnMaxMsgSize := flag.Int("persistentConnMaxMsgSize", 4*1024*1024,
"Max size for persistent connection messages (bytes). Default: 4 MiB")
muxer := flag.String("muxer", "yamux", "muxer to use for connections")
bandwidthMetricsEnabled := flag.Bool("bandwidthMetrics", true, "Enables collection of bandwidth rate metrics")

flag.Parse()

Expand Down Expand Up @@ -388,7 +389,7 @@ func main() {
// start daemon
d, err := p2pd.NewDaemon(
defaultCtx, &c.ListenAddr, c.DHT.Mode,
c.Relay.Discovery, trustedRelays, *persistentConnMaxMsgSize,
c.Relay.Discovery, *bandwidthMetricsEnabled, trustedRelays, *persistentConnMaxMsgSize,
opts...)
if err != nil {
log.Fatal(err)
Expand Down
8 changes: 8 additions & 0 deletions pb/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,11 @@ all: $(pbgos)

%.pb.go: %.proto
protoc --gogofast_out=. --proto_path=$(GOPATH)/src:. $<

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please resolve the merge conflict and make sure it works

<<<<<<< HEAD
# Actually, I compiled it manually! Run "protoc -I=. --go_out=. ./p2pd.proto" in pb folder
# protoc --gofast_out=. p2pd.proto
=======
# Update:
# protoc -I=. --go_out=. ./p2pd.proto --go_opt=paths=source_relative
>>>>>>> 0b29642bd58095978b8261de0896021bec038ccd
Loading