diff --git a/example/broadcast/test_broadcast.go b/example/broadcast/test_broadcast.go index 410daf0..3be403e 100644 --- a/example/broadcast/test_broadcast.go +++ b/example/broadcast/test_broadcast.go @@ -26,7 +26,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) -var _ processor.BroadcastProcessor = &TestBroadcast{} +var _ processor.BroadcastProcessor = (*TestBroadcast)(nil) type TestBroadcast struct{} diff --git a/example/mapreduce/order_info.go b/example/mapreduce/order_info.go index d081757..873f6af 100644 --- a/example/mapreduce/order_info.go +++ b/example/mapreduce/order_info.go @@ -28,7 +28,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) -var _ processor.MapReduceJobProcessor = &TestMapReduceJob{} +var _ processor.MapReduceJobProcessor = (*TestMapReduceJob)(nil) type OrderInfo struct { Id string `json:"id"` diff --git a/example/standalone/helloworld.go b/example/standalone/helloworld.go index dba2309..8daac55 100644 --- a/example/standalone/helloworld.go +++ b/example/standalone/helloworld.go @@ -24,7 +24,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor/jobcontext" ) -var _ processor.Processor = &HelloWorld{} +var _ processor.Processor = (*HelloWorld)(nil) type HelloWorld struct{} diff --git a/go.mod b/go.mod index 3045207..749e6fa 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/lithammer/shortuuid/v4 v4.0.0 // indirect + github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/orcaman/concurrent-map v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -42,6 +43,8 @@ require ( github.com/prometheus/procfs v0.10.1 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect + github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect github.com/twmb/murmur3 v1.1.6 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opentelemetry.io/otel v1.16.0 // indirect diff --git a/go.sum b/go.sum index e98b262..4792a81 100644 --- a/go.sum +++ b/go.sum @@ -29,6 +29,7 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -39,6 +40,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw7k08o4c= github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM= github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= @@ -85,6 +88,10 @@ github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhso github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= +github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= +github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q= github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg= github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= @@ -132,6 +139,8 @@ golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/internal/actor/container_actor.go b/internal/actor/container_actor.go index a00a396..98e5e12 100644 --- a/internal/actor/container_actor.go +++ b/internal/actor/container_actor.go @@ -40,7 +40,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) -var _ actor.Actor = &containerActor{} +var _ actor.Actor = (*containerActor)(nil) var defaultActorPool, _ = ants.NewPool( math.MaxInt32, diff --git a/internal/actor/heartbeat_actor.go b/internal/actor/heartbeat_actor.go index f950867..9d9b5f0 100644 --- a/internal/actor/heartbeat_actor.go +++ b/internal/actor/heartbeat_actor.go @@ -30,7 +30,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/logger" ) -var _ actor.Actor = &heartbeatActor{} +var _ actor.Actor = (*heartbeatActor)(nil) // heartbeatActor is the type heartbeatActor struct { diff --git a/internal/actor/heartbeat_processor.go b/internal/actor/heartbeat_processor.go index 8ee775b..94ec6e9 100644 --- a/internal/actor/heartbeat_processor.go +++ b/internal/actor/heartbeat_processor.go @@ -32,7 +32,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/logger" ) -var _ actor.Process = &heartbeatProcessor{} +var _ actor.Process = (*heartbeatProcessor)(nil) type heartbeatProcessor struct { connpool pool.ConnPool diff --git a/internal/actor/job_instance_actor.go b/internal/actor/job_instance_actor.go index ac58cf9..fc2bc03 100644 --- a/internal/actor/job_instance_actor.go +++ b/internal/actor/job_instance_actor.go @@ -39,7 +39,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/logger" ) -var _ actor.Actor = &jobInstanceActor{} +var _ actor.Actor = (*jobInstanceActor)(nil) type jobInstanceActor struct { connpool pool.ConnPool @@ -178,7 +178,7 @@ func (a *jobInstanceActor) handleSubmitJobInstance(actorCtx actor.Context, msg * if taskMaster != nil { masterpool.GetTaskMasterPool().Put(jobInstanceInfo.GetJobInstanceId(), taskMaster) - if err := taskMaster.SubmitInstance(msg.Ctx, jobInstanceInfo); err != nil { + if err := taskMaster.SubmitInstance(jobInstanceInfo); err != nil { return err } logger.Infof("Submit jobInstanceId=%d succeed", req.GetJobInstanceId()) @@ -230,34 +230,31 @@ func convert2JobInstanceData(datas []*schedulerx.UpstreamData) []*common.JobInst } func (a *jobInstanceActor) handleKillJobInstance(actorCtx actor.Context, msg *actorcomm.SchedulerWrappedMsg) error { - var ( - taskMasterPool = masterpool.GetTaskMasterPool() - req = msg.Msg.(*schedulerx.ServerKillJobInstanceRequest) - ) + req := msg.Msg.(*schedulerx.ServerKillJobInstanceRequest) + taskMasterPool := masterpool.GetTaskMasterPool() logger.Infof("handleKillJobInstance, jobInstanceId=%d ", req.GetJobInstanceId()) + + var resp *schedulerx.ServerKillJobInstanceResponse if !taskMasterPool.Contains(req.GetJobInstanceId()) { errMsg := fmt.Sprintf("%d is not exist", req.GetJobInstanceId()) logger.Infof(errMsg) - resp := &schedulerx.ServerKillJobInstanceResponse{ - Success: proto.Bool(true), + resp = &schedulerx.ServerKillJobInstanceResponse{ + Success: proto.Bool(false), Message: proto.String(errMsg), } - actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath)) } else { - if taskMaster := masterpool.GetTaskMasterPool().Get(req.GetJobInstanceId()); taskMaster != nil { + if taskMaster := taskMasterPool.Get(req.GetJobInstanceId()); taskMaster != nil { if err := taskMaster.KillInstance("killed from server"); err != nil { logger.Infof("%d killed from server failed, err=%s", req.GetJobInstanceId(), err.Error()) } } - errMsg := fmt.Sprintf("%d killed from server", req.GetJobInstanceId()) - logger.Infof(errMsg) - resp := &schedulerx.ServerKillJobInstanceResponse{ - Success: proto.Bool(false), // FIXME true or false - Message: proto.String(errMsg), + resp = &schedulerx.ServerKillJobInstanceResponse{ + Success: proto.Bool(true), + Message: proto.String("killed from server"), } - actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath)) logger.Infof("Kill jobInstanceId=%d succeed", req.GetJobInstanceId()) } + actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath)) return nil } diff --git a/internal/actor/job_instance_processor.go b/internal/actor/job_instance_processor.go index 4c9790c..2eb072f 100644 --- a/internal/actor/job_instance_processor.go +++ b/internal/actor/job_instance_processor.go @@ -32,7 +32,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/logger" ) -var _ actor.Process = &jobInstanceProcessor{} +var _ actor.Process = (*jobInstanceProcessor)(nil) type jobInstanceProcessor struct { connpool pool.ConnPool diff --git a/internal/actor/task_actor.go b/internal/actor/task_actor.go index f39324e..4ae2188 100644 --- a/internal/actor/task_actor.go +++ b/internal/actor/task_actor.go @@ -33,7 +33,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor" ) -var _ actor.Actor = &taskActor{} +var _ actor.Actor = (*taskActor)(nil) // taskActor is the type taskActor struct { diff --git a/internal/actor/task_actor_processor.go b/internal/actor/task_actor_processor.go index fa82a3f..221a5c6 100644 --- a/internal/actor/task_actor_processor.go +++ b/internal/actor/task_actor_processor.go @@ -31,7 +31,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/logger" ) -var _ actor.Process = &taskProcessor{} +var _ actor.Process = (*taskProcessor)(nil) type taskProcessor struct { connpool pool.ConnPool diff --git a/internal/batch/base_req_handler.go b/internal/batch/base_req_handler.go index ef01d13..6382df0 100644 --- a/internal/batch/base_req_handler.go +++ b/internal/batch/base_req_handler.go @@ -28,7 +28,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/logger" ) -var _ ReqHandler = &BaseReqHandler{} +var _ ReqHandler = (*BaseReqHandler)(nil) var globalPool, _ = ants.NewPool( math.MaxInt32, diff --git a/internal/batch/container_status_req_handler.go b/internal/batch/container_status_req_handler.go index fb9b063..7bcaa56 100644 --- a/internal/batch/container_status_req_handler.go +++ b/internal/batch/container_status_req_handler.go @@ -42,11 +42,6 @@ func (h *ContainerStatusReqHandler) GetTaskMasterAkkaPath() string { return h.taskMasterAkkaPath } -type Pair struct { - jobInstanceId int64 - serialNum int64 -} - func (h *ContainerStatusReqHandler) Process(jobInstanceId int64, requests []interface{}, workerAddr string) { reqs := make([]*schedulerx.ContainerReportTaskStatusRequest, 0, len(requests)) for _, req := range requests { diff --git a/internal/batch/task_pull_req_handler.go b/internal/batch/task_pull_req_handler.go index 0cd8464..a1d8885 100644 --- a/internal/batch/task_pull_req_handler.go +++ b/internal/batch/task_pull_req_handler.go @@ -22,7 +22,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/logger" ) -var _ TaskDispatchReqHandler = &TaskPullReqHandler{} +var _ TaskDispatchReqHandler = (*TaskPullReqHandler)(nil) type TaskPullReqHandler struct { *BaseTaskDispatchReqHandler diff --git a/internal/batch/task_push_req_handler.go b/internal/batch/task_push_req_handler.go index a9d33c9..e69e0e7 100644 --- a/internal/batch/task_push_req_handler.go +++ b/internal/batch/task_push_req_handler.go @@ -24,7 +24,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/logger" ) -var _ TaskDispatchReqHandler = &TaskPushReqHandler{} +var _ TaskDispatchReqHandler = (*TaskPushReqHandler)(nil) type TaskPushReqHandler struct { *BaseTaskDispatchReqHandler diff --git a/internal/batch/tms_status_req_handler.go b/internal/batch/tms_status_req_handler.go index 66de927..91f381d 100644 --- a/internal/batch/tms_status_req_handler.go +++ b/internal/batch/tms_status_req_handler.go @@ -24,7 +24,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/logger" ) -var _ ReqHandler = &TMStatusReqHandler{} +var _ ReqHandler = (*TMStatusReqHandler)(nil) type TMStatusReqHandler struct { *BaseReqHandler diff --git a/internal/container/thread_container.go b/internal/container/thread_container.go index 8679f80..312676e 100644 --- a/internal/container/thread_container.go +++ b/internal/container/thread_container.go @@ -37,7 +37,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) -var _ Container = &ThreadContainer{} +var _ Container = (*ThreadContainer)(nil) type ThreadContainer struct { jobCtx *jobcontext.JobContext diff --git a/internal/master/batch_task_master.go b/internal/master/batch_task_master.go index 155b2a7..d0a86cd 100644 --- a/internal/master/batch_task_master.go +++ b/internal/master/batch_task_master.go @@ -28,7 +28,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/internal/utils" ) -var _ taskmaster.MapTaskMaster = &BatchTaskMaster{} +var _ taskmaster.MapTaskMaster = (*BatchTaskMaster)(nil) type BatchTaskMaster struct { *GridTaskMaster diff --git a/internal/master/broadcast_task_master.go b/internal/master/broadcast_task_master.go index 42b9d2f..c8366c9 100644 --- a/internal/master/broadcast_task_master.go +++ b/internal/master/broadcast_task_master.go @@ -17,7 +17,6 @@ package master import ( - "context" "encoding/json" "fmt" "strconv" @@ -42,7 +41,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) -var _ taskmaster.TaskMaster = &BroadcastTaskMaster{} +var _ taskmaster.TaskMaster = (*BroadcastTaskMaster)(nil) type BroadcastTaskMaster struct { *TaskMaster @@ -76,7 +75,7 @@ func NewBroadcastTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx ac return broadcastTaskMaster } -func (m *BroadcastTaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error { +func (m *BroadcastTaskMaster) SubmitInstance(jobInstanceInfo *common.JobInstanceInfo) error { if err := m.preProcess(jobInstanceInfo); err != nil { logger.Errorf("BroadcastTaskMaster.preProcess failed, jobInstanceId=%d, err=%s", jobInstanceInfo.GetJobInstanceId(), err.Error()) if e := m.TaskMaster.updateNewInstanceStatus(m.GetSerialNum(), m.jobInstanceInfo.GetJobInstanceId(), processor.InstanceStatusFailed, "Preprocess failed. "+err.Error()); e != nil { @@ -185,7 +184,7 @@ func (m *BroadcastTaskMaster) dispatchTask(jobInstanceInfo *common.JobInstanceIn } func (m *BroadcastTaskMaster) KillInstance(reason string) error { - m.TaskMaster.KillInstance(reason) + _ = m.TaskMaster.KillInstance(reason) for _, workerIdAddr := range m.allWorkers { uniqueId, ok := m.worker2uniqueIdMap.Load(workerIdAddr) @@ -362,7 +361,7 @@ func (m *BroadcastTaskMaster) checkWorkerAlive() { m.aliveCheckWorkerSet.Add(worker) } - for _, workerIdAddr := range m.aliveCheckWorkerSet.ToStringSlice() { + for _, workerIdAddr := range m.aliveCheckWorkerSet.Keys() { req := &schedulerx.MasterCheckWorkerAliveRequest{ JobInstanceId: proto.Int64(m.jobInstanceInfo.GetJobInstanceId()), } diff --git a/internal/master/common_update_instance_status_handler.go b/internal/master/common_update_instance_status_handler.go index 014349c..f723a88 100644 --- a/internal/master/common_update_instance_status_handler.go +++ b/internal/master/common_update_instance_status_handler.go @@ -29,7 +29,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor" ) -var _ UpdateInstanceStatusHandler = &commonUpdateInstanceStatusHandler{} +var _ UpdateInstanceStatusHandler = (*commonUpdateInstanceStatusHandler)(nil) type commonUpdateInstanceStatusHandler struct { *baseUpdateInstanceStatusHandler diff --git a/internal/master/grid_task_master.go b/internal/master/grid_task_master.go index 8ec89f4..d1c6f87 100644 --- a/internal/master/grid_task_master.go +++ b/internal/master/grid_task_master.go @@ -33,7 +33,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor/jobcontext" ) -var _ taskmaster.MapTaskMaster = &GridTaskMaster{} +var _ taskmaster.MapTaskMaster = (*GridTaskMaster)(nil) type GridTaskMaster struct { *MapTaskMaster diff --git a/internal/master/map_task_master.go b/internal/master/map_task_master.go index 3fc9e29..dd38376 100644 --- a/internal/master/map_task_master.go +++ b/internal/master/map_task_master.go @@ -17,7 +17,6 @@ package master import ( - "context" "encoding/json" "errors" "fmt" @@ -293,7 +292,7 @@ func (m *MapTaskMaster) checkWorkerAlive() { m.taskPersistence.BatchUpdateTaskStatus(m.GetJobInstanceInfo().GetJobInstanceId(), taskstatus.TaskStatusFailed, "", "") break } else { - for _, workerIdAddr := range m.aliveCheckWorkerSet.ToStringSlice() { + for _, workerIdAddr := range m.aliveCheckWorkerSet.Keys() { workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr) times := 0 @@ -376,7 +375,7 @@ func (m *MapTaskMaster) notifyWorkerPull() { } } -func (m *MapTaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error { +func (m *MapTaskMaster) SubmitInstance(jobInstanceInfo *common.JobInstanceInfo) error { var err error defer func() { if err != nil { @@ -790,7 +789,8 @@ func (m *MapTaskMaster) selectWorker() string { } func (m *MapTaskMaster) KillInstance(reason string) error { - m.TaskMaster.KillInstance(reason) + _ = m.TaskMaster.KillInstance(reason) + allWorkers := m.GetJobInstanceInfo().GetAllWorkers() for _, workerIdAddr := range allWorkers { request := &schedulerx.MasterKillContainerRequest{ diff --git a/internal/master/parallel_task_mater.go b/internal/master/parallel_task_mater.go index be3b701..9f4a805 100644 --- a/internal/master/parallel_task_mater.go +++ b/internal/master/parallel_task_mater.go @@ -40,7 +40,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) -var _ taskmaster.ParallelTaskMaster = &ParallelTaskMaster{} +var _ taskmaster.ParallelTaskMaster = (*ParallelTaskMaster)(nil) const batchSize = 256 diff --git a/internal/master/persistence/server_task_persistence.go b/internal/master/persistence/server_task_persistence.go index 791e195..2ff06ff 100644 --- a/internal/master/persistence/server_task_persistence.go +++ b/internal/master/persistence/server_task_persistence.go @@ -35,7 +35,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) -var _ TaskPersistence = &ServerTaskPersistence{} +var _ TaskPersistence = (*ServerTaskPersistence)(nil) type ServerTaskPersistence struct { serverDiscovery *discovery.ServiceDiscover diff --git a/internal/master/second_job_update_instance_status_handler.go b/internal/master/second_job_update_instance_status_handler.go index 096071e..f65c256 100644 --- a/internal/master/second_job_update_instance_status_handler.go +++ b/internal/master/second_job_update_instance_status_handler.go @@ -17,7 +17,6 @@ package master import ( - "context" "encoding/json" "fmt" "io" @@ -44,7 +43,7 @@ import ( const missServerKillTime = 30 // seconds -type secondJobUpdateInstanceStatusHandler struct { +type SecondJobUpdateInstanceStatusHandler struct { *baseUpdateInstanceStatusHandler actorCtx actor.Context secondProgressDetail *common.SecondProgressDetail @@ -56,7 +55,7 @@ type secondJobUpdateInstanceStatusHandler struct { } func NewSecondJobUpdateInstanceStatusHandler(actorCtx actor.Context, taskMaster taskmaster.TaskMaster, jobInstanceInfo *common.JobInstanceInfo) UpdateInstanceStatusHandler { - h := &secondJobUpdateInstanceStatusHandler{ + h := &SecondJobUpdateInstanceStatusHandler{ baseUpdateInstanceStatusHandler: NewBaseUpdateInstanceStatusHandler(jobInstanceInfo, taskMaster), actorCtx: actorCtx, cycleStartTime: time.Now().UnixMilli(), @@ -68,14 +67,14 @@ func NewSecondJobUpdateInstanceStatusHandler(actorCtx actor.Context, taskMaster return h } -func (h *secondJobUpdateInstanceStatusHandler) init() { +func (h *SecondJobUpdateInstanceStatusHandler) init() { GetTimeScheduler().init() // job instance progress report thread. go h.reportJobInstanceProgress() } -func (h *secondJobUpdateInstanceStatusHandler) reportJobInstanceProgress() { +func (h *SecondJobUpdateInstanceStatusHandler) reportJobInstanceProgress() { intervalTimes := 0 jobIdAndInstanceId := utils.GetUniqueIdWithoutTaskId(h.jobInstanceInfo.GetJobId(), h.jobInstanceInfo.GetJobInstanceId()) for !h.taskMaster.IsKilled() { @@ -113,7 +112,7 @@ func (h *secondJobUpdateInstanceStatusHandler) reportJobInstanceProgress() { // Kill self is required if any of the following conditions are met: // 1. Lost contact with the server for more than 30 seconds // 2. The grid task has no available worker -func (h *secondJobUpdateInstanceStatusHandler) need2KillSelf() error { +func (h *SecondJobUpdateInstanceStatusHandler) need2KillSelf() error { if !h.taskMaster.IsInited() { return nil } @@ -134,14 +133,14 @@ func (h *secondJobUpdateInstanceStatusHandler) need2KillSelf() error { return nil } -func (h *secondJobUpdateInstanceStatusHandler) getJobInstanceProgress() (string, error) { +func (h *SecondJobUpdateInstanceStatusHandler) getJobInstanceProgress() (string, error) { progress, err := h.taskMaster.GetJobInstanceProgress() if err != nil { return "", err } h.secondProgressDetail.SetRunningProgress(progress) h.secondProgressDetail.SetRunningStartTime(h.cycleStartTime) - h.secondProgressDetail.SetRecentProgressHistory(h.recentProgressHistory.Convert2Slice()) + h.secondProgressDetail.SetRecentProgressHistory(h.recentProgressHistory.ArrayList()) data, err := json.Marshal(h.secondProgressDetail) if err != nil { return "", err @@ -151,7 +150,7 @@ func (h *secondJobUpdateInstanceStatusHandler) getJobInstanceProgress() (string, } // Get the latest worker list -func (h *secondJobUpdateInstanceStatusHandler) getAllWorkers(appGroupId, jobId int64) (*utils.Set, error) { +func (h *SecondJobUpdateInstanceStatusHandler) getAllWorkers(appGroupId, jobId int64) (*utils.Set, error) { url := fmt.Sprintf("http://%s/app/getAllUsefulWorkerList.json?appGroupId=%d&jobId=%d", openapi.GetOpenAPIClient().Domain(), appGroupId, jobId) resp, err := openapi.GetOpenAPIClient().HttpClient().Get(url) if err != nil { @@ -182,7 +181,7 @@ func (h *secondJobUpdateInstanceStatusHandler) getAllWorkers(appGroupId, jobId i return set, nil } -func (h *secondJobUpdateInstanceStatusHandler) Handle(serialNum int64, instanceStatus processor.InstanceStatus, result string) error { +func (h *SecondJobUpdateInstanceStatusHandler) Handle(serialNum int64, instanceStatus processor.InstanceStatus, result string) error { cycleId := utils.GetUniqueId(h.jobInstanceInfo.GetJobId(), h.jobInstanceInfo.GetJobInstanceId(), h.taskMaster.GetSerialNum()) logger.Infof("cycleId:%s instanceStatus=%d cycle update status.", cycleId, instanceStatus) @@ -194,17 +193,15 @@ func (h *secondJobUpdateInstanceStatusHandler) Handle(serialNum int64, instanceS } // if instance is killed, need to report to server - // From a logical point of view, you only need to judge whether the master has been killed. - // There is no need to judge whether the result contains the specified information. - // However, history says that we should not delete it in the short term. - if h.taskMaster.IsKilled() && - (strings.Contains(result, "killed") || strings.Contains(result, "Worker master shutdown")) { + // 从逻辑看只需要判断master是否被kill即可,无需判断result是否包含指定信息,但历史这么写着短期不敢删减 + if h.taskMaster.IsKilled() && (strings.Contains(result, "killed") || + strings.Contains(result, "Worker master shutdown")) { h.taskMaster.SetInstanceStatus(processor.InstanceStatusFailed) h.taskMaster.Stop() h.masterPool.Remove(h.jobInstanceInfo.GetJobInstanceId()) if result != "killed from server" { - // There is no status feedback for the server-side forced stop operation. + // 对服务端强制停止操作不做状态反馈 req := &schedulerx.WorkerReportJobInstanceStatusRequest{ JobId: proto.Int64(h.jobInstanceInfo.GetJobId()), JobInstanceId: proto.Int64(h.jobInstanceInfo.GetJobInstanceId()), @@ -228,7 +225,7 @@ func (h *secondJobUpdateInstanceStatusHandler) Handle(serialNum int64, instanceS logger.Infof("report cycleId=%s, status=%d to AtLeastDeliveryRoutingActor", cycleId, instanceStatus) } - // If the instance terminates no further action is required + // 如果实例终止无需进行后续操作 return nil } @@ -239,7 +236,7 @@ func (h *secondJobUpdateInstanceStatusHandler) Handle(serialNum int64, instanceS return nil } -func (h *secondJobUpdateInstanceStatusHandler) triggerNextCycle(cycleId string, serialNum int64, instanceStatus processor.InstanceStatus) { +func (h *SecondJobUpdateInstanceStatusHandler) triggerNextCycle(cycleId string, serialNum int64, instanceStatus processor.InstanceStatus) { if serialNum != h.taskMaster.GetSerialNum() { logger.Infof("triggerNextCycle=%s ignore, current serialNum=%d, but trigger serialNum=%d, status=%d, killed=%v.", cycleId, h.taskMaster.GetSerialNum(), serialNum, instanceStatus, h.taskMaster.IsKilled()) @@ -285,7 +282,7 @@ func (h *secondJobUpdateInstanceStatusHandler) triggerNextCycle(cycleId string, } } -func (h *secondJobUpdateInstanceStatusHandler) setHistory(serialNum int64, loopStartTime int64, status processor.InstanceStatus) { +func (h *SecondJobUpdateInstanceStatusHandler) setHistory(serialNum int64, loopStartTime int64, status processor.InstanceStatus) { if status == processor.InstanceStatusSucceed { h.secondProgressDetail.GetTodayProgressCounter().IncrementOneSuccess() } else { @@ -359,7 +356,7 @@ func (h *secondJobUpdateInstanceStatusHandler) setHistory(serialNum int64, loopS } // Schedule a new iteration -func (h *secondJobUpdateInstanceStatusHandler) triggerNewCycle() { +func (h *SecondJobUpdateInstanceStatusHandler) triggerNewCycle() { cycleId := utils.GetUniqueId(h.jobInstanceInfo.GetJobId(), h.jobInstanceInfo.GetJobInstanceId(), h.taskMaster.AcquireSerialNum()) logger.Infof("cycleId:%s cycle begin.", cycleId) h.cycleStartTime = time.Now().UnixMilli() @@ -373,7 +370,7 @@ func (h *secondJobUpdateInstanceStatusHandler) triggerNewCycle() { } h.taskMaster.RestJobInstanceWorkerList(freeWorkers) } - h.taskMaster.SubmitInstance(context.Background(), h.jobInstanceInfo) + h.taskMaster.SubmitInstance(h.jobInstanceInfo) h.triggerTimes++ // If it is a standalone task, cu+1 diff --git a/internal/master/sharding_task_master.go b/internal/master/sharding_task_master.go index 8848c71..9cf5abf 100644 --- a/internal/master/sharding_task_master.go +++ b/internal/master/sharding_task_master.go @@ -17,7 +17,6 @@ package master import ( - "context" "encoding/json" "errors" "fmt" @@ -38,7 +37,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) -var _ taskmaster.MapTaskMaster = &ShardingTaskMaster{} +var _ taskmaster.MapTaskMaster = (*ShardingTaskMaster)(nil) type ShardingTaskMaster struct { *GridTaskMaster @@ -55,7 +54,7 @@ func NewShardingTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx act } } -func (m *ShardingTaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error { +func (m *ShardingTaskMaster) SubmitInstance(jobInstanceInfo *common.JobInstanceInfo) error { if err := m.parseShardingParameters(jobInstanceInfo); err != nil { m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, err.Error()) return err diff --git a/internal/master/standalone_task_master.go b/internal/master/standalone_task_master.go index 600da12..f8e9c6c 100644 --- a/internal/master/standalone_task_master.go +++ b/internal/master/standalone_task_master.go @@ -17,7 +17,6 @@ package master import ( - "context" "fmt" "strings" "time" @@ -39,7 +38,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) -var _ taskmaster.TaskMaster = &StandaloneTaskMaster{} +var _ taskmaster.TaskMaster = (*StandaloneTaskMaster)(nil) type StandaloneTaskMaster struct { *TaskMaster @@ -79,7 +78,7 @@ func NewStandaloneTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx a return standaloneTaskMaster } -func (m *StandaloneTaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error { +func (m *StandaloneTaskMaster) SubmitInstance(jobInstanceInfo *common.JobInstanceInfo) error { var ( err error uniqueId string @@ -162,6 +161,8 @@ func (m *StandaloneTaskMaster) selectWorker() string { } func (m *StandaloneTaskMaster) KillInstance(reason string) error { + _ = m.TaskMaster.KillInstance(reason) + uniqueId := utils.GetUniqueIdWithoutTaskId(m.jobInstanceInfo.GetJobId(), m.jobInstanceInfo.GetJobInstanceId()) req := &schedulerx.MasterKillContainerRequest{ JobId: proto.Int64(m.jobInstanceInfo.GetJobId()), diff --git a/internal/master/task_master.go b/internal/master/task_master.go index 51f7a0a..50e798b 100644 --- a/internal/master/task_master.go +++ b/internal/master/task_master.go @@ -17,7 +17,6 @@ package master import ( - "context" "encoding/json" "fmt" "sync" @@ -36,26 +35,26 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) -var _ taskmaster.TaskMaster = &TaskMaster{} +var _ taskmaster.TaskMaster = (*TaskMaster)(nil) type TaskMaster struct { - actorContext actor.Context `json:"actorContext,omitempty"` - instanceStatus processor.InstanceStatus `json:"instanceStatus,omitempty"` // WARNING: not concurrency-safe - taskStatusMap sync.Map `json:"taskStatusMap"` // key:string, val:TaskStatus - taskIdGenerator *atomic.Int64 `json:"taskIdGenerator"` // WARNING: not concurrency-safe - localWorkIdAddr string `json:"localWorkIdAddr,omitempty"` - localContainerRouterPath string `json:"localContainerRouterPath,omitempty"` - localTaskRouterPath string `json:"localTaskRouterPath,omitempty"` - localInstanceRouterPath string `json:"localInstanceRouterPath,omitempty"` - jobInstanceInfo *common.JobInstanceInfo `json:"jobInstanceInfo,omitempty"` - jobInstanceProgress string `json:"jobInstanceProgress,omitempty"` - statusHandler UpdateInstanceStatusHandler `json:"statusHandler,omitempty"` - killed bool `json:"killed,omitempty"` // WARNING: not concurrency-safe - inited bool `json:"inited,omitempty"` // WARNING: not concurrency-safe - aliveCheckWorkerSet *utils.ConcurrentSet `json:"aliveCheckWorkerSet,omitempty"` // string - serverDiscovery discovery.ServiceDiscover `json:"serverDiscovery"` - serialNum *atomic.Int64 `json:"serialNum,omitempty"` // 秒级任务使用,当前循环次数 - existInvalidWorker bool `json:"existInvalidWorker,omitempty"` // 是否存在失效Worker WARNING: not concurrency-safe + actorContext actor.Context `json:"actorContext,omitempty"` + instanceStatus processor.InstanceStatus `json:"instanceStatus,omitempty"` // WARNING: not concurrency-safe + taskStatusMap sync.Map `json:"taskStatusMap"` // key:string, val:TaskStatus + taskIdGenerator *atomic.Int64 `json:"taskIdGenerator"` // WARNING: not concurrency-safe + localWorkIdAddr string `json:"localWorkIdAddr,omitempty"` + localContainerRouterPath string `json:"localContainerRouterPath,omitempty"` + localTaskRouterPath string `json:"localTaskRouterPath,omitempty"` + localInstanceRouterPath string `json:"localInstanceRouterPath,omitempty"` + jobInstanceInfo *common.JobInstanceInfo `json:"jobInstanceInfo,omitempty"` + jobInstanceProgress string `json:"jobInstanceProgress,omitempty"` + statusHandler UpdateInstanceStatusHandler `json:"statusHandler,omitempty"` + killed bool `json:"killed,omitempty"` // WARNING: not concurrency-safe + inited bool `json:"inited,omitempty"` // WARNING: not concurrency-safe + aliveCheckWorkerSet *utils.ConcurrentSet[string] `json:"aliveCheckWorkerSet,omitempty"` // string + serverDiscovery discovery.ServiceDiscover `json:"serverDiscovery"` + serialNum *atomic.Int64 `json:"serialNum,omitempty"` // 秒级任务使用,当前循环次数 + existInvalidWorker bool `json:"existInvalidWorker,omitempty"` // 是否存在失效Worker WARNING: not concurrency-safe lock sync.RWMutex } @@ -67,7 +66,7 @@ func NewTaskMaster(actorCtx actor.Context, jobInstanceInfo *common.JobInstanceIn instanceStatus: processor.InstanceStatusRunning, taskStatusMap: sync.Map{}, taskIdGenerator: atomic.NewInt64(-1), - aliveCheckWorkerSet: utils.NewConcurrentSet(), + aliveCheckWorkerSet: utils.NewConcurrentSet[string](), jobInstanceInfo: jobInstanceInfo, actorContext: actorCtx, localWorkIdAddr: workerIdAddr, @@ -254,7 +253,7 @@ func (m *TaskMaster) RetryTasks(taskEntities []schedulerx.RetryTaskEntity) { return } -func (m *TaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error { +func (m *TaskMaster) SubmitInstance(jobInstanceInfo *common.JobInstanceInfo) error { // TODO Implement me return nil } @@ -311,7 +310,7 @@ func (m *TaskMaster) GetJobInstanceInfo() *common.JobInstanceInfo { } // GetAliveCheckWorkerSet return set -func (m *TaskMaster) GetAliveCheckWorkerSet() *utils.ConcurrentSet { +func (m *TaskMaster) GetAliveCheckWorkerSet() *utils.ConcurrentSet[string] { return m.aliveCheckWorkerSet } @@ -336,7 +335,7 @@ func (m *TaskMaster) ExistInvalidWorker() bool { func (m *TaskMaster) ResetJobInstanceWorkerList() { freeWorkersNum := m.aliveCheckWorkerSet.Len() if freeWorkersNum > 0 { - m.jobInstanceInfo.SetAllWorkers(m.aliveCheckWorkerSet.ToStringSlice()) + m.jobInstanceInfo.SetAllWorkers(m.aliveCheckWorkerSet.Keys()) m.existInvalidWorker = false logger.Infof("restJobInstanceWorkerList appGroupId=%d, instanceId=%d, workerSize=%d.", m.jobInstanceInfo.GetAppGroupId(), m.jobInstanceInfo.GetJobInstanceId(), freeWorkersNum) diff --git a/internal/master/taskmaster/taskmaster.go b/internal/master/taskmaster/taskmaster.go index d12edc3..424f97a 100644 --- a/internal/master/taskmaster/taskmaster.go +++ b/internal/master/taskmaster/taskmaster.go @@ -17,8 +17,6 @@ package taskmaster import ( - "context" - "github.com/alibaba/schedulerx-worker-go/internal/common" "github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx" "github.com/alibaba/schedulerx-worker-go/internal/utils" @@ -39,10 +37,10 @@ type TaskMaster interface { KillInstance(reason string) error GetInstanceStatus() processor.InstanceStatus GetJobInstanceProgress() (string, error) - GetAliveCheckWorkerSet() *utils.ConcurrentSet + GetAliveCheckWorkerSet() *utils.ConcurrentSet[string] GetJobInstanceInfo() *common.JobInstanceInfo RestJobInstanceWorkerList(freeWorkers *utils.Set) - SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error + SubmitInstance(jobInstanceInfo *common.JobInstanceInfo) error BatchUpdateTaskStatus(taskMaster TaskMaster, req *schedulerx.ContainerBatchReportTaskStatuesRequest) error UpdateTaskStatus(req *schedulerx.ContainerReportTaskStatusRequest) error SetInstanceStatus(instanceStatus processor.InstanceStatus) diff --git a/internal/master/time_plan_entry.go b/internal/master/time_plan_entry.go index 1d0783b..ab49b23 100644 --- a/internal/master/time_plan_entry.go +++ b/internal/master/time_plan_entry.go @@ -17,18 +17,20 @@ package master import ( + "fmt" + "github.com/alibaba/schedulerx-worker-go/internal/utils" ) -var _ utils.ComparatorItem = &TimePlanEntry{} +var _ utils.ComparatorItem = (*TimePlanEntry)(nil) type TimePlanEntry struct { jobInstanceId int64 scheduleTimeStamp int64 - handler *secondJobUpdateInstanceStatusHandler + handler *SecondJobUpdateInstanceStatusHandler } -func NewTimePlanEntry(jobInstanceId int64, scheduleTimeStamp int64, handler *secondJobUpdateInstanceStatusHandler) *TimePlanEntry { +func NewTimePlanEntry(jobInstanceId int64, scheduleTimeStamp int64, handler *SecondJobUpdateInstanceStatusHandler) *TimePlanEntry { return &TimePlanEntry{jobInstanceId: jobInstanceId, scheduleTimeStamp: scheduleTimeStamp, handler: handler} } @@ -36,28 +38,20 @@ func (t *TimePlanEntry) JobInstanceId() int64 { return t.jobInstanceId } -func (t *TimePlanEntry) SetJobInstanceId(jobInstanceId int64) { - t.jobInstanceId = jobInstanceId -} - func (t *TimePlanEntry) ScheduleTimeStamp() int64 { return t.scheduleTimeStamp } -func (t *TimePlanEntry) SetScheduleTimeStamp(scheduleTimeStamp int64) { - t.scheduleTimeStamp = scheduleTimeStamp -} - -func (t *TimePlanEntry) Handler() *secondJobUpdateInstanceStatusHandler { +func (t *TimePlanEntry) Handler() *SecondJobUpdateInstanceStatusHandler { return t.handler } -func (t *TimePlanEntry) SetHandler(handler *secondJobUpdateInstanceStatusHandler) { - t.handler = handler +func (t *TimePlanEntry) UniqueID() string { + return fmt.Sprintf("%d@%d", t.jobInstanceId, t.scheduleTimeStamp) } -func (t *TimePlanEntry) Value() interface{} { - return t +func (t *TimePlanEntry) String() string { + return fmt.Sprintf("TimePlanEntry [jobInstanceId=%d, scheduleTimeStamp=%d]", t.jobInstanceId, t.scheduleTimeStamp) } func (t *TimePlanEntry) Priority() int64 { diff --git a/internal/master/time_queue.go b/internal/master/time_queue.go index 0c9b65a..0d6dec1 100644 --- a/internal/master/time_queue.go +++ b/internal/master/time_queue.go @@ -23,36 +23,38 @@ import ( // TimeQueue Time queue sorted by scheduling time and task priority type TimeQueue struct { - timeSet *utils.ConcurrentSet // Set - timeQueue *utils.PriorityQueue // Queue priority queue, sorted from small to large by scheduling time + timeSet *utils.ConcurrentSet[string] // Set + timeQueue *utils.PriorityQueue // Queue priority queue, sorted from small to large by scheduling time } func NewTimeQueue() *TimeQueue { return &TimeQueue{ - timeSet: utils.NewConcurrentSet(), + timeSet: utils.NewConcurrentSet[string](), timeQueue: utils.NewPriorityQueue(100), } } func (q *TimeQueue) Add(timePlanEntry *TimePlanEntry) { - if !q.timeSet.Contains(timePlanEntry) { - q.timeSet.Add(timePlanEntry) + if !q.timeSet.Contains(timePlanEntry.UniqueID()) { + q.timeSet.Add(timePlanEntry.UniqueID()) q.timeQueue.PushItem(timePlanEntry) - logger.Infof("timeQueue add plan=%+v", timePlanEntry) + logger.Infof("timeQueue add plan=%s", timePlanEntry) } else { - logger.Warnf("plan=%+v is existed in timeQueue", timePlanEntry) + logger.Warnf("plan=%s is existed in timeQueue", timePlanEntry) } } func (q *TimeQueue) Remove(jobInstanceId int64) { - for q.timeQueue.Len() > 0 { - planEntry := q.timeQueue.Peek().(*TimePlanEntry) + q.timeQueue.RemoveBy(func(item utils.ComparatorItem) bool { + planEntry := item.(*TimePlanEntry) if jobInstanceId == planEntry.jobInstanceId { - q.timeQueue.Pop() - q.timeSet.Remove(planEntry) - logger.Infof("planEntry=%+v removed, event.getTriggerType() != null", planEntry) + // found item matched, remove from timeSet + q.timeSet.Remove(planEntry.UniqueID()) + logger.Infof("planEntry=%s removed, event.getTriggerType() != null", planEntry) + return true } - } + return false + }) } // Peek return the head of this queue, or returns null if this queue is empty. @@ -63,13 +65,10 @@ func (q *TimeQueue) Peek() *TimePlanEntry { return nil } -// RemoveHeader removes the head of this queue. -func (q *TimeQueue) RemoveHeader() *TimePlanEntry { - var planEntry *TimePlanEntry - if item := q.timeQueue.Pop(); item != nil { - planEntry = item.(*TimePlanEntry) - q.timeSet.Remove(planEntry) - } +// Pop removes the head of this queue. +func (q *TimeQueue) Pop() *TimePlanEntry { + planEntry := q.timeQueue.PopItem().(*TimePlanEntry) + q.timeSet.Remove(planEntry.UniqueID()) return planEntry } diff --git a/internal/master/time_scheduler.go b/internal/master/time_scheduler.go index 2798676..7c7687f 100644 --- a/internal/master/time_scheduler.go +++ b/internal/master/time_scheduler.go @@ -38,8 +38,7 @@ func GetTimeScheduler() *TimeScheduler { // TimeScheduler is the client time scheduler, mainly used for second-level task scheduling. type TimeScheduler struct { timeQueue *TimeQueue - inited bool // no concurrency safe - lock sync.RWMutex + once sync.Once } func newTimeScheduler() *TimeScheduler { @@ -49,21 +48,22 @@ func newTimeScheduler() *TimeScheduler { } func (s *TimeScheduler) init() { - if !s.isInited() { + s.once.Do(func() { go s.timeScan() logger.Infof("TimeScanThread started") - s.setInited(true) - } + }) } func (s *TimeScheduler) timeScan() { for { now := time.Now().UnixMilli() for !s.timeQueue.IsEmpty() { - if planEntry := s.timeQueue.Peek(); planEntry != nil && planEntry.ScheduleTimeStamp() <= now { - logger.Infof("%+v time ready", planEntry) - // 1. remove this from planQueue - s.timeQueue.RemoveHeader() + planEntry := s.timeQueue.Peek() + if planEntry != nil && planEntry.ScheduleTimeStamp() <= now { + // 1. pop this from timeQueue + planEntry = s.timeQueue.Pop() + logger.Infof("%s time ready", planEntry) + // 2. start this time based job s.submitPlan(planEntry) } else { @@ -75,18 +75,6 @@ func (s *TimeScheduler) timeScan() { } } -func (s *TimeScheduler) isInited() bool { - s.lock.RLock() - defer s.lock.RUnlock() - return s.inited -} - -func (s *TimeScheduler) setInited(flag bool) { - s.lock.Lock() - s.inited = flag - s.lock.Unlock() -} - func (s *TimeScheduler) add(planEntry *TimePlanEntry) { s.timeQueue.Add(planEntry) } diff --git a/internal/master/update_instance_status_handler.go b/internal/master/update_instance_status_handler.go index a7a2442..a519ea0 100644 --- a/internal/master/update_instance_status_handler.go +++ b/internal/master/update_instance_status_handler.go @@ -23,7 +23,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor" ) -var _ UpdateInstanceStatusHandler = &baseUpdateInstanceStatusHandler{} +var _ UpdateInstanceStatusHandler = (*baseUpdateInstanceStatusHandler)(nil) type UpdateInstanceStatusHandler interface { Handle(serialNum int64, newStatus processor.InstanceStatus, result string) error diff --git a/internal/utils/concurrent_set.go b/internal/utils/concurrent_set.go index a960e4a..38cd9ca 100644 --- a/internal/utils/concurrent_set.go +++ b/internal/utils/concurrent_set.go @@ -18,43 +18,43 @@ package utils import "sync" -type ConcurrentSet struct { - set sync.Map +type ConcurrentSet[T comparable] struct { + m sync.Map } -func NewConcurrentSet() *ConcurrentSet { - return &ConcurrentSet{ - set: sync.Map{}, +func NewConcurrentSet[T comparable]() *ConcurrentSet[T] { + return &ConcurrentSet[T]{ + m: sync.Map{}, } } -func (s *ConcurrentSet) Add(item interface{}) { - s.set.Store(item, struct{}{}) +func (s *ConcurrentSet[T]) Add(key T) { + s.m.Store(key, struct{}{}) } -func (s *ConcurrentSet) Remove(item interface{}) { - s.set.Delete(item) +func (s *ConcurrentSet[T]) Remove(key T) { + s.m.Delete(key) } -func (s *ConcurrentSet) Contains(item interface{}) bool { - _, ok := s.set.Load(item) +func (s *ConcurrentSet[T]) Contains(key T) bool { + _, ok := s.m.Load(key) return ok } -func (s *ConcurrentSet) ToStringSlice() []string { - slice := make([]string, 0) - s.set.Range(func(key, value interface{}) bool { - k := key.(string) - slice = append(slice, k) +func (s *ConcurrentSet[T]) Keys() []T { + keys := make([]T, 0) + s.m.Range(func(key, _ any) bool { + k := key.(T) + keys = append(keys, k) return true }) - return slice + return keys } -func (s *ConcurrentSet) Clear() { - s.set = sync.Map{} +func (s *ConcurrentSet[T]) Clear() { + s.m = sync.Map{} } -func (s *ConcurrentSet) Len() int { - return SyncMapLen(&s.set) +func (s *ConcurrentSet[T]) Len() int { + return SyncMapLen(&s.m) } diff --git a/internal/utils/concurrent_set_test.go b/internal/utils/concurrent_set_test.go index 9c3a0ce..07db895 100644 --- a/internal/utils/concurrent_set_test.go +++ b/internal/utils/concurrent_set_test.go @@ -22,7 +22,7 @@ import ( ) func TestConcurrentSet(t *testing.T) { - set := NewConcurrentSet() + set := NewConcurrentSet[int]() // Test add set.Add(1) @@ -63,13 +63,13 @@ func TestConcurrentSet(t *testing.T) { } func TestToStringSlice(t *testing.T) { - chs := &ConcurrentSet{} + chs := &ConcurrentSet[string]{} // Test add chs.Add("hello") chs.Add("world") - slice := chs.ToStringSlice() + slice := chs.Keys() if len(slice) != 2 { t.Fatalf("Expected length of 2, but got %d", len(slice)) diff --git a/internal/utils/limited_queue.go b/internal/utils/limited_queue.go index 36ed874..8473987 100644 --- a/internal/utils/limited_queue.go +++ b/internal/utils/limited_queue.go @@ -46,14 +46,11 @@ func (lq *LimitedQueue) Dequeue() *common.ProgressHistory { return item } -func (lq *LimitedQueue) Convert2Slice() []*common.ProgressHistory { - var ret []*common.ProgressHistory - for { - item := lq.Dequeue() - if item == nil { - break - } - ret = append(ret, item) +func (lq *LimitedQueue) ArrayList() []*common.ProgressHistory { + if len(lq.queue) == 0 { + return []*common.ProgressHistory{} } - return ret + result := make([]*common.ProgressHistory, len(lq.queue)) + copy(result, lq.queue) + return result } diff --git a/internal/utils/misc_test.go b/internal/utils/misc_test.go index 35bb433..e9dd7f9 100644 --- a/internal/utils/misc_test.go +++ b/internal/utils/misc_test.go @@ -24,9 +24,9 @@ import ( ) func TestSyncMapLen(t *testing.T) { - var m *sync.Map + var m sync.Map - if length := SyncMapLen(m); length != 0 { + if length := SyncMapLen(&m); length != 0 { t.Errorf("Expect=0,actual=%d", length) } @@ -34,13 +34,13 @@ func TestSyncMapLen(t *testing.T) { m.Store("key2", "value2") m.Store("key3", "value3") - if length := SyncMapLen(m); length != 3 { + if length := SyncMapLen(&m); length != 3 { t.Errorf("Expect=3,actual=%d", length) } m.Delete("key2") - if length := SyncMapLen(m); length != 2 { + if length := SyncMapLen(&m); length != 2 { t.Errorf("Expect=2,actual=%d", length) } } diff --git a/internal/utils/priority_queue.go b/internal/utils/priority_queue.go index 9ce1f56..f0a25ee 100644 --- a/internal/utils/priority_queue.go +++ b/internal/utils/priority_queue.go @@ -28,7 +28,6 @@ type PriorityQueue struct { type ComparatorItem interface { Priority() int64 - Value() interface{} } func NewPriorityQueue(initialCapacity int) *PriorityQueue { @@ -38,33 +37,28 @@ func NewPriorityQueue(initialCapacity int) *PriorityQueue { } func (pq *PriorityQueue) Len() int { - pq.mu.RLock() - defer pq.mu.RUnlock() + // Called by heap package, don't add lock return len(pq.items) } func (pq *PriorityQueue) Less(i, j int) bool { - pq.mu.RLock() - defer pq.mu.RUnlock() + // Called by heap package, don't add lock return pq.items[i].Priority() < pq.items[j].Priority() } func (pq *PriorityQueue) Swap(i, j int) { - pq.mu.Lock() - defer pq.mu.Unlock() + // Called by heap package, don't add lock pq.items[i], pq.items[j] = pq.items[j], pq.items[i] } func (pq *PriorityQueue) Push(x interface{}) { - pq.mu.Lock() - defer pq.mu.Unlock() + // Called by heap package, don't add lock item := x.(ComparatorItem) pq.items = append(pq.items, item) } func (pq *PriorityQueue) Pop() interface{} { - pq.mu.Lock() - defer pq.mu.Unlock() + // Called by heap package, don't add lock n := len(pq.items) item := pq.items[n-1] pq.items = pq.items[0 : n-1] @@ -72,24 +66,52 @@ func (pq *PriorityQueue) Pop() interface{} { } func (pq *PriorityQueue) PushItem(item ComparatorItem) { + pq.mu.Lock() + defer pq.mu.Unlock() heap.Push(pq, item) } func (pq *PriorityQueue) PopItem() ComparatorItem { + pq.mu.Lock() + defer pq.mu.Unlock() return heap.Pop(pq).(ComparatorItem) } func (pq *PriorityQueue) Peek() ComparatorItem { - var ret ComparatorItem - if item := heap.Pop(pq); item != nil { - ret = item.(ComparatorItem) - heap.Push(pq, item) + pq.mu.RLock() + defer pq.mu.RUnlock() + + if len(pq.items) > 0 { + return pq.items[0].(ComparatorItem) } - return ret + return nil } func (pq *PriorityQueue) Clear() { - for pq.Len() > 0 { - pq.PopItem() + pq.mu.Lock() + defer pq.mu.Unlock() + pq.items = make([]ComparatorItem, 0) +} + +// RemoveBy removes all items that satisfy the predicate function +// Returns the number of removed items +func (pq *PriorityQueue) RemoveBy(predicate func(ComparatorItem) bool) int { + pq.mu.Lock() + defer pq.mu.Unlock() + + originalLen := len(pq.items) + filtered := make([]ComparatorItem, 0, originalLen) + + // Iterate through all elements, keep those that don't match the removal condition + for _, item := range pq.items { + if !predicate(item) { + filtered = append(filtered, item) + } } + + // Rebuild the heap + pq.items = filtered + heap.Init(pq) + + return originalLen - len(filtered) } diff --git a/internal/utils/priority_queue_test.go b/internal/utils/priority_queue_test.go new file mode 100644 index 0000000..2cfac65 --- /dev/null +++ b/internal/utils/priority_queue_test.go @@ -0,0 +1,257 @@ +/* + * Copyright (c) 2023 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +import ( + "sync" + "testing" +) + +// TestItem is a test implementation of ComparatorItem +type TestItem struct { + priority int64 + value string +} + +func (t *TestItem) Priority() int64 { + return t.priority +} + +func TestNewPriorityQueue(t *testing.T) { + pq := NewPriorityQueue(10) + if pq == nil { + t.Error("NewPriorityQueue should not return nil") + } + if pq.Len() != 0 { + t.Errorf("New queue should be empty, got length %d", pq.Len()) + } +} + +func TestPriorityQueue_PushAndPop(t *testing.T) { + pq := NewPriorityQueue(10) + + item1 := &TestItem{priority: 3, value: "item3"} + item2 := &TestItem{priority: 1, value: "item1"} + item3 := &TestItem{priority: 2, value: "item2"} + + pq.PushItem(item1) + pq.PushItem(item2) + pq.PushItem(item3) + + if pq.Len() != 3 { + t.Errorf("Expected length 3, got %d", pq.Len()) + } + + // Should pop in priority order (lowest first) + popped := pq.PopItem() + if popped.(*TestItem).priority != 1 { + t.Errorf("Expected priority 1, got %d", popped.(*TestItem).priority) + } + + popped = pq.PopItem() + if popped.(*TestItem).priority != 2 { + t.Errorf("Expected priority 2, got %d", popped.(*TestItem).priority) + } + + popped = pq.PopItem() + if popped.(*TestItem).priority != 3 { + t.Errorf("Expected priority 3, got %d", popped.(*TestItem).priority) + } + + if pq.Len() != 0 { + t.Errorf("Queue should be empty after popping all items, got length %d", pq.Len()) + } +} + +func TestPriorityQueue_Peek(t *testing.T) { + pq := NewPriorityQueue(10) + + item1 := &TestItem{priority: 5, value: "item5"} + item2 := &TestItem{priority: 2, value: "item2"} + item3 := &TestItem{priority: 8, value: "item8"} + + pq.PushItem(item1) + pq.PushItem(item2) + pq.PushItem(item3) + + // Peek should return the lowest priority item without removing it + peeked := pq.Peek() + if peeked == nil { + t.Error("Peek should not return nil for non-empty queue") + } + if peeked.(*TestItem).priority != 2 { + t.Errorf("Expected priority 2, got %d", peeked.(*TestItem).priority) + } + + // Queue length should remain the same after peek + if pq.Len() != 3 { + t.Errorf("Queue length should remain 3 after peek, got %d", pq.Len()) + } + + // Peek again should return the same item + peeked2 := pq.Peek() + if peeked2.(*TestItem).priority != 2 { + t.Errorf("Second peek should return same priority 2, got %d", peeked2.(*TestItem).priority) + } +} + +func TestPriorityQueue_PeekEmpty(t *testing.T) { + pq := NewPriorityQueue(10) + + peeked := pq.Peek() + if peeked != nil { + t.Error("Peek on empty queue should return nil") + } +} + +func TestPriorityQueue_Clear(t *testing.T) { + pq := NewPriorityQueue(10) + + pq.PushItem(&TestItem{priority: 1, value: "item1"}) + pq.PushItem(&TestItem{priority: 2, value: "item2"}) + pq.PushItem(&TestItem{priority: 3, value: "item3"}) + + if pq.Len() != 3 { + t.Errorf("Expected length 3 before clear, got %d", pq.Len()) + } + + pq.Clear() + + if pq.Len() != 0 { + t.Errorf("Expected length 0 after clear, got %d", pq.Len()) + } + + peeked := pq.Peek() + if peeked != nil { + t.Error("Peek on cleared queue should return nil") + } +} + +func TestPriorityQueue_PopItem(t *testing.T) { + pq := NewPriorityQueue(10) + + item1 := &TestItem{priority: 1, value: "item1"} + item2 := &TestItem{priority: 2, value: "item2"} + item3 := &TestItem{priority: 3, value: "item3"} + + pq.PushItem(item1) + pq.PushItem(item2) + pq.PushItem(item3) + + popped := pq.PopItem() + if popped != item1 { + t.Error("pop item should be item1") + } + + popped = pq.PopItem() + if popped != item2 { + t.Error("pop item should be item2") + } + + popped = pq.PopItem() + if popped != item3 { + t.Error("pop item should be item3") + } +} + +func TestPriorityQueue_RemoveBy(t *testing.T) { + pq := NewPriorityQueue(10) + + pq.PushItem(&TestItem{priority: 1, value: "remove"}) + pq.PushItem(&TestItem{priority: 2, value: "keep"}) + pq.PushItem(&TestItem{priority: 3, value: "remove"}) + pq.PushItem(&TestItem{priority: 4, value: "keep"}) + pq.PushItem(&TestItem{priority: 5, value: "remove"}) + + // Remove all items with value "remove" + count := pq.RemoveBy(func(item ComparatorItem) bool { + return item.(*TestItem).value == "remove" + }) + + if count != 3 { + t.Errorf("Expected to remove 3 items, got %d", count) + } + + if pq.Len() != 2 { + t.Errorf("Expected length 2 after RemoveBy, got %d", pq.Len()) + } + + // Verify remaining items + item1 := pq.PopItem().(*TestItem) + if item1.value != "keep" || item1.priority != 2 { + t.Errorf("First item should be priority 2 with value 'keep', got priority %d value %s", item1.priority, item1.value) + } + + item2 := pq.PopItem().(*TestItem) + if item2.value != "keep" || item2.priority != 4 { + t.Errorf("Second item should be priority 4 with value 'keep', got priority %d value %s", item2.priority, item2.value) + } +} + +func TestPriorityQueue_RemoveByNoMatch(t *testing.T) { + pq := NewPriorityQueue(10) + + pq.PushItem(&TestItem{priority: 1, value: "item1"}) + pq.PushItem(&TestItem{priority: 2, value: "item2"}) + + count := pq.RemoveBy(func(item ComparatorItem) bool { + return item.(*TestItem).value == "nonexistent" + }) + + if count != 0 { + t.Errorf("Expected to remove 0 items, got %d", count) + } + + if pq.Len() != 2 { + t.Errorf("Expected length 2 after RemoveBy with no match, got %d", pq.Len()) + } +} + +func TestPriorityQueue_Concurrent(t *testing.T) { + pq := NewPriorityQueue(100) + var wg sync.WaitGroup + + // Concurrent pushes + for i := 0; i < 50; i++ { + wg.Add(1) + go func(priority int64) { + defer wg.Done() + pq.PushItem(&TestItem{priority: priority, value: "concurrent"}) + }(int64(i)) + } + + wg.Wait() + + if pq.Len() != 50 { + t.Errorf("Expected length 50 after concurrent pushes, got %d", pq.Len()) + } + + // Concurrent pops + for i := 0; i < 25; i++ { + wg.Add(1) + go func() { + defer wg.Done() + pq.PopItem() + }() + } + + wg.Wait() + + if pq.Len() != 25 { + t.Errorf("Expected length 25 after concurrent pops, got %d", pq.Len()) + } +} diff --git a/processor/jobcontext/jobcontext.go b/processor/jobcontext/jobcontext.go index 681e687..9bd16a8 100644 --- a/processor/jobcontext/jobcontext.go +++ b/processor/jobcontext/jobcontext.go @@ -24,7 +24,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) -var _ context.Context = &JobContext{} +var _ context.Context = (*JobContext)(nil) type JobContext struct { context.Context diff --git a/processor/mapjob/map_job_processor.go b/processor/mapjob/map_job_processor.go index c801345..e991160 100644 --- a/processor/mapjob/map_job_processor.go +++ b/processor/mapjob/map_job_processor.go @@ -37,7 +37,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor/mapjob/bizsubtask" ) -var _ processor.MapJobProcessor = &MapJobProcessor{} +var _ processor.MapJobProcessor = (*MapJobProcessor)(nil) const maxRetryCount = 3 diff --git a/processor/mapjob/map_reduce_job_processor.go b/processor/mapjob/map_reduce_job_processor.go index 03b5e60..b1f1143 100644 --- a/processor/mapjob/map_reduce_job_processor.go +++ b/processor/mapjob/map_reduce_job_processor.go @@ -21,7 +21,7 @@ import ( "github.com/alibaba/schedulerx-worker-go/processor/jobcontext" ) -var _ processor.MapReduceJobProcessor = &MapReduceJobProcessor{} +var _ processor.MapReduceJobProcessor = (*MapReduceJobProcessor)(nil) type MapReduceJobProcessor struct { *MapJobProcessor