Skip to content

Commit

Permalink
Batch commit zkHash data for sequencer (#253)
Browse files Browse the repository at this point in the history
Co-authored-by: Dumi Loghin <[email protected]>
  • Loading branch information
zjg555543 and dloghin authored Feb 14, 2025
1 parent 86fc53f commit 113b604
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 15 deletions.
11 changes: 7 additions & 4 deletions zk/l1infotree/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,13 @@ LOOP:
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
processed := 0

tree, err := InitialiseL1InfoTree(hermezDb)
if err != nil {
return nil, fmt.Errorf("InitialiseL1InfoTree: %w", err)
var tree *L1InfoTree
if len(allLogs) > 0 {
log.Info(fmt.Sprintf("[%s] Checking for L1 info tree updates, logs count:%v", logPrefix, len(allLogs)))
tree, err = InitialiseL1InfoTree(hermezDb)
if err != nil {
return nil, fmt.Errorf("InitialiseL1InfoTree: %w", err)
}
}

// process the logs in chunks
Expand Down
1 change: 1 addition & 0 deletions zk/metrics/statistics_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
FailTxResourceOverCounter logTag = "FailTxResourceOverCounter"
BatchGas logTag = "BatchGas"
ProcessingTxTiming logTag = "ProcessingTxTiming"
ProcessingPauseTiming logTag = "ProcessingPauseTiming"
ProcessingInvalidTxCounter logTag = "ProcessingInvalidTxCounter"
FinalizeBatchNumber logTag = "FinalizeBatchNumber"
BatchCommitDBTiming logTag = "BatchCommitDBTiming"
Expand Down
3 changes: 2 additions & 1 deletion zk/metrics/statisticsimpl_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,15 @@ func (l *statisticsInstance) Summary() string {
zkOverflowBlock := "ZKOverflowBlock<" + strconv.Itoa(int(l.statistics[ZKOverflowBlockCounter])) + ">, "
invalidTx := "InvalidTx<" + strconv.Itoa(int(l.statistics[ProcessingInvalidTxCounter])) + ">, "
processTxTiming := "ProcessTx<" + strconv.Itoa(int(l.statistics[ProcessingTxTiming])) + "ms>, "
processTxPauseTiming := "ProcessTxPauseTiming<" + strconv.Itoa(int(l.statistics[ProcessingPauseTiming])) + "ms>, "
batchCommitDBTiming := "BatchCommitDBTiming<" + strconv.Itoa(int(l.statistics[BatchCommitDBTiming])) + "ms>, "
pbStateTiming := "PbStateTiming<" + strconv.Itoa(int(l.statistics[PbStateTiming])) + "ms>, "
zkIncIntermediateHashesTiming := "ZkIncIntermediateHashesTiming<" + strconv.Itoa(int(l.statistics[ZkIncIntermediateHashesTiming])) + "ms>, "
finaliseBlockWriteTiming := "FinaliseBlockWriteTiming<" + strconv.Itoa(int(l.statistics[FinaliseBlockWriteTiming])) + "ms>, "
batchCloseReason := "BatchCloseReason<" + l.tags[BatchCloseReason] + ">"

result := batch + totalDuration + gasUsed + blockCount + tx + getTx + getTxPause + getTxPauseTiming +
reprocessTx + resourceOverTx + zkOverflowBlock + invalidTx + processTxTiming + pbStateTiming +
reprocessTx + resourceOverTx + zkOverflowBlock + invalidTx + processTxTiming + processTxPauseTiming + pbStateTiming +
zkIncIntermediateHashesTiming + finaliseBlockWriteTiming + batchCommitDBTiming +
batchCloseReason
log.Info(result)
Expand Down
3 changes: 2 additions & 1 deletion zk/metrics/statisticsimpl_xlayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ func TestStatisticsInstanceSummary(t *testing.T) {
GetTxPauseTiming: time.Second.Milliseconds() * 30,
ReprocessingTxCounter: 3,
FailTxResourceOverCounter: 1,
ZKOverflowBlockCounter: 1,
ZKOverflowBlockCounter: 1,
ProcessingInvalidTxCounter: 2,
ProcessingTxTiming: time.Second.Milliseconds() * 30,
ProcessingPauseTiming: time.Second.Milliseconds() * 10,
BatchCommitDBTiming: time.Second.Milliseconds() * 10,
PbStateTiming: time.Second.Milliseconds() * 20,
ZkIncIntermediateHashesTiming: time.Second.Milliseconds() * 15,
Expand Down
17 changes: 9 additions & 8 deletions zk/stages/stage_sequence_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func sequencingBatchStep(

innerBreak := false
emptyBlockOverflow := false

start := time.Now()
OuterLoopTransactions:
for {
if innerBreak {
Expand Down Expand Up @@ -384,19 +384,22 @@ func sequencingBatchStep(
}

if len(batchState.blockState.transactionsForInclusion) == 0 {
pauseTime := time.Now()
if allConditionsOK {
time.Sleep(batchContext.cfg.zk.SequencerTimeoutOnEmptyTxPool)
} else {
time.Sleep(batchContext.cfg.zk.SequencerTimeoutOnEmptyTxPool / 5) // we do not need to sleep too long for txpool not ready
}
metrics.GetLogStatistics().CumulativeTiming(metrics.ProcessingPauseTiming, time.Since(pauseTime))
} else {
log.Trace(fmt.Sprintf("[%s] Yielded transactions from the pool", logPrefix), "txCount", len(batchState.blockState.transactionsForInclusion))
}
}

start := time.Now()
if len(batchState.blockState.transactionsForInclusion) == 0 {
pauseTime := time.Now()
time.Sleep(batchContext.cfg.zk.SequencerTimeoutOnEmptyTxPool)
metrics.GetLogStatistics().CumulativeTiming(metrics.ProcessingPauseTiming, time.Since(pauseTime))
} else {
log.Trace(fmt.Sprintf("[%s] Yielded transactions from the pool", logPrefix), "txCount", len(batchState.blockState.transactionsForInclusion))
}
Expand Down Expand Up @@ -559,10 +562,6 @@ func sequencingBatchStep(
}
}

// For X Layer
metrics.GetLogStatistics().CumulativeTiming(metrics.ProcessingTxTiming, time.Since(start))
metrics.SeqTxDuration.Observe(float64(time.Since(start).Milliseconds()))

// remove bad and mined transactions from the list for inclusion
for i := len(batchState.blockState.transactionsForInclusion) - 1; i >= 0; i-- {
tx := batchState.blockState.transactionsForInclusion[i]
Expand Down Expand Up @@ -599,7 +598,9 @@ func sequencingBatchStep(
}
}
}

// For X Layer
metrics.GetLogStatistics().CumulativeTiming(metrics.ProcessingTxTiming, time.Since(start))
metrics.SeqTxDuration.Observe(float64(time.Since(start).Milliseconds()))
// we do not want to commit this block if it has no transactions and we detected an overflow - essentially the batch is too
// full to get any more transactions in it and we don't want to commit an empty block
if emptyBlockOverflow {
Expand Down Expand Up @@ -646,7 +647,7 @@ func sequencingBatchStep(
// add a check to the verifier and also check for responses
batchState.onBuiltBlock(blockNumber)

start := time.Now()
start = time.Now()
if !batchState.isL1Recovery() {
// commit block data here so it is accessible in other threads
if errCommitAndStart := sdb.CommitAndStart(); errCommitAndStart != nil {
Expand Down
7 changes: 7 additions & 0 deletions zk/stages/stage_sequence_execute_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,16 @@ func finaliseBlock(

// For X Layer
zkIncStart := time.Now()
quit := batchContext.ctx.Done()
batchContext.sdb.eridb.OpenBatch(quit)
// this is actually the interhashes stage
newRoot, err := zkIncrementIntermediateHashes(batchContext.ctx, batchContext.s.LogPrefix(), batchContext.s, batchContext.sdb.tx, batchContext.sdb.eridb, batchContext.sdb.smt, newHeader.Number.Uint64()-1, newHeader.Number.Uint64())
if err != nil {
batchContext.sdb.eridb.RollbackBatch()
return nil, err
}

if err = batchContext.sdb.eridb.CommitBatch(); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion zk/txpool/pool_zk.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (p *TxPool) onSenderStateChange(senderID uint64, senderNonce uint64, sender
if dGp != nil {
newGpBig = newGpBig.Mul(dGp, big.NewInt(int64(gpMul)))
}
log.Debug(fmt.Sprintf("Free tx: nonce:%d, type %d. dGp:%v, factor:%d, newGp:%d", mt.Tx.Nonce, freeType, dGp, gpMul, newGpBig))
//log.Debug(fmt.Sprintf("Free tx: nonce:%d, type %d. dGp:%v, factor:%d, newGp:%d", mt.Tx.Nonce, freeType, dGp, gpMul, newGpBig))
}

mt.minTip = newGpBig.Uint64()
Expand Down

0 comments on commit 113b604

Please sign in to comment.