diff --git a/.gitignore b/.gitignore index 5f4d0d2..449b833 100644 --- a/.gitignore +++ b/.gitignore @@ -40,3 +40,4 @@ example/ # Prevent accidental Go source files in root directory (except go.mod/go.sum) # Note: Actual Go test files should be in backend/tests or appropriate package directories +docs/superpowers diff --git a/backend/cmd/singer/worker.go b/backend/cmd/singer/worker.go index ebf402a..a3d4b87 100644 --- a/backend/cmd/singer/worker.go +++ b/backend/cmd/singer/worker.go @@ -6,27 +6,21 @@ import ( "net" "net/http" "strings" - "time" "github.com/gin-gonic/gin" "github.com/insmtx/SingerOS/backend/config" - "github.com/insmtx/SingerOS/backend/internal/agent" - "github.com/insmtx/SingerOS/backend/internal/agent/externalcli" - "github.com/insmtx/SingerOS/backend/internal/eventengine" - "github.com/insmtx/SingerOS/backend/internal/infra/mq" + "github.com/insmtx/SingerOS/backend/internal/worker/client" singerMCP "github.com/insmtx/SingerOS/backend/mcp" - "github.com/insmtx/SingerOS/backend/runtime/engines" - "github.com/insmtx/SingerOS/backend/runtime/engines/builtin" - "github.com/insmtx/SingerOS/backend/tools" - skilltools "github.com/insmtx/SingerOS/backend/tools/skill" "github.com/spf13/cobra" - "github.com/ygpkg/yg-go/lifecycle" + ygconfig "github.com/ygpkg/yg-go/config" "github.com/ygpkg/yg-go/logs" ) var ( - workerConfigPath string - workerServerAddr string + workerConfigPath string + workerServerAddr string + workerListenAddr string + workerAssistantCode string ) var workerCmd = &cobra.Command{ @@ -34,77 +28,62 @@ var workerCmd = &cobra.Command{ Short: "Start the SingerOS background worker", Long: `Start the background worker service for processing asynchronous tasks and events.`, Run: func(cmd *cobra.Command, args []string) { - mcpServer, err := startWorkerMCPServer(workerServerAddr) - if err != nil { - logs.Fatalf("Failed to start worker MCP server: %v", err) - return - } - - cfg, err := loadWorkerConfig(workerConfigPath, workerServerAddr) - if err != nil { - logs.Fatalf("Failed to load config: %v", err) - return - } - - natsUrl := "nats://nats:4222" - if cfg.NATS != nil && cfg.NATS.URL != "" { - natsUrl = cfg.NATS.URL - } + ctx := cmd.Context() - subscriber, err := mq.NewPublisher(natsUrl) + worker, err := createWorker(ctx) if err != nil { - logs.Fatalf("Failed to create event subscriber: %v", err) + logs.Fatalf("Failed to create worker: %v", err) return } - runtimeConfig, err := buildRuntimeConfig() - if err != nil { - logs.Fatalf("Failed to build runtime config: %v", err) - return + if err := worker.Start(ctx); err != nil { + logs.Fatalf("Failed to start worker: %v", err) } - - ctx, cancel := context.WithCancel(context.Background()) - runner, err := buildRuntimeRunner(ctx, cfg, runtimeConfig) - if err != nil { - cancel() - logs.Fatalf("Failed to create agent runtime: %v", err) - return - } - - orchestratorInstance := eventengine.NewOrchestrator(subscriber, runner) - if err := orchestratorInstance.Start(ctx); err != nil { - cancel() - logs.Fatalf("Failed to start orchestrator: %v", err) - return - } - logs.Info("Orchestrator started successfully") - - lifecycle.Std().AddCloseFunc(func() error { - cancel() - return nil - }) - lifecycle.Std().AddCloseFunc(func() error { - shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer shutdownCancel() - return mcpServer.Shutdown(shutdownCtx) - }) - lifecycle.Std().AddCloseFunc(subscriber.Close) - - logs.Info("Worker runtime initialized successfully") - logs.Info("Worker service started") - - lifecycle.Std().WaitExit() - - logs.Info("Worker exited") }, } func init() { workerCmd.Flags().StringVar(&workerConfigPath, "config", "", "Configuration file path") - workerCmd.Flags().StringVar(&workerServerAddr, "server-addr", ":8081", "Worker MCP server listen address for runtime CLI bootstrap") + workerCmd.Flags().StringVar(&workerServerAddr, "server-addr", "127.0.0.1:8080", "Server address for WebSocket connection") + workerCmd.Flags().StringVar(&workerListenAddr, "listen-addr", ":8081", "Worker MCP server listen address for runtime bootstrap") + workerCmd.Flags().StringVar(&workerAssistantCode, "assistant-code", "", "Assistant code for configuration retrieval") rootCmd.AddCommand(workerCmd) } +func createWorker(ctx context.Context) (*client.Worker, error) { + _, err := loadWorkerConfig() + if err != nil { + return nil, fmt.Errorf("load config: %w", err) + } + + return client.NewWorker(ctx, &client.WorkerConfig{ + ServerAddr: workerServerAddr, + AssistantCode: workerAssistantCode, + SkillsDir: "", + ToolsEnabled: true, + }) +} + +func loadWorkerConfig() (*config.WorkerConfig, error) { + cfg := &config.WorkerConfig{} + if workerConfigPath != "" { + err := ygconfig.LoadYamlLocalFile(workerConfigPath, cfg) + if err != nil { + return nil, fmt.Errorf("failed to load config from %s: %w", workerConfigPath, err) + } + } + if strings.TrimSpace(workerAssistantCode) != "" { + cfg.AssistantCode = workerAssistantCode + logs.Infof("Using assistant code from flag: %s", workerAssistantCode) + } + if strings.TrimSpace(workerServerAddr) != "" { + cfg.ServerAddr = workerServerAddr + logs.Infof("Using server address from flag: %s", workerServerAddr) + } + + return cfg, nil +} + func startWorkerMCPServer(addr string) (*http.Server, error) { if strings.TrimSpace(addr) == "" { addr = ":8081" @@ -133,144 +112,3 @@ func startWorkerMCPServer(addr string) (*http.Server, error) { return server, nil } - -func loadWorkerConfig(configPath string, bootstrapAddr string) (*config.Config, error) { - cfg, err := loadConfig(configPath) - if err != nil { - return nil, err - } - - bootstrapped, err := builtin.BootstrapCLIEngines(context.Background(), cfg.CLI, defaultCLIBootstrapOptions(bootstrapAddr)) - if err != nil { - logs.Warnf("CLI bootstrap failed: %v", err) - } - if bootstrapped != nil { - cfg.CLI = bootstrapped - } - - return cfg, nil -} - -func defaultCLIBootstrapOptions(addr string) builtin.BootstrapOptions { - return builtin.BootstrapOptions{ - MCP: engines.MCPServerConfig{ - Name: "singeros", - URL: mcpURLFromAddr(addr), - BearerToken: singerMCP.DefaultAuthToken(), - }, - } -} - -func mcpURLFromAddr(addr string) string { - host := "localhost" - port := "8081" - - if strings.TrimSpace(addr) != "" { - if splitHost, splitPort, err := net.SplitHostPort(addr); err == nil { - if splitHost != "" && splitHost != "0.0.0.0" && splitHost != "::" && splitHost != "[::]" { - host = splitHost - } - if splitPort != "" { - port = splitPort - } - } else if strings.HasPrefix(addr, ":") { - port = strings.TrimPrefix(addr, ":") - } else { - host = addr - } - } - - return fmt.Sprintf("http://%s:%s/v1/mcp", host, port) -} - -func buildRuntimeConfig() (agent.Config, error) { - catalog, skillDir, err := skilltools.LoadDefaultCatalog() - if err != nil { - return agent.Config{}, fmt.Errorf("load skills: %w", err) - } - - logs.Infof("Loaded %d skills from %s for runtime", len(catalog.List()), skillDir) - - toolRegistry, err := buildTooling(catalog) - if err != nil { - return agent.Config{}, err - } - - return agent.Config{ - SkillsCatalog: catalog, - ToolRegistry: toolRegistry, - }, nil -} - -func buildTooling(catalog *skilltools.Catalog) (*tools.Registry, error) { - registry := tools.NewRegistry() - - if err := skilltools.Register(registry, catalog); err != nil { - return nil, fmt.Errorf("register skill use tool: %w", err) - } - - logs.Infof("Loaded %d tools for runtime", len(registry.List())) - - return registry, nil -} - -func buildRuntimeRunner(ctx context.Context, cfg *config.Config, runtimeConfig agent.Config) (agent.Runner, error) { - if cfg == nil { - return nil, fmt.Errorf("config is required") - } - - router := agent.NewRuntimeRouter(agent.RuntimeKindSingerOS) - registered := 0 - - if cfg.LLM != nil && cfg.LLM.APIKey != "" { - switch cfg.LLM.Provider { - case "", "openai": - logs.Info("Registering SingerOS agent runtime") - singerRunner, err := agent.NewAgent(ctx, cfg.LLM, runtimeConfig) - if err != nil { - return nil, err - } - if err := router.Register(agent.RuntimeKindSingerOS, singerRunner); err != nil { - return nil, err - } - registered++ - default: - logs.Warnf("Skipping SingerOS agent runtime for unsupported Eino chat model provider: %s", cfg.LLM.Provider) - } - } - - cliRegistry, err := builtin.NewRegistryFromConfig(cfg.CLI) - if err != nil { - return nil, fmt.Errorf("create CLI engine registry: %w", err) - } - cliNames := cliRegistry.Names() - for _, name := range cliNames { - engine, ok := cliRegistry.Get(name) - if !ok { - continue - } - runner, err := externalcli.NewRunner(name, engine, cfg.LLM) - if err != nil { - return nil, err - } - if err := router.Register(name, runner); err != nil { - return nil, err - } - registered++ - logs.Infof("Registering external agent CLI runtime: %s", name) - } - - if registered == 0 { - return nil, fmt.Errorf("no agent runtime is available") - } - if cfg.LLM == nil || cfg.LLM.APIKey == "" { - if cfg.CLI != nil && cfg.CLI.Default != "" { - router.SetDefault(cfg.CLI.Default) - } else if len(cliNames) > 0 { - router.SetDefault(cliNames[0]) - } - } else { - router.SetDefault(agent.RuntimeKindSingerOS) - } - return router, nil -} diff --git a/backend/cmd/singer/claude_worker.go b/backend/cmd/singer/worker_claudecode.go similarity index 75% rename from backend/cmd/singer/claude_worker.go rename to backend/cmd/singer/worker_claudecode.go index 7ea57c0..c464855 100644 --- a/backend/cmd/singer/claude_worker.go +++ b/backend/cmd/singer/worker_claudecode.go @@ -18,23 +18,16 @@ import ( "github.com/ygpkg/yg-go/logs" ) -var ( - taskWorkerConfigPath string - taskWorkerServerAddr string -) +func init() { + workerCmd.AddCommand(claudeWorkerCmd) +} var claudeWorkerCmd = &cobra.Command{ - Use: "claude-worker", + Use: "claude-code", Short: "Start a standalone task worker backed by Claude Code", Long: `Start a standalone SingerOS worker that subscribes to org.{org_id}.worker.{worker_id}.task and executes agent.run tasks through Claude Code.`, Run: func(cmd *cobra.Command, args []string) { - mcpServer, err := startWorkerMCPServer(taskWorkerServerAddr) - if err != nil { - logs.Fatalf("Failed to start worker MCP server: %v", err) - return - } - - cfg, err := loadWorkerConfig(taskWorkerConfigPath, taskWorkerServerAddr) + cfg, err := loadWorkerConfig() if err != nil { logs.Fatalf("Failed to load config: %v", err) return @@ -44,6 +37,12 @@ var claudeWorkerCmd = &cobra.Command{ return } + mcpServer, err := startWorkerMCPServer(workerListenAddr) + if err != nil { + logs.Fatalf("Failed to start worker MCP server: %v", err) + return + } + natsURL := "nats://nats:4222" if cfg.NATS != nil && strings.TrimSpace(cfg.NATS.URL) != "" { natsURL = cfg.NATS.URL @@ -64,8 +63,8 @@ var claudeWorkerCmd = &cobra.Command{ ctx, cancel := context.WithCancel(context.Background()) consumer, err := taskconsumer.New(taskconsumer.Config{ - OrgID: cfg.Worker.OrgID, - WorkerID: cfg.Worker.WorkerID, + OrgID: cfg.OrgID, + WorkerID: cfg.WorkerID, }, bus, bus, runner) if err != nil { cancel() @@ -91,35 +90,23 @@ var claudeWorkerCmd = &cobra.Command{ }) lifecycle.Std().AddCloseFunc(bus.Close) - logs.Infof("Claude worker started: org_id=%s worker_id=%s topic=%s", cfg.Worker.OrgID, cfg.Worker.WorkerID, consumer.TaskTopic()) + logs.Infof("Claude worker started: org_id=%s worker_id=%s topic=%s", cfg.OrgID, cfg.WorkerID, consumer.TaskTopic()) lifecycle.Std().WaitExit() logs.Info("Claude worker exited") }, } -func init() { - claudeWorkerCmd.Flags().StringVar(&taskWorkerConfigPath, "config", "", "Configuration file path") - claudeWorkerCmd.Flags().StringVar(&taskWorkerServerAddr, "server-addr", ":8081", "Worker MCP server listen address for runtime bootstrap") - rootCmd.AddCommand(claudeWorkerCmd) -} - -func validateTaskWorkerConfig(cfg *config.Config) error { +func validateTaskWorkerConfig(cfg *config.WorkerConfig) error { if cfg == nil { return fmt.Errorf("config is required") } - if cfg.Worker == nil { - return fmt.Errorf("worker config is required") - } - if strings.TrimSpace(cfg.Worker.OrgID) == "" { - return fmt.Errorf("worker.org_id is required") - } - if strings.TrimSpace(cfg.Worker.WorkerID) == "" { - return fmt.Errorf("worker.worker_id is required") + if strings.TrimSpace(cfg.AssistantCode) == "" { + return fmt.Errorf("worker.assistant_code is required") } return nil } -func buildClaudeCodeRunner(cfg *config.Config) (agent.Runner, error) { +func buildClaudeCodeRunner(cfg *config.WorkerConfig) (agent.Runner, error) { cliRegistry, err := builtin.NewRegistryFromConfig(cfg.CLI) if err != nil { return nil, fmt.Errorf("create CLI engine registry: %w", err) diff --git a/backend/cmd/singer/simplechat.go b/backend/cmd/singer/worker_simplechat.go similarity index 100% rename from backend/cmd/singer/simplechat.go rename to backend/cmd/singer/worker_simplechat.go diff --git a/backend/config/config.go b/backend/config/config.go index f5bd622..4671281 100644 --- a/backend/config/config.go +++ b/backend/config/config.go @@ -17,28 +17,13 @@ type LLMConfig struct { BaseURL string `yaml:"base_url,omitempty"` // Custom base URL } -// CLIEnginesConfig is the configuration for external AI coding CLIs. -type CLIEnginesConfig struct { - Default string `yaml:"default,omitempty" json:"default,omitempty"` -} - -// WorkerConfig is the configuration for a standalone task worker process. -type WorkerConfig struct { - OrgID string `yaml:"org_id" json:"org_id"` - WorkerID string `yaml:"worker_id" json:"worker_id"` - Concurrency int `yaml:"concurrency,omitempty" json:"concurrency,omitempty"` - AckWait string `yaml:"ack_wait,omitempty" json:"ack_wait,omitempty"` -} - // Config 是 SingerOS 的主配置结构,包含所有子系统的配置 type Config struct { - Github *GithubAppConfig `yaml:"github,omitempty"` - Gitlab *GitlabAppConfig `yaml:"gitlab,omitempty"` - NATS *NATSConfig `yaml:"nats,omitempty"` - Database *DatabaseConfig `yaml:"database,omitempty"` - LLM *LLMConfig `yaml:"llm,omitempty"` - CLI *CLIEnginesConfig `yaml:"cli,omitempty"` - Worker *WorkerConfig `yaml:"worker,omitempty"` + Github *GithubAppConfig `yaml:"github,omitempty"` + Gitlab *GitlabAppConfig `yaml:"gitlab,omitempty"` + NATS *NATSConfig `yaml:"nats,omitempty"` + Database *DatabaseConfig `yaml:"database,omitempty"` + LLM *LLMConfig `yaml:"llm,omitempty"` } // DatabaseConfig 是数据库的配置结构 diff --git a/backend/config/worker.go b/backend/config/worker.go new file mode 100644 index 0000000..5c5064b --- /dev/null +++ b/backend/config/worker.go @@ -0,0 +1,19 @@ +package config + +type WorkerConfig struct { + OrgID string `yaml:"org_id" json:"org_id"` + WorkerID string `yaml:"worker_id" json:"worker_id"` + AssistantCode string `yaml:"assistant_code,omitempty" json:"assistant_code,omitempty"` + ServerAddr string `yaml:"server_addr,omitempty" json:"server_addr,omitempty"` + SkillsDir string `yaml:"skills_dir,omitempty" json:"skills_dir,omitempty"` + ToolsEnabled bool `yaml:"tools_enabled,omitempty" json:"tools_enabled,omitempty"` + + NATS *NATSConfig `yaml:"nats,omitempty"` + LLM *LLMConfig `yaml:"llm,omitempty" json:"llm,omitempty"` + CLI *CLIEnginesConfig `yaml:"cli,omitempty"` +} + +// CLIEnginesConfig is the configuration for external AI coding CLIs. +type CLIEnginesConfig struct { + Default string `yaml:"default,omitempty" json:"default,omitempty"` +} diff --git a/backend/internal/api/router.go b/backend/internal/api/router.go index 5e89d88..073b2d6 100644 --- a/backend/internal/api/router.go +++ b/backend/internal/api/router.go @@ -16,6 +16,7 @@ import ( githubprovider "github.com/insmtx/SingerOS/backend/internal/infra/providers/github" "github.com/insmtx/SingerOS/backend/internal/infra/websocket" "github.com/insmtx/SingerOS/backend/internal/service" + "github.com/insmtx/SingerOS/backend/internal/worker/scheduler" workerserver "github.com/insmtx/SingerOS/backend/internal/worker/server" singerMCP "github.com/insmtx/SingerOS/backend/mcp" ygmiddleware "github.com/ygpkg/yg-go/apis/runtime/middleware" @@ -61,14 +62,18 @@ func SetupRouter(cfg config.Config, publisher eventbus.Publisher, db *gorm.DB) * websocket.RegisterWebSocketRoutes(v1, publisher) logs.Info("WebSocket connector registered successfully") - digitalAssistantService := service.NewDigitalAssistantService(db) - handler.RegisterDigitalAssistantRoutes(v1, digitalAssistantService) - logs.Info("Digital assistant routes registered successfully") + workerScheduler := scheduler.NewProcessScheduler(&scheduler.ProcessConfig{ + ServerAddr: ":8080", + }) - workerServer := workerserver.NewServer() + workerServer := workerserver.NewServer(workerScheduler, db) workerServer.RegisterRoutes(v1) logs.Info("Worker server routes registered successfully") + digitalAssistantService := service.NewDigitalAssistantService(db, workerScheduler) + handler.RegisterDigitalAssistantRoutes(v1, digitalAssistantService) + logs.Info("Digital assistant routes registered successfully") + singerMCP.RegisterRoutes(v1, singerMCP.NewServer()) logs.Info("MCP routes registered successfully") diff --git a/backend/internal/service/digital_assistant_service.go b/backend/internal/service/digital_assistant_service.go index 87eaa7b..598deae 100644 --- a/backend/internal/service/digital_assistant_service.go +++ b/backend/internal/service/digital_assistant_service.go @@ -10,18 +10,22 @@ import ( "github.com/insmtx/SingerOS/backend/internal/api/auth" "github.com/insmtx/SingerOS/backend/internal/api/contract" "github.com/insmtx/SingerOS/backend/internal/infra/db" + "github.com/insmtx/SingerOS/backend/internal/worker" "github.com/insmtx/SingerOS/backend/types" + "github.com/ygpkg/yg-go/logs" ) var _ contract.DigitalAssistantService = (*digitalAssistantService)(nil) type digitalAssistantService struct { - db *gorm.DB + db *gorm.DB + workerScheduler worker.WorkerScheduler } -func NewDigitalAssistantService(db *gorm.DB) contract.DigitalAssistantService { +func NewDigitalAssistantService(db *gorm.DB, workerScheduler worker.WorkerScheduler) contract.DigitalAssistantService { return &digitalAssistantService{ - db: db, + db: db, + workerScheduler: workerScheduler, } } @@ -70,6 +74,17 @@ func (s *digitalAssistantService) CreateDigitalAssistant(ctx context.Context, re return nil, err } + if s.workerScheduler != nil && da.Status == string(contract.DigitalAssistantStatusActive) { + spec := &worker.WorkerSpec{ + ID: da.Code, + Name: da.Name, + EnvType: worker.WorkerEnvProcess, + } + if _, err := s.workerScheduler.Start(ctx, spec); err != nil { + logs.Warnf("Failed to start worker for assistant %s: %v", da.Code, err) + } + } + return convertToContractDigitalAssistant(da), nil } diff --git a/backend/internal/worker/client/worker.go b/backend/internal/worker/client/worker.go index 55c4933..e18bc7e 100644 --- a/backend/internal/worker/client/worker.go +++ b/backend/internal/worker/client/worker.go @@ -16,12 +16,13 @@ import ( ) type Worker struct { - runtime agent.AgentRuntime - config *WorkerConfig - workerID string - startedAt time.Time - status string - wsClient *WSClient + runtime agent.AgentRuntime + config *WorkerConfig + workerID string + assistantCode string + startedAt time.Time + status string + wsClient *WSClient } type WorkerConfig struct { @@ -30,6 +31,7 @@ type WorkerConfig struct { SkillsDir string ToolsEnabled bool ServerAddr string + AssistantCode string } func NewWorker(ctx context.Context, cfg *WorkerConfig) (*Worker, error) { @@ -37,49 +39,91 @@ func NewWorker(ctx context.Context, cfg *WorkerConfig) (*Worker, error) { return nil, fmt.Errorf("worker config is required") } - runtime := cfg.Runtime - if runtime == nil { - runtime = buildDefaultRuntime(ctx, cfg) - } - - if runtime == nil { - return nil, fmt.Errorf("either Runtime or LLMConfig must be provided") - } - workerID := fmt.Sprintf("worker_%d", time.Now().UnixNano()) w := &Worker{ - runtime: runtime, - config: cfg, - workerID: workerID, - startedAt: time.Now(), - status: "initialized", + config: cfg, + workerID: workerID, + assistantCode: cfg.AssistantCode, + startedAt: time.Now(), + status: "initialized", } if cfg.ServerAddr != "" { - w.wsClient = NewWSClient(cfg.ServerAddr, workerID) + w.wsClient = NewWSClient(cfg.ServerAddr, workerID, + WithAssistantCode(cfg.AssistantCode), + WithOnConfigReady(func(assistantConfig map[string]interface{}) { + w.handleAssistantConfig(ctx, assistantConfig) + }), + ) } return w, nil } -func buildDefaultRuntime(ctx context.Context, cfg *WorkerConfig) agent.AgentRuntime { +func (w *Worker) handleAssistantConfig(ctx context.Context, assistantConfig map[string]interface{}) { + logs.Info("Processing assistant configuration from server") + + llmConfigRaw, ok := assistantConfig["llm_config"] + if !ok { + logs.Warn("llm_config not found in assistant config, using default") + return + } + + llmConfigMap, ok := llmConfigRaw.(map[string]interface{}) + if !ok { + logs.Warn("llm_config is not a valid object") + return + } + + llmConfig := &config.LLMConfig{} + if provider, ok := llmConfigMap["type"].(string); ok { + llmConfig.Provider = provider + } + if apiKey, ok := llmConfigMap["api_key"].(string); ok { + llmConfig.APIKey = apiKey + } + if model, ok := llmConfigMap["model"].(string); ok { + llmConfig.Model = model + } + if baseURL, ok := llmConfigMap["base_url"].(string); ok { + llmConfig.BaseURL = baseURL + } + + if llmConfig.Provider == "" || llmConfig.APIKey == "" { + logs.Warn("incomplete llm_config, skipping runtime initialization") + return + } + + runtime, err := buildDefaultRuntime(ctx, &WorkerConfig{ + LLMConfig: llmConfig, + SkillsDir: w.config.SkillsDir, + ToolsEnabled: w.config.ToolsEnabled, + }) + if err != nil { + logs.Errorf("Failed to build runtime: %v", err) + return + } + + w.runtime = runtime + w.status = "ready" + logs.Infof("Worker %s initialized with assistant config", w.workerID) +} +func buildDefaultRuntime(ctx context.Context, cfg *WorkerConfig) (agent.AgentRuntime, error) { if cfg.LLMConfig == nil { - return nil + return nil, fmt.Errorf("llm config is required") } catalog, err := loadSkillsCatalog(cfg.SkillsDir) if err != nil { - logs.Errorf("load skills catalog: %v", err) - return nil + return nil, fmt.Errorf("load skills catalog: %w", err) } toolRegistry := tools.NewRegistry() if cfg.ToolsEnabled { if err := skilltools.Register(toolRegistry, catalog); err != nil { - logs.Errorf("register tools: %v", err) - return nil + return nil, fmt.Errorf("register tools: %w", err) } } @@ -90,26 +134,25 @@ func buildDefaultRuntime(ctx context.Context, cfg *WorkerConfig) agent.AgentRunt agentInstance, err := agent.NewAgent(ctx, cfg.LLMConfig, agentConfig) if err != nil { - logs.Errorf("create agent: %v", err) - return nil + return nil, fmt.Errorf("create agent: %w", err) } - return agentInstance + return agentInstance, nil } func (w *Worker) Run(ctx context.Context, req *agent.RequestContext) (*agent.RunResult, error) { if w == nil || w.runtime == nil { return nil, fmt.Errorf("worker runtime is not initialized") } - + w.status = "processing" result, err := w.runtime.Run(ctx, req) if err != nil { w.status = "error" return nil, err } - - w.status = "idle" + + w.status = "ready" return result, nil } diff --git a/backend/internal/worker/client/ws_client.go b/backend/internal/worker/client/ws_client.go index 2a9d8e5..df5ed2f 100644 --- a/backend/internal/worker/client/ws_client.go +++ b/backend/internal/worker/client/ws_client.go @@ -11,23 +11,43 @@ import ( ) type WSClient struct { - conn *websocket.Conn - workerID string - serverAddr string - send chan map[string]interface{} - ctx context.Context - cancel context.CancelFunc + conn *websocket.Conn + workerID string + assistantCode string + serverAddr string + send chan map[string]interface{} + ctx context.Context + cancel context.CancelFunc + onConfigReady func(config map[string]interface{}) } -func NewWSClient(serverAddr, workerID string) *WSClient { +type WSClientOption func(*WSClient) + +func WithAssistantCode(assistantCode string) WSClientOption { + return func(c *WSClient) { + c.assistantCode = assistantCode + } +} + +func WithOnConfigReady(handler func(map[string]interface{})) WSClientOption { + return func(c *WSClient) { + c.onConfigReady = handler + } +} + +func NewWSClient(serverAddr, workerID string, opts ...WSClientOption) *WSClient { ctx, cancel := context.WithCancel(context.Background()) - return &WSClient{ + c := &WSClient{ workerID: workerID, serverAddr: serverAddr, send: make(chan map[string]interface{}, 256), ctx: ctx, cancel: cancel, } + for _, opt := range opts { + opt(c) + } + return c } func (c *WSClient) Connect(ctx context.Context) error { @@ -118,6 +138,11 @@ func (c *WSClient) handleMessage(msg map[string]interface{}) { switch msgType { case "welcome": logs.Infof("Received welcome from server") + if c.assistantCode != "" { + c.requestConfig() + } + case "configResponse": + c.handleConfigResponse(msg) case "config_update": logs.Infof("Received config update") default: @@ -125,6 +150,46 @@ func (c *WSClient) handleMessage(msg map[string]interface{}) { } } +func (c *WSClient) requestConfig() { + reqMsg := map[string]interface{}{ + "type": "getConfig", + "payload": map[string]interface{}{ + "assistant_code": c.assistantCode, + }, + } + if err := c.sendJSON(reqMsg); err != nil { + logs.Errorf("Failed to request config: %v", err) + } else { + logs.Infof("Requested config for assistant %s", c.assistantCode) + } +} + +func (c *WSClient) handleConfigResponse(msg map[string]interface{}) { + payload, ok := msg["payload"].(map[string]interface{}) + if !ok { + logs.Errorf("Invalid configResponse payload") + return + } + + if errMsg, ok := payload["error"].(string); ok && errMsg != "" { + logs.Errorf("Config response error: %s", errMsg) + return + } + + config, ok := payload["config"].(map[string]interface{}) + if !ok { + logs.Errorf("Config not found in response") + return + } + + if c.onConfigReady != nil { + c.onConfigReady(config) + logs.Info("Config processed successfully") + } else { + logs.Warn("No config handler registered") + } +} + func (c *WSClient) sendJSON(msg map[string]interface{}) error { data, err := json.Marshal(msg) if err != nil { diff --git a/backend/internal/worker/scheduler.go b/backend/internal/worker/scheduler.go new file mode 100644 index 0000000..5d4940d --- /dev/null +++ b/backend/internal/worker/scheduler.go @@ -0,0 +1,43 @@ +package worker + +import ( + "context" + "time" +) + +type WorkerEnvType string + +const ( + WorkerEnvProcess WorkerEnvType = "process" + WorkerEnvDocker WorkerEnvType = "docker" + WorkerEnvKubeVirt WorkerEnvType = "kubevirt" +) + +type WorkerScheduler interface { + Start(ctx context.Context, spec *WorkerSpec) (*WorkerInstance, error) + Stop(ctx context.Context, workerID string) error + Health(ctx context.Context, workerID string) error + List(ctx context.Context) ([]*WorkerInstance, error) +} + +type WorkerSpec struct { + ID string + Name string + Labels map[string]string + Annotations map[string]string + EnvType WorkerEnvType + Image string + Command []string + Args []string + Env map[string]string + WorkingDir string +} + +type WorkerInstance struct { + ID string + WorkerID string + Status string + PID int + StartedAt time.Time + Endpoint string +} diff --git a/backend/internal/worker/scheduler/process_scheduler.go b/backend/internal/worker/scheduler/process_scheduler.go new file mode 100644 index 0000000..d08480f --- /dev/null +++ b/backend/internal/worker/scheduler/process_scheduler.go @@ -0,0 +1,251 @@ +package scheduler + +import ( + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "sync" + "time" + + "github.com/insmtx/SingerOS/backend/internal/worker" + "github.com/ygpkg/yg-go/logs" +) + +type ProcessScheduler struct { + config *ProcessConfig + instances map[string]*ProcessInstance + mu sync.RWMutex +} + +type ProcessConfig struct { + WorkerBinary string + WorkingDir string + Env map[string]string + ServerAddr string +} + +type ProcessInstance struct { + ID string + WorkerID string + Cmd *exec.Cmd + Process *os.Process + Status string + StartedAt time.Time + LastSeen time.Time + mu sync.RWMutex +} + +func NewProcessScheduler(config *ProcessConfig) worker.WorkerScheduler { + return &ProcessScheduler{ + config: config, + instances: make(map[string]*ProcessInstance), + } +} + +func (ps *ProcessScheduler) Start(ctx context.Context, spec *worker.WorkerSpec) (*worker.WorkerInstance, error) { + if spec.EnvType != "" && spec.EnvType != worker.WorkerEnvProcess { + return nil, fmt.Errorf("unsupported env type: %s, ProcessScheduler only supports process runtime", spec.EnvType) + } + + ps.mu.Lock() + defer ps.mu.Unlock() + + workerID := spec.ID + if workerID == "" { + workerID = fmt.Sprintf("worker_%d", time.Now().UnixNano()) + } + + instance := &ProcessInstance{ + ID: workerID, + WorkerID: workerID, + Status: "initializing", + StartedAt: time.Now(), + LastSeen: time.Now(), + } + + if err := ps.startProcess(instance, spec); err != nil { + return nil, fmt.Errorf("failed to start process: %w", err) + } + + ps.instances[workerID] = instance + + return &worker.WorkerInstance{ + ID: instance.ID, + WorkerID: instance.WorkerID, + Status: instance.Status, + PID: instance.Process.Pid, + StartedAt: instance.StartedAt, + }, nil +} + +func (ps *ProcessScheduler) Stop(ctx context.Context, workerID string) error { + ps.mu.Lock() + defer ps.mu.Unlock() + + instance, ok := ps.instances[workerID] + if !ok { + return fmt.Errorf("worker %s not found", workerID) + } + + if err := ps.stopProcess(instance); err != nil { + return fmt.Errorf("failed to stop process: %w", err) + } + + instance.Status = "stopped" + delete(ps.instances, workerID) + return nil +} + +func (ps *ProcessScheduler) Health(ctx context.Context, workerID string) error { + ps.mu.RLock() + instance, ok := ps.instances[workerID] + ps.mu.RUnlock() + + if !ok { + return fmt.Errorf("worker %s not found", workerID) + } + + return ps.healthCheck(instance) +} + +func (ps *ProcessScheduler) List(ctx context.Context) ([]*worker.WorkerInstance, error) { + ps.mu.RLock() + defer ps.mu.RUnlock() + + result := make([]*worker.WorkerInstance, 0, len(ps.instances)) + for _, instance := range ps.instances { + instance.mu.RLock() + result = append(result, &worker.WorkerInstance{ + ID: instance.ID, + WorkerID: instance.WorkerID, + Status: instance.Status, + PID: instance.Process.Pid, + StartedAt: instance.StartedAt, + }) + instance.mu.RUnlock() + } + return result, nil +} + +func (ps *ProcessScheduler) startProcess(instance *ProcessInstance, spec *worker.WorkerSpec) error { + cmdPath := ps.config.WorkerBinary + if cmdPath == "" { + cmdPath = "./bundles/singer" + } + + if _, err := os.Stat(cmdPath); os.IsNotExist(err) { + return fmt.Errorf("worker binary not found: %s", cmdPath) + } + + env := os.Environ() + for key, value := range ps.config.Env { + env = append(env, fmt.Sprintf("%s=%s", key, value)) + } + + workDir := ps.config.WorkingDir + if workDir == "" { + workDir = filepath.Dir(cmdPath) + } + + args := []string{cmdPath, "worker"} + if spec.ID != "" { + args = append(args, "--assistant-code", spec.ID) + } + args = append(args, "--server-addr", ps.config.ServerAddr) + + cmd := exec.Command(args[0], args[1:]...) + cmd.Dir = workDir + cmd.Env = env + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + if err := cmd.Start(); err != nil { + return fmt.Errorf("failed to start command: %w", err) + } + + instance.mu.Lock() + instance.Cmd = cmd + instance.Process = cmd.Process + instance.Status = "running" + instance.LastSeen = time.Now() + instance.mu.Unlock() + + logs.Infof("Worker process for assistant %s started with PID %d", spec.ID, cmd.Process.Pid) + + go ps.monitorProcess(instance) + + return nil +} + +func (ps *ProcessScheduler) stopProcess(instance *ProcessInstance) error { + instance.mu.RLock() + defer instance.mu.RUnlock() + + if instance.Process == nil { + return nil + } + + if err := instance.Process.Signal(os.Interrupt); err != nil { + logs.Warnf("Failed to send interrupt signal to %s: %v", instance.WorkerID, err) + if err := instance.Process.Kill(); err != nil { + return fmt.Errorf("failed to kill process: %w", err) + } + } + + logs.Infof("Sent interrupt signal to worker %s", instance.WorkerID) + return nil +} + +func (ps *ProcessScheduler) healthCheck(instance *ProcessInstance) error { + instance.mu.RLock() + defer instance.mu.RUnlock() + + if instance.Process == nil { + return fmt.Errorf("process not started") + } + + process, err := os.FindProcess(instance.Process.Pid) + if err != nil { + return fmt.Errorf("process not found: %w", err) + } + + if err := process.Signal(os.Interrupt); err != nil { + return fmt.Errorf("process is not responding: %w", err) + } + + instance.LastSeen = time.Now() + return nil +} + +func (ps *ProcessScheduler) monitorProcess(instance *ProcessInstance) { + instance.mu.RLock() + cmd := instance.Cmd + instance.mu.RUnlock() + + if cmd == nil { + return + } + + err := cmd.Wait() + + instance.mu.Lock() + defer instance.mu.Unlock() + + if err != nil { + logs.Errorf("Worker process %s exited with error: %v", instance.WorkerID, err) + instance.Status = "error" + } else { + logs.Infof("Worker process %s exited normally", instance.WorkerID) + instance.Status = "stopped" + } + + ps.removeInstance(instance.WorkerID) +} + +func (ps *ProcessScheduler) removeInstance(workerID string) { + ps.mu.Lock() + defer ps.mu.Unlock() + delete(ps.instances, workerID) +} diff --git a/backend/internal/worker/server/server.go b/backend/internal/worker/server/server.go index 288b58a..0090719 100644 --- a/backend/internal/worker/server/server.go +++ b/backend/internal/worker/server/server.go @@ -1,6 +1,7 @@ package server import ( + "context" "encoding/json" "net/http" "sync" @@ -8,7 +9,10 @@ import ( "github.com/gin-gonic/gin" "github.com/gorilla/websocket" + infradb "github.com/insmtx/SingerOS/backend/internal/infra/db" + "github.com/insmtx/SingerOS/backend/internal/worker" "github.com/ygpkg/yg-go/logs" + "gorm.io/gorm" ) var upgrader = websocket.Upgrader{ @@ -20,8 +24,18 @@ var upgrader = websocket.Upgrader{ } type Server struct { - workers map[string]*WorkerConnection - mu sync.RWMutex + workers map[string]*WorkerConnection + mu sync.RWMutex + scheduler worker.WorkerScheduler + db *gorm.DB +} + +func NewServer(scheduler worker.WorkerScheduler, db *gorm.DB) *Server { + return &Server{ + workers: make(map[string]*WorkerConnection), + scheduler: scheduler, + db: db, + } } type WorkerConnection struct { @@ -34,17 +48,12 @@ type WorkerConnection struct { mu sync.RWMutex } -func NewServer() *Server { - return &Server{ - workers: make(map[string]*WorkerConnection), - } -} - func (s *Server) RegisterRoutes(r gin.IRouter) { r.GET("/ws/worker", s.handleWorkerWebSocket) r.POST("/ListWorkers", s.listWorkers) r.POST("/GetWorkerInfo", s.getWorkerInfo) r.POST("/ShutdownWorker", s.shutdownWorker) + r.POST("/CreateWorker", s.createWorker) } func (s *Server) handleWorkerWebSocket(c *gin.Context) { @@ -213,6 +222,99 @@ func (s *Server) handleWorkerMessage(worker *WorkerConnection, msg map[string]in worker.mu.Unlock() } } + + case "getConfig": + s.handleGetConfig(worker, msg) + } +} + +func (s *Server) handleGetConfig(worker *WorkerConnection, msg map[string]interface{}) { + assistantCode := "" + if payload, ok := msg["payload"].(map[string]interface{}); ok { + if code, ok := payload["assistant_code"].(string); ok { + assistantCode = code + } + } + + if assistantCode == "" { + resp := map[string]interface{}{ + "type": "configResponse", + "payload": map[string]interface{}{ + "config": nil, + "error": "assistant_code is required", + }, + } + select { + case worker.Send <- resp: + default: + logs.Warnf("Config response dropped for worker %s", worker.ID) + } + return + } + + if s.db == nil { + resp := map[string]interface{}{ + "type": "configResponse", + "payload": map[string]interface{}{ + "config": nil, + "error": "database not available", + }, + } + select { + case worker.Send <- resp: + default: + logs.Warnf("Config response dropped for worker %s", worker.ID) + } + return + } + + da, err := infradb.GetDigitalAssistantByCode(context.Background(), s.db, assistantCode) + if err != nil { + logs.Errorf("Failed to get digital assistant %s: %v", assistantCode, err) + resp := map[string]interface{}{ + "type": "configResponse", + "payload": map[string]interface{}{ + "config": nil, + "error": err.Error(), + }, + } + select { + case worker.Send <- resp: + default: + logs.Warnf("Config response dropped for worker %s", worker.ID) + } + return + } + + if da == nil { + logs.Warnf("Digital assistant %s not found", assistantCode) + resp := map[string]interface{}{ + "type": "configResponse", + "payload": map[string]interface{}{ + "config": nil, + "error": "digital assistant not found", + }, + } + select { + case worker.Send <- resp: + default: + logs.Warnf("Config response dropped for worker %s", worker.ID) + } + return + } + + resp := map[string]interface{}{ + "type": "configResponse", + "payload": map[string]interface{}{ + "config": da.Config, + "error": nil, + }, + } + select { + case worker.Send <- resp: + logs.Infof("Config sent to worker %s for assistant %s", worker.ID, assistantCode) + default: + logs.Warnf("Config response dropped for worker %s", worker.ID) } } @@ -351,6 +453,56 @@ func (s *Server) shutdownWorker(c *gin.Context) { } } +type CreateWorkerRequest struct { + ID string `json:"id"` + Name string `json:"name"` + Labels map[string]string `json:"labels"` + Annotations map[string]string `json:"annotations"` + EnvType string `json:"env_type"` + Image string `json:"image"` + Command []string `json:"command"` + Args []string `json:"args"` + Env map[string]string `json:"env"` + WorkingDir string `json:"working_dir"` +} + +func (s *Server) createWorker(c *gin.Context) { + var req CreateWorkerRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + if s.scheduler == nil { + c.JSON(http.StatusServiceUnavailable, gin.H{"error": "worker scheduler not initialized"}) + return + } + + spec := &worker.WorkerSpec{ + ID: req.ID, + Name: req.Name, + Labels: req.Labels, + Annotations: req.Annotations, + EnvType: worker.WorkerEnvType(req.EnvType), + Image: req.Image, + Command: req.Command, + Args: req.Args, + Env: req.Env, + WorkingDir: req.WorkingDir, + } + + spec.EnvType = worker.WorkerEnvProcess + + instance, err := s.scheduler.Start(c.Request.Context(), spec) + if err != nil { + logs.Errorf("Failed to create worker: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + c.JSON(http.StatusOK, instance) +} + func (wc *WorkerConnection) SendJSON(msg map[string]interface{}) error { wc.mu.RLock() defer wc.mu.RUnlock()