-
Couldn't load subscription status.
- Fork 1.2k
feat: use static subscribe instead of subscribeWithParameters for blob sidecar topics #15895
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -79,11 +79,6 @@ func (p subscribeParameters) logFields() logrus.Fields { | |
| } | ||
| } | ||
|
|
||
| // fullTopic is the fully qualified topic string, given to gossipsub. | ||
| func (p subscribeParameters) fullTopic(subnet uint64, suffix string) string { | ||
| return fmt.Sprintf(p.topicFormat, p.nse.ForkDigest, subnet) + suffix | ||
| } | ||
|
|
||
| // subnetTracker keeps track of which subnets we are subscribed to, out of the set of | ||
| // possible subnets described by a `subscribeParameters`. | ||
| type subnetTracker struct { | ||
|
|
@@ -210,19 +205,20 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool { | |
| return false | ||
| } | ||
| s.spawn(func() { | ||
| s.subscribe(p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, nse) | ||
| s.subscribe(p2p.BlockSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.BlockSubnetTopicFormat, | ||
| nse.ForkDigest), s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, nse) | ||
| }) | ||
| s.spawn(func() { | ||
| s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, nse) | ||
| s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.AggregateAndProofSubnetTopicFormat, nse.ForkDigest), s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, nse) | ||
| }) | ||
| s.spawn(func() { | ||
| s.subscribe(p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber, nse) | ||
| s.subscribe(p2p.ExitSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.ExitSubnetTopicFormat, nse.ForkDigest), s.validateVoluntaryExit, s.voluntaryExitSubscriber, nse) | ||
| }) | ||
| s.spawn(func() { | ||
| s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber, nse) | ||
| s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.ProposerSlashingSubnetTopicFormat, nse.ForkDigest), s.validateProposerSlashing, s.proposerSlashingSubscriber, nse) | ||
| }) | ||
| s.spawn(func() { | ||
| s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber, nse) | ||
| s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.AttesterSlashingSubnetTopicFormat, nse.ForkDigest), s.validateAttesterSlashing, s.attesterSlashingSubscriber, nse) | ||
| }) | ||
| s.spawn(func() { | ||
| s.subscribeWithParameters(subscribeParameters{ | ||
|
|
@@ -240,6 +236,7 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool { | |
| s.spawn(func() { | ||
| s.subscribe( | ||
| p2p.SyncContributionAndProofSubnetTopicFormat, | ||
| s.buildTopicWithoutSubnet(p2p.SyncContributionAndProofSubnetTopicFormat, nse.ForkDigest), | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
==> |
||
| s.validateSyncContributionAndProof, | ||
| s.syncContributionAndProofSubscriber, | ||
| nse, | ||
|
|
@@ -259,6 +256,7 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool { | |
| s.spawn(func() { | ||
| s.subscribe( | ||
| p2p.LightClientOptimisticUpdateTopicFormat, | ||
| s.buildTopicWithoutSubnet(p2p.LightClientOptimisticUpdateTopicFormat, nse.ForkDigest), | ||
| s.validateLightClientOptimisticUpdate, | ||
| noopHandler, | ||
| nse, | ||
|
|
@@ -267,6 +265,7 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool { | |
| s.spawn(func() { | ||
| s.subscribe( | ||
| p2p.LightClientFinalityUpdateTopicFormat, | ||
| s.buildTopicWithoutSubnet(p2p.LightClientFinalityUpdateTopicFormat, nse.ForkDigest), | ||
| s.validateLightClientFinalityUpdate, | ||
| noopHandler, | ||
| nse, | ||
|
|
@@ -280,6 +279,7 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool { | |
| s.spawn(func() { | ||
| s.subscribe( | ||
| p2p.BlsToExecutionChangeSubnetTopicFormat, | ||
| s.buildTopicWithoutSubnet(p2p.BlsToExecutionChangeSubnetTopicFormat, nse.ForkDigest), | ||
| s.validateBlsToExecutionChange, | ||
| s.blsToExecutionChangeSubscriber, | ||
| nse, | ||
|
|
@@ -289,32 +289,36 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool { | |
|
|
||
| // New gossip topic in Deneb, removed in Electra | ||
| if params.BeaconConfig().DenebForkEpoch <= nse.Epoch && nse.Epoch < params.BeaconConfig().ElectraForkEpoch { | ||
| s.spawn(func() { | ||
| s.subscribeWithParameters(subscribeParameters{ | ||
| topicFormat: p2p.BlobSubnetTopicFormat, | ||
| validate: s.validateBlob, | ||
| handle: s.blobSubscriber, | ||
| nse: nse, | ||
| getSubnetsToJoin: func(primitives.Slot) map[uint64]bool { | ||
| return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCount) | ||
| }, | ||
| for i := uint64(0); i < params.BeaconConfig().BlobsidecarSubnetCount; i++ { | ||
| subnet := i | ||
| s.spawn(func() { | ||
| topic := s.buildTopicWithSubnet(p2p.BlobSubnetTopicFormat, nse.ForkDigest, subnet) | ||
| s.subscribe( | ||
| p2p.BlobSubnetTopicFormat, | ||
| topic, | ||
| s.validateBlob, | ||
| s.blobSubscriber, | ||
| nse, | ||
| ) | ||
| }) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| // New gossip topic in Electra, removed in Fulu | ||
| if params.BeaconConfig().ElectraForkEpoch <= nse.Epoch && nse.Epoch < params.BeaconConfig().FuluForkEpoch { | ||
| s.spawn(func() { | ||
| s.subscribeWithParameters(subscribeParameters{ | ||
| topicFormat: p2p.BlobSubnetTopicFormat, | ||
| validate: s.validateBlob, | ||
| handle: s.blobSubscriber, | ||
| nse: nse, | ||
| getSubnetsToJoin: func(currentSlot primitives.Slot) map[uint64]bool { | ||
| return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCountElectra) | ||
| }, | ||
| for i := uint64(0); i < params.BeaconConfig().BlobsidecarSubnetCountElectra; i++ { | ||
| subnet := i | ||
| s.spawn(func() { | ||
| topic := s.buildTopicWithSubnet(p2p.BlobSubnetTopicFormat, nse.ForkDigest, subnet) | ||
| s.subscribe( | ||
| p2p.BlobSubnetTopicFormat, | ||
| topic, | ||
| s.validateBlob, | ||
| s.blobSubscriber, | ||
| nse, | ||
| ) | ||
| }) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| // New gossip topic in Fulu. | ||
|
|
@@ -349,27 +353,40 @@ func (s *Service) subscribeLogFields(topic string, nse params.NetworkScheduleEnt | |
|
|
||
| // subscribe to a given topic with a given validator and subscription handler. | ||
| // The base protobuf message is used to initialize new messages for decoding. | ||
| func (s *Service) subscribe(topic string, validator wrappedVal, handle subHandler, nse params.NetworkScheduleEntry) { | ||
| func (s *Service) subscribe(topicFormat string, fullTopic string, validator wrappedVal, handle subHandler, nse params.NetworkScheduleEntry) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why using both It seems |
||
| if err := s.waitForInitialSync(s.ctx); err != nil { | ||
| log.WithFields(s.subscribeLogFields(topic, nse)).WithError(err).Debug("Context cancelled while waiting for initial sync, not subscribing to topic") | ||
| log.WithFields(s.subscribeLogFields(topicFormat, nse)).WithError(err).Debug("Context cancelled while waiting for initial sync, not subscribing to topic") | ||
| return | ||
| } | ||
| // Check if this subscribe request is still valid - we may have crossed another fork epoch while waiting for initial sync. | ||
| if s.subscriptionRequestExpired(nse) { | ||
| // If we are already past the next fork epoch, do not subscribe to this topic. | ||
| log.WithFields(s.subscribeLogFields(topic, nse)).Debug("Not subscribing to topic as we are already past the next fork epoch") | ||
| log.WithFields(s.subscribeLogFields(topicFormat, nse)).Debug("Not subscribing to topic as we are already past the next fork epoch") | ||
| return | ||
| } | ||
| base := p2p.GossipTopicMappings(topic, nse.Epoch) | ||
| base := p2p.GossipTopicMappings(topicFormat, nse.Epoch) | ||
| if base == nil { | ||
| // Impossible condition as it would mean topic does not exist. | ||
| panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) // lint:nopanic -- Impossible condition. | ||
| panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) // lint:nopanic -- Impossible condition. | ||
| } | ||
| s.subscribeWithBase(fullTopic, validator, handle) | ||
| } | ||
|
|
||
| func (s *Service) buildTopicWithoutSubnet(topicFormat string, digest [4]byte) string { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Without taking tests into account, |
||
| if !strings.Contains(topicFormat, "%x") { | ||
| log.Error("Topic does not have appropriate formatter for digest") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we are here, the user have no idea of which subnet was wrong. |
||
| } | ||
| return (fmt.Sprintf(topicFormat, digest)) + s.cfg.p2p.Encoding().ProtocolSuffix() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why the extra parenthesis around |
||
| } | ||
|
|
||
| func (s *Service) buildTopicWithSubnet(topicFormat string, digest [4]byte, subnet uint64) string { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comments as in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The names Also, please add a godoc with an example of suitable topic format, so we can understand what the function does without reading it. |
||
| if !strings.Contains(topicFormat, "%x") { | ||
| log.Error("Topic does not have appropriate formatter for digest") | ||
| } | ||
| s.subscribeWithBase(s.addDigestToTopic(topic, nse.ForkDigest), validator, handle) | ||
| return (fmt.Sprintf(topicFormat, digest, subnet)) + s.cfg.p2p.Encoding().ProtocolSuffix() | ||
| } | ||
|
|
||
| func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle subHandler) *pubsub.Subscription { | ||
| topic += s.cfg.p2p.Encoding().ProtocolSuffix() | ||
| log := log.WithField("topic", topic) | ||
|
|
||
| // Do not resubscribe already seen subscriptions. | ||
|
|
@@ -532,7 +549,8 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p | |
| func (s *Service) pruneNotWanted(t *subnetTracker, wantedSubnets map[uint64]bool) { | ||
| for _, subnet := range t.unwanted(wantedSubnets) { | ||
| t.cancelSubscription(subnet) | ||
| s.unSubscribeFromTopic(t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix())) | ||
| fullTopic := s.buildTopicWithSubnet(t.topicFormat, t.nse.ForkDigest, subnet) | ||
| s.unSubscribeFromTopic(fullTopic) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -579,8 +597,7 @@ func (s *Service) trySubscribeSubnets(t *subnetTracker) { | |
| subnetsToJoin := t.getSubnetsToJoin(s.cfg.clock.CurrentSlot()) | ||
| s.pruneNotWanted(t, subnetsToJoin) | ||
| for _, subnet := range t.missing(subnetsToJoin) { | ||
| // TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent. | ||
| topic := t.fullTopic(subnet, "") | ||
| topic := s.buildTopicWithSubnet(t.topicFormat, t.nse.ForkDigest, subnet) | ||
| t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle)) | ||
| } | ||
| } | ||
|
|
@@ -782,22 +799,6 @@ func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID { | |
| return newPeers | ||
| } | ||
|
|
||
| // Add fork digest to topic. | ||
| func (*Service) addDigestToTopic(topic string, digest [4]byte) string { | ||
| if !strings.Contains(topic, "%x") { | ||
| log.Error("Topic does not have appropriate formatter for digest") | ||
| } | ||
| return fmt.Sprintf(topic, digest) | ||
| } | ||
|
|
||
| // Add the digest and index to subnet topic. | ||
| func (*Service) addDigestAndIndexToTopic(topic string, digest [4]byte, idx uint64) string { | ||
| if !strings.Contains(topic, "%x") { | ||
| log.Error("Topic does not have appropriate formatter for digest") | ||
| } | ||
| return fmt.Sprintf(topic, digest, idx) | ||
| } | ||
|
|
||
| func (s *Service) currentForkDigest() ([4]byte, error) { | ||
| return params.ForkDigest(s.cfg.clock.CurrentEpoch()), nil | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s.buildTopicWithoutSubnetcould be called and the result extracted in a variable to avoid extra long lines of code.