diff --git a/backend/cmd/singer/server.go b/backend/cmd/singer/server.go index d723794..38618a8 100644 --- a/backend/cmd/singer/server.go +++ b/backend/cmd/singer/server.go @@ -23,7 +23,6 @@ import ( var ( serverConfigPath string - serverHttpAddr string ) var serverCmd = &cobra.Command{ @@ -67,12 +66,12 @@ var serverCmd = &cobra.Command{ r := api.SetupRouter(*cfg, publisher, db) srv := &http.Server{ - Addr: serverHttpAddr, + Addr: cfg.ServerAddr, Handler: r, } logs.Info("Starting SingerOS backend service...") - logs.Infof("Listening on %s", serverHttpAddr) + logs.Infof("Listening on %s", cfg.ServerAddr) go func() { if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { @@ -96,7 +95,6 @@ var serverCmd = &cobra.Command{ func init() { serverCmd.Flags().StringVar(&serverConfigPath, "config", "", "Configuration file path") - serverCmd.Flags().StringVar(&serverHttpAddr, "addr", ":8080", "HTTP server address") rootCmd.AddCommand(serverCmd) } diff --git a/backend/cmd/singer/worker.go b/backend/cmd/singer/worker.go index e0c2215..2c50aa7 100644 --- a/backend/cmd/singer/worker.go +++ b/backend/cmd/singer/worker.go @@ -23,10 +23,10 @@ import ( ) var ( - workerConfigPath string - workerServerAddr string - workerListenAddr string - workerAssistantCode string + workerConfigPath string + workerServerAddr string + workerListenAddr string + workerWorkerID string ) var workerCmd = &cobra.Command{ @@ -52,21 +52,21 @@ func init() { workerCmd.Flags().StringVar(&workerConfigPath, "config", "", "Configuration file path") workerCmd.Flags().StringVar(&workerServerAddr, "server-addr", "127.0.0.1:8080", "Server address for WebSocket connection") workerCmd.Flags().StringVar(&workerListenAddr, "listen-addr", ":8081", "Worker MCP server listen address for runtime bootstrap") - workerCmd.Flags().StringVar(&workerAssistantCode, "assistant-code", "", "Assistant code for configuration retrieval") + workerCmd.Flags().StringVar(&workerWorkerID, "worker-id", "", "Worker ID for configuration retrieval") rootCmd.AddCommand(workerCmd) } -func createWorker(ctx context.Context) (*client.Worker, error) { +func createWorker(ctx context.Context) (*client.WorkerClient, error) { _, err := loadWorkerConfig() if err != nil { return nil, fmt.Errorf("load config: %w", err) } return client.NewWorker(ctx, &client.WorkerConfig{ - ServerAddr: workerServerAddr, - AssistantCode: workerAssistantCode, - SkillsDir: "", - ToolsEnabled: true, + ServerAddr: workerServerAddr, + WorkerID: workerWorkerID, + SkillsDir: "", + ToolsEnabled: true, }) } @@ -78,9 +78,9 @@ func loadWorkerConfig() (*config.WorkerConfig, error) { return nil, fmt.Errorf("failed to load config from %s: %w", workerConfigPath, err) } } - if strings.TrimSpace(workerAssistantCode) != "" { - cfg.AssistantCode = workerAssistantCode - logs.Infof("Using assistant code from flag: %s", workerAssistantCode) + if strings.TrimSpace(workerWorkerID) != "" { + cfg.WorkerID = workerWorkerID + logs.Infof("Using worker ID from flag: %s", workerWorkerID) } if strings.TrimSpace(workerServerAddr) != "" { cfg.ServerAddr = workerServerAddr diff --git a/backend/cmd/singer/worker_simplechat.go b/backend/cmd/singer/worker_simplechat.go index c94a073..d36943f 100644 --- a/backend/cmd/singer/worker_simplechat.go +++ b/backend/cmd/singer/worker_simplechat.go @@ -18,7 +18,6 @@ var ( simpleChatAPIKey string simpleChatModel string simpleChatBaseURL string - ) var simpleChatCmd = &cobra.Command{ @@ -53,7 +52,7 @@ var simpleChatCmd = &cobra.Command{ workerCfg := &worker.WorkerConfig{ Runtime: scRuntime, - ServerAddr: simpleChatServer, + ServerAddr: workerServerAddr, } w, err := worker.NewWorker(ctx, workerCfg) @@ -76,7 +75,7 @@ func init() { simpleChatCmd.Flags().StringVar(&simpleChatAPIKey, "api-key", "", "OpenAI API key (or set OPENAI_API_KEY env)") simpleChatCmd.Flags().StringVar(&simpleChatModel, "model", "gpt-4", "LLM model to use") simpleChatCmd.Flags().StringVar(&simpleChatBaseURL, "base-url", "", "Custom API base URL") - + workerCmd.AddCommand(simpleChatCmd) } diff --git a/backend/config/config.go b/backend/config/config.go index 4671281..343e9a9 100644 --- a/backend/config/config.go +++ b/backend/config/config.go @@ -19,11 +19,13 @@ type LLMConfig struct { // Config 是 SingerOS 的主配置结构,包含所有子系统的配置 type Config struct { - Github *GithubAppConfig `yaml:"github,omitempty"` - Gitlab *GitlabAppConfig `yaml:"gitlab,omitempty"` - NATS *NATSConfig `yaml:"nats,omitempty"` - Database *DatabaseConfig `yaml:"database,omitempty"` - LLM *LLMConfig `yaml:"llm,omitempty"` + ServerAddr string `yaml:"server_addr,omitempty"` // 服务器地址 + Github *GithubAppConfig `yaml:"github,omitempty"` + Gitlab *GitlabAppConfig `yaml:"gitlab,omitempty"` + NATS *NATSConfig `yaml:"nats,omitempty"` + Database *DatabaseConfig `yaml:"database,omitempty"` + LLM *LLMConfig `yaml:"llm,omitempty"` + Scheduler *SchedulerConfig `yaml:"scheduler,omitempty"` } // DatabaseConfig 是数据库的配置结构 diff --git a/backend/config/scheduler.go b/backend/config/scheduler.go new file mode 100644 index 0000000..a9304dd --- /dev/null +++ b/backend/config/scheduler.go @@ -0,0 +1,9 @@ +package config + +type SchedulerConfig struct { + Mode string `yaml:"mode,omitempty" json:"mode,omitempty"` // 调度模式,支持 "process","docker-cli","docker-api","k8s" + WorkerBinary string `yaml:"worker_binary,omitempty" json:"worker_binary,omitempty"` + WorkingDir string `yaml:"working_dir,omitempty" json:"working_dir,omitempty"` + Env map[string]string `yaml:"env,omitempty" json:"env,omitempty"` + ServerAddr string `yaml:"server_addr,omitempty" json:"server_addr,omitempty"` +} diff --git a/backend/internal/agent/agent.go b/backend/internal/agent/agent.go index 1651708..a5c7f0e 100644 --- a/backend/internal/agent/agent.go +++ b/backend/internal/agent/agent.go @@ -72,14 +72,14 @@ func NewAgent(ctx context.Context, llmConfig *config.LLMConfig, runtimeConfig Co return nil, fmt.Errorf("tool registry is required") } - chatModel, err := einoadapter.NewOpenAIChatModel(ctx, llmConfig) + chatModel, err := einoadapter.NewOpenAIChatModel(ctx, llmConfig) if err != nil { return nil, err } return &Agent{ chatModel: chatModel, - toolAdapter: einoadapter.NewToolAdapter(runtimeConfig.ToolRegistry), + toolAdapter: einoadapter.NewToolAdapter(runtimeConfig.ToolRegistry), skillsCatalog: runtimeConfig.SkillsCatalog, systemPrompt: defaultAgentSystemPrompt, }, nil @@ -156,8 +156,8 @@ func (a *Agent) Run(ctx context.Context, req *RequestContext) (*RunResult, error } if usage != nil { - _ = state.emitter.Emit(ctx, & agentevents.RunEvent{ - Type: agentevents.RunEventUsage, + _ = state.emitter.Emit(ctx, &agentevents.RunEvent{ + Type: agentevents.RunEventUsage, Content: eventContentJSON(usage), }) } @@ -187,7 +187,7 @@ func (a *Agent) buildRunState(req *RequestContext) (*runState, error) { return nil, err } - emitter := agentevents.NewEmitter(req.RunID, req.TraceID, sinkForRequest(req)) + emitter := agentevents.NewEmitter(req.RunID, req.TraceID, sinkForRequest(req)) toolCtx := tools.ToolContext{ RunID: req.RunID, TraceID: req.TraceID, @@ -205,7 +205,7 @@ func (a *Agent) buildRunState(req *RequestContext) (*runState, error) { emitter: emitter, userInput: userInput, systemPrompt: systemPrompt, - toolBinding: einoadapter.ToolBinding{ + toolBinding: einoadapter.ToolBinding{ ToolContext: toolCtx, AllowedTools: req.Capability.AllowedTools, }, diff --git a/backend/internal/agent/eino/flow.go b/backend/internal/agent/eino/flow.go index adf3d43..c1be687 100644 --- a/backend/internal/agent/eino/flow.go +++ b/backend/internal/agent/eino/flow.go @@ -25,7 +25,7 @@ type FlowConfig struct { Model einomodel.ToolCallingChatModel ToolAdapter *ToolAdapter Binding ToolBinding - Emitter * agentevents.Emitter + Emitter *agentevents.Emitter SystemPrompt string MaxStep int } @@ -100,7 +100,7 @@ func (f *Flow) Generate(ctx context.Context, userInput string) (*einoschema.Mess } // Stream runs one user request through the Eino agent loop and emits runtime events. -func (f *Flow) Stream(ctx context.Context, userInput string, emitter * agentevents.Emitter) (*einoschema.Message, error) { +func (f *Flow) Stream(ctx context.Context, userInput string, emitter *agentevents.Emitter) (*einoschema.Message, error) { if f == nil || f.agent == nil { return nil, fmt.Errorf("flow is not initialized") } @@ -141,21 +141,21 @@ func (f *Flow) Stream(ctx context.Context, userInput string, emitter * agenteven return einoschema.ConcatMessages(chunks) } -func emitMessageChunk(ctx context.Context, emitter * agentevents.Emitter, chunk *einoschema.Message, contentSnapshot *strings.Builder, reasoningSnapshot *strings.Builder) error { +func emitMessageChunk(ctx context.Context, emitter *agentevents.Emitter, chunk *einoschema.Message, contentSnapshot *strings.Builder, reasoningSnapshot *strings.Builder) error { if emitter == nil || chunk == nil { return nil } if chunk.Content != "" { contentSnapshot.WriteString(chunk.Content) - _ = emitter.Emit(ctx, & agentevents.RunEvent{ - Type: agentevents.RunEventMessageDelta, + _ = emitter.Emit(ctx, &agentevents.RunEvent{ + Type: agentevents.RunEventMessageDelta, Content: chunk.Content, }) } if chunk.ReasoningContent != "" { reasoningSnapshot.WriteString(chunk.ReasoningContent) - _ = emitter.Emit(ctx, & agentevents.RunEvent{ - Type: agentevents.RunEventReasoningDelta, + _ = emitter.Emit(ctx, &agentevents.RunEvent{ + Type: agentevents.RunEventReasoningDelta, Content: chunk.ReasoningContent, }) } @@ -164,8 +164,8 @@ func emitMessageChunk(ctx context.Context, emitter * agentevents.Emitter, chunk if strings.TrimSpace(toolCall.Function.Arguments) != "" { args["json"] = toolCall.Function.Arguments } - _ = emitter.Emit(ctx, & agentevents.RunEvent{ - Type: agentevents.RunEventToolCallArguments, + _ = emitter.Emit(ctx, &agentevents.RunEvent{ + Type: agentevents.RunEventToolCallArguments, Content: eventContentJSON(map[string]any{ "call_id": toolCall.ID, "name": toolCall.Function.Name, diff --git a/backend/internal/agent/eino/flow_test.go b/backend/internal/agent/eino/flow_test.go index 2a1d239..1801e2b 100644 --- a/backend/internal/agent/eino/flow_test.go +++ b/backend/internal/agent/eino/flow_test.go @@ -80,8 +80,8 @@ func TestFlowStreamEmitsMessageEvents(t *testing.T) { t.Fatalf("new flow: %v", err) } - var emitted []* agentevents.RunEvent - emitter := agentevents.NewEmitter("run_stream", "trace_stream", agentevents.SinkFunc(func(ctx context.Context, event * agentevents.RunEvent) error { + var emitted []*agentevents.RunEvent + emitter := agentevents.NewEmitter("run_stream", "trace_stream", agentevents.SinkFunc(func(ctx context.Context, event *agentevents.RunEvent) error { emitted = append(emitted, event) return nil })) @@ -96,7 +96,7 @@ func TestFlowStreamEmitsMessageEvents(t *testing.T) { var deltaCount int for _, event := range emitted { switch event.Type { - case agentevents.RunEventMessageDelta: + case agentevents.RunEventMessageDelta: deltaCount++ } } diff --git a/backend/internal/api/contract/digital_assistant_type.go b/backend/internal/api/contract/digital_assistant_type.go index d785e89..3c5f863 100644 --- a/backend/internal/api/contract/digital_assistant_type.go +++ b/backend/internal/api/contract/digital_assistant_type.go @@ -6,28 +6,28 @@ import "time" type DigitalAssistantStatus string const ( - DigitalAssistantStatusDraft DigitalAssistantStatus = "draft" - DigitalAssistantStatusActive DigitalAssistantStatus = "active" - DigitalAssistantStatusInactive DigitalAssistantStatus = "inactive" - DigitalAssistantStatusArchived DigitalAssistantStatus = "archived" + DigitalAssistantStatusDraft DigitalAssistantStatus = "draft" + DigitalAssistantStatusActive DigitalAssistantStatus = "active" + DigitalAssistantStatusInactive DigitalAssistantStatus = "inactive" + DigitalAssistantStatusArchived DigitalAssistantStatus = "archived" ) // RuntimeType 运行时类型常量 type RuntimeType string const ( - RuntimeTypeDocker RuntimeType = "docker" - RuntimeTypeProcess RuntimeType = "process" - RuntimeTypeK8s RuntimeType = "kubernetes" + RuntimeTypeDocker RuntimeType = "docker" + RuntimeTypeProcess RuntimeType = "process" + RuntimeTypeK8s RuntimeType = "kubernetes" ) // LLMProviderType LLM提供商类型常量 type LLMProviderType string const ( - LLMProviderOpenAI LLMProviderType = "openai" - LLMProviderClaude LLMProviderType = "claude" - LLMProviderDeepSeek LLMProviderType = "deepseek" + LLMProviderOpenAI LLMProviderType = "openai" + LLMProviderClaude LLMProviderType = "claude" + LLMProviderDeepSeek LLMProviderType = "deepseek" ) // MemoryType 记忆类型常量 @@ -42,46 +42,46 @@ const ( type ChannelType string const ( - ChannelTypeGitHub ChannelType = "github" - ChannelTypeGitLab ChannelType = "gitlab" - ChannelTypeWeChat ChannelType = "wechat" - ChannelTypeFeishu ChannelType = "feishu" + ChannelTypeGitHub ChannelType = "github" + ChannelTypeGitLab ChannelType = "gitlab" + ChannelTypeWeChat ChannelType = "wechat" + ChannelTypeFeishu ChannelType = "feishu" ) // KnowledgeType 知识库类型常量 type KnowledgeType string const ( - KnowledgeTypeVector KnowledgeType = "vector" - KnowledgeTypeFile KnowledgeType = "file" + KnowledgeTypeVector KnowledgeType = "vector" + KnowledgeTypeFile KnowledgeType = "file" KnowledgeTypeDatabase KnowledgeType = "database" ) // DigitalAssistant 数字助手信息 type DigitalAssistant struct { - ID uint `json:"id"` - Code string `json:"code"` - OrgID uint `json:"org_id"` - OwnerID uint `json:"owner_id"` - Name string `json:"name"` - Description string `json:"description"` - Avatar string `json:"avatar"` - Status string `json:"status"` - Version int `json:"version"` - Config AssistantConfig `json:"config"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + ID uint `json:"id"` + Code string `json:"code"` + OrgID uint `json:"org_id"` + OwnerID uint `json:"owner_id"` + Name string `json:"name"` + Description string `json:"description"` + Avatar string `json:"avatar"` + Status string `json:"status"` + Version int `json:"version"` + Config AssistantConfig `json:"config"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } // AssistantConfig 数字助手配置 type AssistantConfig struct { - Runtime RuntimeConfig `json:"runtime_config"` - LLM LLMConfig `json:"llm_config"` - Skills []SkillRef `json:"skills"` - Channels []ChannelRef `json:"channels"` + Runtime RuntimeConfig `json:"runtime_config"` + LLM LLMConfig `json:"llm_config"` + Skills []SkillRef `json:"skills"` + Channels []ChannelRef `json:"channels"` Knowledge []KnowledgeRef `json:"knowledge"` - Memory MemoryConfig `json:"memory_config"` - Policies PolicyConfig `json:"policies_config"` + Memory MemoryConfig `json:"memory_config"` + Policies PolicyConfig `json:"policies_config"` } // SkillRef 技能引用 @@ -97,9 +97,9 @@ type ChannelRef struct { // KnowledgeRef 知识库引用 type KnowledgeRef struct { - Type string `json:"type"` + Type string `json:"type"` DatasetID string `json:"dataset_id"` - Repo string `json:"repo"` + Repo string `json:"repo"` } // RuntimeConfig 运行时配置 @@ -135,9 +135,9 @@ type CreateDigitalAssistantRequest struct { // UpdateDigitalAssistantRequest 更新数字助手请求 type UpdateDigitalAssistantRequest struct { - Name string `json:"name"` - Description string `json:"description"` - Avatar string `json:"avatar"` + Name string `json:"name"` + Description string `json:"description"` + Avatar string `json:"avatar"` Config *AssistantConfig `json:"config,omitempty"` } @@ -148,12 +148,12 @@ type UpdateDigitalAssistantStatusRequest struct { // ListDigitalAssistantRequest 查询数字助手列表请求 type ListDigitalAssistantRequest struct { - OrgID *uint `form:"org_id,omitempty"` - OwnerID *uint `form:"owner_id,omitempty"` - Status *string `form:"status,omitempty"` - Keyword *string `form:"keyword,omitempty"` - Page int `form:"page,default=1"` - PerPage int `form:"per_page,default=20"` + OrgID *uint `form:"org_id,omitempty"` + OwnerID *uint `form:"owner_id,omitempty"` + Status *string `form:"status,omitempty"` + Keyword *string `form:"keyword,omitempty"` + Page int `form:"page,default=1"` + PerPage int `form:"per_page,default=20"` } // DigitalAssistantList 数字助手列表响应 diff --git a/backend/internal/api/dto/digital_assistant.go b/backend/internal/api/dto/digital_assistant.go index e47753f..817e1de 100644 --- a/backend/internal/api/dto/digital_assistant.go +++ b/backend/internal/api/dto/digital_assistant.go @@ -4,9 +4,9 @@ import "github.com/insmtx/SingerOS/backend/internal/api/contract" // CreateDigitalAssistantResponse 创建数字助手响应 type CreateDigitalAssistantResponse struct { - Code int `json:"code"` - Message string `json:"message"` - Data *contract.DigitalAssistant `json:"data"` + Code int `json:"code"` + Message string `json:"message"` + Data *contract.DigitalAssistant `json:"data"` } // NewCreateDigitalAssistantResponse 创建成功响应 diff --git a/backend/internal/api/router.go b/backend/internal/api/router.go index 073b2d6..d07ad10 100644 --- a/backend/internal/api/router.go +++ b/backend/internal/api/router.go @@ -34,49 +34,49 @@ import ( // 同时设置客户端 WebSocket 连接器,并将所有连接器的路由注册到 HTTP 服务器。 func SetupRouter(cfg config.Config, publisher eventbus.Publisher, db *gorm.DB) *gin.Engine { r := gin.New() - { - r.Use(ygmiddleware.CORS()) - r.Use(middleware.CallerMiddleware()) - r.Use(middleware.Logger(".Ping", "metrics")) - r.Use(ygmiddleware.Recovery()) - } + r.Use(ygmiddleware.CORS()) + r.Use(middleware.CallerMiddleware()) + r.Use(middleware.Logger(".Ping", "metrics")) + r.Use(ygmiddleware.Recovery()) v1 := r.Group("/v1") + { + if cfg.Github != nil { + logs.Info("Setting up GitHub connector") + authService := initThirdPartyAuthService(&cfg) + github.RegisterGitHubRoutes(v1, *cfg.Github, publisher, db, authService) + logs.Info("GitHub connector registered successfully") + } else { + logs.Debug("No GitHub configuration provided, skipping GitHub connector setup") + } - if cfg.Github != nil { - logs.Info("Setting up GitHub connector") - authService := initThirdPartyAuthService(&cfg) - github.RegisterGitHubRoutes(v1, *cfg.Github, publisher, db, authService) - logs.Info("GitHub connector registered successfully") - } else { - logs.Debug("No GitHub configuration provided, skipping GitHub connector setup") + if cfg.Gitlab != nil { + logs.Info("Setting up GitLab connector") + gitlab.RegisterGitLabRoutes(v1, *cfg.Gitlab, publisher) + logs.Info("GitLab connector registered successfully") + } else { + logs.Debug("No GitLab configuration provided, skipping GitLab connector setup") + } } - - if cfg.Gitlab != nil { - logs.Info("Setting up GitLab connector") - gitlab.RegisterGitLabRoutes(v1, *cfg.Gitlab, publisher) - logs.Info("GitLab connector registered successfully") - } else { - logs.Debug("No GitLab configuration provided, skipping GitLab connector setup") + { + websocket.RegisterWebSocketRoutes(v1, publisher) + logs.Info("WebSocket connector registered successfully") } + { + workerScheduler := scheduler.NewProcessScheduler(cfg.Scheduler) - websocket.RegisterWebSocketRoutes(v1, publisher) - logs.Info("WebSocket connector registered successfully") - - workerScheduler := scheduler.NewProcessScheduler(&scheduler.ProcessConfig{ - ServerAddr: ":8080", - }) - - workerServer := workerserver.NewServer(workerScheduler, db) - workerServer.RegisterRoutes(v1) - logs.Info("Worker server routes registered successfully") - - digitalAssistantService := service.NewDigitalAssistantService(db, workerScheduler) - handler.RegisterDigitalAssistantRoutes(v1, digitalAssistantService) - logs.Info("Digital assistant routes registered successfully") + workerManager := workerserver.NewServer(workerScheduler, db) + workerManager.RegisterRoutes(v1) + logs.Info("Worker server routes registered successfully") - singerMCP.RegisterRoutes(v1, singerMCP.NewServer()) - logs.Info("MCP routes registered successfully") + digitalAssistantService := service.NewDigitalAssistantService(db, workerScheduler) + handler.RegisterDigitalAssistantRoutes(v1, digitalAssistantService) + logs.Info("Digital assistant routes registered successfully") + } + { + singerMCP.RegisterRoutes(v1, singerMCP.NewServer()) + logs.Info("MCP routes registered successfully") + } // Swagger UI 路由 v1.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) return r diff --git a/backend/internal/eventengine/orchestrator.go b/backend/internal/eventengine/orchestrator.go index 462b584..9359c34 100644 --- a/backend/internal/eventengine/orchestrator.go +++ b/backend/internal/eventengine/orchestrator.go @@ -10,9 +10,9 @@ import ( "fmt" "strings" + "github.com/insmtx/SingerOS/backend/internal/agent" eventbus "github.com/insmtx/SingerOS/backend/internal/infra/mq" interactionevent "github.com/insmtx/SingerOS/backend/pkg/event" - "github.com/insmtx/SingerOS/backend/internal/agent" "github.com/ygpkg/yg-go/logs" ) @@ -22,12 +22,12 @@ type EventHandlerFunc func(ctx context.Context, event *interactionevent.Event) e // Orchestrator 是事件编排器,负责事件的订阅、分发和处理 type Orchestrator struct { subscriber eventbus.Subscriber // 事件订阅者 - runner agent.Runner // 统一任务运行器 + runner agent.Runner // 统一任务运行器 handlers map[string]EventHandlerFunc // 事件主题到处理器的映射 } // NewOrchestrator 创建一个新的事件编排器实例 -func NewOrchestrator(subscriber eventbus.Subscriber, runner agent.Runner) *Orchestrator { +func NewOrchestrator(subscriber eventbus.Subscriber, runner agent.Runner) *Orchestrator { orchestrator := &Orchestrator{ subscriber: subscriber, runner: runner, @@ -65,7 +65,7 @@ func (o *Orchestrator) Start(ctx context.Context) error { return } - var interactionEvent interactionevent.Event + var interactionEvent interactionevent.Event if err := json.Unmarshal(jsonBytes, &interactionEvent); err != nil { logs.ErrorContextf(ctx, "Failed to unmarshal event: %v", err) return @@ -88,27 +88,27 @@ func (o *Orchestrator) Start(ctx context.Context) error { } // handleIssueComment 处理 GitHub Issue 评论事件 -func (o *Orchestrator) handleIssueComment(ctx context.Context, event * interactionevent.Event) error { +func (o *Orchestrator) handleIssueComment(ctx context.Context, event *interactionevent.Event) error { logs.InfoContextf(ctx, "Processing GitHub issue comment event with agent runtime: %+v", event) return o.runEvent(ctx, event) } // handlePullRequest 处理 GitHub Pull Request 事件 -func (o *Orchestrator) handlePullRequest(ctx context.Context, event * interactionevent.Event) error { +func (o *Orchestrator) handlePullRequest(ctx context.Context, event *interactionevent.Event) error { logs.InfoContextf(ctx, "Processing GitHub pull request event with agent runtime: %+v", event) return o.runEvent(ctx, event) } // handlePush 处理 GitHub Push 提交事件 -func (o *Orchestrator) handlePush(ctx context.Context, event * interactionevent.Event) error { +func (o *Orchestrator) handlePush(ctx context.Context, event *interactionevent.Event) error { logs.InfoContextf(ctx, "Processing GitHub push event with agent runtime: %+v", event) return o.runEvent(ctx, event) } -func (o *Orchestrator) runEvent(ctx context.Context, event * interactionevent.Event) error { +func (o *Orchestrator) runEvent(ctx context.Context, event *interactionevent.Event) error { if o.runner == nil { return fmt.Errorf("agent runtime runner is required") } @@ -123,25 +123,25 @@ func (o *Orchestrator) runEvent(ctx context.Context, event * interactionevent.Ev return nil } -func requestFromInteractionEvent(event * interactionevent.Event) * agent.RequestContext { +func requestFromInteractionEvent(event *interactionevent.Event) *agent.RequestContext { if event == nil { - return & agent.RequestContext{ - Input: agent.InputContext{ - Type: agent.InputTypeEvent, + return &agent.RequestContext{ + Input: agent.InputContext{ + Type: agent.InputTypeEvent, }, } } - return & agent.RequestContext{ + return &agent.RequestContext{ RunID: event.EventID, TraceID: event.TraceID, - Actor: agent.ActorContext{ + Actor: agent.ActorContext{ UserID: event.Actor, Channel: event.Channel, ExternalID: event.Actor, }, - Input: agent.InputContext{ - Type: agent.InputTypeEvent, + Input: agent.InputContext{ + Type: agent.InputTypeEvent, Text: buildInteractionEventInput(event), }, Metadata: map[string]any{ @@ -154,7 +154,7 @@ func requestFromInteractionEvent(event * interactionevent.Event) * agent.Request } } -func buildInteractionEventInput(event * interactionevent.Event) string { +func buildInteractionEventInput(event *interactionevent.Event) string { if event == nil { return "" } @@ -176,7 +176,7 @@ func buildInteractionEventInput(event * interactionevent.Event) string { return strings.Join(filterEmptyStrings(sections), "\n\n") } -func buildInteractionEventEnvelope(event * interactionevent.Event) string { +func buildInteractionEventEnvelope(event *interactionevent.Event) string { lines := []string{"Event envelope:"} if event.Channel != "" { lines = append(lines, "- channel: "+event.Channel) diff --git a/backend/internal/worker/client/worker.go b/backend/internal/worker/client/worker_client.go similarity index 81% rename from backend/internal/worker/client/worker.go rename to backend/internal/worker/client/worker_client.go index e18bc7e..0f5487b 100644 --- a/backend/internal/worker/client/worker.go +++ b/backend/internal/worker/client/worker_client.go @@ -15,14 +15,13 @@ import ( "github.com/ygpkg/yg-go/logs" ) -type Worker struct { - runtime agent.AgentRuntime - config *WorkerConfig - workerID string - assistantCode string - startedAt time.Time - status string - wsClient *WSClient +type WorkerClient struct { + runtime agent.AgentRuntime + config *WorkerConfig + workerID string + startedAt time.Time + status string + wsClient *WSClient } type WorkerConfig struct { @@ -31,27 +30,26 @@ type WorkerConfig struct { SkillsDir string ToolsEnabled bool ServerAddr string - AssistantCode string + WorkerID string } -func NewWorker(ctx context.Context, cfg *WorkerConfig) (*Worker, error) { +func NewWorker(ctx context.Context, cfg *WorkerConfig) (*WorkerClient, error) { if cfg == nil { return nil, fmt.Errorf("worker config is required") } workerID := fmt.Sprintf("worker_%d", time.Now().UnixNano()) - w := &Worker{ - config: cfg, - workerID: workerID, - assistantCode: cfg.AssistantCode, - startedAt: time.Now(), - status: "initialized", + w := &WorkerClient{ + config: cfg, + workerID: workerID, + startedAt: time.Now(), + status: "initialized", } if cfg.ServerAddr != "" { w.wsClient = NewWSClient(cfg.ServerAddr, workerID, - WithAssistantCode(cfg.AssistantCode), + WithWorkerID(workerID), WithOnConfigReady(func(assistantConfig map[string]interface{}) { w.handleAssistantConfig(ctx, assistantConfig) }), @@ -61,7 +59,7 @@ func NewWorker(ctx context.Context, cfg *WorkerConfig) (*Worker, error) { return w, nil } -func (w *Worker) handleAssistantConfig(ctx context.Context, assistantConfig map[string]interface{}) { +func (w *WorkerClient) handleAssistantConfig(ctx context.Context, assistantConfig map[string]interface{}) { logs.Info("Processing assistant configuration from server") llmConfigRaw, ok := assistantConfig["llm_config"] @@ -140,7 +138,7 @@ func buildDefaultRuntime(ctx context.Context, cfg *WorkerConfig) (agent.AgentRun return agentInstance, nil } -func (w *Worker) Run(ctx context.Context, req *agent.RequestContext) (*agent.RunResult, error) { +func (w *WorkerClient) Run(ctx context.Context, req *agent.RequestContext) (*agent.RunResult, error) { if w == nil || w.runtime == nil { return nil, fmt.Errorf("worker runtime is not initialized") } @@ -156,7 +154,7 @@ func (w *Worker) Run(ctx context.Context, req *agent.RequestContext) (*agent.Run return result, nil } -func (w *Worker) Start(ctx context.Context) error { +func (w *WorkerClient) Start(ctx context.Context) error { w.status = "running" logs.Infof("Worker %s started", w.workerID) @@ -181,7 +179,7 @@ func (w *Worker) Start(ctx context.Context) error { } } -func (w *Worker) Shutdown(ctx context.Context) error { +func (w *WorkerClient) Shutdown(ctx context.Context) error { logs.Info("Worker shutting down...") w.status = "stopping" @@ -192,15 +190,15 @@ func (w *Worker) Shutdown(ctx context.Context) error { return nil } -func (w *Worker) GetWorkerID() string { +func (w *WorkerClient) GetWorkerID() string { return w.workerID } -func (w *Worker) GetStartedAt() time.Time { +func (w *WorkerClient) GetStartedAt() time.Time { return w.startedAt } -func (w *Worker) GetStatus() string { +func (w *WorkerClient) GetStatus() string { return w.status } diff --git a/backend/internal/worker/client/ws_client.go b/backend/internal/worker/client/ws_client.go index df5ed2f..55f4bac 100644 --- a/backend/internal/worker/client/ws_client.go +++ b/backend/internal/worker/client/ws_client.go @@ -11,21 +11,20 @@ import ( ) type WSClient struct { - conn *websocket.Conn - workerID string - assistantCode string - serverAddr string - send chan map[string]interface{} - ctx context.Context - cancel context.CancelFunc - onConfigReady func(config map[string]interface{}) + conn *websocket.Conn + workerID string + serverAddr string + send chan map[string]interface{} + ctx context.Context + cancel context.CancelFunc + onConfigReady func(config map[string]interface{}) } type WSClientOption func(*WSClient) -func WithAssistantCode(assistantCode string) WSClientOption { +func WithWorkerID(workerID string) WSClientOption { return func(c *WSClient) { - c.assistantCode = assistantCode + c.workerID = workerID } } @@ -138,7 +137,7 @@ func (c *WSClient) handleMessage(msg map[string]interface{}) { switch msgType { case "welcome": logs.Infof("Received welcome from server") - if c.assistantCode != "" { + if c.workerID != "" { c.requestConfig() } case "configResponse": @@ -154,13 +153,13 @@ func (c *WSClient) requestConfig() { reqMsg := map[string]interface{}{ "type": "getConfig", "payload": map[string]interface{}{ - "assistant_code": c.assistantCode, + "worker_id": c.workerID, }, } if err := c.sendJSON(reqMsg); err != nil { logs.Errorf("Failed to request config: %v", err) } else { - logs.Infof("Requested config for assistant %s", c.assistantCode) + logs.Infof("Requested config for worker %s", c.workerID) } } diff --git a/backend/internal/worker/scheduler/dockercli_scheduler.go b/backend/internal/worker/scheduler/dockercli_scheduler.go new file mode 100644 index 0000000..8efcc77 --- /dev/null +++ b/backend/internal/worker/scheduler/dockercli_scheduler.go @@ -0,0 +1,341 @@ +package scheduler + +import ( + "context" + "fmt" + "os/exec" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/insmtx/SingerOS/backend/config" + "github.com/insmtx/SingerOS/backend/internal/worker" + "github.com/ygpkg/yg-go/logs" +) + +// DockerCLIScheduler implements WorkerScheduler using Docker CLI commands. +// It manages containerized workers by spawning Docker containers and tracking their lifecycle. +type DockerCLIScheduler struct { + config *config.SchedulerConfig + instances map[string]*DockerInstance + mu sync.RWMutex +} + +// DockerInstance represents a running containerized worker. +type DockerInstance struct { + ID string + WorkerID string + ContainerID string + Status string + PID int + StartedAt time.Time + LastSeen time.Time + Image string + mu sync.RWMutex +} + +var _ worker.WorkerScheduler = (*DockerCLIScheduler)(nil) + +// NewDockerCLIScheduler creates a new Docker CLI-based scheduler. +func NewDockerCLIScheduler(config *config.SchedulerConfig) worker.WorkerScheduler { + return &DockerCLIScheduler{ + config: config, + instances: make(map[string]*DockerInstance), + } +} + +func containerName(workerID string) string { + return fmt.Sprintf("singeros-worker-%s", workerID) +} + +func (ds *DockerCLIScheduler) execDocker(ctx context.Context, args ...string) (string, string, error) { + cmd := exec.CommandContext(ctx, "docker", args...) + var stdout, stderr strings.Builder + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + err := cmd.Run() + if err != nil { + return stdout.String(), stderr.String(), err + } + + return strings.TrimSpace(stdout.String()), stderr.String(), nil +} + +func (ds *DockerCLIScheduler) containerWorkingDir(workid string) string { + return filepath.Join(ds.config.WorkingDir, "workspace", workid) +} + +func (ds *DockerCLIScheduler) buildEnvVars(spec *worker.WorkerSpec) map[string]string { + env := make(map[string]string) + + for key, value := range ds.config.Env { + env[key] = value + } + + for key, value := range spec.Env { + env[key] = value + } + + if ds.config.ServerAddr != "" { + env["SINGEROS_SERVER_ADDR"] = ds.config.ServerAddr + } + env["SINGEROS_WORKER_ID"] = spec.ID + + return env +} + +func (ds *DockerCLIScheduler) createAndStartContainer(ctx context.Context, instance *DockerInstance, spec *worker.WorkerSpec, cName string) error { + args := []string{"create", "--name", cName} + args = append(args, "-v", ds.containerWorkingDir(spec.ID)+":/workspace") + if spec.WorkingDir != "" { + args = append(args, "-w", spec.WorkingDir) + } else { + args = append(args, "-w", "/workspace") + } + + env := ds.buildEnvVars(spec) + for key, value := range env { + args = append(args, "-e", fmt.Sprintf("%s=%s", key, value)) + } + + args = append(args, spec.Image) + + if len(spec.Command) > 0 { + args = append(args, spec.Command...) + } + if len(spec.Args) > 0 { + args = append(args, spec.Args...) + } + + stdout, stderr, err := ds.execDocker(ctx, args...) + if err != nil { + return fmt.Errorf("docker create failed: %w (stderr: %s)", err, stderr) + } + + containerID := strings.TrimSpace(stdout) + if containerID == "" { + return fmt.Errorf("no container ID returned from docker create") + } + + instance.mu.Lock() + instance.ContainerID = containerID + instance.Status = "created" + instance.mu.Unlock() + + logs.Infof("Container created: %s", containerID) + + startArgs := []string{"start", containerID} + _, stderr, err = ds.execDocker(ctx, startArgs...) + if err != nil { + return fmt.Errorf("docker start failed: %w (stderr: %s)", err, stderr) + } + + logs.Infof("Container started: %s", containerID) + return nil +} + +func (ds *DockerCLIScheduler) inspectContainer(ctx context.Context, instance *DockerInstance) error { + inspectArgs := []string{"inspect", "--format", "{{.State.Pid}}", instance.ContainerID} + stdout, _, err := ds.execDocker(ctx, inspectArgs...) + if err != nil { + return fmt.Errorf("failed to inspect container PID: %w", err) + } + + var pid int + if _, err := fmt.Sscanf(stdout, "%d", &pid); err != nil { + return fmt.Errorf("failed to parse PID: %w", err) + } + + instance.mu.Lock() + defer instance.mu.Unlock() + instance.PID = pid + instance.Status = "running" + instance.LastSeen = time.Now() + + return nil +} + +func (ds *DockerCLIScheduler) getContainerStatus(instance *DockerInstance) (string, error) { + ctx := context.Background() + inspectArgs := []string{"inspect", "--format", "{{.State.Status}}", instance.ContainerID} + stdout, _, err := ds.execDocker(ctx, inspectArgs...) + if err != nil { + return "", err + } + return strings.ToLower(strings.TrimSpace(stdout)), nil +} + +func (ds *DockerCLIScheduler) monitorContainer(instance *DockerInstance) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for range ticker.C { + status, err := ds.getContainerStatus(instance) + if err != nil { + logs.Errorf("Failed to get status for container %s: %v", instance.ContainerID, err) + ds.removeInstance(instance.WorkerID) + return + } + + if status != "running" { + logs.Infof("Container %s is no longer running (status: %s)", instance.ContainerID, status) + instance.mu.Lock() + instance.Status = status + instance.mu.Unlock() + ds.removeInstance(instance.WorkerID) + return + } + + instance.mu.Lock() + instance.LastSeen = time.Now() + instance.mu.Unlock() + } +} + +func (ds *DockerCLIScheduler) removeInstance(workerID string) { + ds.mu.Lock() + defer ds.mu.Unlock() + delete(ds.instances, workerID) +} + +// Start launches a new containerized worker and returns its instance information. +func (ds *DockerCLIScheduler) Start(ctx context.Context, spec *worker.WorkerSpec) (*worker.WorkerInstance, error) { + if spec.EnvType != "" && spec.EnvType != worker.WorkerEnvDocker { + return nil, fmt.Errorf("unsupported env type: %s, DockerCLIScheduler only supports docker runtime", spec.EnvType) + } + + ds.mu.Lock() + defer ds.mu.Unlock() + + workerID := spec.ID + if workerID == "" { + workerID = fmt.Sprintf("worker_%d", time.Now().UnixNano()) + } + + cName := containerName(workerID) + + instance := &DockerInstance{ + ID: workerID, + WorkerID: workerID, + Status: "initializing", + StartedAt: time.Now(), + LastSeen: time.Now(), + Image: spec.Image, + } + + if err := ds.createAndStartContainer(ctx, instance, spec, cName); err != nil { + return nil, fmt.Errorf("failed to create container: %w", err) + } + + if err := ds.inspectContainer(ctx, instance); err != nil { + logs.Warnf("Failed to inspect container %s: %v", instance.ContainerID, err) + } + + ds.instances[workerID] = instance + + logs.Infof("Docker container %s for worker %s started", instance.ContainerID, workerID) + + go ds.monitorContainer(instance) + + return &worker.WorkerInstance{ + ID: instance.ID, + WorkerID: instance.WorkerID, + Status: instance.Status, + PID: instance.PID, + StartedAt: instance.StartedAt, + }, nil +} + +// Stop terminates a running containerized worker. +func (ds *DockerCLIScheduler) Stop(ctx context.Context, workerID string) error { + ds.mu.Lock() + defer ds.mu.Unlock() + + instance, ok := ds.instances[workerID] + if !ok { + return fmt.Errorf("worker %s not found", workerID) + } + + if err := ds.stopContainer(ctx, instance); err != nil { + return fmt.Errorf("failed to stop container: %w", err) + } + + instance.Status = "stopped" + delete(ds.instances, workerID) + logs.Infof("Worker %s stopped", workerID) + return nil +} + +func (ds *DockerCLIScheduler) stopContainer(ctx context.Context, instance *DockerInstance) error { + instance.mu.RLock() + containerID := instance.ContainerID + instance.mu.RUnlock() + + if containerID == "" { + return nil + } + + stopArgs := []string{"stop", containerID} + _, stderr, err := ds.execDocker(ctx, stopArgs...) + if err != nil { + logs.Warnf("Failed to stop container %s: %v (stderr: %s)", containerID, err, stderr) + } + + rmArgs := []string{"rm", containerID} + _, stderr, err = ds.execDocker(ctx, rmArgs...) + if err != nil { + logs.Warnf("Failed to remove container %s: %v (stderr: %s)", containerID, err, stderr) + } + + logs.Infof("Container %s stopped and removed", containerID) + return nil +} + +// Health checks if a containerized worker is running and healthy. +func (ds *DockerCLIScheduler) Health(ctx context.Context, workerID string) error { + ds.mu.RLock() + instance, ok := ds.instances[workerID] + ds.mu.RUnlock() + + if !ok { + return fmt.Errorf("worker %s not found", workerID) + } + + status, err := ds.getContainerStatus(instance) + if err != nil { + return fmt.Errorf("container health check failed: %w", err) + } + + if status != "running" { + return fmt.Errorf("container is not running (status: %s)", status) + } + + instance.mu.Lock() + instance.LastSeen = time.Now() + instance.Status = "running" + instance.mu.Unlock() + + return nil +} + +// List returns all active containerized workers. +func (ds *DockerCLIScheduler) List(ctx context.Context) ([]*worker.WorkerInstance, error) { + ds.mu.RLock() + defer ds.mu.RUnlock() + + result := make([]*worker.WorkerInstance, 0, len(ds.instances)) + for _, instance := range ds.instances { + instance.mu.RLock() + result = append(result, &worker.WorkerInstance{ + ID: instance.ID, + WorkerID: instance.WorkerID, + Status: instance.Status, + PID: instance.PID, + StartedAt: instance.StartedAt, + }) + instance.mu.RUnlock() + } + return result, nil +} diff --git a/backend/internal/worker/scheduler/dockercli_scheduler_test.go b/backend/internal/worker/scheduler/dockercli_scheduler_test.go new file mode 100644 index 0000000..725e4d1 --- /dev/null +++ b/backend/internal/worker/scheduler/dockercli_scheduler_test.go @@ -0,0 +1,355 @@ +package scheduler + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/insmtx/SingerOS/backend/config" + "github.com/insmtx/SingerOS/backend/internal/worker" +) + +func TestNewDockerCLIScheduler(t *testing.T) { + cfg := &config.SchedulerConfig{ + Mode: "docker-cli", + WorkingDir: "/tmp/singer-os", + } + + scheduler := NewDockerCLIScheduler(cfg) + + if scheduler == nil { + t.Fatal("NewDockerCLIScheduler returned nil") + } + + dcs, ok := scheduler.(*DockerCLIScheduler) + if !ok { + t.Fatal("NewDockerCLIScheduler did not return *DockerCLIScheduler") + } + + if dcs.config != cfg { + t.Error("Config not set properly") + } + + if dcs.instances == nil { + t.Error("Instances map not initialized") + } + + if len(dcs.instances) != 0 { + t.Errorf("Expected empty instances map, got %d entries", len(dcs.instances)) + } +} + +func TestContainerName(t *testing.T) { + tests := []struct { + name string + workerID string + want string + }{ + { + name: "simple worker id", + workerID: "worker123", + want: "singeros-worker-worker123", + }, + { + name: "worker id with underscore", + workerID: "worker_456", + want: "singeros-worker-worker_456", + }, + { + name: "empty worker id", + workerID: "", + want: "singeros-worker-", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := containerName(tt.workerID) + if got != tt.want { + t.Errorf("containerName(%q) = %q, want %q", tt.workerID, got, tt.want) + } + }) + } +} + +func TestContainerWorkingDir(t *testing.T) { + cfg := &config.SchedulerConfig{ + WorkingDir: "/tmp/singer-os", + } + scheduler := NewDockerCLIScheduler(cfg).(*DockerCLIScheduler) + + tests := []struct { + name string + workerID string + want string + }{ + { + name: "simple worker id", + workerID: "worker123", + want: "/tmp/singer-os/workspace/worker123", + }, + { + name: "worker id with special chars", + workerID: "worker-456_test", + want: "/tmp/singer-os/workspace/worker-456_test", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := scheduler.containerWorkingDir(tt.workerID) + if got != tt.want { + t.Errorf("containerWorkingDir(%q) = %q, want %q", tt.workerID, got, tt.want) + } + }) + } +} + +func TestBuildEnvVars(t *testing.T) { + cfg := &config.SchedulerConfig{ + Env: map[string]string{ + "APP_ENV": "production", + "LOG_LEVEL": "info", + "COMMON_KEY": "config_value", + }, + ServerAddr: "localhost:8080", + } + + spec := &worker.WorkerSpec{ + ID: "worker-123", + Env: map[string]string{ + "WORKER_KEY": "worker_value", + "COMMON_KEY": "worker_override", + }, + } + + scheduler := NewDockerCLIScheduler(cfg).(*DockerCLIScheduler) + env := scheduler.buildEnvVars(spec) + + expected := map[string]string{ + "APP_ENV": "production", + "LOG_LEVEL": "info", + "COMMON_KEY": "worker_override", + "WORKER_KEY": "worker_value", + "SINGEROS_SERVER_ADDR": "localhost:8080", + "SINGEROS_WORKER_ID": "worker-123", + } + + if len(env) != len(expected) { + t.Errorf("Expected %d env vars, got %d", len(expected), len(env)) + } + + for key, want := range expected { + if got, ok := env[key]; !ok { + t.Errorf("Missing expected env var: %s", key) + } else if got != want { + t.Errorf("env[%s] = %q, want %q", key, got, want) + } + } +} + +func TestBuildEnvVarsNoServerAddr(t *testing.T) { + cfg := &config.SchedulerConfig{ + Env: map[string]string{ + "APP_ENV": "production", + }, + } + + spec := &worker.WorkerSpec{ + ID: "worker-456", + } + + scheduler := NewDockerCLIScheduler(cfg).(*DockerCLIScheduler) + env := scheduler.buildEnvVars(spec) + + if _, ok := env["SINGEROS_SERVER_ADDR"]; ok { + t.Error("SINGEROS_SERVER_ADDR should not be set when config has no server addr") + } + + if got, want := env["SINGEROS_WORKER_ID"], "worker-456"; got != want { + t.Errorf("SINGEROS_WORKER_ID = %q, want %q", got, want) + } +} + +func TestBuildEnvVarsEmptyEnv(t *testing.T) { + cfg := &config.SchedulerConfig{} + spec := &worker.WorkerSpec{ + ID: "worker-789", + } + + scheduler := NewDockerCLIScheduler(cfg).(*DockerCLIScheduler) + env := scheduler.buildEnvVars(spec) + + if len(env) != 1 { + t.Errorf("Expected 1 env var (SINGEROS_WORKER_ID), got %d", len(env)) + } + + if got, want := env["SINGEROS_WORKER_ID"], "worker-789"; got != want { + t.Errorf("SINGEROS_WORKER_ID = %q, want %q", got, want) + } +} + +func TestStartUnsupportedEnvType(t *testing.T) { + cfg := &config.SchedulerConfig{} + scheduler := NewDockerCLIScheduler(cfg) + + spec := &worker.WorkerSpec{ + ID: "worker-1", + EnvType: worker.WorkerEnvProcess, + } + + _, err := scheduler.Start(context.Background(), spec) + if err == nil { + t.Fatal("Start should return error for unsupported env type") + } + + if !strings.Contains(err.Error(), "unsupported env type") { + t.Errorf("Error should mention unsupported env type, got: %v", err) + } + + if !strings.Contains(err.Error(), "docker") { + t.Errorf("Error should mention docker, got: %v", err) + } +} + +func TestDockerInstanceThreadSafety(t *testing.T) { + instance := &DockerInstance{ + ID: "test-1", + WorkerID: "test-1", + ContainerID: "container-123", + Status: "running", + PID: 1234, + StartedAt: time.Now(), + LastSeen: time.Now(), + } + + done := make(chan bool, 10) + + for i := 0; i < 10; i++ { + go func() { + instance.mu.Lock() + instance.Status = "updating" + instance.LastSeen = time.Now() + instance.mu.Unlock() + done <- true + }() + } + + for i := 0; i < 10; i++ { + <-done + } + + instance.mu.RLock() + status := instance.Status + lastSeen := instance.LastSeen + instance.mu.RUnlock() + + if status != "updating" { + t.Errorf("Expected status 'updating', got %q", status) + } + + if lastSeen.IsZero() { + t.Error("LastSeen should not be zero") + } +} + +func TestSchedulerInstancesConcurrency(t *testing.T) { + cfg := &config.SchedulerConfig{ + WorkingDir: "/workspace", + } + scheduler := &DockerCLIScheduler{ + config: cfg, + instances: make(map[string]*DockerInstance), + } + + done := make(chan bool, 10) + + for i := 0; i < 10; i++ { + go func(id int) { + workerID := fmt.Sprintf("worker-%d", id) + scheduler.mu.Lock() + scheduler.instances[workerID] = &DockerInstance{ + ID: workerID, + WorkerID: workerID, + Status: "running", + StartedAt: time.Now(), + } + scheduler.mu.Unlock() + done <- true + }(i) + } + + for i := 0; i < 10; i++ { + <-done + } + + scheduler.mu.RLock() + count := len(scheduler.instances) + scheduler.mu.RUnlock() + + if count != 10 { + t.Errorf("Expected 10 instances, got %d", count) + } +} + +func TestInspectContainerParsePID(t *testing.T) { + instance := &DockerInstance{ + ID: "test-1", + WorkerID: "test-1", + ContainerID: "test-container", + } + + ctx := context.Background() + pidOutput := "12345" + + var pid int + _, err := fmt.Sscanf(pidOutput, "%d", &pid) + if err != nil { + t.Fatalf("Failed to parse PID: %v", err) + } + + if pid != 12345 { + t.Errorf("Expected PID 12345, got %d", pid) + } + + instance.mu.Lock() + instance.PID = pid + instance.Status = "running" + instance.LastSeen = time.Now() + instance.mu.Unlock() + + instance.mu.RLock() + gotPID := instance.PID + gotStatus := instance.Status + instance.mu.RUnlock() + + if gotPID != 12345 { + t.Errorf("PID = %d, want 12345", gotPID) + } + + if gotStatus != "running" { + t.Errorf("Status = %q, want running", gotStatus) + } + + if instance.LastSeen.IsZero() { + t.Error("LastSeen should be updated") + } + + _ = ctx +} + +func TestStopNonExistentWorker(t *testing.T) { + cfg := &config.SchedulerConfig{} + scheduler := NewDockerCLIScheduler(cfg) + + err := scheduler.Stop(context.Background(), "nonexistent") + if err == nil { + t.Fatal("Stop should return error for non-existent worker") + } + + if !strings.Contains(err.Error(), "not found") { + t.Errorf("Error should mention 'not found', got: %v", err) + } +} diff --git a/backend/internal/worker/scheduler/process_scheduler.go b/backend/internal/worker/scheduler/process_scheduler.go index d08480f..4c62b41 100644 --- a/backend/internal/worker/scheduler/process_scheduler.go +++ b/backend/internal/worker/scheduler/process_scheduler.go @@ -9,22 +9,18 @@ import ( "sync" "time" + "github.com/insmtx/SingerOS/backend/config" "github.com/insmtx/SingerOS/backend/internal/worker" "github.com/ygpkg/yg-go/logs" ) type ProcessScheduler struct { - config *ProcessConfig + config *config.SchedulerConfig instances map[string]*ProcessInstance mu sync.RWMutex } -type ProcessConfig struct { - WorkerBinary string - WorkingDir string - Env map[string]string - ServerAddr string -} +var _ worker.WorkerScheduler = (*ProcessScheduler)(nil) type ProcessInstance struct { ID string @@ -37,7 +33,7 @@ type ProcessInstance struct { mu sync.RWMutex } -func NewProcessScheduler(config *ProcessConfig) worker.WorkerScheduler { +func NewProcessScheduler(config *config.SchedulerConfig) worker.WorkerScheduler { return &ProcessScheduler{ config: config, instances: make(map[string]*ProcessInstance), diff --git a/backend/internal/worker/server/conn.go b/backend/internal/worker/server/conn.go new file mode 100644 index 0000000..e17f48a --- /dev/null +++ b/backend/internal/worker/server/conn.go @@ -0,0 +1,24 @@ +package server + +import ( + "sync" + "time" + + "github.com/gorilla/websocket" +) + +type WorkerConnection struct { + ID string + Conn *websocket.Conn + Send chan map[string]interface{} + Status string + Registered time.Time + LastSeen time.Time + mu sync.RWMutex +} + +func (wc *WorkerConnection) SendJSON(msg map[string]interface{}) error { + wc.mu.RLock() + defer wc.mu.RUnlock() + return wc.Conn.WriteJSON(msg) +} diff --git a/backend/internal/worker/server/server.go b/backend/internal/worker/server/server.go index 0090719..ec82b30 100644 --- a/backend/internal/worker/server/server.go +++ b/backend/internal/worker/server/server.go @@ -23,32 +23,22 @@ var upgrader = websocket.Upgrader{ WriteBufferSize: 1024, } -type Server struct { +type WorkerManager struct { workers map[string]*WorkerConnection mu sync.RWMutex scheduler worker.WorkerScheduler db *gorm.DB } -func NewServer(scheduler worker.WorkerScheduler, db *gorm.DB) *Server { - return &Server{ +func NewServer(scheduler worker.WorkerScheduler, db *gorm.DB) *WorkerManager { + return &WorkerManager{ workers: make(map[string]*WorkerConnection), scheduler: scheduler, db: db, } } -type WorkerConnection struct { - ID string - Conn *websocket.Conn - Send chan map[string]interface{} - Status string - Registered time.Time - LastSeen time.Time - mu sync.RWMutex -} - -func (s *Server) RegisterRoutes(r gin.IRouter) { +func (s *WorkerManager) RegisterRoutes(r gin.IRouter) { r.GET("/ws/worker", s.handleWorkerWebSocket) r.POST("/ListWorkers", s.listWorkers) r.POST("/GetWorkerInfo", s.getWorkerInfo) @@ -56,7 +46,7 @@ func (s *Server) RegisterRoutes(r gin.IRouter) { r.POST("/CreateWorker", s.createWorker) } -func (s *Server) handleWorkerWebSocket(c *gin.Context) { +func (s *WorkerManager) handleWorkerWebSocket(c *gin.Context) { conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { logs.Errorf("Failed to upgrade WebSocket: %v", err) @@ -124,7 +114,7 @@ func (s *Server) handleWorkerWebSocket(c *gin.Context) { <-ctx.Done() } -func (s *Server) readPump(worker *WorkerConnection) { +func (s *WorkerManager) readPump(worker *WorkerConnection) { defer func() { s.unregisterWorker(worker.ID) worker.Conn.Close() @@ -159,7 +149,7 @@ func (s *Server) readPump(worker *WorkerConnection) { } } -func (s *Server) writePump(worker *WorkerConnection) { +func (s *WorkerManager) writePump(worker *WorkerConnection) { ticker := time.NewTicker(54 * time.Second) defer ticker.Stop() @@ -192,7 +182,7 @@ func (s *Server) writePump(worker *WorkerConnection) { } } -func (s *Server) handleWorkerMessage(worker *WorkerConnection, msg map[string]interface{}) { +func (s *WorkerManager) handleWorkerMessage(worker *WorkerConnection, msg map[string]interface{}) { msgType, _ := msg["type"].(string) switch msgType { @@ -228,7 +218,7 @@ func (s *Server) handleWorkerMessage(worker *WorkerConnection, msg map[string]in } } -func (s *Server) handleGetConfig(worker *WorkerConnection, msg map[string]interface{}) { +func (s *WorkerManager) handleGetConfig(worker *WorkerConnection, msg map[string]interface{}) { assistantCode := "" if payload, ok := msg["payload"].(map[string]interface{}); ok { if code, ok := payload["assistant_code"].(string); ok { @@ -318,7 +308,7 @@ func (s *Server) handleGetConfig(worker *WorkerConnection, msg map[string]interf } } -func (s *Server) unregisterWorker(workerID string) { +func (s *WorkerManager) unregisterWorker(workerID string) { s.mu.Lock() defer s.mu.Unlock() @@ -329,7 +319,7 @@ func (s *Server) unregisterWorker(workerID string) { } } -func (s *Server) heartbeatChecker(worker *WorkerConnection) { +func (s *WorkerManager) heartbeatChecker(worker *WorkerConnection) { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() @@ -359,7 +349,7 @@ type WorkerInfo struct { LastSeen time.Time `json:"last_seen"` } -func (s *Server) listWorkers(c *gin.Context) { +func (s *WorkerManager) listWorkers(c *gin.Context) { var req struct{} if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) @@ -390,7 +380,7 @@ type GetWorkerInfoRequest struct { WorkerID string `json:"worker_id" binding:"required"` } -func (s *Server) getWorkerInfo(c *gin.Context) { +func (s *WorkerManager) getWorkerInfo(c *gin.Context) { var req GetWorkerInfoRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) @@ -422,7 +412,7 @@ type ShutdownWorkerRequest struct { WorkerID string `json:"worker_id" binding:"required"` } -func (s *Server) shutdownWorker(c *gin.Context) { +func (s *WorkerManager) shutdownWorker(c *gin.Context) { var req ShutdownWorkerRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) @@ -466,7 +456,7 @@ type CreateWorkerRequest struct { WorkingDir string `json:"working_dir"` } -func (s *Server) createWorker(c *gin.Context) { +func (s *WorkerManager) createWorker(c *gin.Context) { var req CreateWorkerRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) @@ -502,9 +492,3 @@ func (s *Server) createWorker(c *gin.Context) { c.JSON(http.StatusOK, instance) } - -func (wc *WorkerConnection) SendJSON(msg map[string]interface{}) error { - wc.mu.RLock() - defer wc.mu.RUnlock() - return wc.Conn.WriteJSON(msg) -} diff --git a/backend/internal/worker/worker.go b/backend/internal/worker/worker.go index ba14866..13f019a 100644 --- a/backend/internal/worker/worker.go +++ b/backend/internal/worker/worker.go @@ -4,7 +4,7 @@ import ( "github.com/insmtx/SingerOS/backend/internal/worker/client" ) -type Worker = client.Worker +type Worker = client.WorkerClient type WorkerConfig = client.WorkerConfig var NewWorker = client.NewWorker