diff --git a/cmd/p2psim/main.go b/cmd/p2psim/main.go index 451b0d942d..ae170c381c 100644 --- a/cmd/p2psim/main.go +++ b/cmd/p2psim/main.go @@ -40,10 +40,15 @@ import ( "encoding/json" "fmt" "io" + "math/rand" "os" + "os/signal" "strings" + "syscall" "text/tabwriter" + "time" + "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" @@ -53,6 +58,15 @@ import ( "github.com/urfave/cli/v2" ) +type NodeType int + +const ( + DefaultNode NodeType = iota + OutboundNode + DirtyNode + BootNode +) + var client *simulations.Client func main() { @@ -102,6 +116,27 @@ func main() { Usage: "load a network snapshot from stdin", Action: loadSnapshot, }, + { + Name: "network-stats", + Usage: "manage the simulation network", + Subcommands: []*cli.Command{ + { + Name: "peer-stats", + Usage: "show peer stats", + Action: getNetworkPeerStats, + }, + { + Name: "dht", + Usage: "Get all nodes in the DHT of all nodes", + Action: getAllDHT, + }, + { + Name: "peers", + Usage: "Get all peers of all nodes", + Action: getAllNodePeersInfo, + }, + }, + }, { Name: "node", Usage: "manage simulation nodes", @@ -132,6 +167,89 @@ func main() { Value: "", Usage: "node private key (hex encoded)", }, + &cli.BoolFlag{ + Name: "autofill.bootnodes", + Value: true, + Usage: "autofill bootnodes with existing bootnodes from manager", + }, + &cli.StringFlag{ + Name: "node.type", + Value: "default", + Usage: "Set node type (default, outbound, dirty, bootnode)", + }, + &cli.BoolFlag{ + Name: "enable.enrfilter", + Value: true, + Usage: "Enable ENR filter when adding nodes to the DHT", + }, + &cli.BoolFlag{ + Name: "only.outbound", + Usage: "Only allow outbound connections", + }, + utils.NoDiscoverFlag, + utils.BootnodesFlag, + utils.MaxPeersFlag, + }, + }, + { + Name: "create-multi", + Usage: "create a node", + Action: createMultiNode, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "name", + Value: "", + Usage: "node name", + }, + &cli.IntFlag{ + Name: "count", + Value: 1, + Usage: "number of nodes to create", + }, + &cli.StringFlag{ + Name: "services", + Value: "", + Usage: "node services (comma separated)", + }, + &cli.BoolFlag{ + Name: "start", + Value: true, + Usage: "start the node after creating successfully", + }, + &cli.BoolFlag{ + Name: "autofill.bootnodes", + Value: true, + Usage: "autofill bootnodes with existing bootnodes from manager", + }, + &cli.StringFlag{ + Name: "node.type", + Value: "default", + Usage: "Set node type (default, outbound, dirty, bootnode)", + }, + &cli.BoolFlag{ + Name: "enable.enrfilter", + Value: true, + Usage: "Enable ENR filter when adding nodes to the DHT", + }, + &cli.DurationFlag{ + Name: "interval", + Usage: "create interval", + }, + &cli.IntFlag{ + Name: "dirty.rate", + Usage: "Rate of dirty nodes", + }, + &cli.IntFlag{ + Name: "only.outbound.rate", + Usage: "Rate of nodes that only allow outbound connections", + }, + &cli.BoolFlag{ + Name: "only.outbound", + Usage: "Only allow outbound connections", + }, + utils.NoDiscoverFlag, + utils.BootnodesFlag, + utils.MaxPeersFlag, }, }, { @@ -176,6 +294,29 @@ func main() { }, }, }, + { + Name: "peer-stats", + Usage: "show peer stats", + ArgsUsage: "", + Action: getNodePeerStats, + }, + }, + }, + { + Name: "log-stats", + Usage: "log peer stats to a CSV file", + Action: startLogStats, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "file", + Usage: "output file", + Value: "stats.csv", + }, + &cli.DurationFlag{ + Name: "interval", + Usage: "log interval", + Value: 15 * time.Second, + }, }, }, } @@ -278,6 +419,7 @@ func createNode(ctx *cli.Context) error { return cli.ShowCommandHelp(ctx, ctx.Command.Name) } config := adapters.RandomNodeConfig() + config.UseTCPDialer = true config.Name = ctx.String("name") if key := ctx.String("key"); key != "" { privKey, err := crypto.HexToECDSA(key) @@ -287,6 +429,22 @@ func createNode(ctx *cli.Context) error { config.ID = enode.PubkeyToIDV4(&privKey.PublicKey) config.PrivateKey = privKey } + if ctx.Bool(utils.NoDiscoverFlag.Name) { + config.NoDiscovery = true + } + config.BootstrapNodeURLs = ctx.String(utils.BootnodesFlag.Name) + if ctx.Bool("autofill.bootnodes") { + bootnodeURLs, err := getBootnodes() + if err != nil { + return err + } + if bootnodeURLs != "" { + config.BootstrapNodeURLs += "," + bootnodeURLs + } + } + config.MaxPeers = ctx.Int(utils.MaxPeersFlag.Name) + config.DisableTCPListener = ctx.Bool("only.outbound") + config.EnableENRFilter = ctx.Bool("enable.enrfilter") if services := ctx.String("services"); services != "" { config.Lifecycles = strings.Split(services, ",") } @@ -298,6 +456,131 @@ func createNode(ctx *cli.Context) error { return nil } +func getBootnodes() (string, error) { + nodes, err := client.GetNodes() + if err != nil { + return "", err + } + + bootnodes := make([]string, 0) + for _, node := range nodes { + if strings.HasPrefix(node.Name, "bootnode") { + bootnodes = append(bootnodes, node.Enode) + } + } + + return strings.Join(bootnodes, ","), nil +} + +func fillArray(array []NodeType, value NodeType, count int) []NodeType { + for i := 0; i < count; i++ { + array = append(array, value) + } + return array +} + +func createMultiNode(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + + t := time.Now() + + createInterval := ctx.Duration("interval") + bootNodeURLs := ctx.String(utils.BootnodesFlag.Name) + if ctx.Bool("autofill.bootnodes") { + existedBootnodeURLs, err := getBootnodes() + if err != nil { + return err + } + if existedBootnodeURLs != "" { + bootNodeURLs += "," + existedBootnodeURLs + } + } + + // Create array of node types based on the ratio of outbound, dirty and default nodes + count := ctx.Int("count") + if count < 1 { + return fmt.Errorf("count must be greater than 0") + } + + outboundRate := ctx.Int("only.outbound.rate") + if outboundRate < 0 || outboundRate > 100 { + return fmt.Errorf("outbound rate must be between 0 and 100") + } + + dirtyRate := ctx.Int("dirty.rate") + if dirtyRate < 0 || dirtyRate > 100 { + return fmt.Errorf("dirty rate must be between 0 and 100") + } + + numOutboundNode := count * outboundRate / 100 + numDirtyNode := count * dirtyRate / 100 + numDefaultNode := count - numOutboundNode - numDirtyNode + nodeTypes := make([]NodeType, 0) + nodeTypes = fillArray(nodeTypes, OutboundNode, numOutboundNode) + nodeTypes = fillArray(nodeTypes, DirtyNode, numDirtyNode) + nodeTypes = fillArray(nodeTypes, DefaultNode, numDefaultNode) + rand.Shuffle(len(nodeTypes), func(i, j int) { nodeTypes[i], nodeTypes[j] = nodeTypes[j], nodeTypes[i] }) + + // If bootnode flag is set, create all nodes as bootnodes + isBootnode := ctx.String("node.type") == "bootnode" + + // Create nodes + for i, nodeType := range nodeTypes { + var nodeName string + if isBootnode { + nodeName = fmt.Sprintf("bootnode-%d-%d", t.Unix(), i) + ctx.Set(utils.BootnodesFlag.Name, "") + } else { + switch nodeType { + case OutboundNode: + ctx.Set("only.outbound", "true") + ctx.Set("node.type", "outbound") + ctx.Set("services", "valid") + nodeName = fmt.Sprintf("outbound-%d-%d", t.Unix(), i) + case DirtyNode: + ctx.Set("only.outbound", "false") + ctx.Set("node.type", "dirty") + ctx.Set("services", "invalid") + nodeName = fmt.Sprintf("dirty-%d-%d", t.Unix(), i) + default: + ctx.Set("only.outbound", "false") + ctx.Set("node.type", "default") + ctx.Set("services", "valid") + nodeName = fmt.Sprintf("node-%d-%d", t.Unix(), i) + } + } + ctx.Set("name", nodeName) + for { + err := createNode(ctx) + if err == nil { + // Start node if needed + if ctx.Bool("start") { + err = client.StartNode(nodeName) + if err == nil { + fmt.Fprintln(ctx.App.Writer, "Started", nodeName) + break + } + } else { + break + } + } + + fmt.Fprintln(ctx.App.Writer, "Failed to create node, retrying...", nodeName, err) + // Try to create the node again + client.DeleteNode(nodeName) + time.Sleep(500 * time.Millisecond) + + } + if createInterval > 0 { + time.Sleep(createInterval) + } + } + + return nil +} + func showNode(ctx *cli.Context) error { if ctx.Args().Len() != 1 { @@ -429,3 +712,140 @@ func rpcSubscribe(client *rpc.Client, out io.Writer, method string, args ...stri } } } + +func getNodePeerStats(ctx *cli.Context) error { + if ctx.Args().Len() != 1 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + nodeName := ctx.Args().Get(0) + stats, err := client.GetNodePeerStats(nodeName) + if err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Peer stats of", ctx.String("node")) + fmt.Fprintln(ctx.App.Writer, "Peer count: ", stats.PeerCount) + fmt.Fprintln(ctx.App.Writer, "Tried: ", stats.Tried) + fmt.Fprintln(ctx.App.Writer, "Failed: ", stats.Failed) + fmt.Fprintln(ctx.App.Writer, "Nodes count: ", stats.DifferentNodesDiscovered) + fmt.Fprintln(ctx.App.Writer, "DHT: ", stats.DHTBuckets) + return nil +} + +func getNetworkPeerStats(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + stats, err := client.GetAllNodePeerStats() + if err != nil { + return err + } + for nodeID, stats := range stats { + fmt.Fprintln(ctx.App.Writer, "Peer stats of", nodeID) + fmt.Fprintln(ctx.App.Writer, "Peer count: ", stats.PeerCount) + fmt.Fprintln(ctx.App.Writer, "Tried: ", stats.Tried) + fmt.Fprintln(ctx.App.Writer, "Failed: ", stats.Failed) + fmt.Fprintln(ctx.App.Writer, "Nodes count: ", stats.DifferentNodesDiscovered) + fmt.Fprintln(ctx.App.Writer, "DHT: ", stats.DHTBuckets) + } + return nil +} + +func getAllDHT(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + + nodes, err := client.GetNodes() + if err != nil { + return err + } + nodeID2Name := make(map[string]string) + for _, node := range nodes { + nodeID2Name[node.ID] = node.Name + } + + dht, err := client.GetAllNodeDHT() + if err != nil { + return err + } + for nodeName, buckets := range dht { + fmt.Fprintf(ctx.App.Writer, "%s: ", nodeName) + for _, bucket := range buckets { + fmt.Fprintf(ctx.App.Writer, "[") + for _, node := range bucket { + fmt.Fprintf(ctx.App.Writer, "%s ", nodeID2Name[node.ID().String()]) + } + fmt.Fprintf(ctx.App.Writer, "],") + } + fmt.Fprintf(ctx.App.Writer, "\n") + } + return nil +} + +func getAllNodePeersInfo(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + + nodes, err := client.GetNodes() + if err != nil { + return err + } + nodeID2Name := make(map[string]string) + for _, node := range nodes { + nodeID2Name[node.ID] = node.Name + } + + peers, err := client.GetAllNodePeersInfo() + if err != nil { + return err + } + for nodeName, peerInfos := range peers { + fmt.Fprintf(ctx.App.Writer, "%s: ", nodeName) + for _, peerInfo := range peerInfos { + fmt.Fprintf(ctx.App.Writer, "(%s %v), ", nodeID2Name[peerInfo.ID], peerInfo.Network.Inbound) + } + fmt.Fprintf(ctx.App.Writer, "\n") + } + return nil +} + +func startLogStats(ctx *cli.Context) error { + if ctx.Args().Len() != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + csvFile := ctx.String("file") + f, err := os.OpenFile(csvFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return err + } + defer f.Close() + + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + timer := time.NewTicker(ctx.Duration("interval")) + + f.WriteString("node,timestamp,type,value\n") + +loop: + for { + select { + case <-sig: + return nil + case <-timer.C: + stats, err := client.GetAllNodePeerStats() + if err != nil { + fmt.Fprintln(ctx.App.Writer, err) + goto loop + } + for nodeID, stats := range stats { + t := time.Now() + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "PeerCount", stats.PeerCount)) + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "Tried", stats.Tried)) + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "Failed", stats.Failed)) + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "DifferentNodesDiscovered", stats.DifferentNodesDiscovered)) + f.WriteString(fmt.Sprintf("%s,%d,%s,%d\n", nodeID, t.Unix(), "DHTBuckets", stats.DHTBuckets)) + } + } + } +} diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index 267a0ffd60..3fb61fd83a 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -155,6 +155,22 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { return t, nil } +// NodesInDHT returns all nodes in the DHT. +// For testing only. +func (t *UDPv4) NodesInDHT() [][]enode.Node { + if t == nil || t.tab == nil { + return nil + } + nodes := make([][]enode.Node, len(t.tab.buckets)) + for i, bucket := range t.tab.buckets { + nodes[i] = make([]enode.Node, len(bucket.entries)) + for j, entry := range bucket.entries { + nodes[i][j] = entry.Node + } + } + return nodes +} + // Self returns the local node. func (t *UDPv4) Self() *enode.Node { return t.localNode.Node() diff --git a/p2p/server.go b/p2p/server.go index 4b437dc3b7..e0d5686282 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -305,6 +305,18 @@ func (c *conn) set(f connFlag, val bool) { } } +// SetListenFunc sets the function used to accept inbound connections. +// For testing only. +func (srv *Server) SetListenFunc(f func(network, addr string) (net.Listener, error)) { + srv.listenFunc = f +} + +// UDPv4 returns the UDPv4 discovery table. +// For testing only. +func (srv *Server) UDPv4() *discover.UDPv4 { + return srv.ntab +} + // LocalNode returns the local node record. func (srv *Server) LocalNode() *enode.LocalNode { return srv.localnode diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index 35ccdfb068..e4d80cb2ae 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -364,6 +364,21 @@ func (n *ExecNode) Snapshots() (map[string][]byte, error) { return snapshots, n.client.Call(&snapshots, "simulation_snapshot") } +// Empty PeerStats +func (n *ExecNode) PeerStats() *PeerStats { + return &PeerStats{} +} + +// Empty DHT +func (n *ExecNode) NodesInDHT() [][]enode.Node { + return nil +} + +// Empty PeersInfo +func (n *ExecNode) PeersInfo() []*p2p.PeerInfo { + return nil +} + // execNodeConfig is used to serialize the node configuration so it can be // passed to the child process as a JSON encoded environment variable type execNodeConfig struct { diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go index 1cb26a8ea0..a221004c8a 100644 --- a/p2p/simulations/adapters/inproc.go +++ b/p2p/simulations/adapters/inproc.go @@ -20,16 +20,20 @@ import ( "context" "errors" "fmt" - "math" "net" + "strings" "sync" + "time" + "github.com/ethereum/go-ethereum/core/forkid" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/simulations/pipes" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/gorilla/websocket" ) @@ -91,16 +95,35 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { return nil, err } + p2pCfg := p2p.Config{ + PrivateKey: config.PrivateKey, + MaxPeers: config.MaxPeers, + NoDiscovery: config.NoDiscovery, + EnableMsgEvents: config.EnableMsgEvents, + } + if !config.DisableTCPListener { + p2pCfg.ListenAddr = fmt.Sprintf(":%d", config.Port) + } else { + p2pCfg.ListenAddr = "" + } + if len(config.BootstrapNodeURLs) > 0 { + for _, url := range strings.Split(config.BootstrapNodeURLs, ",") { + if len(url) == 0 { + continue + } + n, err := enode.Parse(enode.ValidSchemes, url) + if err != nil { + log.Warn("invalid bootstrap node URL", "url", url, "err", err) + continue + } + p2pCfg.BootstrapNodes = append(p2pCfg.BootstrapNodes, n) + } + } + n, err := node.New(&node.Config{ - P2P: p2p.Config{ - PrivateKey: config.PrivateKey, - MaxPeers: math.MaxInt32, - NoDiscovery: true, - Dialer: s, - EnableMsgEvents: config.EnableMsgEvents, - }, + P2P: p2pCfg, ExternalSigner: config.ExternalSigner, - Logger: log.New("node.id", id.String()), + Logger: log.New("node.name", config.Name), }) if err != nil { return nil, err @@ -113,6 +136,34 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { adapter: s, running: make(map[string]node.Lifecycle), } + if !config.UseTCPDialer { + n.Server().Dialer = s + } else { + simNode.dialer = &wrapTCPDialerStats{ + d: &net.Dialer{Timeout: 15 * time.Second}, + resultCh: make(chan resultDial, 10000), + } + n.Server().Dialer = simNode.dialer + } + + if config.EnableENRFilter { + n.Server().SetFilter(func(id forkid.ID) error { + var eth struct { + ForkID forkid.ID + Rest []rlp.RawValue `rlp:"tail"` + } + if err := n.Server().Self().Record().Load(enr.WithEntry("eth", ð)); err != nil { + log.Warn("failed to load eth entry", "err", err) + return err + } + + if id == eth.ForkID { + return nil + } + return forkid.ErrLocalIncompatibleOrStale + }) + } + s.nodes[id] = simNode return simNode, nil } @@ -162,6 +213,8 @@ func (s *SimAdapter) GetNode(id enode.ID) (*SimNode, bool) { // net.Pipe (see SimAdapter.Dial), running devp2p protocols directly over that // pipe type SimNode struct { + ctx context.Context + cancel context.CancelFunc lock sync.RWMutex ID enode.ID config *NodeConfig @@ -170,6 +223,11 @@ type SimNode struct { running map[string]node.Lifecycle client *rpc.Client registerOnce sync.Once + dialer *wrapTCPDialerStats + + // Track different nodes discovered by the node + discoveredNodes sync.Map + differentNodeCount int } // Close closes the underlaying node.Node to release @@ -240,6 +298,15 @@ func (sn *SimNode) Snapshots() (map[string][]byte, error) { // Start registers the services and starts the underlying devp2p node func (sn *SimNode) Start(snapshots map[string][]byte) error { + sn.lock.Lock() + if sn.cancel != nil { + sn.lock.Unlock() + return errors.New("node already started") + } + + sn.ctx, sn.cancel = context.WithCancel(context.Background()) + sn.lock.Unlock() + // ensure we only register the services once in the case of the node // being stopped and then started again var regErr error @@ -282,6 +349,8 @@ func (sn *SimNode) Start(snapshots map[string][]byte) error { sn.client = client sn.lock.Unlock() + go sn.trackDiscoveredNode() + return nil } @@ -292,6 +361,10 @@ func (sn *SimNode) Stop() error { sn.client.Close() sn.client = nil } + if sn.cancel != nil { + sn.cancel() + sn.cancel = nil + } sn.lock.Unlock() return sn.node.Close() } @@ -351,3 +424,88 @@ func (sn *SimNode) NodeInfo() *p2p.NodeInfo { } return server.NodeInfo() } + +// PeerStats returns statistics about the node's peers +func (sn *SimNode) PeerStats() *PeerStats { + if sn.dialer == nil || sn.node.Server() == nil || sn.node.Server().UDPv4() == nil { + return &PeerStats{} + } + + nodesCount := 0 + sn.discoveredNodes.Range(func(_, _ interface{}) bool { + nodesCount++ + return true + }) + buckets := sn.node.Server().UDPv4().NodesInDHT() + bucketSizes := make([]int, len(buckets)) + for i, bucket := range buckets { + bucketSizes[i] = len(bucket) + } + return &PeerStats{ + PeerCount: sn.node.Server().PeerCount(), + Failed: sn.dialer.failed, + Tried: sn.dialer.tried, + DifferentNodesDiscovered: nodesCount, + DHTBuckets: bucketSizes, + } +} + +// NodesInDHT returns the nodes in the DHT buckets +func (sn *SimNode) NodesInDHT() [][]enode.Node { + if sn.node.Server() == nil || sn.node.Server().UDPv4() == nil { + return nil + } + return sn.node.Server().UDPv4().NodesInDHT() +} + +// PeersInfo returns information about the node's peers +func (sn *SimNode) PeersInfo() []*p2p.PeerInfo { + if sn.node.Server() == nil { + return nil + } + return sn.node.Server().PeersInfo() +} + +// trackDiscoveredNodes tracks all nodes discovered by the node and dial by wrapTCPDialerStats +func (sn *SimNode) trackDiscoveredNode() { + if sn.dialer == nil { + return + } + + for { + select { + case <-sn.ctx.Done(): + return + case r := <-sn.dialer.resultCh: + if _, ok := sn.discoveredNodes.LoadOrStore(r.node, struct{}{}); !ok { + sn.differentNodeCount++ + } + if r.err != nil { + log.Info("dial failed", "node", r.node, "err", r.err) + sn.dialer.failed++ + } + log.Info("dial tried", "from", sn.ID, "to", r.node) + sn.dialer.tried++ + } + } +} + +// wrapTCPDialerStats is a wrapper around the net.Dialer which tracks nodes that have been tried to dial +type wrapTCPDialerStats struct { + d *net.Dialer + failed int + tried int + resultCh chan resultDial +} + +type resultDial struct { + err error + node enode.ID +} + +func (d wrapTCPDialerStats) Dial(ctx context.Context, dest *enode.Node) (net.Conn, error) { + nodeAddr := &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()} + conn, err := d.d.DialContext(ctx, "tcp", nodeAddr.String()) + d.resultCh <- resultDial{err, dest.ID()} + return conn, err +} diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go index aeb8ef7772..7ae5cb2169 100644 --- a/p2p/simulations/adapters/types.go +++ b/p2p/simulations/adapters/types.go @@ -42,7 +42,6 @@ import ( // * SimNode - An in-memory node // * ExecNode - A child process node // * DockerNode - A Docker container node -// type Node interface { // Addr returns the node's address (e.g. an Enode URL) Addr() []byte @@ -65,6 +64,15 @@ type Node interface { // Snapshots creates snapshots of the running services Snapshots() (map[string][]byte, error) + + // PeerStats returns the node's peer statistics + PeerStats() *PeerStats + + // NodesInDHT returns all nodes in the DHT + NodesInDHT() [][]enode.Node + + // PeersInfo returns information about the node's peers + PeersInfo() []*p2p.PeerInfo } // NodeAdapter is used to create Nodes in a simulation network @@ -119,7 +127,8 @@ type NodeConfig struct { // function to sanction or prevent suggesting a peer Reachable func(id enode.ID) bool - Port uint16 + Port uint16 + DisableTCPListener bool // LogFile is the log file name of the p2p node at runtime. // @@ -131,34 +140,61 @@ type NodeConfig struct { // // The default verbosity is INFO. LogVerbosity log.Lvl + + // NoDiscovery disables the peer discovery mechanism (manual peer addition) + NoDiscovery bool + + // Use default TCP dialer + UseTCPDialer bool + + // BootstrapNodes is the list of bootstrap nodes + BootstrapNodeURLs string + + // MaxPeers is the maximum number of peers + MaxPeers int + + // EnableENRFilter enables the ENR filter when adding node into the DHT + EnableENRFilter bool } // nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding // all fields as strings type nodeConfigJSON struct { - ID string `json:"id"` - PrivateKey string `json:"private_key"` - Name string `json:"name"` - Lifecycles []string `json:"lifecycles"` - Properties []string `json:"properties"` - EnableMsgEvents bool `json:"enable_msg_events"` - Port uint16 `json:"port"` - LogFile string `json:"logfile"` - LogVerbosity int `json:"log_verbosity"` + ID string `json:"id"` + PrivateKey string `json:"private_key"` + Name string `json:"name"` + Lifecycles []string `json:"lifecycles"` + Properties []string `json:"properties"` + EnableMsgEvents bool `json:"enable_msg_events"` + Port uint16 `json:"port"` + DisableTCPListener bool `json:"disable_tcp_listener"` + LogFile string `json:"logfile"` + LogVerbosity int `json:"log_verbosity"` + NoDiscovery bool `json:"no_discovery"` + UseTCPDialer bool `json:"use_tcp_dialer"` + BootstrapNodeURLs string `json:"bootstrap_node_urls"` + MaxPeers int `json:"max_peers"` + EnableENRFilter bool `json:"enable_enr_filter"` } // MarshalJSON implements the json.Marshaler interface by encoding the config // fields as strings func (n *NodeConfig) MarshalJSON() ([]byte, error) { confJSON := nodeConfigJSON{ - ID: n.ID.String(), - Name: n.Name, - Lifecycles: n.Lifecycles, - Properties: n.Properties, - Port: n.Port, - EnableMsgEvents: n.EnableMsgEvents, - LogFile: n.LogFile, - LogVerbosity: int(n.LogVerbosity), + ID: n.ID.String(), + Name: n.Name, + Lifecycles: n.Lifecycles, + Properties: n.Properties, + Port: n.Port, + DisableTCPListener: n.DisableTCPListener, + EnableMsgEvents: n.EnableMsgEvents, + LogFile: n.LogFile, + LogVerbosity: int(n.LogVerbosity), + NoDiscovery: n.NoDiscovery, + UseTCPDialer: n.UseTCPDialer, + BootstrapNodeURLs: n.BootstrapNodeURLs, + MaxPeers: n.MaxPeers, + EnableENRFilter: n.EnableENRFilter, } if n.PrivateKey != nil { confJSON.PrivateKey = hex.EncodeToString(crypto.FromECDSA(n.PrivateKey)) @@ -196,9 +232,15 @@ func (n *NodeConfig) UnmarshalJSON(data []byte) error { n.Lifecycles = confJSON.Lifecycles n.Properties = confJSON.Properties n.Port = confJSON.Port + n.DisableTCPListener = confJSON.DisableTCPListener n.EnableMsgEvents = confJSON.EnableMsgEvents n.LogFile = confJSON.LogFile n.LogVerbosity = log.Lvl(confJSON.LogVerbosity) + n.NoDiscovery = confJSON.NoDiscovery + n.UseTCPDialer = confJSON.UseTCPDialer + n.BootstrapNodeURLs = confJSON.BootstrapNodeURLs + n.MaxPeers = confJSON.MaxPeers + n.EnableENRFilter = confJSON.EnableENRFilter return nil } @@ -223,12 +265,15 @@ func RandomNodeConfig() *NodeConfig { enodId := enode.PubkeyToIDV4(&prvkey.PublicKey) return &NodeConfig{ - PrivateKey: prvkey, - ID: enodId, - Name: fmt.Sprintf("node_%s", enodId.String()), - Port: port, - EnableMsgEvents: true, - LogVerbosity: log.LvlInfo, + PrivateKey: prvkey, + ID: enodId, + Name: fmt.Sprintf("node_%s", enodId.String()), + Port: port, + EnableMsgEvents: true, + LogVerbosity: log.LvlInfo, + DisableTCPListener: true, + UseTCPDialer: false, + MaxPeers: node.DefaultConfig.P2P.MaxPeers, } } @@ -324,3 +369,12 @@ func (n *NodeConfig) initEnode(ip net.IP, tcpport int, udpport int) error { func (n *NodeConfig) initDummyEnode() error { return n.initEnode(net.IPv4(127, 0, 0, 1), int(n.Port), 0) } + +// PeerStats is a struct that holds the statistics of a node's peers +type PeerStats struct { + PeerCount int `json:"peer_count"` + Tried int `json:"tried"` + DifferentNodesDiscovered int `json:"different_nodes_discovered"` + Failed int `json:"failed"` + DHTBuckets []int `json:"dht_buckets"` +} diff --git a/p2p/simulations/examples/discovery/README.md b/p2p/simulations/examples/discovery/README.md new file mode 100644 index 0000000000..8af0e5511e --- /dev/null +++ b/p2p/simulations/examples/discovery/README.md @@ -0,0 +1,177 @@ +# devp2p simulation for discovery benchmark + +## Overview + +In this simulation, we will focus on benchmarking the discovery process by simulating the network with a number of nodes and bootnodes. We aim to measure peer quality when bypassing and not bypassing the ENR filter when an ENR request fails, as well as adjusting the DHT bucket size from 16 to 256. + + +## Setup dirty node config + +In the basecode, we dont have any dirty node config, so if you start any dirty node, the simulation network will treat them as a valid node. A dirty node will try not to response if it receives a ENR request from another node in the network to simulate the case that the node is not compatible with the network but still added to the DHT of other nodes. Below is how to modify the source code to enable dirty node config: + +1. Add `dirty` field to `UDPv4` to mark the node as a dirty node and modify `handleENRRequest` to not response if the node is dirty + +```go +// p2p/discover/v4_udp.go + +type UDPv4 struct { + ... + dirty bool // for testing +} + +// SetDirty sets the dirty flag for testing purposes. +func (t *UDPv4) SetDirty(dirty bool) { + t.dirty = dirty +} + +func (t *UDPv4) handleENRRequest(h *packetHandlerV4, from *net.UDPAddr, fromID enode.ID, mac []byte) { + if t.dirty { // simulate dirty node, for testing purposes only + return + } + ... +} +``` + +2. Add `Dirty` field to config of `p2p.Server` to mark the node as a dirty node + +```go +// p2p/server.go + +type Config struct { + ... + Dirty bool +} + +func (srv *Server) setupDiscovery() { + ... + if !srv.NoDiscovery { + ... + + // Mark the node as dirty (for testing purposes). + if srv.Dirty { + srv.ntab.SetDirty(true) + } + } + ... +} +``` + +3. Finally, set the `Dirty` field when create simulation node if node is dirty +```go +// p2p/simulations/adapters/inproc.go + +func (s *SimAdapter) NewNode(cfg *NodeConfig) (Node, error) { + ... + p2pCfg := p2p.Config{ + ... + Dirty: strings.HasPrefix(config.Name, "dirty"), + } + ... +} +``` + +## Manual run + +Run the p2psim server by `go run main.go` in `p2p/simulations/examples/discovery`, and in another terminal, we can use `p2psim` cli to start, manage new nodes in the simulation network. Example: + +``` bash +$ cd p2p/simulations/examples/discovery +$ go run main.go +INFO [12-24|14:46:39.132] starting simulation server port=8888 +``` + +``` bash +$ p2psim node create-multi --count 2 --node.type bootnode --autofill.bootnodes=false +Created bootnode-1735026417-0 +Started bootnode-1735026417-0 +Created bootnode-1735026417-1 +Started bootnode-1735026417-1 +``` + +``` bash +$ p2psim node create-multi --count 16 --dirty.rate 50 +Created node-1735026508-0 +Started node-1735026508-0 +Created node-1735026508-1 +Started node-1735026508-1 +Created dirty-1735026508-2 +Started dirty-1735026508-2 +Created dirty-1735026508-3 +Started dirty-1735026508-3 +Created dirty-1735026508-4 +Started dirty-1735026508-4 +Created node-1735026508-5 +Started node-1735026508-5 +Created node-1735026508-6 +Started node-1735026508-6 +Created node-1735026508-7 +Started node-1735026508-7 +Created node-1735026508-8 +Started node-1735026508-8 +Created dirty-1735026508-9 +Started dirty-1735026508-9 +Created node-1735026508-10 +Started node-1735026508-10 +Created dirty-1735026508-11 +Started dirty-1735026508-11 +Created dirty-1735026508-12 +Started dirty-1735026508-12 +Created node-1735026508-13 +Started node-1735026508-13 +Created dirty-1735026508-14 +Started dirty-1735026508-14 +Created dirty-1735026508-15 +Started dirty-1735026508-15 +``` + +## Strategy + +We have some types of nodes: +1. Dirty nodes: Nodes that are not compatible with the valid nodes +2. Valid nodes +3. Valid nodes that only accept outbound connections + +The benchmark default will run with 350 nodes and 2 bootnodes (can be adjusted in the configuration), and will be rolled out in 3 batches following below steps: +1. Start the simulation server, 2 bootnodes and rolling out nodes in batch 1 and sleep for a while +2. Rolling out nodes in batch 2 and sleep for a while +3. Rolling out nodes in batch 3 and sleep for a while +4. Export the DHT and peers info + +## Run benchmark + +To run the simulation, run `./discovery.sh` to start both p2psim server and start the benchmark with default parameters. + +### Configuration + +To show the help message, run `./discovery.sh --help`. Besides the configurable parameters, we can modify the source code to change the behavior of the simulation: +- If we want to change the DHT bucket size, we can modify the const `bucketSize` in `p2p/discover/table.go` +- Or filter node if request ENR fails, we can modify the function `Table::filterNode` in `p2p/discover/table.go` to: + +```go +func (tab *Table) filterNode(n *node) bool { + ... + if node, err := tab.net.RequestENR(unwrapNode(n)); err != nil { + return true // modify here + } else if !tab.enrFilter(node.Record()) { + ... + } + ... +} +``` + +### Export data + +After running the simulation, some files will be generated in the `results_dir` folder: +- `$test_name.log`: Log of all nodes in the simulation network +- `stats_$test_name.csv`: Statistics of the simulation, including the number of peers, distribution of nodes in the DHT, ... +- `peers_$test_name.log`: List peers of each node in the network +- `dht_$test_name.log`: List nodes in the DHT of each node + +### Visualization + +To visualize the data, we can use the `gen_chart.py` script to plot the data. +Supported types: +- `dht_peer`: Ratio between the number of peers (outbound) and the number of nodes in the DHT +- `PeerCount`: Number of peers of each node +- `DHTBuckets`: Size of DHT +- And more type can see in the stats file diff --git a/p2p/simulations/examples/discovery/discovery.sh b/p2p/simulations/examples/discovery/discovery.sh new file mode 100755 index 0000000000..e515e5f46b --- /dev/null +++ b/p2p/simulations/examples/discovery/discovery.sh @@ -0,0 +1,146 @@ +#!/bin/bash +# +# Boot a simulation network and start benchmarking test +# Export the logs, peers info, and DHT info to files for visualization. + +main_cmd="go run ." +p2psim_cmd="p2psim" + +if ! which p2psim &>/dev/null; then + fail "missing p2psim binary (you need to build cmd/p2psim and put it in \$PATH)" +fi + +# Number of nodes to start for each batch +distribution=(150 100 100) + +# Rate of dirty node that not compatible with the valid node +dirty_rate=60 + +# Rate of valid node but only accept outbound connection +only_outbound_rate=20 + +# Interval between each node creation +node_creation_interval=1s + +# Sleep time between each batch +sleep_time=1200 + +# Number of bootnodes +num_bootnodes=2 + +# Other flags +other="" + +# Test name +test_name="discovery_benchmark" + +# Directory to store results +results_dir="./results" + +# Parse the arguments +while [[ $# -gt 0 ]]; do + case $1 in + --dirty.rate) + dirty_rate=$2 + shift 2 + ;; + --only.outbound.rate) + only_outbound_rate=$2 + shift 2 + ;; + --interval) + node_creation_interval=$2 + shift 2 + ;; + --distribution) + IFS=',' read -r -a distribution <<< "$2" + shift 2 + ;; + --sleep) + sleep_time=$2 + shift 2 + ;; + --num.bootnodes) + num_bootnodes=$2 + shift 2 + ;; + --disable.enrfilter) + other+=" --enable.enrfilter=false" + shift + ;; + --testname) + test_name=$2 + shift 2 + ;; + --results.dir) + results_dir=$2 + shift 2 + ;; + --help) + echo "USAGE: $0 [OPTIONS]" + echo "OPTIONS:" + echo " --dirty.rate Rate of dirty node that not compatible with the valid node (default 60 means 60% dirty nodes)" + echo " --only.outbound.rate Rate of valid node but only accept outbound connection (default 20 means 20% nodes only accept outbound connection)" + echo " --interval Interval between each node creation (default 1s)" + echo " --distribution Number of nodes to start for each batch (default 150,100,100)" + echo " --sleep