Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: prover ci #1588

Closed
Closed
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f92029a
fix(rollup-relayer): update batch finalizing status and unify db time…
colinlyguo Dec 30, 2024
791fcaf
feat(coordinator): assign static prover first and avoid reassigning f…
yiweichi Dec 30, 2024
71acdb3
fix: lint
yiweichi Dec 30, 2024
8ce5121
fix: GetUnassignedBatchCount
yiweichi Dec 30, 2024
3e0589e
Merge branch 'develop' into feat-coordinator-assign-logic
yiweichi Dec 30, 2024
df92616
chore: remove extra files
yiweichi Dec 30, 2024
a75075d
fix: err log
yiweichi Dec 30, 2024
1c5d88d
fix: orm GetTaskOfProver
yiweichi Dec 30, 2024
e4c0779
fix: comments
yiweichi Dec 30, 2024
3da7567
feat(prover): support mutiple version of prover tasks
yiweichi Dec 31, 2024
37da7b8
chore: remove extra files
yiweichi Dec 31, 2024
4215b6b
fix: field is never read
yiweichi Dec 31, 2024
3e97105
refactor
yiweichi Dec 31, 2024
9e63761
fix: lint
yiweichi Dec 31, 2024
707267a
fix: lint
yiweichi Dec 31, 2024
6ba25dc
feat(prover): integrate proving-sdk
yiweichi Jan 5, 2025
f3cebe7
fix: lint
yiweichi Jan 5, 2025
1e2e253
chore: auto version bump [bot]
yiweichi Jan 5, 2025
6e2c29e
update dependencies
yiweichi Jan 6, 2025
e01d185
Revert "feat(coordinator): assign static prover first and avoid reass…
yiweichi Jan 6, 2025
fa87cfa
Merge branch 'feat-prover-integrate-proving-sdk' of https://github.co…
yiweichi Jan 6, 2025
da6be90
fix: comments
yiweichi Jan 6, 2025
fa0927c
optimize l2 GasPrice comparasion (#1581)
alexisdevilliers Jan 6, 2025
139fd6e
Merge branch 'develop' into feat-prover-integrate-proving-sdk
yiweichi Jan 6, 2025
ddbb06d
chore: auto version bump [bot]
yiweichi Jan 6, 2025
5d58a35
remove unused files
yiweichi Jan 6, 2025
4c4495d
Merge branch 'fix-ci-prover-fmt-timeout' into feat-prover-integrate-p…
yiweichi Jan 6, 2025
9cf9bc3
Merge branch 'fix-ci-prover-fmt-timeout' into feat-prover-integrate-p…
yiweichi Jan 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(coordinator): assign static prover first and avoid reassigning f…
…ailing task to same prover
yiweichi committed Dec 30, 2024
commit 791fcafa6010269926503e40539f57dc8766079a
1 change: 1 addition & 0 deletions coordinator/conf/config.json
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@
"prover_manager": {
"provers_per_session": 1,
"session_attempts": 5,
"external_prover_threshold": 32,
"bundle_collection_time_sec": 180,
"batch_collection_time_sec": 180,
"chunk_collection_time_sec": 180,
2 changes: 2 additions & 0 deletions coordinator/internal/config/config.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,8 @@ type ProverManager struct {
// Number of attempts that a session can be retried if previous attempts failed.
// Currently we only consider proving timeout as failure here.
SessionAttempts uint8 `json:"session_attempts"`
// Threshold for activating the external prover based on unassigned task count.
ExternalProverThreshold int64 `json:"external_prover_threshold"`
// Zk verifier config.
Verifier *VerifierConfig `json:"verifier"`
// BatchCollectionTimeSec batch Proof collection time (in seconds).
57 changes: 44 additions & 13 deletions coordinator/internal/logic/provertask/batch_prover_task.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/gin-gonic/gin"
@@ -63,29 +64,59 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato

maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession
maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts
if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) {
unassignedBatchCount, err := bp.batchOrm.GetUnassignedBatchCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
if err != nil {
log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err)
return nil, ErrCoordinatorInternalFailure
}
// Assign external prover if unassigned task number exceeds threshold
if unassignedBatchCount < bp.cfg.ProverManager.ExternalProverThreshold {
return nil, nil
}
}

var batchTask *orm.Batch
for i := 0; i < 5; i++ {
var getTaskError error
var tmpBatchTask *orm.Batch
tmpBatchTask, getTaskError = bp.batchOrm.GetAssignedBatch(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
var assignedOffset, unassignedOffset = 0, 0
tmpAssignedBatchTasks, getTaskError := bp.batchOrm.GetAssignedBatches(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50)
if getTaskError != nil {
log.Error("failed to get assigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure
}

// Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned`
// batch to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
if tmpBatchTask == nil {
tmpBatchTask, getTaskError = bp.batchOrm.GetUnassignedBatch(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
if getTaskError != nil {
log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure
}
// chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
tmpUnassignedBatchTask, getTaskError := bp.batchOrm.GetUnassignedBatches(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50)
if getTaskError != nil {
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure
}
for {
tmpBatchTask = nil
if assignedOffset < len(tmpAssignedBatchTasks) {
tmpBatchTask = tmpAssignedBatchTasks[assignedOffset]
assignedOffset++
} else if unassignedOffset < len(tmpUnassignedBatchTask) {
tmpBatchTask = tmpUnassignedBatchTask[unassignedOffset]
unassignedOffset++
}

if tmpBatchTask == nil {
log.Debug("get empty batch", "height", getTaskParameter.ProverHeight)
return nil, nil
}

if tmpBatchTask == nil {
log.Debug("get empty batch", "height", getTaskParameter.ProverHeight)
return nil, nil
// Don't dispatch the same failing job to the same prover
proverTask, err := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion)
if err != nil {
log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBatchTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", err)
return nil, ErrCoordinatorInternalFailure
}
if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid {
break
}
}

rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx.Copy(), tmpBatchTask.Index, tmpBatchTask.ActiveAttempts, tmpBatchTask.TotalAttempts)
57 changes: 44 additions & 13 deletions coordinator/internal/logic/provertask/bundle_prover_task.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/gin-gonic/gin"
@@ -63,29 +64,59 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat

maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession
maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts
if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) {
unassignedBundleCount, err := bp.bundleOrm.GetUnassignedBundleCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
if err != nil {
log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err)
return nil, ErrCoordinatorInternalFailure
}
// Assign external prover if unassigned task number exceeds threshold
if unassignedBundleCount < bp.cfg.ProverManager.ExternalProverThreshold {
return nil, nil
}
}

var bundleTask *orm.Bundle
for i := 0; i < 5; i++ {
var getTaskError error
var tmpBundleTask *orm.Bundle
tmpBundleTask, getTaskError = bp.bundleOrm.GetAssignedBundle(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
var assignedOffset, unassignedOffset = 0, 0
tmpAssignedBundleTasks, getTaskError := bp.bundleOrm.GetAssignedBundles(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50)
if getTaskError != nil {
log.Error("failed to get assigned bundle proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure
}

// Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned`
// bundle to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
if tmpBundleTask == nil {
tmpBundleTask, getTaskError = bp.bundleOrm.GetUnassignedBundle(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
if getTaskError != nil {
log.Error("failed to get unassigned bundle proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure
}
// chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
tmpUnassignedBundleTask, getTaskError := bp.bundleOrm.GetUnassignedBundles(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50)
if getTaskError != nil {
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure
}
for {
tmpBundleTask = nil
if assignedOffset < len(tmpAssignedBundleTasks) {
tmpBundleTask = tmpAssignedBundleTasks[assignedOffset]
assignedOffset++
} else if unassignedOffset < len(tmpUnassignedBundleTask) {
tmpBundleTask = tmpUnassignedBundleTask[unassignedOffset]
unassignedOffset++
}

if tmpBundleTask == nil {
log.Debug("get empty bundle", "height", getTaskParameter.ProverHeight)
return nil, nil
}

if tmpBundleTask == nil {
log.Debug("get empty bundle", "height", getTaskParameter.ProverHeight)
return nil, nil
// Don't dispatch the same failing job to the same prover
proverTask, err := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBundleTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion)
if err != nil {
log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBundleTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", err)
return nil, ErrCoordinatorInternalFailure
}
if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid {
break
}
}

rowsAffected, updateAttemptsErr := bp.bundleOrm.UpdateBundleAttempts(ctx.Copy(), tmpBundleTask.Hash, tmpBundleTask.ActiveAttempts, tmpBundleTask.TotalAttempts)
53 changes: 42 additions & 11 deletions coordinator/internal/logic/provertask/chunk_prover_task.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/gin-gonic/gin"
@@ -61,29 +62,59 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato

maxActiveAttempts := cp.cfg.ProverManager.ProversPerSession
maxTotalAttempts := cp.cfg.ProverManager.SessionAttempts
if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) {
unassignedChunkCount, err := cp.chunkOrm.GetUnassignedChunkCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight)
if err != nil {
log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err)
return nil, ErrCoordinatorInternalFailure
}
// Assign external prover if unassigned task number exceeds threshold
if unassignedChunkCount < cp.cfg.ProverManager.ExternalProverThreshold {
return nil, nil
}
}

var chunkTask *orm.Chunk
for i := 0; i < 5; i++ {
var getTaskError error
var tmpChunkTask *orm.Chunk
tmpChunkTask, getTaskError = cp.chunkOrm.GetAssignedChunk(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight)
var assignedOffset, unassignedOffset = 0, 0
tmpAssignedChunkTasks, getTaskError := cp.chunkOrm.GetAssignedChunks(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight, 50)
if getTaskError != nil {
log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure
}

// Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned`
// chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
if tmpChunkTask == nil {
tmpChunkTask, getTaskError = cp.chunkOrm.GetUnassignedChunk(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight)
if getTaskError != nil {
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure
}
tmpUnassignedChunkTask, getTaskError := cp.chunkOrm.GetUnassignedChunk(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight, 50)
if getTaskError != nil {
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure
}
for {
tmpChunkTask = nil
if assignedOffset < len(tmpAssignedChunkTasks) {
tmpChunkTask = tmpAssignedChunkTasks[assignedOffset]
assignedOffset++
} else if unassignedOffset < len(tmpUnassignedChunkTask) {
tmpChunkTask = tmpUnassignedChunkTask[unassignedOffset]
unassignedOffset++
}

if tmpChunkTask == nil {
log.Debug("get empty chunk", "height", getTaskParameter.ProverHeight)
return nil, nil
}

if tmpChunkTask == nil {
log.Debug("get empty chunk", "height", getTaskParameter.ProverHeight)
return nil, nil
// Don't dispatch the same failing job to the same prover
proverTask, err := cp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion)
if err != nil {
log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeChunk.String(), "taskID", tmpChunkTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", err)
return nil, ErrCoordinatorInternalFailure
}
if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid {
break
}
}

rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx.Copy(), tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts)
5 changes: 5 additions & 0 deletions coordinator/internal/logic/provertask/prover_task.go
Original file line number Diff line number Diff line change
@@ -27,6 +27,11 @@ var (
getTaskCounterVec *prometheus.CounterVec = nil
)

var (
// ExternalProverNamePrefix prefix of prover name
ExternalProverNamePrefix = "external"
)

// ProverTask the interface of a collector who send data to prover
type ProverTask interface {
Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error)
60 changes: 28 additions & 32 deletions coordinator/internal/orm/batch.go
Original file line number Diff line number Diff line change
@@ -78,38 +78,47 @@ func (*Batch) TableName() string {
return "batch"
}

// GetUnassignedBatch retrieves unassigned batch based on the specified limit.
// GetUnassignedBatches retrieves unassigned batches based on the specified limit.
// The returned batches are sorted in ascending order by their index.
func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {
var batch Batch
func (o *Batch) GetUnassignedBatches(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, limit uint64) ([]*Batch, error) {
var batch []*Batch
db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT 1;",
int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady))
sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT %d;",
int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady), limit)
err := db.Raw(sql).Scan(&batch).Error
if err != nil {
return nil, fmt.Errorf("Batch.GetUnassignedBatch error: %w", err)
return nil, fmt.Errorf("Batch.GetUnassignedBatches error: %w", err)
}
if batch.Hash == "" {
return nil, nil
return batch, nil
}

// GetUnassignedBatchCount retrieves unassigned batch count based on the specified limit.
func (o *Batch) GetUnassignedBatchCount(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (int64, error) {
var count int64
db := o.db.WithContext(ctx)
db = db.Model(&Batch{})
db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned))
db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("batch.deleted_at IS NULL")
if err := db.Count(&count).Error; err != nil {
return 0, fmt.Errorf("Batch.GetUnassignedBatchCount error: %w", err)
}
return &batch, nil
return count, nil
}

// GetAssignedBatch retrieves assigned batch based on the specified limit.
// GetAssignedBatches retrieves assigned batches based on the specified limit.
// The returned batches are sorted in ascending order by their index.
func (o *Batch) GetAssignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {
var batch Batch
func (o *Batch) GetAssignedBatches(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, limit uint64) ([]*Batch, error) {
var batch []*Batch
db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT 1;",
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady))
sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT %d;",
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady), limit)
err := db.Raw(sql).Scan(&batch).Error
if err != nil {
return nil, fmt.Errorf("Batch.GetAssignedBatch error: %w", err)
}
if batch.Hash == "" {
return nil, nil
return nil, fmt.Errorf("Batch.GetAssignedBatches error: %w", err)
}
return &batch, nil
return batch, nil
}

// GetUnassignedAndChunksUnreadyBatches get the batches which is unassigned and chunks is not ready
@@ -132,19 +141,6 @@ func (o *Batch) GetUnassignedAndChunksUnreadyBatches(ctx context.Context, offset
return batches, nil
}

// GetAssignedBatches retrieves all batches whose proving_status is either types.ProvingTaskAssigned.
func (o *Batch) GetAssignedBatches(ctx context.Context) ([]*Batch, error) {
db := o.db.WithContext(ctx)
db = db.Model(&Batch{})
db = db.Where("proving_status = ?", int(types.ProvingTaskAssigned))

var assignedBatches []*Batch
if err := db.Find(&assignedBatches).Error; err != nil {
return nil, fmt.Errorf("Batch.GetAssignedBatches error: %w", err)
}
return assignedBatches, nil
}

// GetProvingStatusByHash retrieves the proving status of a batch given its hash.
func (o *Batch) GetProvingStatusByHash(ctx context.Context, hash string) (types.ProvingStatus, error) {
db := o.db.WithContext(ctx)
49 changes: 29 additions & 20 deletions coordinator/internal/orm/bundle.go
Original file line number Diff line number Diff line change
@@ -54,38 +54,47 @@ func (*Bundle) TableName() string {
return "bundle"
}

// GetUnassignedBundle retrieves unassigned bundle based on the specified limit.
// GetUnassignedBundles retrieves unassigned bundle based on the specified limit.
// The returned batch sorts in ascending order by their index.
func (o *Bundle) GetUnassignedBundle(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Bundle, error) {
var bundle Bundle
func (o *Bundle) GetUnassignedBundles(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, limit uint64) ([]*Bundle, error) {
var bundle []*Bundle
db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM bundle WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND batch_proofs_status = %d AND bundle.deleted_at IS NULL ORDER BY bundle.index LIMIT 1;",
int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.BatchProofsStatusReady))
sql := fmt.Sprintf("SELECT * FROM bundle WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND batch_proofs_status = %d AND bundle.deleted_at IS NULL ORDER BY bundle.index LIMIT %d;",
int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.BatchProofsStatusReady), limit)
err := db.Raw(sql).Scan(&bundle).Error
if err != nil {
return nil, fmt.Errorf("Batch.GetUnassignedBundle error: %w", err)
}
if bundle.StartBatchHash == "" || bundle.EndBatchHash == "" {
return nil, nil
return nil, fmt.Errorf("Batch.GetUnassignedBundles error: %w", err)
}
return &bundle, nil
return bundle, nil
}

// GetUnassignedBundleCount retrieves unassigned bundle count based on the specified limit.
func (o *Bundle) GetUnassignedBundleCount(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (int64, error) {
var count int64
db := o.db.WithContext(ctx)
db = db.Model(&Bundle{})
db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned))
db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("bundle.deleted_at IS NULL")
if err := db.Count(&count).Error; err != nil {
return 0, fmt.Errorf("Bundle.GetUnassignedBundleCount error: %w", err)
}
return count, nil
}

// GetAssignedBundle retrieves assigned bundle based on the specified limit.
// GetAssignedBundles retrieves assigned bundles based on the specified limit.
// The returned bundle sorts in ascending order by their index.
func (o *Bundle) GetAssignedBundle(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Bundle, error) {
var bundle Bundle
func (o *Bundle) GetAssignedBundles(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, limit uint64) ([]*Bundle, error) {
var bundle []*Bundle
db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM bundle WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND batch_proofs_status = %d AND bundle.deleted_at IS NULL ORDER BY bundle.index LIMIT 1;",
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.BatchProofsStatusReady))
sql := fmt.Sprintf("SELECT * FROM bundle WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND batch_proofs_status = %d AND bundle.deleted_at IS NULL ORDER BY bundle.index LIMIT %d;",
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.BatchProofsStatusReady), limit)
err := db.Raw(sql).Scan(&bundle).Error
if err != nil {
return nil, fmt.Errorf("Bundle.GetAssignedBatch error: %w", err)
return nil, fmt.Errorf("Bundle.GetAssignedBundles error: %w", err)
}
if bundle.StartBatchHash == "" || bundle.EndBatchHash == "" {
return nil, nil
}
return &bundle, nil
return bundle, nil
}

// GetProvingStatusByHash retrieves the proving status of a bundle given its hash.
48 changes: 29 additions & 19 deletions coordinator/internal/orm/chunk.go
Original file line number Diff line number Diff line change
@@ -73,36 +73,46 @@ func (*Chunk) TableName() string {

// GetUnassignedChunk retrieves unassigned chunk based on the specified limit.
// The returned chunks are sorted in ascending order by their index.
func (o *Chunk) GetUnassignedChunk(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height uint64) (*Chunk, error) {
var chunk Chunk
func (o *Chunk) GetUnassignedChunk(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height, limit uint64) ([]*Chunk, error) {
var chunks []*Chunk
db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT 1;",
int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, height)
err := db.Raw(sql).Scan(&chunk).Error
sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT %d;",
int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, height, limit)
err := db.Raw(sql).Scan(&chunks).Error
if err != nil {
return nil, fmt.Errorf("Chunk.GetUnassignedChunk error: %w", err)
}
if chunk.Hash == "" {
return nil, nil
return chunks, nil
}

// GetUnassignedChunkCount retrieves unassigned chunk count based on the specified limit.
func (o *Chunk) GetUnassignedChunkCount(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height uint64) (int64, error) {
var count int64
db := o.db.WithContext(ctx)
db = db.Model(&Chunk{})
db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned))
db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("end_block_number <= ?", height)
db = db.Where("chunk.deleted_at IS NULL")
if err := db.Count(&count).Error; err != nil {
return 0, fmt.Errorf("Chunk.GetUnassignedChunkCount error: %w", err)
}
return &chunk, nil
return count, nil
}

// GetAssignedChunk retrieves assigned chunk based on the specified limit.
// GetAssignedChunks retrieves assigned chunks based on the specified limit.
// The returned chunks are sorted in ascending order by their index.
func (o *Chunk) GetAssignedChunk(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height uint64) (*Chunk, error) {
var chunk Chunk
func (o *Chunk) GetAssignedChunks(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height uint64, limit uint64) ([]*Chunk, error) {
var chunks []*Chunk
db := o.db.WithContext(ctx)
sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT 1;",
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, height)
err := db.Raw(sql).Scan(&chunk).Error
sql := fmt.Sprintf("SELECT * FROM chunk WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND end_block_number <= %d AND chunk.deleted_at IS NULL ORDER BY chunk.index LIMIT %d;",
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, height, limit)
err := db.Raw(sql).Scan(&chunks).Error
if err != nil {
return nil, fmt.Errorf("Chunk.GetAssignedChunk error: %w", err)
return nil, fmt.Errorf("Chunk.GetAssignedChunks error: %w", err)
}
if chunk.Hash == "" {
return nil, nil
}
return &chunk, nil
return chunks, nil
}

// GetChunksByBatchHash retrieves the chunks associated with a specific batch hash.
17 changes: 17 additions & 0 deletions coordinator/internal/orm/prover_task.go
Original file line number Diff line number Diff line change
@@ -148,6 +148,23 @@ func (o *ProverTask) GetAssignedTaskOfOtherProvers(ctx context.Context, taskType
return proverTasks, nil
}

// GetTaskOfOtherProvers get the chunk/batch task of prover
func (o *ProverTask) GetTaskOfProver(ctx context.Context, taskType message.ProofType, taskID, proverPublicKey, proverVersion string) (*ProverTask, error) {
db := o.db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Where("task_type", int(taskType))
db = db.Where("task_id", taskID)
db = db.Where("prover_public_key", proverPublicKey)
db = db.Where("prover_version", proverVersion)

var proverTask ProverTask
err := db.First(&proverTask).Error
if err != nil {
return nil, fmt.Errorf("ProverTask.GetTaskOfProver error: %w, taskID: %v, publicKey:%s", err, taskID, proverPublicKey)
}
return &proverTask, nil
}

// GetProvingStatusByTaskID retrieves the proving status of a prover task
func (o *ProverTask) GetProvingStatusByTaskID(ctx context.Context, taskType message.ProofType, taskID string) (types.ProverProveStatus, error) {
db := o.db.WithContext(ctx)
130 changes: 130 additions & 0 deletions rollup/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
{
"l1_config": {
"endpoint": "https://alien-flashy-arm.ethereum-sepolia.quiknode.pro/2aeb75414e5ee0e930b64c2e7feff59efb537f30",
"start_height": 0,
"relayer_config": {
"gas_price_oracle_contract_address": "0x5300000000000000000000000000000000000002",
"sender_config": {
"endpoint": "http://l2-rpc.scrollsdk",
"escalate_blocks": 100,
"escalate_multiple_num": 11,
"escalate_multiple_den": 10,
"min_gas_tip": 1,
"max_gas_price": 10000000000000,
"tx_type": "DynamicFeeTx",
"check_pending_time": 3,
"confirmations": "0x0"
},
"gas_oracle_config": {
"min_gas_price": 0,
"gas_price_diff": 50000,
"l1_base_fee_weight": 0.086,
"l1_blob_base_fee_weight": 0.030,
"check_committed_batches_window_minutes": 5,
"l1_base_fee_default": 15000000000,
"l1_blob_base_fee_default": 1,
"alternative_gas_token_config": {
"enabled": false,
"mode": "Fixed",
"fixed_exchange_rate": 0.001,
"token_symbol_pair": ""
}
},
"gas_oracle_sender_signer_config": {
"signer_type": "PrivateKey",
"private_key_signer_config": {
"private_key": "1313131313131313131313131313131313131313131313131313131313131313"
}
}
}
},
"l2_config": {
"confirmations": "0x10",
"endpoint": "http://l2-rpc.scrollsdk",
"l2_message_queue_address": "0x5300000000000000000000000000000000000000",
"relayer_config": {
"rollup_contract_address": "0xBAA5Cc4a4Ca1c596CbF33183A43148c832a53CC5",
"gas_price_oracle_contract_address": "0x30D802Ba5E7BF1145cA35E67de07388e4C5B8487",
"sender_config": {
"endpoint": "https://alien-flashy-arm.ethereum-sepolia.quiknode.pro/2aeb75414e5ee0e930b64c2e7feff59efb537f30",
"escalate_blocks": 4,
"escalate_multiple_num": 12,
"escalate_multiple_den": 10,
"min_gas_tip": 100000000,
"max_gas_price": 200000000000,
"max_blob_gas_price": 200000000000,
"tx_type": "DynamicFeeTx",
"check_pending_time": 10,
"confirmations": "0x0",
"max_pending_blob_txs": 3
},
"gas_oracle_config": {
"min_gas_price": 0,
"gas_price_diff": 50000,
"alternative_gas_token_config": {
"enabled": false,
"mode": "Fixed",
"fixed_exchange_rate": 0.001,
"token_symbol_pair": ""
}
},
"chain_monitor": {
"enabled": true,
"timeout": 3,
"try_times": 5,
"base_url": "http://chain-monitor:8080"
},
"enable_test_env_bypass_features": false,
"finalize_batch_without_proof_timeout_sec": 300,
"finalize_bundle_without_proof_timeout_sec": 300,
"gas_oracle_sender_signer_config": {
"signer_type": "PrivateKey",
"private_key_signer_config": {
"private_key": "1313131313131313131313131313131313131313131313131313131313131313"
}
},
"commit_sender_signer_config": {
"signer_type": "PrivateKey",
"private_key_signer_config": {
"private_key": "1414141414141414141414141414141414141414141414141414141414141414"
}
},
"finalize_sender_signer_config": {
"signer_type": "PrivateKey",
"private_key_signer_config": {
"private_key": "1515151515151515151515151515151515151515151515151515151515151515"
}
},
"l1_commit_gas_limit_multiplier": 1.2
},
"chunk_proposer_config": {
"propose_interval_milliseconds": 10000,
"max_block_num_per_chunk": 1000000,
"max_tx_num_per_chunk": 1000000,
"max_l1_commit_gas_per_chunk": 50000000,
"max_l1_commit_calldata_size_per_chunk": 1100000,
"chunk_timeout_sec": 27000,
"max_row_consumption_per_chunk": 10000000,
"gas_cost_increase_multiplier": 1.2,
"max_uncompressed_batch_bytes_size": 634880
},
"batch_proposer_config": {
"propose_interval_milliseconds": 1000,
"max_l1_commit_gas_per_batch": 5000000,
"max_l1_commit_calldata_size_per_batch": 110000,
"batch_timeout_sec": 2700,
"gas_cost_increase_multiplier": 1.2,
"max_uncompressed_batch_bytes_size": 634880
},
"bundle_proposer_config": {
"max_batch_num_per_bundle": 30,
"bundle_timeout_sec": 36000
}
},
"db_config": {
"driver_name": "postgres",
"dsn": "postgres://rollup_node:0.qfxlf8tgld@morty-11-28-do-user-9610937-0.i.db.ondigitalocean.com:25060/scroll_rollup?sslmode=require",
"maxOpenNum": 50,
"maxIdleNum": 20
}
}