Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add write-coalesing to atx handling #6239

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions activation/e2e/builds_atx_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ func TestBuilder_SwitchesToBuildV2(t *testing.T) {
logger,
activation.WithAtxVersions(atxVersions),
)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
go atxHdlr.Start(ctx)

var previous *types.ActivationTx
var publishedAtxs atomic.Uint32
Expand Down
5 changes: 5 additions & 0 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func NewHandler(
beacon: beacon,
tortoise: tortoise,
signers: make(map[types.NodeID]*signing.EdSigner),
atxBatchResult: nil,
},

v2: &HandlerV2{
Expand Down Expand Up @@ -169,6 +170,10 @@ func (h *Handler) Register(sig *signing.EdSigner) {
h.v1.Register(sig)
}

func (h *Handler) Start(ctx context.Context) {
h.v1.flushAtxLoop(ctx)
}

// HandleSyncedAtx handles atxs received by sync.
func (h *Handler) HandleSyncedAtx(ctx context.Context, expHash types.Hash32, peer p2p.Peer, data []byte) error {
_, err := h.handleAtx(ctx, expHash, peer, data)
Expand Down
3 changes: 3 additions & 0 deletions activation/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ func newTestHandler(tb testing.TB, goldenATXID types.ATXID, opts ...HandlerOptio
lg,
opts...,
)
ctx, cancel := context.WithCancel(context.Background())
go atxHdlr.Start(ctx)
tb.Cleanup(func() { cancel() })
return &testHandler{
Handler: atxHdlr,
cdb: cdb,
Expand Down
156 changes: 143 additions & 13 deletions activation/handler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.uber.org/zap"
"golang.org/x/exp/maps"

"github.com/spacemeshos/go-spacemesh/activation/metrics"
"github.com/spacemeshos/go-spacemesh/activation/wire"
"github.com/spacemeshos/go-spacemesh/atxsdata"
"github.com/spacemeshos/go-spacemesh/codec"
Expand All @@ -30,6 +31,8 @@ import (
"github.com/spacemeshos/go-spacemesh/system"
)

var sqlWriterSleep = 50 * time.Millisecond

type nipostValidatorV1 interface {
InitialNIPostChallengeV1(challenge *wire.NIPostChallengeV1, atxs atxProvider, goldenATXID types.ATXID) error
NIPostChallengeV1(challenge *wire.NIPostChallengeV1, previous *types.ActivationTx, nodeID types.NodeID) error
Expand Down Expand Up @@ -83,6 +86,20 @@ type HandlerV1 struct {

signerMtx sync.Mutex
signers map[types.NodeID]*signing.EdSigner

atxMu sync.Mutex
atxBatch []atxBatchItem
atxBatchResult *batchResult
}

type batchResult struct {
doneC chan struct{}
err error
}

type atxBatchItem struct {
atx *types.ActivationTx
watx *wire.ActivationTxV1
}

func (h *HandlerV1) Register(sig *signing.EdSigner) {
Expand All @@ -97,6 +114,75 @@ func (h *HandlerV1) Register(sig *signing.EdSigner) {
h.signers[sig.NodeID()] = sig
}

const poolItemMinSize = 1000 // minimum size of atx batch (to save on allocation)
var pool = &sync.Pool{
New: func() any {
s := make([]atxBatchItem, 0, poolItemMinSize)
return &s
},
}

func getBatch() []atxBatchItem {
v := pool.Get().(*[]atxBatchItem)
return *v
}

func putBatch(v []atxBatchItem) {
v = v[:0]
pool.Put(&v)
}

func (h *HandlerV1) flushAtxLoop(ctx context.Context) {
t := time.NewTicker(sqlWriterSleep)
// initialize the first batch
h.atxMu.Lock()
h.atxBatchResult = &batchResult{doneC: make(chan struct{})}
h.atxMu.Unlock()
for {
select {
case <-ctx.Done():
return
case <-t.C:
// copy-on-write
h.atxMu.Lock()
if len(h.atxBatch) == 0 {
h.atxMu.Unlock()
continue
}
batch := h.atxBatch // copy the existing slice
h.atxBatch = getBatch() // make a new one
res := h.atxBatchResult // copy the result type
h.atxBatchResult = &batchResult{doneC: make(chan struct{})} // make a new one
h.atxMu.Unlock()
metrics.FlushBatchSize.Add(float64(len(batch)))

if err := h.cdb.WithTx(ctx, func(tx *sql.Tx) error {
var err error
for _, item := range batch {
err = atxs.Add(tx, item.atx, item.watx.Blob())
if err != nil && !errors.Is(err, sql.ErrObjectExists) {
metrics.WriteBatchErrorsCount.Inc()
return fmt.Errorf("add atx to db: %w", err)
}
err = atxs.SetPost(tx, item.atx.ID(), item.watx.PrevATXID, 0,
item.atx.SmesherID, item.watx.NumUnits)
if err != nil && !errors.Is(err, sql.ErrObjectExists) {
metrics.WriteBatchErrorsCount.Inc()
return fmt.Errorf("set atx units: %w", err)
}
}
return nil
}); err != nil {
res.err = err
metrics.ErroredBatchCount.Inc()
h.logger.Error("flush atxs to db", zap.Error(err))
}
putBatch(batch)
close(res.doneC)
}
}
}

func (h *HandlerV1) syntacticallyValidate(ctx context.Context, atx *wire.ActivationTxV1) error {
if atx.NIPost == nil {
return fmt.Errorf("nil nipost for atx %s", atx.ID())
Expand Down Expand Up @@ -489,37 +575,81 @@ func (h *HandlerV1) checkWrongPrevAtx(

func (h *HandlerV1) checkMalicious(
ctx context.Context,
tx *sql.Tx,
exec sql.Executor,
watx *wire.ActivationTxV1,
) (*mwire.MalfeasanceProof, error) {
malicious, err := identities.IsMalicious(tx, watx.SmesherID)
malicious, err := identities.IsMalicious(exec, watx.SmesherID)
if err != nil {
return nil, fmt.Errorf("checking if node is malicious: %w", err)
}
if malicious {
return nil, nil
}
proof, err := h.checkDoublePublish(ctx, tx, watx)
proof, err := h.checkDoublePublish(ctx, exec, watx)
if proof != nil || err != nil {
return proof, err
}
return h.checkWrongPrevAtx(ctx, tx, watx)
return h.checkWrongPrevAtx(ctx, exec, watx)
}

// storeAtx stores an ATX and notifies subscribers of the ATXID.
func (h *HandlerV1) storeAtx(
ctx context.Context,
atx *types.ActivationTx,
watx *wire.ActivationTxV1,
) (*mwire.MalfeasanceProof, error) {
var proof *mwire.MalfeasanceProof
var (
c chan struct{}
proof *mwire.MalfeasanceProof
br *batchResult
err error
)
proof, err = h.checkMalicious(ctx, h.cdb, watx)
if err != nil {
return proof, fmt.Errorf("check malicious: %w", err)
}

h.atxMu.Lock()
h.atxBatch = append(h.atxBatch, atxBatchItem{atx: atx, watx: watx})
br = h.atxBatchResult
c = br.doneC
h.atxMu.Unlock()

select {
case <-c:
// wait for the batch the corresponds to the atx to be written
err = br.err
case <-ctx.Done():
err = ctx.Err()
}

atxs.AtxAdded(h.cdb, atx)
if proof != nil {
h.cdb.CacheMalfeasanceProof(atx.SmesherID, proof)
h.tortoise.OnMalfeasance(atx.SmesherID)
}

added := h.cacheAtx(ctx, atx)
h.beacon.OnAtx(atx)
if added != nil {
h.tortoise.OnAtx(atx.TargetEpoch(), atx.ID(), added)
}

h.logger.Debug("finished storing atx in epoch",
zap.Stringer("atx_id", atx.ID()),
zap.Uint32("epoch_id", atx.PublishEpoch.Uint32()),
)
return proof, err
}

// storeAtx stores an ATX and notifies subscribers of the ATXID.
func (h *HandlerV1) storeAtxSync(
ctx context.Context,
atx *types.ActivationTx,
watx *wire.ActivationTxV1,
proof *mwire.MalfeasanceProof,
) error {
if err := h.cdb.WithTx(ctx, func(tx *sql.Tx) error {
var err error
proof, err = h.checkMalicious(ctx, tx, watx)
if err != nil {
return fmt.Errorf("check malicious: %w", err)
}

err = atxs.Add(tx, atx, watx.Blob())
if err != nil && !errors.Is(err, sql.ErrObjectExists) {
return fmt.Errorf("add atx to db: %w", err)
Expand All @@ -531,7 +661,7 @@ func (h *HandlerV1) storeAtx(

return nil
}); err != nil {
return nil, fmt.Errorf("store atx: %w", err)
return fmt.Errorf("store atx: %w", err)
}

atxs.AtxAdded(h.cdb, atx)
Expand All @@ -550,7 +680,7 @@ func (h *HandlerV1) storeAtx(
zap.Stringer("atx_id", atx.ID()),
zap.Uint32("epoch_id", atx.PublishEpoch.Uint32()),
)
return proof, nil
return nil
}

func (h *HandlerV1) processATX(
Expand Down
Loading
Loading