Skip to content

Commit 4fb75d6

Browse files
nalepaejames-prysm
andauthored
Add some metrics improvements (#15922)
* Define TCP and QUIC as `InternetProtocol` (no functional change). * Group types. (No functional changes) * Rename variables and use range syntax. * Add `p2pMaxPeers` and `p2pPeerCountDirectionType` metrics * `p2p_subscribed_topic_peer_total`: Reset to avoid dangling values. * `validateConfig`: - Use `Warning` with fields instead of `Warnf`. - Avoid to both modify in place the input value and return it. * Add `p2p_minimum_peers_per_subnet` metric. * `beaconConfig` => `cfg`. #15880 (comment) * Add changelog --------- Co-authored-by: james-prysm <[email protected]>
1 parent 6d596ed commit 4fb75d6

28 files changed

+181
-139
lines changed

beacon-chain/blockchain/service.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -472,8 +472,8 @@ func (s *Service) removeStartupState() {
472472
func (s *Service) updateCustodyInfoInDB(slot primitives.Slot) (primitives.Slot, uint64, error) {
473473
isSubscribedToAllDataSubnets := flags.Get().SubscribeAllDataSubnets
474474

475-
beaconConfig := params.BeaconConfig()
476-
custodyRequirement := beaconConfig.CustodyRequirement
475+
cfg := params.BeaconConfig()
476+
custodyRequirement := cfg.CustodyRequirement
477477

478478
// Check if the node was previously subscribed to all data subnets, and if so,
479479
// store the new status accordingly.
@@ -493,7 +493,7 @@ func (s *Service) updateCustodyInfoInDB(slot primitives.Slot) (primitives.Slot,
493493
// Compute the custody group count.
494494
custodyGroupCount := custodyRequirement
495495
if isSubscribedToAllDataSubnets {
496-
custodyGroupCount = beaconConfig.NumberOfCustodyGroups
496+
custodyGroupCount = cfg.NumberOfCustodyGroups
497497
}
498498

499499
// Safely compute the fulu fork slot.
@@ -536,11 +536,11 @@ func spawnCountdownIfPreGenesis(ctx context.Context, genesisTime time.Time, db d
536536
}
537537

538538
func fuluForkSlot() (primitives.Slot, error) {
539-
beaconConfig := params.BeaconConfig()
539+
cfg := params.BeaconConfig()
540540

541-
fuluForkEpoch := beaconConfig.FuluForkEpoch
542-
if fuluForkEpoch == beaconConfig.FarFutureEpoch {
543-
return beaconConfig.FarFutureSlot, nil
541+
fuluForkEpoch := cfg.FuluForkEpoch
542+
if fuluForkEpoch == cfg.FarFutureEpoch {
543+
return cfg.FarFutureSlot, nil
544544
}
545545

546546
forkFuluSlot, err := slots.EpochStart(fuluForkEpoch)

beacon-chain/core/helpers/validators.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ func ComputeProposerIndex(bState state.ReadOnlyBeaconState, activeIndices []prim
401401
return 0, errors.New("empty active indices list")
402402
}
403403
hashFunc := hash.CustomSHA256Hasher()
404-
beaconConfig := params.BeaconConfig()
404+
cfg := params.BeaconConfig()
405405
seedBuffer := make([]byte, len(seed)+8)
406406
copy(seedBuffer, seed[:])
407407

@@ -426,14 +426,14 @@ func ComputeProposerIndex(bState state.ReadOnlyBeaconState, activeIndices []prim
426426
offset := (i % 16) * 2
427427
randomValue := uint64(randomBytes[offset]) | uint64(randomBytes[offset+1])<<8
428428

429-
if effectiveBal*fieldparams.MaxRandomValueElectra >= beaconConfig.MaxEffectiveBalanceElectra*randomValue {
429+
if effectiveBal*fieldparams.MaxRandomValueElectra >= cfg.MaxEffectiveBalanceElectra*randomValue {
430430
return candidateIndex, nil
431431
}
432432
} else {
433433
binary.LittleEndian.PutUint64(seedBuffer[len(seed):], i/32)
434434
randomByte := hashFunc(seedBuffer)[i%32]
435435

436-
if effectiveBal*fieldparams.MaxRandomByte >= beaconConfig.MaxEffectiveBalance*uint64(randomByte) {
436+
if effectiveBal*fieldparams.MaxRandomByte >= cfg.MaxEffectiveBalance*uint64(randomByte) {
437437
return candidateIndex, nil
438438
}
439439
}

beacon-chain/core/peerdas/das_core.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,14 @@ func CustodyGroups(nodeId enode.ID, custodyGroupCount uint64) ([]uint64, error)
8989
// ComputeColumnsForCustodyGroup computes the columns for a given custody group.
9090
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/das-core.md#compute_columns_for_custody_group
9191
func ComputeColumnsForCustodyGroup(custodyGroup uint64) ([]uint64, error) {
92-
beaconConfig := params.BeaconConfig()
93-
numberOfCustodyGroups := beaconConfig.NumberOfCustodyGroups
92+
cfg := params.BeaconConfig()
93+
numberOfCustodyGroups := cfg.NumberOfCustodyGroups
9494

9595
if custodyGroup >= numberOfCustodyGroups {
9696
return nil, ErrCustodyGroupTooLarge
9797
}
9898

99-
numberOfColumns := beaconConfig.NumberOfColumns
99+
numberOfColumns := cfg.NumberOfColumns
100100

101101
columnsPerGroup := numberOfColumns / numberOfCustodyGroups
102102

@@ -112,9 +112,9 @@ func ComputeColumnsForCustodyGroup(custodyGroup uint64) ([]uint64, error) {
112112
// ComputeCustodyGroupForColumn computes the custody group for a given column.
113113
// It is the reciprocal function of ComputeColumnsForCustodyGroup.
114114
func ComputeCustodyGroupForColumn(columnIndex uint64) (uint64, error) {
115-
beaconConfig := params.BeaconConfig()
116-
numberOfColumns := beaconConfig.NumberOfColumns
117-
numberOfCustodyGroups := beaconConfig.NumberOfCustodyGroups
115+
cfg := params.BeaconConfig()
116+
numberOfColumns := cfg.NumberOfColumns
117+
numberOfCustodyGroups := cfg.NumberOfCustodyGroups
118118

119119
if columnIndex >= numberOfColumns {
120120
return 0, ErrIndexTooLarge

beacon-chain/core/peerdas/validator.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ func ValidatorsCustodyRequirement(state beaconState.ReadOnlyBeaconState, validat
8484
totalNodeBalance += validator.EffectiveBalance()
8585
}
8686

87-
beaconConfig := params.BeaconConfig()
88-
numberOfCustodyGroups := beaconConfig.NumberOfCustodyGroups
89-
validatorCustodyRequirement := beaconConfig.ValidatorCustodyRequirement
90-
balancePerAdditionalCustodyGroup := beaconConfig.BalancePerAdditionalCustodyGroup
87+
cfg := params.BeaconConfig()
88+
numberOfCustodyGroups := cfg.NumberOfCustodyGroups
89+
validatorCustodyRequirement := cfg.ValidatorCustodyRequirement
90+
balancePerAdditionalCustodyGroup := cfg.BalancePerAdditionalCustodyGroup
9191

9292
count := totalNodeBalance / balancePerAdditionalCustodyGroup
9393
return min(max(count, validatorCustodyRequirement), numberOfCustodyGroups), nil

beacon-chain/core/time/slot_epoch_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func TestAltairCompatible(t *testing.T) {
196196
}
197197

198198
func TestCanUpgradeTo(t *testing.T) {
199-
beaconConfig := params.BeaconConfig()
199+
cfg := params.BeaconConfig()
200200

201201
outerTestCases := []struct {
202202
name string
@@ -205,40 +205,40 @@ func TestCanUpgradeTo(t *testing.T) {
205205
}{
206206
{
207207
name: "Altair",
208-
forkEpoch: &beaconConfig.AltairForkEpoch,
208+
forkEpoch: &cfg.AltairForkEpoch,
209209
upgradeFunc: time.CanUpgradeToAltair,
210210
},
211211
{
212212
name: "Bellatrix",
213-
forkEpoch: &beaconConfig.BellatrixForkEpoch,
213+
forkEpoch: &cfg.BellatrixForkEpoch,
214214
upgradeFunc: time.CanUpgradeToBellatrix,
215215
},
216216
{
217217
name: "Capella",
218-
forkEpoch: &beaconConfig.CapellaForkEpoch,
218+
forkEpoch: &cfg.CapellaForkEpoch,
219219
upgradeFunc: time.CanUpgradeToCapella,
220220
},
221221
{
222222
name: "Deneb",
223-
forkEpoch: &beaconConfig.DenebForkEpoch,
223+
forkEpoch: &cfg.DenebForkEpoch,
224224
upgradeFunc: time.CanUpgradeToDeneb,
225225
},
226226
{
227227
name: "Electra",
228-
forkEpoch: &beaconConfig.ElectraForkEpoch,
228+
forkEpoch: &cfg.ElectraForkEpoch,
229229
upgradeFunc: time.CanUpgradeToElectra,
230230
},
231231
{
232232
name: "Fulu",
233-
forkEpoch: &beaconConfig.FuluForkEpoch,
233+
forkEpoch: &cfg.FuluForkEpoch,
234234
upgradeFunc: time.CanUpgradeToFulu,
235235
},
236236
}
237237

238238
for _, otc := range outerTestCases {
239239
params.SetupTestConfigCleanup(t)
240240
*otc.forkEpoch = 5
241-
params.OverrideBeaconConfig(beaconConfig)
241+
params.OverrideBeaconConfig(cfg)
242242

243243
innerTestCases := []struct {
244244
name string

beacon-chain/p2p/config.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state"
88
"github.com/OffchainLabs/prysm/v6/beacon-chain/db"
99
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
10+
"github.com/sirupsen/logrus"
1011
)
1112

1213
// This is the default queue size used if we have specified an invalid one.
@@ -63,12 +64,17 @@ func (cfg *Config) connManagerLowHigh() (int, int) {
6364
return low, high
6465
}
6566

66-
// validateConfig validates whether the values provided are accurate and will set
67-
// the appropriate values for those that are invalid.
68-
func validateConfig(cfg *Config) *Config {
69-
if cfg.QueueSize == 0 {
70-
log.Warnf("Invalid pubsub queue size of %d initialized, setting the quese size as %d instead", cfg.QueueSize, defaultPubsubQueueSize)
71-
cfg.QueueSize = defaultPubsubQueueSize
67+
// validateConfig validates whether the provided config has valid values and sets
68+
// the invalid ones to default.
69+
func validateConfig(cfg *Config) {
70+
if cfg.QueueSize > 0 {
71+
return
7272
}
73-
return cfg
73+
74+
log.WithFields(logrus.Fields{
75+
"queueSize": cfg.QueueSize,
76+
"default": defaultPubsubQueueSize,
77+
}).Warning("Invalid pubsub queue size, setting the queue size to the default value")
78+
79+
cfg.QueueSize = defaultPubsubQueueSize
7480
}

beacon-chain/p2p/custody.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -259,11 +259,11 @@ func (s *Service) custodyGroupCountFromPeerENR(pid peer.ID) uint64 {
259259
}
260260

261261
func fuluForkSlot() (primitives.Slot, error) {
262-
beaconConfig := params.BeaconConfig()
262+
cfg := params.BeaconConfig()
263263

264-
fuluForkEpoch := beaconConfig.FuluForkEpoch
265-
if fuluForkEpoch == beaconConfig.FarFutureEpoch {
266-
return beaconConfig.FarFutureSlot, nil
264+
fuluForkEpoch := cfg.FuluForkEpoch
265+
if fuluForkEpoch == cfg.FarFutureEpoch {
266+
return cfg.FarFutureSlot, nil
267267
}
268268

269269
forkFuluSlot, err := slots.EpochStart(fuluForkEpoch)

beacon-chain/p2p/monitoring.go

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package p2p
33
import (
44
"strings"
55

6+
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
67
"github.com/libp2p/go-libp2p/core/peer"
78
"github.com/libp2p/go-libp2p/core/peerstore"
89
"github.com/prometheus/client_golang/prometheus"
@@ -26,12 +27,25 @@ var (
2627
Help: "The number of peers in a given state.",
2728
},
2829
[]string{"state"})
30+
p2pMaxPeers = promauto.NewGauge(prometheus.GaugeOpts{
31+
Name: "p2p_max_peers",
32+
Help: "The target maximum number of peers.",
33+
})
34+
p2pPeerCountDirectionType = promauto.NewGaugeVec(prometheus.GaugeOpts{
35+
Name: "p2p_peer_count_direction_type",
36+
Help: "The number of peers in a given direction and type.",
37+
},
38+
[]string{"direction", "type"})
2939
connectedPeersCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
3040
Name: "connected_libp2p_peers",
3141
Help: "Tracks the total number of connected libp2p peers by agent string",
3242
},
3343
[]string{"agent"},
3444
)
45+
minimumPeersPerSubnet = promauto.NewGauge(prometheus.GaugeOpts{
46+
Name: "p2p_minimum_peers_per_subnet",
47+
Help: "The minimum number of peers to connect to per subnet",
48+
})
3549
avgScoreConnectedClients = promauto.NewGaugeVec(prometheus.GaugeOpts{
3650
Name: "connected_libp2p_peers_average_scores",
3751
Help: "Tracks the overall p2p scores of connected libp2p peers by agent string",
@@ -174,35 +188,45 @@ var (
174188
)
175189

176190
func (s *Service) updateMetrics() {
191+
store := s.Host().Peerstore()
177192
connectedPeers := s.peers.Connected()
193+
178194
p2pPeerCount.WithLabelValues("Connected").Set(float64(len(connectedPeers)))
179195
p2pPeerCount.WithLabelValues("Disconnected").Set(float64(len(s.peers.Disconnected())))
180196
p2pPeerCount.WithLabelValues("Connecting").Set(float64(len(s.peers.Connecting())))
181197
p2pPeerCount.WithLabelValues("Disconnecting").Set(float64(len(s.peers.Disconnecting())))
182198
p2pPeerCount.WithLabelValues("Bad").Set(float64(len(s.peers.Bad())))
183199

184-
store := s.Host().Peerstore()
185-
numConnectedPeersByClient := make(map[string]float64)
200+
upperTCP := strings.ToUpper(string(peers.TCP))
201+
upperQUIC := strings.ToUpper(string(peers.QUIC))
202+
203+
p2pPeerCountDirectionType.WithLabelValues("inbound", upperTCP).Set(float64(len(s.peers.InboundConnectedWithProtocol(peers.TCP))))
204+
p2pPeerCountDirectionType.WithLabelValues("inbound", upperQUIC).Set(float64(len(s.peers.InboundConnectedWithProtocol(peers.QUIC))))
205+
p2pPeerCountDirectionType.WithLabelValues("outbound", upperTCP).Set(float64(len(s.peers.OutboundConnectedWithProtocol(peers.TCP))))
206+
p2pPeerCountDirectionType.WithLabelValues("outbound", upperQUIC).Set(float64(len(s.peers.OutboundConnectedWithProtocol(peers.QUIC))))
207+
208+
connectedPeersCountByClient := make(map[string]float64)
186209
peerScoresByClient := make(map[string][]float64)
187-
for i := 0; i < len(connectedPeers); i++ {
188-
p := connectedPeers[i]
210+
for _, p := range connectedPeers {
189211
pid, err := peer.Decode(p.String())
190212
if err != nil {
191213
log.WithError(err).Debug("Could not decode peer string")
192214
continue
193215
}
194216

195217
foundName := agentFromPid(pid, store)
196-
numConnectedPeersByClient[foundName] += 1
218+
connectedPeersCountByClient[foundName] += 1
197219

198220
// Get peer scoring data.
199221
overallScore := s.peers.Scorers().Score(pid)
200222
peerScoresByClient[foundName] = append(peerScoresByClient[foundName], overallScore)
201223
}
224+
202225
connectedPeersCount.Reset() // Clear out previous results.
203-
for agent, total := range numConnectedPeersByClient {
226+
for agent, total := range connectedPeersCountByClient {
204227
connectedPeersCount.WithLabelValues(agent).Set(total)
205228
}
229+
206230
avgScoreConnectedClients.Reset() // Clear out previous results.
207231
for agent, scoringData := range peerScoresByClient {
208232
avgScore := average(scoringData)

beacon-chain/p2p/peers/status.go

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -81,29 +81,31 @@ const (
8181
type InternetProtocol string
8282

8383
const (
84-
TCP = "tcp"
85-
QUIC = "quic"
84+
TCP = InternetProtocol("tcp")
85+
QUIC = InternetProtocol("quic")
8686
)
8787

88-
// Status is the structure holding the peer status information.
89-
type Status struct {
90-
ctx context.Context
91-
scorers *scorers.Service
92-
store *peerdata.Store
93-
ipTracker map[string]uint64
94-
rand *rand.Rand
95-
ipColocationWhitelist []*net.IPNet
96-
}
97-
98-
// StatusConfig represents peer status service params.
99-
type StatusConfig struct {
100-
// PeerLimit specifies maximum amount of concurrent peers that are expected to be connect to the node.
101-
PeerLimit int
102-
// ScorerParams holds peer scorer configuration params.
103-
ScorerParams *scorers.Config
104-
// IPColocationWhitelist contains CIDR ranges that are exempt from IP colocation limits.
105-
IPColocationWhitelist []*net.IPNet
106-
}
88+
type (
89+
// Status is the structure holding the peer status information.
90+
Status struct {
91+
ctx context.Context
92+
scorers *scorers.Service
93+
store *peerdata.Store
94+
ipTracker map[string]uint64
95+
rand *rand.Rand
96+
ipColocationWhitelist []*net.IPNet
97+
}
98+
99+
// StatusConfig represents peer status service params.
100+
StatusConfig struct {
101+
// PeerLimit specifies maximum amount of concurrent peers that are expected to be connect to the node.
102+
PeerLimit int
103+
// ScorerParams holds peer scorer configuration params.
104+
ScorerParams *scorers.Config
105+
// IPColocationWhitelist contains CIDR ranges that are exempt from IP colocation limits.
106+
IPColocationWhitelist []*net.IPNet
107+
}
108+
)
107109

108110
// NewStatus creates a new status entity.
109111
func NewStatus(ctx context.Context, config *StatusConfig) *Status {

beacon-chain/p2p/rpc_topic_mappings.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -345,17 +345,17 @@ func TopicFromMessage(msg string, epoch primitives.Epoch) (string, error) {
345345
return "", errors.Errorf("%s: %s", invalidRPCMessageType, msg)
346346
}
347347

348-
beaconConfig := params.BeaconConfig()
348+
cfg := params.BeaconConfig()
349349

350350
// Check if the message is to be updated in fulu.
351-
if epoch >= beaconConfig.FuluForkEpoch {
351+
if epoch >= cfg.FuluForkEpoch {
352352
if version, ok := fuluMapping[msg]; ok {
353353
return protocolPrefix + msg + version, nil
354354
}
355355
}
356356

357357
// Check if the message is to be updated in altair.
358-
if epoch >= beaconConfig.AltairForkEpoch {
358+
if epoch >= cfg.AltairForkEpoch {
359359
if version, ok := altairMapping[msg]; ok {
360360
return protocolPrefix + msg + version, nil
361361
}

0 commit comments

Comments
 (0)