Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions backend/cmd/singer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

var (
serverConfigPath string
serverHttpAddr string
)

var serverCmd = &cobra.Command{
Expand Down Expand Up @@ -67,12 +66,12 @@ var serverCmd = &cobra.Command{
r := api.SetupRouter(*cfg, publisher, db)

srv := &http.Server{
Addr: serverHttpAddr,
Addr: cfg.ServerAddr,
Handler: r,
}

logs.Info("Starting SingerOS backend service...")
logs.Infof("Listening on %s", serverHttpAddr)
logs.Infof("Listening on %s", cfg.ServerAddr)

go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
Expand All @@ -96,7 +95,6 @@ var serverCmd = &cobra.Command{

func init() {
serverCmd.Flags().StringVar(&serverConfigPath, "config", "", "Configuration file path")
serverCmd.Flags().StringVar(&serverHttpAddr, "addr", ":8080", "HTTP server address")
rootCmd.AddCommand(serverCmd)
}

Expand Down
26 changes: 13 additions & 13 deletions backend/cmd/singer/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
)

var (
workerConfigPath string
workerServerAddr string
workerListenAddr string
workerAssistantCode string
workerConfigPath string
workerServerAddr string
workerListenAddr string
workerWorkerID string
)

var workerCmd = &cobra.Command{
Expand All @@ -52,21 +52,21 @@ func init() {
workerCmd.Flags().StringVar(&workerConfigPath, "config", "", "Configuration file path")
workerCmd.Flags().StringVar(&workerServerAddr, "server-addr", "127.0.0.1:8080", "Server address for WebSocket connection")
workerCmd.Flags().StringVar(&workerListenAddr, "listen-addr", ":8081", "Worker MCP server listen address for runtime bootstrap")
workerCmd.Flags().StringVar(&workerAssistantCode, "assistant-code", "", "Assistant code for configuration retrieval")
workerCmd.Flags().StringVar(&workerWorkerID, "worker-id", "", "Worker ID for configuration retrieval")
rootCmd.AddCommand(workerCmd)
}

func createWorker(ctx context.Context) (*client.Worker, error) {
func createWorker(ctx context.Context) (*client.WorkerClient, error) {
_, err := loadWorkerConfig()
if err != nil {
return nil, fmt.Errorf("load config: %w", err)
}

return client.NewWorker(ctx, &client.WorkerConfig{
ServerAddr: workerServerAddr,
AssistantCode: workerAssistantCode,
SkillsDir: "",
ToolsEnabled: true,
ServerAddr: workerServerAddr,
WorkerID: workerWorkerID,
SkillsDir: "",
ToolsEnabled: true,
})
}

Expand All @@ -78,9 +78,9 @@ func loadWorkerConfig() (*config.WorkerConfig, error) {
return nil, fmt.Errorf("failed to load config from %s: %w", workerConfigPath, err)
}
}
if strings.TrimSpace(workerAssistantCode) != "" {
cfg.AssistantCode = workerAssistantCode
logs.Infof("Using assistant code from flag: %s", workerAssistantCode)
if strings.TrimSpace(workerWorkerID) != "" {
cfg.WorkerID = workerWorkerID
logs.Infof("Using worker ID from flag: %s", workerWorkerID)
}
if strings.TrimSpace(workerServerAddr) != "" {
cfg.ServerAddr = workerServerAddr
Expand Down
5 changes: 2 additions & 3 deletions backend/cmd/singer/worker_simplechat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ var (
simpleChatAPIKey string
simpleChatModel string
simpleChatBaseURL string

)

var simpleChatCmd = &cobra.Command{
Expand Down Expand Up @@ -53,7 +52,7 @@ var simpleChatCmd = &cobra.Command{

workerCfg := &worker.WorkerConfig{
Runtime: scRuntime,
ServerAddr: simpleChatServer,
ServerAddr: workerServerAddr,
}

w, err := worker.NewWorker(ctx, workerCfg)
Expand All @@ -76,7 +75,7 @@ func init() {
simpleChatCmd.Flags().StringVar(&simpleChatAPIKey, "api-key", "", "OpenAI API key (or set OPENAI_API_KEY env)")
simpleChatCmd.Flags().StringVar(&simpleChatModel, "model", "gpt-4", "LLM model to use")
simpleChatCmd.Flags().StringVar(&simpleChatBaseURL, "base-url", "", "Custom API base URL")

workerCmd.AddCommand(simpleChatCmd)
}

Expand Down
12 changes: 7 additions & 5 deletions backend/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ type LLMConfig struct {

// Config 是 SingerOS 的主配置结构,包含所有子系统的配置
type Config struct {
Github *GithubAppConfig `yaml:"github,omitempty"`
Gitlab *GitlabAppConfig `yaml:"gitlab,omitempty"`
NATS *NATSConfig `yaml:"nats,omitempty"`
Database *DatabaseConfig `yaml:"database,omitempty"`
LLM *LLMConfig `yaml:"llm,omitempty"`
ServerAddr string `yaml:"server_addr,omitempty"` // 服务器地址
Github *GithubAppConfig `yaml:"github,omitempty"`
Gitlab *GitlabAppConfig `yaml:"gitlab,omitempty"`
NATS *NATSConfig `yaml:"nats,omitempty"`
Database *DatabaseConfig `yaml:"database,omitempty"`
LLM *LLMConfig `yaml:"llm,omitempty"`
Scheduler *SchedulerConfig `yaml:"scheduler,omitempty"`
}

// DatabaseConfig 是数据库的配置结构
Expand Down
9 changes: 9 additions & 0 deletions backend/config/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package config

type SchedulerConfig struct {
Mode string `yaml:"mode,omitempty" json:"mode,omitempty"` // 调度模式,支持 "process","docker-cli","docker-api","k8s"
WorkerBinary string `yaml:"worker_binary,omitempty" json:"worker_binary,omitempty"`
WorkingDir string `yaml:"working_dir,omitempty" json:"working_dir,omitempty"`
Env map[string]string `yaml:"env,omitempty" json:"env,omitempty"`
ServerAddr string `yaml:"server_addr,omitempty" json:"server_addr,omitempty"`
}
12 changes: 6 additions & 6 deletions backend/internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ func NewAgent(ctx context.Context, llmConfig *config.LLMConfig, runtimeConfig Co
return nil, fmt.Errorf("tool registry is required")
}

chatModel, err := einoadapter.NewOpenAIChatModel(ctx, llmConfig)
chatModel, err := einoadapter.NewOpenAIChatModel(ctx, llmConfig)
if err != nil {
return nil, err
}

return &Agent{
chatModel: chatModel,
toolAdapter: einoadapter.NewToolAdapter(runtimeConfig.ToolRegistry),
toolAdapter: einoadapter.NewToolAdapter(runtimeConfig.ToolRegistry),
skillsCatalog: runtimeConfig.SkillsCatalog,
systemPrompt: defaultAgentSystemPrompt,
}, nil
Expand Down Expand Up @@ -156,8 +156,8 @@ func (a *Agent) Run(ctx context.Context, req *RequestContext) (*RunResult, error
}

if usage != nil {
_ = state.emitter.Emit(ctx, & agentevents.RunEvent{
Type: agentevents.RunEventUsage,
_ = state.emitter.Emit(ctx, &agentevents.RunEvent{
Type: agentevents.RunEventUsage,
Content: eventContentJSON(usage),
})
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func (a *Agent) buildRunState(req *RequestContext) (*runState, error) {
return nil, err
}

emitter := agentevents.NewEmitter(req.RunID, req.TraceID, sinkForRequest(req))
emitter := agentevents.NewEmitter(req.RunID, req.TraceID, sinkForRequest(req))
toolCtx := tools.ToolContext{
RunID: req.RunID,
TraceID: req.TraceID,
Expand All @@ -205,7 +205,7 @@ func (a *Agent) buildRunState(req *RequestContext) (*runState, error) {
emitter: emitter,
userInput: userInput,
systemPrompt: systemPrompt,
toolBinding: einoadapter.ToolBinding{
toolBinding: einoadapter.ToolBinding{
ToolContext: toolCtx,
AllowedTools: req.Capability.AllowedTools,
},
Expand Down
18 changes: 9 additions & 9 deletions backend/internal/agent/eino/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type FlowConfig struct {
Model einomodel.ToolCallingChatModel
ToolAdapter *ToolAdapter
Binding ToolBinding
Emitter * agentevents.Emitter
Emitter *agentevents.Emitter
SystemPrompt string
MaxStep int
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func (f *Flow) Generate(ctx context.Context, userInput string) (*einoschema.Mess
}

// Stream runs one user request through the Eino agent loop and emits runtime events.
func (f *Flow) Stream(ctx context.Context, userInput string, emitter * agentevents.Emitter) (*einoschema.Message, error) {
func (f *Flow) Stream(ctx context.Context, userInput string, emitter *agentevents.Emitter) (*einoschema.Message, error) {
if f == nil || f.agent == nil {
return nil, fmt.Errorf("flow is not initialized")
}
Expand Down Expand Up @@ -141,21 +141,21 @@ func (f *Flow) Stream(ctx context.Context, userInput string, emitter * agenteven
return einoschema.ConcatMessages(chunks)
}

func emitMessageChunk(ctx context.Context, emitter * agentevents.Emitter, chunk *einoschema.Message, contentSnapshot *strings.Builder, reasoningSnapshot *strings.Builder) error {
func emitMessageChunk(ctx context.Context, emitter *agentevents.Emitter, chunk *einoschema.Message, contentSnapshot *strings.Builder, reasoningSnapshot *strings.Builder) error {
if emitter == nil || chunk == nil {
return nil
}
if chunk.Content != "" {
contentSnapshot.WriteString(chunk.Content)
_ = emitter.Emit(ctx, & agentevents.RunEvent{
Type: agentevents.RunEventMessageDelta,
_ = emitter.Emit(ctx, &agentevents.RunEvent{
Type: agentevents.RunEventMessageDelta,
Content: chunk.Content,
})
}
if chunk.ReasoningContent != "" {
reasoningSnapshot.WriteString(chunk.ReasoningContent)
_ = emitter.Emit(ctx, & agentevents.RunEvent{
Type: agentevents.RunEventReasoningDelta,
_ = emitter.Emit(ctx, &agentevents.RunEvent{
Type: agentevents.RunEventReasoningDelta,
Content: chunk.ReasoningContent,
})
}
Expand All @@ -164,8 +164,8 @@ func emitMessageChunk(ctx context.Context, emitter * agentevents.Emitter, chunk
if strings.TrimSpace(toolCall.Function.Arguments) != "" {
args["json"] = toolCall.Function.Arguments
}
_ = emitter.Emit(ctx, & agentevents.RunEvent{
Type: agentevents.RunEventToolCallArguments,
_ = emitter.Emit(ctx, &agentevents.RunEvent{
Type: agentevents.RunEventToolCallArguments,
Content: eventContentJSON(map[string]any{
"call_id": toolCall.ID,
"name": toolCall.Function.Name,
Expand Down
6 changes: 3 additions & 3 deletions backend/internal/agent/eino/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func TestFlowStreamEmitsMessageEvents(t *testing.T) {
t.Fatalf("new flow: %v", err)
}

var emitted []* agentevents.RunEvent
emitter := agentevents.NewEmitter("run_stream", "trace_stream", agentevents.SinkFunc(func(ctx context.Context, event * agentevents.RunEvent) error {
var emitted []*agentevents.RunEvent
emitter := agentevents.NewEmitter("run_stream", "trace_stream", agentevents.SinkFunc(func(ctx context.Context, event *agentevents.RunEvent) error {
emitted = append(emitted, event)
return nil
}))
Expand All @@ -96,7 +96,7 @@ func TestFlowStreamEmitsMessageEvents(t *testing.T) {
var deltaCount int
for _, event := range emitted {
switch event.Type {
case agentevents.RunEventMessageDelta:
case agentevents.RunEventMessageDelta:
deltaCount++
}
}
Expand Down
Loading