diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index e63c554171b5..9336f054de50 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -79,11 +79,6 @@ func (p subscribeParameters) logFields() logrus.Fields { } } -// fullTopic is the fully qualified topic string, given to gossipsub. -func (p subscribeParameters) fullTopic(subnet uint64, suffix string) string { - return fmt.Sprintf(p.topicFormat, p.nse.ForkDigest, subnet) + suffix -} - // subnetTracker keeps track of which subnets we are subscribed to, out of the set of // possible subnets described by a `subscribeParameters`. type subnetTracker struct { @@ -210,19 +205,20 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool { return false } s.spawn(func() { - s.subscribe(p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, nse) + s.subscribe(p2p.BlockSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.BlockSubnetTopicFormat, + nse.ForkDigest), s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, nse) }) s.spawn(func() { - s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, nse) + s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.AggregateAndProofSubnetTopicFormat, nse.ForkDigest), s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, nse) }) s.spawn(func() { - s.subscribe(p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber, nse) + s.subscribe(p2p.ExitSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.ExitSubnetTopicFormat, nse.ForkDigest), s.validateVoluntaryExit, s.voluntaryExitSubscriber, nse) }) s.spawn(func() { - s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber, nse) + s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.ProposerSlashingSubnetTopicFormat, nse.ForkDigest), s.validateProposerSlashing, s.proposerSlashingSubscriber, nse) }) s.spawn(func() { - s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber, nse) + s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.AttesterSlashingSubnetTopicFormat, nse.ForkDigest), s.validateAttesterSlashing, s.attesterSlashingSubscriber, nse) }) s.spawn(func() { s.subscribeWithParameters(subscribeParameters{ @@ -240,6 +236,7 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool { s.spawn(func() { s.subscribe( p2p.SyncContributionAndProofSubnetTopicFormat, + s.buildTopicWithoutSubnet(p2p.SyncContributionAndProofSubnetTopicFormat, nse.ForkDigest), s.validateSyncContributionAndProof, s.syncContributionAndProofSubscriber, nse, @@ -259,6 +256,7 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool { s.spawn(func() { s.subscribe( p2p.LightClientOptimisticUpdateTopicFormat, + s.buildTopicWithoutSubnet(p2p.LightClientOptimisticUpdateTopicFormat, nse.ForkDigest), s.validateLightClientOptimisticUpdate, noopHandler, nse, @@ -267,6 +265,7 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool { s.spawn(func() { s.subscribe( p2p.LightClientFinalityUpdateTopicFormat, + s.buildTopicWithoutSubnet(p2p.LightClientFinalityUpdateTopicFormat, nse.ForkDigest), s.validateLightClientFinalityUpdate, noopHandler, nse, @@ -280,6 +279,7 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool { s.spawn(func() { s.subscribe( p2p.BlsToExecutionChangeSubnetTopicFormat, + s.buildTopicWithoutSubnet(p2p.BlsToExecutionChangeSubnetTopicFormat, nse.ForkDigest), s.validateBlsToExecutionChange, s.blsToExecutionChangeSubscriber, nse, @@ -289,32 +289,36 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool { // New gossip topic in Deneb, removed in Electra if params.BeaconConfig().DenebForkEpoch <= nse.Epoch && nse.Epoch < params.BeaconConfig().ElectraForkEpoch { - s.spawn(func() { - s.subscribeWithParameters(subscribeParameters{ - topicFormat: p2p.BlobSubnetTopicFormat, - validate: s.validateBlob, - handle: s.blobSubscriber, - nse: nse, - getSubnetsToJoin: func(primitives.Slot) map[uint64]bool { - return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCount) - }, + for i := uint64(0); i < params.BeaconConfig().BlobsidecarSubnetCount; i++ { + subnet := i + s.spawn(func() { + topic := s.buildTopicWithSubnet(p2p.BlobSubnetTopicFormat, nse.ForkDigest, subnet) + s.subscribe( + p2p.BlobSubnetTopicFormat, + topic, + s.validateBlob, + s.blobSubscriber, + nse, + ) }) - }) + } } // New gossip topic in Electra, removed in Fulu if params.BeaconConfig().ElectraForkEpoch <= nse.Epoch && nse.Epoch < params.BeaconConfig().FuluForkEpoch { - s.spawn(func() { - s.subscribeWithParameters(subscribeParameters{ - topicFormat: p2p.BlobSubnetTopicFormat, - validate: s.validateBlob, - handle: s.blobSubscriber, - nse: nse, - getSubnetsToJoin: func(currentSlot primitives.Slot) map[uint64]bool { - return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCountElectra) - }, + for i := uint64(0); i < params.BeaconConfig().BlobsidecarSubnetCountElectra; i++ { + subnet := i + s.spawn(func() { + topic := s.buildTopicWithSubnet(p2p.BlobSubnetTopicFormat, nse.ForkDigest, subnet) + s.subscribe( + p2p.BlobSubnetTopicFormat, + topic, + s.validateBlob, + s.blobSubscriber, + nse, + ) }) - }) + } } // New gossip topic in Fulu. @@ -349,27 +353,40 @@ func (s *Service) subscribeLogFields(topic string, nse params.NetworkScheduleEnt // subscribe to a given topic with a given validator and subscription handler. // The base protobuf message is used to initialize new messages for decoding. -func (s *Service) subscribe(topic string, validator wrappedVal, handle subHandler, nse params.NetworkScheduleEntry) { +func (s *Service) subscribe(topicFormat string, fullTopic string, validator wrappedVal, handle subHandler, nse params.NetworkScheduleEntry) { if err := s.waitForInitialSync(s.ctx); err != nil { - log.WithFields(s.subscribeLogFields(topic, nse)).WithError(err).Debug("Context cancelled while waiting for initial sync, not subscribing to topic") + log.WithFields(s.subscribeLogFields(topicFormat, nse)).WithError(err).Debug("Context cancelled while waiting for initial sync, not subscribing to topic") return } // Check if this subscribe request is still valid - we may have crossed another fork epoch while waiting for initial sync. if s.subscriptionRequestExpired(nse) { // If we are already past the next fork epoch, do not subscribe to this topic. - log.WithFields(s.subscribeLogFields(topic, nse)).Debug("Not subscribing to topic as we are already past the next fork epoch") + log.WithFields(s.subscribeLogFields(topicFormat, nse)).Debug("Not subscribing to topic as we are already past the next fork epoch") return } - base := p2p.GossipTopicMappings(topic, nse.Epoch) + base := p2p.GossipTopicMappings(topicFormat, nse.Epoch) if base == nil { // Impossible condition as it would mean topic does not exist. - panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) // lint:nopanic -- Impossible condition. + panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) // lint:nopanic -- Impossible condition. + } + s.subscribeWithBase(fullTopic, validator, handle) +} + +func (s *Service) buildTopicWithoutSubnet(topicFormat string, digest [4]byte) string { + if !strings.Contains(topicFormat, "%x") { + log.Error("Topic does not have appropriate formatter for digest") + } + return (fmt.Sprintf(topicFormat, digest)) + s.cfg.p2p.Encoding().ProtocolSuffix() +} + +func (s *Service) buildTopicWithSubnet(topicFormat string, digest [4]byte, subnet uint64) string { + if !strings.Contains(topicFormat, "%x") { + log.Error("Topic does not have appropriate formatter for digest") } - s.subscribeWithBase(s.addDigestToTopic(topic, nse.ForkDigest), validator, handle) + return (fmt.Sprintf(topicFormat, digest, subnet)) + s.cfg.p2p.Encoding().ProtocolSuffix() } func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle subHandler) *pubsub.Subscription { - topic += s.cfg.p2p.Encoding().ProtocolSuffix() log := log.WithField("topic", topic) // Do not resubscribe already seen subscriptions. @@ -532,7 +549,8 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p func (s *Service) pruneNotWanted(t *subnetTracker, wantedSubnets map[uint64]bool) { for _, subnet := range t.unwanted(wantedSubnets) { t.cancelSubscription(subnet) - s.unSubscribeFromTopic(t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix())) + fullTopic := s.buildTopicWithSubnet(t.topicFormat, t.nse.ForkDigest, subnet) + s.unSubscribeFromTopic(fullTopic) } } @@ -579,8 +597,7 @@ func (s *Service) trySubscribeSubnets(t *subnetTracker) { subnetsToJoin := t.getSubnetsToJoin(s.cfg.clock.CurrentSlot()) s.pruneNotWanted(t, subnetsToJoin) for _, subnet := range t.missing(subnetsToJoin) { - // TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent. - topic := t.fullTopic(subnet, "") + topic := s.buildTopicWithSubnet(t.topicFormat, t.nse.ForkDigest, subnet) t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle)) } } @@ -782,22 +799,6 @@ func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID { return newPeers } -// Add fork digest to topic. -func (*Service) addDigestToTopic(topic string, digest [4]byte) string { - if !strings.Contains(topic, "%x") { - log.Error("Topic does not have appropriate formatter for digest") - } - return fmt.Sprintf(topic, digest) -} - -// Add the digest and index to subnet topic. -func (*Service) addDigestAndIndexToTopic(topic string, digest [4]byte, idx uint64) string { - if !strings.Contains(topic, "%x") { - log.Error("Topic does not have appropriate formatter for digest") - } - return fmt.Sprintf(topic, digest, idx) -} - func (s *Service) currentForkDigest() ([4]byte, error) { return params.ForkDigest(s.cfg.clock.CurrentEpoch()), nil } diff --git a/beacon-chain/sync/subscriber_test.go b/beacon-chain/sync/subscriber_test.go index fd6ac72e85b2..c316e71bf6f6 100644 --- a/beacon-chain/sync/subscriber_test.go +++ b/beacon-chain/sync/subscriber_test.go @@ -64,7 +64,8 @@ func TestSubscribe_ReceivesValidMessage(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error { + fullTopic := r.buildTopicWithoutSubnet(topic, nse.ForkDigest) + r.subscribe(topic, fullTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error { m, ok := msg.(*pb.SignedVoluntaryExit) assert.Equal(t, true, ok, "Object is not of type *pb.SignedVoluntaryExit") if m.Exit == nil || m.Exit.Epoch != 55 { @@ -110,12 +111,12 @@ func TestSubscribe_UnsubscribeTopic(t *testing.T) { p2pService.Digest = nse.ForkDigest topic := "/eth2/%x/voluntary_exit" - r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error { + fullTopic := r.buildTopicWithoutSubnet(topic, nse.ForkDigest) + r.subscribe(topic, fullTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error { return nil }, nse) r.markForChainStart() - fullTopic := fmt.Sprintf(topic, p2pService.Digest) + p2pService.Encoding().ProtocolSuffix() assert.Equal(t, true, r.subHandler.topicExists(fullTopic)) topics := p2pService.PubSub().GetTopics() assert.Equal(t, fullTopic, topics[0]) @@ -162,7 +163,8 @@ func TestSubscribe_ReceivesAttesterSlashing(t *testing.T) { wg.Add(1) nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch()) p2pService.Digest = nse.ForkDigest - r.subscribe(topic, r.noopValidator, func(ctx context.Context, msg proto.Message) error { + fullTopic := r.buildTopicWithoutSubnet(topic, nse.ForkDigest) + r.subscribe(topic, fullTopic, r.noopValidator, func(ctx context.Context, msg proto.Message) error { require.NoError(t, r.attesterSlashingSubscriber(ctx, msg)) wg.Done() return nil @@ -217,7 +219,8 @@ func TestSubscribe_ReceivesProposerSlashing(t *testing.T) { params.OverrideBeaconConfig(params.MainnetConfig()) nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch()) p2pService.Digest = nse.ForkDigest - r.subscribe(topic, r.noopValidator, func(ctx context.Context, msg proto.Message) error { + fullTopic := r.buildTopicWithoutSubnet(topic, nse.ForkDigest) + r.subscribe(topic, fullTopic, r.noopValidator, func(ctx context.Context, msg proto.Message) error { require.NoError(t, r.proposerSlashingSubscriber(ctx, msg)) wg.Done() return nil @@ -266,7 +269,8 @@ func TestSubscribe_HandlesPanic(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error { + fullTopic := r.buildTopicWithoutSubnet(topic, nse.ForkDigest) + r.subscribe(topic, fullTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error { defer wg.Done() panic("bad") }, nse) @@ -305,7 +309,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) { // committee index 1 c1 := uint64(1) - fullTopic := params.fullTopic(c1, r.cfg.p2p.Encoding().ProtocolSuffix()) + fullTopic := r.buildTopicWithSubnet(params.topicFormat, params.nse.ForkDigest, c1) _, topVal := r.wrapAndReportValidation(fullTopic, r.noopValidator) require.NoError(t, r.cfg.p2p.PubSub().RegisterTopicValidator(fullTopic, topVal)) sub1, err := r.cfg.p2p.SubscribeToTopic(fullTopic) @@ -314,7 +318,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) { // committee index 2 c2 := uint64(2) - fullTopic = params.fullTopic(c2, r.cfg.p2p.Encoding().ProtocolSuffix()) + fullTopic = r.buildTopicWithSubnet(params.topicFormat, params.nse.ForkDigest, c2) _, topVal = r.wrapAndReportValidation(fullTopic, r.noopValidator) err = r.cfg.p2p.PubSub().RegisterTopicValidator(fullTopic, topVal) require.NoError(t, err) @@ -484,11 +488,11 @@ func TestFilterSubnetPeers(t *testing.T) { defer cache.SubnetIDs.EmptyAllCaches() digest, err := r.currentForkDigest() assert.NoError(t, err) - defaultTopic := "/eth2/%x/beacon_attestation_%d" + r.cfg.p2p.Encoding().ProtocolSuffix() - subnet10 := r.addDigestAndIndexToTopic(defaultTopic, digest, 10) + defaultTopic := "/eth2/%x/beacon_attestation_%d" + subnet10 := r.buildTopicWithSubnet(defaultTopic, digest, 10) cache.SubnetIDs.AddAggregatorSubnetID(currSlot, 10) - subnet20 := r.addDigestAndIndexToTopic(defaultTopic, digest, 20) + subnet20 := r.buildTopicWithSubnet(defaultTopic, digest, 20) cache.SubnetIDs.AddAttesterSubnetID(currSlot, 20) p1 := createPeer(t, subnet10) diff --git a/beacon-chain/sync/sync_fuzz_test.go b/beacon-chain/sync/sync_fuzz_test.go index 0703d8dd67f7..a24c558a5a39 100644 --- a/beacon-chain/sync/sync_fuzz_test.go +++ b/beacon-chain/sync/sync_fuzz_test.go @@ -82,7 +82,7 @@ func FuzzValidateBeaconBlockPubSub_Phase0(f *testing.F) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := r.currentForkDigest() assert.NoError(f, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) f.Add("junk", []byte("junk"), buf.Bytes(), []byte(topic)) f.Fuzz(func(t *testing.T, pid string, from, data, topic []byte) { @@ -166,7 +166,7 @@ func FuzzValidateBeaconBlockPubSub_Altair(f *testing.F) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := r.currentForkDigest() assert.NoError(f, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) f.Add("junk", []byte("junk"), buf.Bytes(), []byte(topic)) f.Fuzz(func(t *testing.T, pid string, from, data, topic []byte) { @@ -250,7 +250,7 @@ func FuzzValidateBeaconBlockPubSub_Bellatrix(f *testing.F) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := r.currentForkDigest() assert.NoError(f, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) f.Add("junk", []byte("junk"), buf.Bytes(), []byte(topic)) f.Fuzz(func(t *testing.T, pid string, from, data, topic []byte) { diff --git a/beacon-chain/sync/validate_aggregate_proof_test.go b/beacon-chain/sync/validate_aggregate_proof_test.go index 035f1d81e232..fab05346936b 100644 --- a/beacon-chain/sync/validate_aggregate_proof_test.go +++ b/beacon-chain/sync/validate_aggregate_proof_test.go @@ -488,7 +488,7 @@ func TestValidateAggregateAndProof_CanValidate(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)] d, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, d) + topic = r.buildTopicWithoutSubnet(topic, d) msg := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -591,7 +591,7 @@ func TestVerifyIndexInCommittee_SeenAggregatorEpoch(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)] d, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, d) + topic = r.buildTopicWithoutSubnet(topic, d) msg := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), diff --git a/beacon-chain/sync/validate_attester_slashing_test.go b/beacon-chain/sync/validate_attester_slashing_test.go index 0814a4024012..a024a0d44cf2 100644 --- a/beacon-chain/sync/validate_attester_slashing_test.go +++ b/beacon-chain/sync/validate_attester_slashing_test.go @@ -101,7 +101,7 @@ func TestValidateAttesterSlashing_ValidSlashing(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)] d, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, d) + topic = r.buildTopicWithoutSubnet(topic, d) msg := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -146,7 +146,7 @@ func TestValidateAttesterSlashing_ValidOldSlashing(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)] d, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, d) + topic = r.buildTopicWithoutSubnet(topic, d) msg := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -191,7 +191,7 @@ func TestValidateAttesterSlashing_InvalidSlashing_WithdrawableEpoch(t *testing.T topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)] d, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, d) + topic = r.buildTopicWithoutSubnet(topic, d) msg := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -240,7 +240,7 @@ func TestValidateAttesterSlashing_CanFilter(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.AttesterSlashing{})] d, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, d) + topic = r.buildTopicWithoutSubnet(topic, d) buf := new(bytes.Buffer) _, err = p.Encoding().EncodeGossip(buf, ðpb.AttesterSlashing{ Attestation_1: util.HydrateIndexedAttestation(ðpb.IndexedAttestation{ diff --git a/beacon-chain/sync/validate_beacon_blocks_test.go b/beacon-chain/sync/validate_beacon_blocks_test.go index a64aea417e99..5eaf898a2c4a 100644 --- a/beacon-chain/sync/validate_beacon_blocks_test.go +++ b/beacon-chain/sync/validate_beacon_blocks_test.go @@ -95,7 +95,7 @@ func TestValidateBeaconBlockPubSub_InvalidSignature(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -165,7 +165,7 @@ func TestValidateBeaconBlockPubSub_InvalidSignature_MarksBlockAsBad(t *testing.T topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -212,7 +212,7 @@ func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -275,7 +275,7 @@ func TestValidateBeaconBlockPubSub_CanRecoverStateSummary(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -341,7 +341,7 @@ func TestValidateBeaconBlockPubSub_IsInCache(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -407,7 +407,7 @@ func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -476,7 +476,7 @@ func TestValidateBeaconBlockPubSub_WithLookahead(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -544,7 +544,7 @@ func TestValidateBeaconBlockPubSub_AdvanceEpochsForState(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -653,7 +653,7 @@ func TestValidateBeaconBlockPubSub_IgnoreAndQueueBlocksFromNearFuture(t *testing topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -704,7 +704,7 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromFuture(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -755,7 +755,7 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromThePast(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -834,7 +834,7 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msgClone)] digest, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -975,7 +975,7 @@ func TestValidateBeaconBlockPubSub_ParentNotFinalizedDescendant(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -1041,7 +1041,7 @@ func TestValidateBeaconBlockPubSub_InvalidParentBlock(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -1135,7 +1135,7 @@ func TestValidateBeaconBlockPubSub_InsertValidPendingBlock(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -1220,7 +1220,7 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromBadParent(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, digest) + topic = r.buildTopicWithoutSubnet(topic, digest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -1323,7 +1323,7 @@ func TestValidateBeaconBlockPubSub_ValidExecutionPayload(t *testing.T) { genesisValidatorsRoot := r.cfg.clock.GenesisValidatorsRoot() BellatrixDigest, err := signing.ComputeForkDigest(params.BeaconConfig().BellatrixForkVersion, genesisValidatorsRoot[:]) require.NoError(t, err) - topic = r.addDigestToTopic(topic, BellatrixDigest) + topic = r.buildTopicWithoutSubnet(topic, BellatrixDigest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -1395,7 +1395,7 @@ func TestValidateBeaconBlockPubSub_InvalidPayloadTimestamp(t *testing.T) { genesisValidatorsRoot := r.cfg.clock.GenesisValidatorsRoot() BellatrixDigest, err := signing.ComputeForkDigest(params.BeaconConfig().BellatrixForkVersion, genesisValidatorsRoot[:]) assert.NoError(t, err) - topic = r.addDigestToTopic(topic, BellatrixDigest) + topic = r.buildTopicWithoutSubnet(topic, BellatrixDigest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -1558,7 +1558,7 @@ func Test_validateBeaconBlockProcessingWhenParentIsOptimistic(t *testing.T) { genesisValidatorsRoot := r.cfg.clock.GenesisValidatorsRoot() BellatrixDigest, err := signing.ComputeForkDigest(params.BeaconConfig().BellatrixForkVersion, genesisValidatorsRoot[:]) require.NoError(t, err) - topic = r.addDigestToTopic(topic, BellatrixDigest) + topic = r.buildTopicWithoutSubnet(topic, BellatrixDigest) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), diff --git a/beacon-chain/sync/validate_blob_test.go b/beacon-chain/sync/validate_blob_test.go index 8a30aebb1d22..e9140616af42 100644 --- a/beacon-chain/sync/validate_blob_test.go +++ b/beacon-chain/sync/validate_blob_test.go @@ -70,7 +70,7 @@ func TestValidateBlob_InvalidMessageType(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := s.currentForkDigest() require.NoError(t, err) - topic = s.addDigestToTopic(topic, digest) + topic = s.buildTopicWithoutSubnet(topic, digest) result, err := s.validateBlob(ctx, "", &pubsub.Message{ Message: &pb.Message{ Data: buf.Bytes(), @@ -129,7 +129,7 @@ func TestValidateBlob_AlreadySeenInCache(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(b)] digest, err := s.currentForkDigest() require.NoError(t, err) - topic = s.addDigestAndIndexToTopic(topic, digest, 0) + topic = s.buildTopicWithSubnet(topic, digest, 0) s.setSeenBlobIndex(sc.Slot(), sc.SignedBlockHeader.Header.ProposerIndex, 0) result, err := s.validateBlob(ctx, "", &pubsub.Message{ @@ -159,7 +159,7 @@ func TestValidateBlob_InvalidTopicIndex(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := s.currentForkDigest() require.NoError(t, err) - topic = s.addDigestAndIndexToTopic(topic, digest, 1) + topic = s.buildTopicWithSubnet(topic, digest, 1) result, err := s.validateBlob(ctx, "", &pubsub.Message{ Message: &pb.Message{ Data: buf.Bytes(), @@ -274,7 +274,7 @@ func TestValidateBlob_ErrorPathsWithMock(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] digest, err := s.currentForkDigest() require.NoError(t, err) - topic = s.addDigestAndIndexToTopic(topic, digest, 0) + topic = s.buildTopicWithSubnet(topic, digest, 0) result, err := s.validateBlob(ctx, "", &pubsub.Message{ Message: &pb.Message{ Data: buf.Bytes(), diff --git a/beacon-chain/sync/validate_data_column_test.go b/beacon-chain/sync/validate_data_column_test.go index 6f396159a259..e389039baf97 100644 --- a/beacon-chain/sync/validate_data_column_test.go +++ b/beacon-chain/sync/validate_data_column_test.go @@ -85,7 +85,7 @@ func TestValidateDataColumn(t *testing.T) { digest, err := service.currentForkDigest() require.NoError(t, err) - topic = service.addDigestToTopic(topic, digest) + topic = service.buildTopicWithoutSubnet(topic, digest) message := &pubsub.Message{Message: &pb.Message{Data: buf.Bytes(), Topic: &topic}} diff --git a/beacon-chain/sync/validate_light_client_test.go b/beacon-chain/sync/validate_light_client_test.go index 6e74234e6fca..4d6c4d91cafb 100644 --- a/beacon-chain/sync/validate_light_client_test.go +++ b/beacon-chain/sync/validate_light_client_test.go @@ -133,7 +133,7 @@ func TestValidateLightClientOptimisticUpdate(t *testing.T) { topic := p2p.LightClientOptimisticUpdateTopicFormat digest, err := s.currentForkDigest() require.NoError(t, err) - topic = s.addDigestToTopic(topic, digest) + topic = s.buildTopicWithoutSubnet(topic, digest) r, err := s.validateLightClientOptimisticUpdate(ctx, "", &pubsub.Message{ Message: &pb.Message{ @@ -259,7 +259,7 @@ func TestValidateLightClientFinalityUpdate(t *testing.T) { topic := p2p.LightClientFinalityUpdateTopicFormat digest, err := s.currentForkDigest() require.NoError(t, err) - topic = s.addDigestToTopic(topic, digest) + topic = s.buildTopicWithoutSubnet(topic, digest) r, err := s.validateLightClientFinalityUpdate(ctx, "", &pubsub.Message{ Message: &pb.Message{ diff --git a/beacon-chain/sync/validate_proposer_slashing_test.go b/beacon-chain/sync/validate_proposer_slashing_test.go index 7ca4ea572533..a5eef887db91 100644 --- a/beacon-chain/sync/validate_proposer_slashing_test.go +++ b/beacon-chain/sync/validate_proposer_slashing_test.go @@ -132,7 +132,7 @@ func TestValidateProposerSlashing_ValidSlashing(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)] d, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, d) + topic = r.buildTopicWithoutSubnet(topic, d) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), @@ -175,7 +175,7 @@ func TestValidateProposerSlashing_ValidOldSlashing(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)] d, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, d) + topic = r.buildTopicWithoutSubnet(topic, d) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), diff --git a/beacon-chain/sync/validate_voluntary_exit_test.go b/beacon-chain/sync/validate_voluntary_exit_test.go index ce44f273ed18..16f0c5ad67cc 100644 --- a/beacon-chain/sync/validate_voluntary_exit_test.go +++ b/beacon-chain/sync/validate_voluntary_exit_test.go @@ -104,7 +104,7 @@ func TestValidateVoluntaryExit_ValidExit(t *testing.T) { topic := p2p.GossipTypeMapping[reflect.TypeOf(exit)] d, err := r.currentForkDigest() assert.NoError(t, err) - topic = r.addDigestToTopic(topic, d) + topic = r.buildTopicWithoutSubnet(topic, d) m := &pubsub.Message{ Message: &pubsubpb.Message{ Data: buf.Bytes(), diff --git a/changelog/aarshkshah1992-use-static-subscribe-for-blob-sidecar-topics.md b/changelog/aarshkshah1992-use-static-subscribe-for-blob-sidecar-topics.md new file mode 100644 index 000000000000..09dbefa747ea --- /dev/null +++ b/changelog/aarshkshah1992-use-static-subscribe-for-blob-sidecar-topics.md @@ -0,0 +1,2 @@ +### Changed +- Use static subscription for blob sidecar topics instead of dynamic subnet management.