Skip to content
Open
16 changes: 16 additions & 0 deletions pkg/common/moerr/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,14 @@ const (
// ErrSchedulerClosed scheduler has been closed, cannot schedule new jobs
ErrSchedulerClosed uint16 = 20641

// GC sync protection errors
ErrGCIsRunning uint16 = 20642
ErrSyncProtectionNotFound uint16 = 20643
ErrSyncProtectionExists uint16 = 20644
ErrSyncProtectionMaxCount uint16 = 20645
ErrSyncProtectionSoftDelete uint16 = 20646
ErrSyncProtectionInvalid uint16 = 20647

// Group 7: lock service
// ErrDeadLockDetected lockservice has detected a deadlock and should abort the transaction if it receives this error
ErrDeadLockDetected uint16 = 20701
Expand Down Expand Up @@ -505,6 +513,14 @@ var errorMsgRefer = map[uint16]moErrorMsgItem{
ErrOfflineTxnWrite: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "write offline txn: %s"},
ErrSchedulerClosed: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "scheduler closed"},

// GC sync protection errors
ErrGCIsRunning: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "GC is running, please retry later"},
ErrSyncProtectionNotFound: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "sync protection not found: %s"},
ErrSyncProtectionExists: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "sync protection already exists: %s"},
ErrSyncProtectionMaxCount: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "sync protection max count reached: %d"},
ErrSyncProtectionSoftDelete: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "sync protection is soft deleted: %s"},
ErrSyncProtectionInvalid: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "invalid sync protection request"},

// Group 7: lock service
ErrDeadLockDetected: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "deadlock detected"},
ErrLockTableBindChanged: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "lock table bind changed"},
Expand Down
25 changes: 25 additions & 0 deletions pkg/common/moerr/error_no_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,3 +499,28 @@ func NewCantCompileForPrepareNoCtx() *Error {
func NewSchedulerClosedNoCtx() *Error {
return newError(Context(), ErrSchedulerClosed)
}

// GC sync protection errors
func NewGCIsRunningNoCtx() *Error {
return newError(Context(), ErrGCIsRunning)
}

func NewSyncProtectionNotFoundNoCtx(jobID string) *Error {
return newError(Context(), ErrSyncProtectionNotFound, jobID)
}

func NewSyncProtectionExistsNoCtx(jobID string) *Error {
return newError(Context(), ErrSyncProtectionExists, jobID)
}

func NewSyncProtectionMaxCountNoCtx(maxCount int) *Error {
return newError(Context(), ErrSyncProtectionMaxCount, maxCount)
}

func NewSyncProtectionSoftDeleteNoCtx(jobID string) *Error {
return newError(Context(), ErrSyncProtectionSoftDelete, jobID)
}

func NewSyncProtectionInvalidNoCtx() *Error {
return newError(Context(), ErrSyncProtectionInvalid)
}
21 changes: 19 additions & 2 deletions pkg/sql/plan/function/ctl/cmd_disk_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@ import (

func IsValidArg(parameter string, proc *process.Process) (*cmd_util.DiskCleaner, error) {
parameters := strings.Split(parameter, ".")
if len(parameters) > 3 || len(parameters) < 1 {
if len(parameters) < 1 {
return nil, moerr.NewInternalError(proc.Ctx, "handleDiskCleaner: invalid argument!")
}
op := parameters[0]
switch op {
case cmd_util.AddChecker, cmd_util.RemoveChecker:
break
// These operations need key validation, check parameter count later
if len(parameters) > 3 {
return nil, moerr.NewInternalError(proc.Ctx, "handleDiskCleaner: invalid argument!")
}
case cmd_util.StopGC, cmd_util.StartGC:
return &cmd_util.DiskCleaner{
Op: op,
Expand All @@ -52,6 +55,20 @@ func IsValidArg(parameter string, proc *process.Process) (*cmd_util.DiskCleaner,
Op: op,
Key: cmd_util.GCVerify,
}, nil
case cmd_util.RegisterSyncProtection, cmd_util.RenewSyncProtection, cmd_util.UnregisterSyncProtection:
// Sync protection operations expect JSON value in the second parameter
// Format: register_sync_protection.{"job_id":"xxx","objects":["obj1"],"valid_ts":123}
// Note: JSON may contain dots, so we join all remaining parts
value := ""
if len(parameters) > 1 {
// Join remaining parts as JSON value (in case JSON contains dots)
value = strings.Join(parameters[1:], ".")
}

return &cmd_util.DiskCleaner{
Op: op,
Value: value,
}, nil
default:
return nil, moerr.NewInternalError(proc.Ctx, "handleDiskCleaner: invalid operation!")
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/vm/engine/cmd_util/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,3 +424,11 @@ func (f *FaultInjectReq) MarshalBinary() ([]byte, error) {
func (f *FaultInjectReq) UnmarshalBinary(data []byte) error {
return f.Unmarshal(data)
}

// SyncProtection is the request for sync protection operations
type SyncProtection struct {
JobID string `json:"job_id"` // Sync job ID
BF string `json:"bf"` // Base64 encoded BloomFilter data (for register)
ValidTS int64 `json:"valid_ts"` // Valid timestamp in nanoseconds (for register and renew)
TestObject string `json:"test_object"` // Test object name for debugging (optional)
}
Loading
Loading