From 4bd4e4025940b92f9328042507c5d9644b2064c2 Mon Sep 17 00:00:00 2001 From: acud <12988138+acud@users.noreply.github.com> Date: Tue, 13 Aug 2024 13:53:04 -0600 Subject: [PATCH] wip --- .../atxwriter.go} | 68 +++++-- activation/atxwriter/atxwriter_test.go | 191 ++++++++++++++++++ activation/atxwriter/metrics.go | 33 +++ activation/handler.go | 4 - activation/handler_v1.go | 56 +---- activation/write_coalescer/metrics.go | 34 ---- .../write_coalescer/write_coalscer_test.go | 34 ---- metrics/common.go | 8 +- 8 files changed, 285 insertions(+), 143 deletions(-) rename activation/{write_coalescer/write_coalescer.go => atxwriter/atxwriter.go} (72%) create mode 100644 activation/atxwriter/atxwriter_test.go create mode 100644 activation/atxwriter/metrics.go delete mode 100644 activation/write_coalescer/metrics.go delete mode 100644 activation/write_coalescer/write_coalscer_test.go diff --git a/activation/write_coalescer/write_coalescer.go b/activation/atxwriter/atxwriter.go similarity index 72% rename from activation/write_coalescer/write_coalescer.go rename to activation/atxwriter/atxwriter.go index 48898e15840..569687c8014 100644 --- a/activation/write_coalescer/write_coalescer.go +++ b/activation/atxwriter/atxwriter.go @@ -1,4 +1,4 @@ -package write_coalescer +package atxwriter import ( "context" @@ -14,23 +14,30 @@ import ( "go.uber.org/zap" ) -var sqlWriterSleep = 100 * time.Millisecond - -const poolItemMinSize = 1000 // minimum size of atx batch (to save on allocation) -var pool = &sync.Pool{ - New: func() any { - s := make([]atxBatchItem, 0, poolItemMinSize) - return &s - }, -} +var ( + writerDelay = 100 * time.Millisecond + pool = &sync.Pool{ + New: func() any { + s := make(map[types.ATXID]atxBatchItem) + return &s + }, + } + rmSlice = make([]types.ATXID, 0, 1000) +) -func getBatch() []atxBatchItem { - v := pool.Get().(*[]atxBatchItem) +func getBatch() map[types.ATXID]atxBatchItem { + v := pool.Get().(*map[types.ATXID]atxBatchItem) return *v } -func putBatch(v []atxBatchItem) { - v = v[:0] +func putBatch(v map[types.ATXID]atxBatchItem) { + for k := range v { + rmSlice = append(rmSlice, k) + } + for _, k := range rmSlice { + delete(v, k) + } + pool.Put(&v) } @@ -38,31 +45,43 @@ type AtxWriter struct { db db logger *zap.Logger - atxMu sync.Mutex - atxBatch []atxBatchItem + atxMu sync.Mutex + timer *time.Timer + running bool + deadline time.Time + + atxBatch map[types.ATXID]atxBatchItem atxBatchResult *batchResult } func New(db db, logger *zap.Logger) *AtxWriter { - return &AtxWriter{ + // create a stopped timer so we could reuse it later on + timer := time.NewTimer(writerDelay) + if !timer.Stop() { + <-timer.C + } + + writer := &AtxWriter{ db: db, logger: logger, + timer: timer, atxBatchResult: &batchResult{ doneC: make(chan struct{}), }, + atxBatch: getBatch(), } + return writer } // Start the forever-loop that flushes the atxs to the DB // at-least every `sqlWriterSleep`. The caller is responsible // to call Start in a different goroutine. func (w *AtxWriter) Start(ctx context.Context) { - t := time.NewTicker(sqlWriterSleep) for { select { case <-ctx.Done(): return - case <-t.C: + case <-w.timer.C: // copy-on-write w.atxMu.Lock() if len(w.atxBatch) == 0 { @@ -100,6 +119,12 @@ func (w *AtxWriter) Start(ctx context.Context) { } putBatch(batch) close(res.doneC) + w.atxMu.Lock() + if len(w.atxBatch) == 0 { + w.running = false + } + + w.atxMu.Unlock() } } } @@ -107,7 +132,10 @@ func (w *AtxWriter) Start(ctx context.Context) { func (w *AtxWriter) Store(atx *types.ActivationTx, watx *wire.ActivationTxV1) (<-chan struct{}, func() error) { w.atxMu.Lock() defer w.atxMu.Unlock() - w.atxBatch = append(w.atxBatch, atxBatchItem{atx: atx, watx: watx}) + if !w.running { + w.timer.Reset(writerDelay) + } + w.atxBatch[atx.ID()] = atxBatchItem{atx: atx, watx: watx} br := w.atxBatchResult c := br.doneC return c, br.Error diff --git a/activation/atxwriter/atxwriter_test.go b/activation/atxwriter/atxwriter_test.go new file mode 100644 index 00000000000..58c31751664 --- /dev/null +++ b/activation/atxwriter/atxwriter_test.go @@ -0,0 +1,191 @@ +package atxwriter_test + +import ( + "context" + "testing" + "time" + + "github.com/spacemeshos/go-spacemesh/activation/atxwriter" + "github.com/spacemeshos/go-spacemesh/activation/wire" + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/atxs" + "github.com/spacemeshos/merkle-tree" + poetShared "github.com/spacemeshos/poet/shared" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +var ( + postGenesisEpoch types.EpochID = 2 + goldenATXID = types.RandomATXID() + wAtx = newInitialATXv1(goldenATXID) + atx = toAtx(wAtx) +) + +func TestWriteCoalesce_One(t *testing.T) { + w, db := newTestAtxWriter(t) + + ch, errfn := w.Store(atx, wAtx) + var err error + select { + case <-ch: + err = errfn() + case <-time.After(5 * time.Second): + t.Fatal("timeout") + } + require.NoError(t, err) + has, err := atxs.Has(db, atx.ID()) + require.True(t, has) + require.NoError(t, err) +} + +func TestWriteCoalesce_Duplicates(t *testing.T) { + w, db := newTestAtxWriter(t) + + ch, errfn := w.Store(atx, wAtx) + _, _ = w.Store(atx, wAtx) + var err error + select { + case <-ch: + err = errfn() + case <-time.After(5 * time.Second): + t.Fatal("timeout") + } + require.NoError(t, err) + has, err := atxs.Has(db, atx.ID()) + require.True(t, has) + require.NoError(t, err) +} + +func TestWriteCoalesce_MultipleBatches(t *testing.T) { + w, db := newTestAtxWriter(t) + + ch, errfn := w.Store(atx, wAtx) + var err error + select { + case <-ch: + err = errfn() + case <-time.After(5 * time.Second): + t.Fatal("timeout") + } + require.NoError(t, err) + has, err := atxs.Has(db, atx.ID()) + require.True(t, has) + require.NoError(t, err) + + wAtx2 := newInitialATXv1(types.RandomATXID()) + atx2 := toAtx(wAtx) + + ch, errfn = w.Store(atx2, wAtx2) + select { + case <-ch: + err = errfn() + case <-time.After(5 * time.Second): + t.Fatal("timeout") + } + require.NoError(t, err) + has, err = atxs.Has(db, atx2.ID()) + require.True(t, has) + require.NoError(t, err) +} + +func toAtx(watx *wire.ActivationTxV1) *types.ActivationTx { + atx := wire.ActivationTxFromWireV1(watx) + atx.SetReceived(time.Now()) + atx.BaseTickHeight = uint64(atx.PublishEpoch) + atx.TickCount = 1 + return atx +} + +func newTestAtxWriter(t *testing.T) (*atxwriter.AtxWriter, *sql.Database) { + t.Helper() + db := sql.InMemoryTest(t) + log := zaptest.NewLogger(t) + w := atxwriter.New(db, log) + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + t.Cleanup(func() { + cancel() + <-done + }) + go func() { + defer close(done) + w.Start(ctx) + }() + return w, db +} + +func newInitialATXv1( + goldenATXID types.ATXID, + opts ...func(*wire.ActivationTxV1), +) *wire.ActivationTxV1 { + nonce := uint64(999) + poetRef := types.RandomHash() + atx := &wire.ActivationTxV1{ + InnerActivationTxV1: wire.InnerActivationTxV1{ + NIPostChallengeV1: wire.NIPostChallengeV1{ + PrevATXID: types.EmptyATXID, + PublishEpoch: postGenesisEpoch, + PositioningATXID: goldenATXID, + CommitmentATXID: &goldenATXID, + InitialPost: &wire.PostV1{}, + }, + NIPost: newNIPostV1WithPoet(poetRef.Bytes()), + VRFNonce: &nonce, + Coinbase: types.GenerateAddress([]byte("aaaa")), + NumUnits: 100, + }, + } + for _, opt := range opts { + opt(atx) + } + return atx +} + +func newMerkleProof(leafs []types.Hash32) (types.MerkleProof, types.Hash32) { + tree, err := merkle.NewTreeBuilder(). + WithHashFunc(poetShared.HashMembershipTreeNode). + WithLeavesToProve(map[uint64]bool{0: true}). + Build() + if err != nil { + panic(err) + } + for _, m := range leafs { + if err := tree.AddLeaf(m[:]); err != nil { + panic(err) + } + } + root, nodes := tree.RootAndProof() + nodesH32 := make([]types.Hash32, 0, len(nodes)) + for _, n := range nodes { + nodesH32 = append(nodesH32, types.BytesToHash(n)) + } + return types.MerkleProof{ + Nodes: nodesH32, + }, types.BytesToHash(root) +} + +func newNIPostV1WithPoet(poetRef []byte) *wire.NIPostV1 { + proof, _ := newMerkleProof([]types.Hash32{ + types.BytesToHash([]byte("challenge")), + types.BytesToHash([]byte("leaf2")), + types.BytesToHash([]byte("leaf3")), + types.BytesToHash([]byte("leaf4")), + }) + + return &wire.NIPostV1{ + Membership: wire.MerkleProofV1{ + Nodes: proof.Nodes, + LeafIndex: 0, + }, + Post: &wire.PostV1{ + Nonce: 0, + Indices: []byte{1, 2, 3}, + Pow: 0, + }, + PostMetadata: &wire.PostMetadataV1{ + Challenge: poetRef, + }, + } +} diff --git a/activation/atxwriter/metrics.go b/activation/atxwriter/metrics.go new file mode 100644 index 00000000000..b2d85f4a7d4 --- /dev/null +++ b/activation/atxwriter/metrics.go @@ -0,0 +1,33 @@ +package atxwriter + +import ( + "github.com/spacemeshos/go-spacemesh/metrics" +) + +const ( + namespace = "activation_write_coalescer" +) + +var BatchWriteCount = metrics.NewSimpleCounter( + namespace, + "batch_write_count", + "number of errors when writing a batch", +) + +var WriteBatchErrorsCount = metrics.NewSimpleCounter( + namespace, + "write_batch_errors", + "number of errors when writing a batch", +) + +var ErroredBatchCount = metrics.NewSimpleCounter( + namespace, + "errored_batch", + "number of batches that errored", +) + +var FlushBatchSize = metrics.NewSimpleCounter( + namespace, + "flush_batch_size", + "size of flushed batch", +) diff --git a/activation/handler.go b/activation/handler.go index fcfd6181ebc..eec1038f385 100644 --- a/activation/handler.go +++ b/activation/handler.go @@ -170,10 +170,6 @@ func (h *Handler) Register(sig *signing.EdSigner) { h.v1.Register(sig) } -func (h *Handler) Start(ctx context.Context) { - h.v1.flushAtxLoop(ctx) -} - // HandleSyncedAtx handles atxs received by sync. func (h *Handler) HandleSyncedAtx(ctx context.Context, expHash types.Hash32, peer p2p.Peer, data []byte) error { _, err := h.handleAtx(ctx, expHash, peer, data) diff --git a/activation/handler_v1.go b/activation/handler_v1.go index 9410f13b387..859acec907f 100644 --- a/activation/handler_v1.go +++ b/activation/handler_v1.go @@ -13,8 +13,8 @@ import ( "go.uber.org/zap" "golang.org/x/exp/maps" + "github.com/spacemeshos/go-spacemesh/activation/atxwriter" "github.com/spacemeshos/go-spacemesh/activation/wire" - "github.com/spacemeshos/go-spacemesh/activation/write_coalescer" "github.com/spacemeshos/go-spacemesh/atxsdata" "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" @@ -71,7 +71,7 @@ type nipostValidatorV1 interface { type HandlerV1 struct { local p2p.Peer cdb *datastore.CachedDB - writeCoalescer *write_coalescer.AtxWriter + writeCoalescer *atxwriter.AtxWriter atxsdata *atxsdata.Data edVerifier *signing.EdVerifier clock layerClock @@ -527,13 +527,7 @@ func (h *HandlerV1) storeAtx( atx *types.ActivationTx, watx *wire.ActivationTxV1, ) (*mwire.MalfeasanceProof, error) { - var ( - c chan struct{} - proof *mwire.MalfeasanceProof - br *batchResult - err error - ) - proof, err = h.checkMalicious(ctx, h.cdb, watx) + proof, err := h.checkMalicious(ctx, h.cdb, watx) if err != nil { return proof, fmt.Errorf("check malicious: %w", err) } @@ -547,46 +541,8 @@ func (h *HandlerV1) storeAtx( err = ctx.Err() } - atxs.AtxAdded(h.cdb, atx) - if proof != nil { - h.cdb.CacheMalfeasanceProof(atx.SmesherID, proof) - h.tortoise.OnMalfeasance(atx.SmesherID) - } - - added := h.cacheAtx(ctx, atx) - h.beacon.OnAtx(atx) - if added != nil { - h.tortoise.OnAtx(atx.TargetEpoch(), atx.ID(), added) - } - - h.logger.Debug("finished storing atx in epoch", - zap.Stringer("atx_id", atx.ID()), - zap.Uint32("epoch_id", atx.PublishEpoch.Uint32()), - ) - return proof, err -} - -// storeAtx stores an ATX and notifies subscribers of the ATXID. -func (h *HandlerV1) storeAtxSync( - ctx context.Context, - atx *types.ActivationTx, - watx *wire.ActivationTxV1, - proof *mwire.MalfeasanceProof, -) error { - if err := h.cdb.WithTx(ctx, func(tx *sql.Tx) error { - var err error - err = atxs.Add(tx, atx, watx.Blob()) - if err != nil && !errors.Is(err, sql.ErrObjectExists) { - return fmt.Errorf("add atx to db: %w", err) - } - err = atxs.SetPost(tx, atx.ID(), watx.PrevATXID, 0, atx.SmesherID, watx.NumUnits) - if err != nil && !errors.Is(err, sql.ErrObjectExists) { - return fmt.Errorf("set atx units: %w", err) - } - - return nil - }); err != nil { - return fmt.Errorf("store atx: %w", err) + if err != nil { + return nil, fmt.Errorf("atxwriter write: %w", err) } atxs.AtxAdded(h.cdb, atx) @@ -605,7 +561,7 @@ func (h *HandlerV1) storeAtxSync( zap.Stringer("atx_id", atx.ID()), zap.Uint32("epoch_id", atx.PublishEpoch.Uint32()), ) - return nil + return proof, err } func (h *HandlerV1) processATX( diff --git a/activation/write_coalescer/metrics.go b/activation/write_coalescer/metrics.go deleted file mode 100644 index aa1ea40e46d..00000000000 --- a/activation/write_coalescer/metrics.go +++ /dev/null @@ -1,34 +0,0 @@ -package write_coalescer - -import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/spacemeshos/go-spacemesh/metrics" -) - -const ( - namespace = "activation_write_coalescer" -) - -var BatchWriteCount = prometheus.NewCounter(metrics.NewCounterOpts( - namespace, - "write_batch_errors", - "number of errors when writing a batch", -)) - -var WriteBatchErrorsCount = prometheus.NewCounter(metrics.NewCounterOpts( - namespace, - "write_batch_errors", - "number of errors when writing a batch", -)) - -var ErroredBatchCount = prometheus.NewCounter(metrics.NewCounterOpts( - namespace, - "errored_batch", - "number of batches that errored", -)) - -var FlushBatchSize = prometheus.NewCounter(metrics.NewCounterOpts( - namespace, - "flush_batch_size", - "size of flushed batch", -)) diff --git a/activation/write_coalescer/write_coalscer_test.go b/activation/write_coalescer/write_coalscer_test.go deleted file mode 100644 index c5f7546074c..00000000000 --- a/activation/write_coalescer/write_coalscer_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package write_coalescer_test - -import ( - "context" - "fmt" - "testing" - - "github.com/spacemeshos/go-spacemesh/activation/write_coalescer" - "github.com/spacemeshos/go-spacemesh/sql" - "go.uber.org/zap/zaptest" -) - -func newTestAtxWriter(t *testing.T) *write_coalescer.AtxWriter { - t.Helper() - db := sql.InMemoryTest(t) - log := zaptest.NewLogger(t) - w := write_coalescer.New(db, log) - ctx, cancel := context.WithCancel(context.Background()) - done := make(chan struct{}) - t.Cleanup(func() { - cancel() - <-done - }) - go func() { - defer close(done) - w.Start(ctx) - }() - return w -} - -func TestWriteCoalesce(t *testing.T) { - w := newTestAtxWriter(t) - fmt.Println(w) -} diff --git a/metrics/common.go b/metrics/common.go index 46860e9fb3e..fc4da99b616 100644 --- a/metrics/common.go +++ b/metrics/common.go @@ -76,9 +76,15 @@ func ReportMessageLatency(protocol, msgType string, latency time.Duration) { receivedMessagesLatency.WithLabelValues(protocol, msgType, sign).Observe(seconds) } +// NewHistogram creates a Histogram metrics under the global namespace returns nop if metrics are disabled. +func NewSimpleCounter(subsystem, name, help string) prometheus.Counter { + return promauto.NewCounter(NewCounterOpts(subsystem, name, help)) +} + func NewCounterOpts(ns, name, help string) prometheus.CounterOpts { return prometheus.CounterOpts{ - Namespace: ns, + Namespace: Namespace, + Subsystem: ns, Name: name, Help: help, }