Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ example/

# Prevent accidental Go source files in root directory (except go.mod/go.sum)
# Note: Actual Go test files should be in backend/tests or appropriate package directories
docs/superpowers
258 changes: 48 additions & 210 deletions backend/cmd/singer/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,105 +6,84 @@ import (
"net"
"net/http"
"strings"
"time"

"github.com/gin-gonic/gin"
"github.com/insmtx/SingerOS/backend/config"
"github.com/insmtx/SingerOS/backend/internal/agent"
"github.com/insmtx/SingerOS/backend/internal/agent/externalcli"
"github.com/insmtx/SingerOS/backend/internal/eventengine"
"github.com/insmtx/SingerOS/backend/internal/infra/mq"
"github.com/insmtx/SingerOS/backend/internal/worker/client"
singerMCP "github.com/insmtx/SingerOS/backend/mcp"
"github.com/insmtx/SingerOS/backend/runtime/engines"
"github.com/insmtx/SingerOS/backend/runtime/engines/builtin"
"github.com/insmtx/SingerOS/backend/tools"
skilltools "github.com/insmtx/SingerOS/backend/tools/skill"
"github.com/spf13/cobra"
"github.com/ygpkg/yg-go/lifecycle"
ygconfig "github.com/ygpkg/yg-go/config"
"github.com/ygpkg/yg-go/logs"
)

var (
workerConfigPath string
workerServerAddr string
workerConfigPath string
workerServerAddr string
workerListenAddr string
workerAssistantCode string
)

var workerCmd = &cobra.Command{
Use: "worker",
Short: "Start the SingerOS background worker",
Long: `Start the background worker service for processing asynchronous tasks and events.`,
Run: func(cmd *cobra.Command, args []string) {
mcpServer, err := startWorkerMCPServer(workerServerAddr)
if err != nil {
logs.Fatalf("Failed to start worker MCP server: %v", err)
return
}

cfg, err := loadWorkerConfig(workerConfigPath, workerServerAddr)
if err != nil {
logs.Fatalf("Failed to load config: %v", err)
return
}

natsUrl := "nats://nats:4222"
if cfg.NATS != nil && cfg.NATS.URL != "" {
natsUrl = cfg.NATS.URL
}
ctx := cmd.Context()

subscriber, err := mq.NewPublisher(natsUrl)
worker, err := createWorker(ctx)
if err != nil {
logs.Fatalf("Failed to create event subscriber: %v", err)
logs.Fatalf("Failed to create worker: %v", err)
return
}

runtimeConfig, err := buildRuntimeConfig()
if err != nil {
logs.Fatalf("Failed to build runtime config: %v", err)
return
if err := worker.Start(ctx); err != nil {
logs.Fatalf("Failed to start worker: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())
runner, err := buildRuntimeRunner(ctx, cfg, runtimeConfig)
if err != nil {
cancel()
logs.Fatalf("Failed to create agent runtime: %v", err)
return
}

orchestratorInstance := eventengine.NewOrchestrator(subscriber, runner)
if err := orchestratorInstance.Start(ctx); err != nil {
cancel()
logs.Fatalf("Failed to start orchestrator: %v", err)
return
}
logs.Info("Orchestrator started successfully")

lifecycle.Std().AddCloseFunc(func() error {
cancel()
return nil
})
lifecycle.Std().AddCloseFunc(func() error {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
return mcpServer.Shutdown(shutdownCtx)
})
lifecycle.Std().AddCloseFunc(subscriber.Close)

logs.Info("Worker runtime initialized successfully")
logs.Info("Worker service started")

lifecycle.Std().WaitExit()

logs.Info("Worker exited")
},
}

func init() {
workerCmd.Flags().StringVar(&workerConfigPath, "config", "", "Configuration file path")
workerCmd.Flags().StringVar(&workerServerAddr, "server-addr", ":8081", "Worker MCP server listen address for runtime CLI bootstrap")
workerCmd.Flags().StringVar(&workerServerAddr, "server-addr", "127.0.0.1:8080", "Server address for WebSocket connection")
workerCmd.Flags().StringVar(&workerListenAddr, "listen-addr", ":8081", "Worker MCP server listen address for runtime bootstrap")
workerCmd.Flags().StringVar(&workerAssistantCode, "assistant-code", "", "Assistant code for configuration retrieval")
rootCmd.AddCommand(workerCmd)
}

func createWorker(ctx context.Context) (*client.Worker, error) {
_, err := loadWorkerConfig()
if err != nil {
return nil, fmt.Errorf("load config: %w", err)
}

return client.NewWorker(ctx, &client.WorkerConfig{
ServerAddr: workerServerAddr,
AssistantCode: workerAssistantCode,
SkillsDir: "",
ToolsEnabled: true,
})
}

func loadWorkerConfig() (*config.WorkerConfig, error) {
cfg := &config.WorkerConfig{}
if workerConfigPath != "" {
err := ygconfig.LoadYamlLocalFile(workerConfigPath, cfg)
if err != nil {
return nil, fmt.Errorf("failed to load config from %s: %w", workerConfigPath, err)
}
}
if strings.TrimSpace(workerAssistantCode) != "" {
cfg.AssistantCode = workerAssistantCode
logs.Infof("Using assistant code from flag: %s", workerAssistantCode)
}
if strings.TrimSpace(workerServerAddr) != "" {
cfg.ServerAddr = workerServerAddr
logs.Infof("Using server address from flag: %s", workerServerAddr)
}

return cfg, nil
}

func startWorkerMCPServer(addr string) (*http.Server, error) {
if strings.TrimSpace(addr) == "" {
addr = ":8081"
Expand Down Expand Up @@ -133,144 +112,3 @@ func startWorkerMCPServer(addr string) (*http.Server, error) {

return server, nil
}

func loadWorkerConfig(configPath string, bootstrapAddr string) (*config.Config, error) {
cfg, err := loadConfig(configPath)
if err != nil {
return nil, err
}

bootstrapped, err := builtin.BootstrapCLIEngines(context.Background(), cfg.CLI, defaultCLIBootstrapOptions(bootstrapAddr))
if err != nil {
logs.Warnf("CLI bootstrap failed: %v", err)
}
if bootstrapped != nil {
cfg.CLI = bootstrapped
}

return cfg, nil
}

func defaultCLIBootstrapOptions(addr string) builtin.BootstrapOptions {
return builtin.BootstrapOptions{
MCP: engines.MCPServerConfig{
Name: "singeros",
URL: mcpURLFromAddr(addr),
BearerToken: singerMCP.DefaultAuthToken(),
},
}
}

func mcpURLFromAddr(addr string) string {
host := "localhost"
port := "8081"

if strings.TrimSpace(addr) != "" {
if splitHost, splitPort, err := net.SplitHostPort(addr); err == nil {
if splitHost != "" && splitHost != "0.0.0.0" && splitHost != "::" && splitHost != "[::]" {
host = splitHost
}
if splitPort != "" {
port = splitPort
}
} else if strings.HasPrefix(addr, ":") {
port = strings.TrimPrefix(addr, ":")
} else {
host = addr
}
}

return fmt.Sprintf("http://%s:%s/v1/mcp", host, port)
}

func buildRuntimeConfig() (agent.Config, error) {
catalog, skillDir, err := skilltools.LoadDefaultCatalog()
if err != nil {
return agent.Config{}, fmt.Errorf("load skills: %w", err)
}

logs.Infof("Loaded %d skills from %s for runtime", len(catalog.List()), skillDir)

toolRegistry, err := buildTooling(catalog)
if err != nil {
return agent.Config{}, err
}

return agent.Config{
SkillsCatalog: catalog,
ToolRegistry: toolRegistry,
}, nil
}

func buildTooling(catalog *skilltools.Catalog) (*tools.Registry, error) {
registry := tools.NewRegistry()

if err := skilltools.Register(registry, catalog); err != nil {
return nil, fmt.Errorf("register skill use tool: %w", err)
}

logs.Infof("Loaded %d tools for runtime", len(registry.List()))

return registry, nil
}

func buildRuntimeRunner(ctx context.Context, cfg *config.Config, runtimeConfig agent.Config) (agent.Runner, error) {
if cfg == nil {
return nil, fmt.Errorf("config is required")
}

router := agent.NewRuntimeRouter(agent.RuntimeKindSingerOS)
registered := 0

if cfg.LLM != nil && cfg.LLM.APIKey != "" {
switch cfg.LLM.Provider {
case "", "openai":
logs.Info("Registering SingerOS agent runtime")
singerRunner, err := agent.NewAgent(ctx, cfg.LLM, runtimeConfig)
if err != nil {
return nil, err
}
if err := router.Register(agent.RuntimeKindSingerOS, singerRunner); err != nil {
return nil, err
}
registered++
default:
logs.Warnf("Skipping SingerOS agent runtime for unsupported Eino chat model provider: %s", cfg.LLM.Provider)
}
}

cliRegistry, err := builtin.NewRegistryFromConfig(cfg.CLI)
if err != nil {
return nil, fmt.Errorf("create CLI engine registry: %w", err)
}
cliNames := cliRegistry.Names()
for _, name := range cliNames {
engine, ok := cliRegistry.Get(name)
if !ok {
continue
}
runner, err := externalcli.NewRunner(name, engine, cfg.LLM)
if err != nil {
return nil, err
}
if err := router.Register(name, runner); err != nil {
return nil, err
}
registered++
logs.Infof("Registering external agent CLI runtime: %s", name)
}

if registered == 0 {
return nil, fmt.Errorf("no agent runtime is available")
}
if cfg.LLM == nil || cfg.LLM.APIKey == "" {
if cfg.CLI != nil && cfg.CLI.Default != "" {
router.SetDefault(cfg.CLI.Default)
} else if len(cliNames) > 0 {
router.SetDefault(cliNames[0])
}
} else {
router.SetDefault(agent.RuntimeKindSingerOS)
}
return router, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,16 @@ import (
"github.com/ygpkg/yg-go/logs"
)

var (
taskWorkerConfigPath string
taskWorkerServerAddr string
)
func init() {
workerCmd.AddCommand(claudeWorkerCmd)
}

var claudeWorkerCmd = &cobra.Command{
Use: "claude-worker",
Use: "claude-code",
Short: "Start a standalone task worker backed by Claude Code",
Long: `Start a standalone SingerOS worker that subscribes to org.{org_id}.worker.{worker_id}.task and executes agent.run tasks through Claude Code.`,
Run: func(cmd *cobra.Command, args []string) {
mcpServer, err := startWorkerMCPServer(taskWorkerServerAddr)
if err != nil {
logs.Fatalf("Failed to start worker MCP server: %v", err)
return
}

cfg, err := loadWorkerConfig(taskWorkerConfigPath, taskWorkerServerAddr)
cfg, err := loadWorkerConfig()
if err != nil {
logs.Fatalf("Failed to load config: %v", err)
return
Expand All @@ -44,6 +37,12 @@ var claudeWorkerCmd = &cobra.Command{
return
}

mcpServer, err := startWorkerMCPServer(workerListenAddr)
if err != nil {
logs.Fatalf("Failed to start worker MCP server: %v", err)
return
}

natsURL := "nats://nats:4222"
if cfg.NATS != nil && strings.TrimSpace(cfg.NATS.URL) != "" {
natsURL = cfg.NATS.URL
Expand All @@ -64,8 +63,8 @@ var claudeWorkerCmd = &cobra.Command{

ctx, cancel := context.WithCancel(context.Background())
consumer, err := taskconsumer.New(taskconsumer.Config{
OrgID: cfg.Worker.OrgID,
WorkerID: cfg.Worker.WorkerID,
OrgID: cfg.OrgID,
WorkerID: cfg.WorkerID,
}, bus, bus, runner)
if err != nil {
cancel()
Expand All @@ -91,35 +90,23 @@ var claudeWorkerCmd = &cobra.Command{
})
lifecycle.Std().AddCloseFunc(bus.Close)

logs.Infof("Claude worker started: org_id=%s worker_id=%s topic=%s", cfg.Worker.OrgID, cfg.Worker.WorkerID, consumer.TaskTopic())
logs.Infof("Claude worker started: org_id=%s worker_id=%s topic=%s", cfg.OrgID, cfg.WorkerID, consumer.TaskTopic())
lifecycle.Std().WaitExit()
logs.Info("Claude worker exited")
},
}

func init() {
claudeWorkerCmd.Flags().StringVar(&taskWorkerConfigPath, "config", "", "Configuration file path")
claudeWorkerCmd.Flags().StringVar(&taskWorkerServerAddr, "server-addr", ":8081", "Worker MCP server listen address for runtime bootstrap")
rootCmd.AddCommand(claudeWorkerCmd)
}

func validateTaskWorkerConfig(cfg *config.Config) error {
func validateTaskWorkerConfig(cfg *config.WorkerConfig) error {
if cfg == nil {
return fmt.Errorf("config is required")
}
if cfg.Worker == nil {
return fmt.Errorf("worker config is required")
}
if strings.TrimSpace(cfg.Worker.OrgID) == "" {
return fmt.Errorf("worker.org_id is required")
}
if strings.TrimSpace(cfg.Worker.WorkerID) == "" {
return fmt.Errorf("worker.worker_id is required")
if strings.TrimSpace(cfg.AssistantCode) == "" {
return fmt.Errorf("worker.assistant_code is required")
}
return nil
}

func buildClaudeCodeRunner(cfg *config.Config) (agent.Runner, error) {
func buildClaudeCodeRunner(cfg *config.WorkerConfig) (agent.Runner, error) {
cliRegistry, err := builtin.NewRegistryFromConfig(cfg.CLI)
if err != nil {
return nil, fmt.Errorf("create CLI engine registry: %w", err)
Expand Down
Loading