From 6315fbbeabfa6d7fb47b6c52aaeeed846aa0ad15 Mon Sep 17 00:00:00 2001 From: Santosh Date: Thu, 11 Jun 2026 08:36:56 -0400 Subject: [PATCH 1/3] Add explicit realtime session DX --- control-plane/internal/cli/root.go | 1 + control-plane/internal/cli/session.go | 302 +++++++++++++++++ control-plane/internal/cli/session_test.go | 73 ++++ control-plane/internal/handlers/sessions.go | 311 ++++++++++++++++++ .../internal/handlers/sessions_test.go | 66 ++++ control-plane/internal/server/routes_core.go | 7 + control-plane/pkg/types/session_transport.go | 77 +++++ .../pkg/types/session_transport_test.go | 65 ++++ control-plane/pkg/types/types.go | 5 + sdk/go/agent/agent.go | 13 +- sdk/go/agent/agent_lifecycle.go | 9 +- sdk/go/agent/session.go | 78 +++++ sdk/go/agent/session_test.go | 43 +++ sdk/go/agent/session_transport.go | 77 +++++ sdk/go/agent/session_transport_test.go | 65 ++++ sdk/python/agentfield/__init__.py | 15 + sdk/python/agentfield/agent.py | 56 ++++ sdk/python/agentfield/execution_context.py | 4 +- sdk/python/agentfield/session_transport.py | 74 +++++ sdk/python/agentfield/sessions.py | 100 ++++++ sdk/python/tests/test_agent_session.py | 40 +++ .../tests/test_execution_context_core.py | 2 +- sdk/python/tests/test_session_transport.py | 33 ++ sdk/python/uv.lock | 2 +- sdk/typescript/src/agent/Agent.ts | 26 +- sdk/typescript/src/index.ts | 2 + sdk/typescript/src/session.ts | 59 ++++ sdk/typescript/src/sessionTransport.ts | 65 ++++ sdk/typescript/tests/agent.test.ts | 25 ++ .../tests/agent_runtime_paths.test.ts | 1 + .../tests/session_transport.test.ts | 31 ++ 31 files changed, 1712 insertions(+), 15 deletions(-) create mode 100644 control-plane/internal/cli/session.go create mode 100644 control-plane/internal/cli/session_test.go create mode 100644 control-plane/internal/handlers/sessions.go create mode 100644 control-plane/internal/handlers/sessions_test.go create mode 100644 control-plane/pkg/types/session_transport.go create mode 100644 control-plane/pkg/types/session_transport_test.go create mode 100644 sdk/go/agent/session.go create mode 100644 sdk/go/agent/session_test.go create mode 100644 sdk/go/agent/session_transport.go create mode 100644 sdk/go/agent/session_transport_test.go create mode 100644 sdk/python/agentfield/session_transport.py create mode 100644 sdk/python/agentfield/sessions.py create mode 100644 sdk/python/tests/test_agent_session.py create mode 100644 sdk/python/tests/test_session_transport.py create mode 100644 sdk/typescript/src/session.ts create mode 100644 sdk/typescript/src/sessionTransport.ts create mode 100644 sdk/typescript/tests/session_transport.test.ts diff --git a/control-plane/internal/cli/root.go b/control-plane/internal/cli/root.go index 5a3eb70be..f98cc2530 100644 --- a/control-plane/internal/cli/root.go +++ b/control-plane/internal/cli/root.go @@ -120,6 +120,7 @@ AI Agent? Run "af agent help" for structured JSON output optimized for programma RootCmd.AddCommand(NewVerifyAliasCommand()) RootCmd.AddCommand(NewNodesCommand()) RootCmd.AddCommand(NewExecutionCommand()) + RootCmd.AddCommand(NewSessionCommand()) RootCmd.AddCommand(NewCallCommand()) RootCmd.AddCommand(NewReasonerListCommand()) RootCmd.AddCommand(NewTailCommand()) diff --git a/control-plane/internal/cli/session.go b/control-plane/internal/cli/session.go new file mode 100644 index 000000000..c6e9b8eda --- /dev/null +++ b/control-plane/internal/cli/session.go @@ -0,0 +1,302 @@ +package cli + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "strings" + + "github.com/spf13/cobra" +) + +type sessionStartOptions struct { + provider string + transport string + model string + voice string + outputFormat string + stdout io.Writer +} + +type sessionToolOptions struct { + target string + inputSource string + outputFormat string + stdin io.Reader + stdout io.Writer +} + +type sessionOfferOptions struct { + provider string + transport string + sdpSource string + outputFormat string + stdin io.Reader + stdout io.Writer +} + +func NewSessionCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "session", + Short: "Start and interact with AgentField realtime sessions", + } + cmd.AddCommand(newSessionStartCommand()) + cmd.AddCommand(newSessionOfferCommand()) + cmd.AddCommand(newSessionToolCommand()) + cmd.AddCommand(newSessionWorkflowsCommand()) + return cmd +} + +func newSessionStartCommand() *cobra.Command { + opts := &sessionStartOptions{} + cmd := &cobra.Command{ + Use: "start .", + Short: "Start a provider-backed AgentField session", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := commandContext() + defer cancel() + opts.stdout = os.Stdout + return runSessionStart(ctx, args[0], opts) + }, + } + cmd.Flags().StringVar(&opts.provider, "provider", "", "Explicit session provider, e.g. openai") + cmd.Flags().StringVar(&opts.transport, "transport", "", "Explicit session transport, e.g. webrtc") + cmd.Flags().StringVar(&opts.model, "model", "", "Provider model") + cmd.Flags().StringVar(&opts.voice, "voice", "", "Provider voice") + cmd.Flags().StringVarP(&opts.outputFormat, "output", "o", "json", "Output format: json, pretty, yaml") + return cmd +} + +func runSessionStart(ctx context.Context, target string, opts *sessionStartOptions) error { + if opts.stdout == nil { + opts.stdout = os.Stdout + } + payload := map[string]interface{}{ + "provider": opts.provider, + "transport": opts.transport, + "model": opts.model, + "voice": opts.voice, + } + resp, err := makeRequest(ctx, http.MethodPost, "/api/v1/sessions/"+target+"/start", payload, "application/json") + if err != nil { + return cliExitError{Code: 3, Err: err} + } + var decoded map[string]interface{} + body, err := readJSONResponse(resp, &decoded) + if err != nil { + return cliExitError{Code: 3, Err: err} + } + if resp.StatusCode >= http.StatusBadRequest { + return cliExitError{Code: httpExitCode(resp.StatusCode), Err: fmt.Errorf("session start failed with status %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))} + } + return writeValue(opts.stdout, decoded, autoOutputFormat(opts.outputFormat, false)) +} + +func newSessionOfferCommand() *cobra.Command { + opts := &sessionOfferOptions{} + cmd := &cobra.Command{ + Use: "offer ", + Short: "Create a realtime WebRTC offer through the control plane", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := commandContext() + defer cancel() + opts.stdin = os.Stdin + opts.stdout = os.Stdout + return runSessionOffer(ctx, args[0], opts) + }, + } + cmd.Flags().StringVar(&opts.provider, "provider", "", "Explicit session provider") + cmd.Flags().StringVar(&opts.transport, "transport", "", "Explicit session transport") + cmd.Flags().StringVar(&opts.sdpSource, "sdp", "", "SDP offer as inline text, @path, or - for stdin; defaults to stdin") + cmd.Flags().StringVarP(&opts.outputFormat, "output", "o", "raw", "Output format: raw, json, pretty, yaml") + return cmd +} + +func runSessionOffer(ctx context.Context, sessionID string, opts *sessionOfferOptions) error { + if opts.stdout == nil { + opts.stdout = os.Stdout + } + sdp, err := readSessionSDP(opts.sdpSource, opts.stdin) + if err != nil { + return cliExitError{Code: 2, Err: err} + } + values := url.Values{} + if strings.TrimSpace(opts.provider) != "" { + values.Set("provider", opts.provider) + } + if strings.TrimSpace(opts.transport) != "" { + values.Set("transport", opts.transport) + } + path := "/api/v1/sessions/" + url.PathEscape(sessionID) + "/realtime-offer" + if encoded := values.Encode(); encoded != "" { + path += "?" + encoded + } + resp, err := makeRawRequest(ctx, http.MethodPost, path, strings.NewReader(sdp), "application/sdp", "application/sdp") + if err != nil { + return cliExitError{Code: 3, Err: err} + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return cliExitError{Code: 3, Err: fmt.Errorf("read response: %w", err)} + } + if resp.StatusCode >= http.StatusBadRequest { + return cliExitError{Code: httpExitCode(resp.StatusCode), Err: fmt.Errorf("session offer failed with status %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))} + } + + format := strings.ToLower(strings.TrimSpace(opts.outputFormat)) + if format == "" || format == "raw" { + _, err := opts.stdout.Write(body) + return err + } + return writeValue(opts.stdout, map[string]interface{}{"answer_sdp": string(body)}, autoOutputFormat(format, false)) +} + +func readSessionSDP(source string, stdin io.Reader) (string, error) { + sourceToken := strings.TrimSpace(source) + switch { + case sourceToken == "" || sourceToken == "-": + if stdin == nil { + return "", fmt.Errorf("SDP offer required; pass --sdp, --sdp @path, or pipe SDP on stdin") + } + data, err := io.ReadAll(stdin) + if err != nil { + return "", fmt.Errorf("read SDP from stdin: %w", err) + } + if strings.TrimSpace(string(data)) == "" { + return "", fmt.Errorf("SDP offer required; pass --sdp, --sdp @path, or pipe SDP on stdin") + } + return string(data), nil + case strings.HasPrefix(sourceToken, "@"): + path := strings.TrimSpace(strings.TrimPrefix(sourceToken, "@")) + if path == "" { + return "", fmt.Errorf("SDP file path is required after @") + } + data, err := os.ReadFile(path) + if err != nil { + return "", fmt.Errorf("read SDP file %s: %w", path, err) + } + if strings.TrimSpace(string(data)) == "" { + return "", fmt.Errorf("SDP file %s is empty", path) + } + return string(data), nil + default: + if strings.TrimSpace(source) == "" { + return "", fmt.Errorf("SDP offer required; pass --sdp, --sdp @path, or pipe SDP on stdin") + } + return source, nil + } +} + +func makeRawRequest(ctx context.Context, method, path string, body io.Reader, contentType string, accept string) (*http.Response, error) { + server := strings.TrimRight(GetServerURL(), "/") + if !strings.HasPrefix(path, "/") { + path = "/" + path + } + req, err := http.NewRequestWithContext(ctx, method, server+path, body) + if err != nil { + return nil, fmt.Errorf("build request: %w", err) + } + if accept == "" { + accept = "application/json" + } + req.Header.Set("Accept", accept) + req.Header.Set("User-Agent", "af-cli/session") + if strings.TrimSpace(contentType) != "" { + req.Header.Set("Content-Type", contentType) + } + if key := strings.TrimSpace(GetAPIKey()); key != "" { + req.Header.Set("X-API-Key", key) + } + client := triggerHTTPClient(accept) + return client.Do(req) +} + +func newSessionToolCommand() *cobra.Command { + opts := &sessionToolOptions{} + cmd := &cobra.Command{ + Use: "tool ", + Short: "Invoke a session tool through AgentField execute/async", + Args: cobra.ExactArgs(2), + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := commandContext() + defer cancel() + opts.stdin = os.Stdin + opts.stdout = os.Stdout + return runSessionTool(ctx, args[0], args[1], opts) + }, + } + cmd.Flags().StringVar(&opts.target, "target", "", "Explicit . target") + cmd.Flags().StringVar(&opts.inputSource, "in", "", "Input payload as inline JSON or @path") + cmd.Flags().StringVarP(&opts.outputFormat, "output", "o", "json", "Output format: json, pretty, yaml") + return cmd +} + +func runSessionTool(ctx context.Context, sessionID string, tool string, opts *sessionToolOptions) error { + input := map[string]interface{}{} + if strings.TrimSpace(opts.inputSource) != "" { + parsed, err := parseInputSource(opts.inputSource) + if err != nil { + return cliExitError{Code: 2, Err: err} + } + input = parsed + } else if opts.stdin != nil { + data, _ := io.ReadAll(opts.stdin) + if len(strings.TrimSpace(string(data))) > 0 { + if err := json.Unmarshal(data, &input); err != nil { + return cliExitError{Code: 2, Err: fmt.Errorf("parse stdin JSON: %w", err)} + } + } + } + payload := map[string]interface{}{"target": opts.target, "input": input} + resp, err := makeRequest(ctx, http.MethodPost, "/api/v1/sessions/"+sessionID+"/tools/"+tool, payload, "application/json") + if err != nil { + return cliExitError{Code: 3, Err: err} + } + var decoded map[string]interface{} + body, err := readJSONResponse(resp, &decoded) + if err != nil { + return cliExitError{Code: 3, Err: err} + } + if resp.StatusCode >= http.StatusBadRequest { + return cliExitError{Code: httpExitCode(resp.StatusCode), Err: fmt.Errorf("session tool failed with status %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))} + } + return writeValue(opts.stdout, decoded, autoOutputFormat(opts.outputFormat, false)) +} + +func newSessionWorkflowsCommand() *cobra.Command { + var output string + cmd := &cobra.Command{ + Use: "workflows ", + Short: "List workflows associated with a session", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := commandContext() + defer cancel() + resp, err := makeRequest(ctx, http.MethodPost, "/api/v1/agentic/query", map[string]interface{}{ + "resource": "workflows", + "filters": map[string]interface{}{"session_id": args[0]}, + }, "application/json") + if err != nil { + return cliExitError{Code: 3, Err: err} + } + var decoded map[string]interface{} + body, err := readJSONResponse(resp, &decoded) + if err != nil { + return cliExitError{Code: 3, Err: err} + } + if resp.StatusCode >= http.StatusBadRequest { + return cliExitError{Code: httpExitCode(resp.StatusCode), Err: fmt.Errorf("session workflows failed with status %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))} + } + return writeValue(os.Stdout, decoded, autoOutputFormat(output, false)) + }, + } + cmd.Flags().StringVarP(&output, "output", "o", "json", "Output format: json, pretty, yaml") + return cmd +} diff --git a/control-plane/internal/cli/session_test.go b/control-plane/internal/cli/session_test.go new file mode 100644 index 000000000..d7f79d742 --- /dev/null +++ b/control-plane/internal/cli/session_test.go @@ -0,0 +1,73 @@ +package cli + +import ( + "bytes" + "context" + "io" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRunSessionOfferPostsSDPAndWritesRawAnswer(t *testing.T) { + var gotBody string + var gotContentType string + withTriggerTestServer(t, func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/api/v1/sessions/sess-1/realtime-offer", r.URL.Path) + require.Equal(t, "openai", r.URL.Query().Get("provider")) + require.Equal(t, "webrtc", r.URL.Query().Get("transport")) + gotContentType = r.Header.Get("Content-Type") + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + gotBody = string(body) + w.Header().Set("Content-Type", "application/sdp") + _, _ = w.Write([]byte("v=0\r\nanswer\r\n")) + }) + + var stdout bytes.Buffer + err := runSessionOffer(context.Background(), "sess-1", &sessionOfferOptions{ + provider: "openai", + transport: "webrtc", + sdpSource: "v=0\r\noffer\r\n", + outputFormat: "raw", + stdout: &stdout, + }) + require.NoError(t, err) + require.Equal(t, "application/sdp", gotContentType) + require.Equal(t, "v=0\r\noffer\r\n", gotBody) + require.Equal(t, "v=0\r\nanswer\r\n", stdout.String()) +} + +func TestRunSessionOfferReadsSDPFromStdinAndCanWrapJSON(t *testing.T) { + withTriggerTestServer(t, func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + require.Equal(t, "v=0\nstdin-offer\n", string(body)) + w.Header().Set("Content-Type", "application/sdp") + _, _ = w.Write([]byte("v=0\nstdin-answer\n")) + }) + + var stdout bytes.Buffer + err := runSessionOffer(context.Background(), "sess-stdin", &sessionOfferOptions{ + provider: "openai", + transport: "webrtc", + outputFormat: "json", + stdin: strings.NewReader("v=0\nstdin-offer\n"), + stdout: &stdout, + }) + require.NoError(t, err) + require.JSONEq(t, `{"answer_sdp":"v=0\nstdin-answer\n"}`, stdout.String()) +} + +func TestRunSessionOfferRequiresSDP(t *testing.T) { + var stdout bytes.Buffer + err := runSessionOffer(context.Background(), "sess-empty", &sessionOfferOptions{ + provider: "openai", + transport: "webrtc", + stdin: strings.NewReader(" "), + stdout: &stdout, + }) + require.ErrorContains(t, err, "SDP offer required") +} diff --git a/control-plane/internal/handlers/sessions.go b/control-plane/internal/handlers/sessions.go new file mode 100644 index 000000000..3efb0b764 --- /dev/null +++ b/control-plane/internal/handlers/sessions.go @@ -0,0 +1,311 @@ +package handlers + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "net/url" + "os" + "strings" + "time" + + "github.com/Agent-Field/agentfield/control-plane/internal/storage" + "github.com/Agent-Field/agentfield/control-plane/pkg/types" + "github.com/gin-gonic/gin" +) + +type StartSessionRequest struct { + Provider string `json:"provider"` + Transport string `json:"transport"` + Model string `json:"model,omitempty"` + Voice string `json:"voice,omitempty"` + Metadata map[string]interface{} `json:"metadata,omitempty"` +} + +type ToolSessionRequest struct { + Target string `json:"target,omitempty"` + Input map[string]interface{} `json:"input,omitempty"` +} + +type sessionDefinition struct { + Name string `json:"name"` + Provider string `json:"provider"` + Transport string `json:"transport"` + Model string `json:"model,omitempty"` + Modalities []string `json:"modalities,omitempty"` + Voice string `json:"voice,omitempty"` + Tools []string `json:"tools,omitempty"` + Metadata map[string]interface{} `json:"metadata,omitempty"` +} + +func StartSessionHandler(store storage.StorageProvider) gin.HandlerFunc { + return func(c *gin.Context) { + nodeID, sessionName, ok := splitSessionTarget(c.Param("target")) + if !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "session target must be ."}) + return + } + + definition, found := lookupSessionDefinition(c, store, nodeID, sessionName) + if !found { + return + } + + var req StartSessionRequest + if c.Request.Body != nil { + _ = c.ShouldBindJSON(&req) + } + + provider := firstNonEmptySession(req.Provider, definition.Provider) + transport := firstNonEmptySession(req.Transport, definition.Transport) + capability, err := types.ValidateSessionTransport(provider, transport) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "error": err.Error(), + "provider": provider, + "transport": transport, + }) + return + } + if capability.Provider != definition.Provider || capability.Transport != definition.Transport { + c.JSON(http.StatusBadRequest, gin.H{ + "error": fmt.Sprintf( + "session %s.%s is registered for provider=%s transport=%s; requested provider=%s transport=%s", + nodeID, + sessionName, + definition.Provider, + definition.Transport, + capability.Provider, + capability.Transport, + ), + }) + return + } + + sessionID := "sess_" + time.Now().UTC().Format("20060102_150405") + "_" + shortRandom() + model := firstNonEmptySession(req.Model, definition.Model) + voice := firstNonEmptySession(req.Voice, definition.Voice) + c.JSON(http.StatusCreated, gin.H{ + "session_id": sessionID, + "target": nodeID + "." + sessionName, + "provider": capability.Provider, + "transport": capability.Transport, + "model": model, + "voice": voice, + "modalities": definition.Modalities, + "tool_targets": sessionToolTargets(nodeID, definition.Tools), + "offer_url": fmt.Sprintf("/api/v1/sessions/%s/realtime-offer", url.PathEscape(sessionID)), + "tool_url": fmt.Sprintf("/api/v1/sessions/%s/tools/{tool}", url.PathEscape(sessionID)), + "created_at": time.Now().UTC().Format(time.RFC3339Nano), + }) + } +} + +func SessionRealtimeOfferHandler(store storage.StorageProvider) gin.HandlerFunc { + return func(c *gin.Context) { + _ = store + provider := strings.TrimSpace(c.Query("provider")) + transport := strings.TrimSpace(c.Query("transport")) + if provider == "" || transport == "" { + var req StartSessionRequest + _ = c.ShouldBindJSON(&req) + provider = req.Provider + transport = req.Transport + } + if _, err := types.ValidateSessionTransport(provider, transport); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + if types.NormalizeSessionTransportValue(transport) != "webrtc" { + c.JSON(http.StatusBadRequest, gin.H{"error": "realtime-offer requires transport=webrtc"}) + return + } + if types.NormalizeSessionTransportValue(provider) != "openai" { + c.JSON(http.StatusBadRequest, gin.H{"error": "webrtc realtime offers currently require provider=openai"}) + return + } + if strings.TrimSpace(os.Getenv("OPENAI_API_KEY")) == "" { + c.JSON(http.StatusBadGateway, gin.H{"error": "OPENAI_API_KEY is required for provider=openai transport=webrtc"}) + return + } + sdp, err := io.ReadAll(c.Request.Body) + if err != nil || strings.TrimSpace(string(sdp)) == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "SDP offer body is required"}) + return + } + answer, err := createOpenAIRealtimeCall( + c.Request.Context(), + c.Param("session_id"), + string(sdp), + firstNonEmptySession(c.Query("model"), "gpt-realtime-2"), + firstNonEmptySession(c.Query("voice"), "marin"), + ) + if err != nil { + c.JSON(http.StatusBadGateway, gin.H{ + "error": err.Error(), + "boundary": "control-plane -> realtime provider", + "provider": "openai", + }) + return + } + c.Data(http.StatusOK, "application/sdp", []byte(answer)) + } +} + +func SessionToolHandler(store storage.StorageProvider, timeout time.Duration, internalToken string) gin.HandlerFunc { + return func(c *gin.Context) { + _ = store + sessionID := strings.TrimSpace(c.Param("session_id")) + toolName := strings.TrimSpace(c.Param("tool")) + var req ToolSessionRequest + if err := c.ShouldBindJSON(&req); err != nil && err != io.EOF { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid JSON body"}) + return + } + target := firstNonEmptySession(req.Target, toolName) + if !strings.Contains(target, ".") { + c.JSON(http.StatusBadRequest, gin.H{"error": "session tool target must be ."}) + return + } + + body, _ := json.Marshal(gin.H{"input": req.Input}) + scheme := "http" + if c.Request.TLS != nil { + scheme = "https" + } + executeURL := fmt.Sprintf("%s://%s/api/v1/execute/async/%s", scheme, c.Request.Host, url.PathEscape(target)) + forwardReq, err := http.NewRequestWithContext(c.Request.Context(), http.MethodPost, executeURL, strings.NewReader(string(body))) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + forwardReq.Header.Set("Content-Type", "application/json") + forwardReq.Header.Set("X-Session-ID", sessionID) + if internalToken != "" { + forwardReq.Header.Set("Authorization", "Bearer "+internalToken) + } + + client := &http.Client{Timeout: timeout} + resp, err := client.Do(forwardReq) + if err != nil { + c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) + return + } + defer resp.Body.Close() + respBody, _ := io.ReadAll(resp.Body) + c.Data(resp.StatusCode, resp.Header.Get("Content-Type"), respBody) + } +} + +func lookupSessionDefinition(c *gin.Context, store storage.StorageProvider, nodeID string, sessionName string) (sessionDefinition, bool) { + agent, err := store.GetAgent(c.Request.Context(), nodeID) + if err != nil || agent == nil { + c.JSON(http.StatusNotFound, gin.H{"error": "agent not found"}) + return sessionDefinition{}, false + } + if agent.Metadata.Custom == nil { + c.JSON(http.StatusNotFound, gin.H{"error": "agent has no registered sessions"}) + return sessionDefinition{}, false + } + rawSessions, ok := agent.Metadata.Custom["sessions"].([]interface{}) + if !ok { + c.JSON(http.StatusNotFound, gin.H{"error": "agent has no registered sessions"}) + return sessionDefinition{}, false + } + for _, raw := range rawSessions { + bytes, _ := json.Marshal(raw) + var definition sessionDefinition + if json.Unmarshal(bytes, &definition) == nil && definition.Name == sessionName { + return definition, true + } + } + c.JSON(http.StatusNotFound, gin.H{"error": "session not registered"}) + return sessionDefinition{}, false +} + +func splitSessionTarget(target string) (string, string, bool) { + target = strings.TrimSpace(target) + parts := strings.SplitN(target, ".", 2) + if len(parts) != 2 || strings.TrimSpace(parts[0]) == "" || strings.TrimSpace(parts[1]) == "" { + return "", "", false + } + return strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]), true +} + +func firstNonEmptySession(values ...string) string { + for _, value := range values { + if strings.TrimSpace(value) != "" { + return strings.TrimSpace(value) + } + } + return "" +} + +func sessionToolTargets(nodeID string, tools []string) map[string]string { + targets := map[string]string{} + for _, tool := range tools { + tool = strings.TrimSpace(tool) + if tool == "" { + continue + } + if strings.Contains(tool, ".") { + parts := strings.SplitN(tool, ".", 2) + targets[parts[1]] = tool + } else { + targets[tool] = nodeID + "." + tool + } + } + return targets +} + +func createOpenAIRealtimeCall(ctx context.Context, sessionID string, sdp string, model string, voice string) (string, error) { + var body bytes.Buffer + writer := multipart.NewWriter(&body) + if err := writer.WriteField("sdp", sdp); err != nil { + return "", err + } + sessionConfig := map[string]interface{}{ + "type": "realtime", + "model": model, + "instructions": "You are a realtime voice front end for an AgentField session. Use registered tools to route agent work through the AgentField control plane.", + "audio": map[string]interface{}{"output": map[string]interface{}{"voice": voice}}, + "tool_choice": "auto", + } + sessionBytes, _ := json.Marshal(sessionConfig) + if err := writer.WriteField("session", string(sessionBytes)); err != nil { + return "", err + } + if err := writer.Close(); err != nil { + return "", err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://api.openai.com/v1/realtime/calls", &body) + if err != nil { + return "", err + } + req.Header.Set("Authorization", "Bearer "+strings.TrimSpace(os.Getenv("OPENAI_API_KEY"))) + req.Header.Set("Content-Type", writer.FormDataContentType()) + hash := sha256.Sum256([]byte(sessionID)) + req.Header.Set("OpenAI-Safety-Identifier", hex.EncodeToString(hash[:])[:32]) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + answer, _ := io.ReadAll(resp.Body) + if resp.StatusCode >= http.StatusBadRequest { + return "", fmt.Errorf("OpenAI realtime call failed (%d): %s", resp.StatusCode, strings.TrimSpace(string(answer))) + } + return string(answer), nil +} + +func shortRandom() string { + return fmt.Sprintf("%08x", time.Now().UnixNano()&0xffffffff) +} diff --git a/control-plane/internal/handlers/sessions_test.go b/control-plane/internal/handlers/sessions_test.go new file mode 100644 index 000000000..d571e8d9e --- /dev/null +++ b/control-plane/internal/handlers/sessions_test.go @@ -0,0 +1,66 @@ +package handlers + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/Agent-Field/agentfield/control-plane/pkg/types" + "github.com/gin-gonic/gin" + "github.com/stretchr/testify/require" +) + +func TestStartSessionHandlerCreatesSessionForRegisteredDefinition(t *testing.T) { + gin.SetMode(gin.TestMode) + store := &nodeRESTStorageStub{agent: sessionTestAgent()} + router := gin.New() + router.POST("/api/v1/sessions/:target/start", StartSessionHandler(store)) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/sessions/support.voice/start", bytes.NewBufferString(`{"provider":"openai","transport":"webrtc"}`)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + require.Equal(t, http.StatusCreated, rec.Code) + var body map[string]interface{} + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &body)) + require.Equal(t, "support.voice", body["target"]) + require.Equal(t, "openai", body["provider"]) + require.Equal(t, "webrtc", body["transport"]) + require.Equal(t, map[string]interface{}{"resolve_voice_turn": "support.resolve_voice_turn"}, body["tool_targets"]) +} + +func TestStartSessionHandlerRejectsUnsupportedTransport(t *testing.T) { + gin.SetMode(gin.TestMode) + store := &nodeRESTStorageStub{agent: sessionTestAgent()} + router := gin.New() + router.POST("/api/v1/sessions/:target/start", StartSessionHandler(store)) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/sessions/support.voice/start", bytes.NewBufferString(`{"provider":"openrouter","transport":"webrtc"}`)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + require.Equal(t, http.StatusBadRequest, rec.Code) + require.Contains(t, rec.Body.String(), "does not infer or switch providers") +} + +func sessionTestAgent() *types.AgentNode { + return &types.AgentNode{ + ID: "support", + Metadata: types.AgentMetadata{Custom: map[string]interface{}{ + "sessions": []interface{}{ + map[string]interface{}{ + "name": "voice", + "provider": "openai", + "transport": "webrtc", + "model": "gpt-realtime-2", + "modalities": []interface{}{"audio", "text"}, + "tools": []interface{}{"support.resolve_voice_turn"}, + }, + }, + }}, + } +} diff --git a/control-plane/internal/server/routes_core.go b/control-plane/internal/server/routes_core.go index 3db40f11c..6dd350aa3 100644 --- a/control-plane/internal/server/routes_core.go +++ b/control-plane/internal/server/routes_core.go @@ -146,6 +146,13 @@ func (s *AgentFieldServer) registerCoreRoutes(agentAPI *gin.RouterGroup) { agentAPI.GET("/executions/:execution_id/notes", handlers.GetExecutionNotesHandler(s.storage, s.noteOwnershipEnforced())) agentAPI.POST("/workflow/executions/events", handlers.WorkflowExecutionEventHandler(s.storage)) + sessionGroup := agentAPI.Group("/sessions") + { + sessionGroup.POST("/:target/start", handlers.StartSessionHandler(s.storage)) + sessionGroup.POST("/:session_id/realtime-offer", handlers.SessionRealtimeOfferHandler(s.storage)) + sessionGroup.POST("/:session_id/tools/:tool", handlers.SessionToolHandler(s.storage, s.config.AgentField.ExecutionQueue.AgentCallTimeout, s.config.Features.DID.Authorization.InternalToken)) + } + // Workflow endpoints will be reintroduced once the simplified execution pipeline lands. } diff --git a/control-plane/pkg/types/session_transport.go b/control-plane/pkg/types/session_transport.go new file mode 100644 index 000000000..476933210 --- /dev/null +++ b/control-plane/pkg/types/session_transport.go @@ -0,0 +1,77 @@ +package types + +import ( + "fmt" + "sort" + "strings" +) + +var SupportedSessionTransports = map[string][]string{ + "openai": {"webrtc", "websocket"}, + "openrouter": {"audio_turns"}, +} + +type SessionTransportCapability struct { + Provider string `json:"provider"` + Transport string `json:"transport"` +} + +type SessionTransportError struct { + Provider string + Transport string + Supported []string +} + +func (e *SessionTransportError) Error() string { + return fmt.Sprintf( + "unsupported session transport %q for provider %q. Supported transports: %s. AgentField does not infer or switch providers; set provider and transport explicitly.", + e.Transport, + e.Provider, + strings.Join(e.Supported, ", "), + ) +} + +func NormalizeSessionTransportValue(value string) string { + return strings.ReplaceAll(strings.ToLower(strings.TrimSpace(value)), "-", "_") +} + +func ValidateSessionTransport(provider string, transport string) (SessionTransportCapability, error) { + normalizedProvider := NormalizeSessionTransportValue(provider) + normalizedTransport := NormalizeSessionTransportValue(transport) + + if normalizedProvider == "" { + return SessionTransportCapability{}, fmt.Errorf("session provider is required; AgentField does not infer providers") + } + if normalizedTransport == "" { + return SessionTransportCapability{}, fmt.Errorf("session transport is required; AgentField does not infer transports") + } + + supported, ok := SupportedSessionTransports[normalizedProvider] + if !ok { + known := make([]string, 0, len(SupportedSessionTransports)) + for knownProvider := range SupportedSessionTransports { + known = append(known, knownProvider) + } + sort.Strings(known) + return SessionTransportCapability{}, fmt.Errorf( + "unknown session provider %q. Known providers: %s. Register provider capabilities before using a custom session provider", + provider, + strings.Join(known, ", "), + ) + } + + for _, candidate := range supported { + if candidate == normalizedTransport { + return SessionTransportCapability{ + Provider: normalizedProvider, + Transport: normalizedTransport, + }, nil + } + } + + return SessionTransportCapability{}, &SessionTransportError{ + Provider: normalizedProvider, + Transport: normalizedTransport, + Supported: append([]string(nil), supported...), + } +} diff --git a/control-plane/pkg/types/session_transport_test.go b/control-plane/pkg/types/session_transport_test.go new file mode 100644 index 000000000..8dd980b52 --- /dev/null +++ b/control-plane/pkg/types/session_transport_test.go @@ -0,0 +1,65 @@ +package types + +import ( + "errors" + "strings" + "testing" +) + +func TestValidateSessionTransportAcceptsExplicitSupportedPairs(t *testing.T) { + capability, err := ValidateSessionTransport("OpenAI", "WebRTC") + if err != nil { + t.Fatalf("ValidateSessionTransport returned error: %v", err) + } + + if capability.Provider != "openai" { + t.Fatalf("provider = %q, want openai", capability.Provider) + } + if capability.Transport != "webrtc" { + t.Fatalf("transport = %q, want webrtc", capability.Transport) + } +} + +func TestValidateSessionTransportRejectsProviderTransportMismatch(t *testing.T) { + _, err := ValidateSessionTransport("openrouter", "webrtc") + if err == nil { + t.Fatal("expected error") + } + + var transportErr *SessionTransportError + if !errors.As(err, &transportErr) { + t.Fatalf("error type = %T, want *SessionTransportError", err) + } + if transportErr.Provider != "openrouter" { + t.Fatalf("provider = %q, want openrouter", transportErr.Provider) + } + if transportErr.Transport != "webrtc" { + t.Fatalf("transport = %q, want webrtc", transportErr.Transport) + } + if got := err.Error(); got == "" || !containsAll(got, []string{"audio_turns", "does not infer or switch providers"}) { + t.Fatalf("unexpected error message: %q", got) + } +} + +func TestValidateSessionTransportRequiresExplicitTransport(t *testing.T) { + _, err := ValidateSessionTransport("openai", "") + if err == nil || !containsAll(err.Error(), []string{"transport is required"}) { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestValidateSessionTransportRejectsUnknownProvider(t *testing.T) { + _, err := ValidateSessionTransport("custom", "webrtc") + if err == nil || !containsAll(err.Error(), []string{"unknown session provider", "custom"}) { + t.Fatalf("unexpected error: %v", err) + } +} + +func containsAll(value string, needles []string) bool { + for _, needle := range needles { + if !strings.Contains(value, needle) { + return false + } + } + return true +} diff --git a/control-plane/pkg/types/types.go b/control-plane/pkg/types/types.go index 20d2b685f..7ddd75d40 100644 --- a/control-plane/pkg/types/types.go +++ b/control-plane/pkg/types/types.go @@ -877,6 +877,11 @@ type Session struct { SessionName *string `json:"session_name,omitempty" db:"session_name"` // DAG Relationship Fields + // TODO(session-linking): Define explicit session edge semantics before exposing + // session-to-session APIs. Parent/root IDs should support lifecycle edges such + // as transfer, bridge, companion, and escalation, while agent work inside each + // session continues to use app.call/session.call so reasoner executions remain + // normal workflow DAG nodes under the session. ParentSessionID *string `json:"parent_session_id,omitempty" db:"parent_session_id"` RootSessionID *string `json:"root_session_id,omitempty" db:"root_session_id"` diff --git a/sdk/go/agent/agent.go b/sdk/go/agent/agent.go index 566f97988..0f4238bde 100644 --- a/sdk/go/agent/agent.go +++ b/sdk/go/agent/agent.go @@ -12,8 +12,8 @@ import ( "net/http" "net/url" "os" - "strings" "runtime" + "strings" "sync" "time" @@ -153,10 +153,10 @@ type Reasoner struct { // Use it as a typed argument to WithTriggers — the control plane registers // the binding and minting of the public ingest URL happens server-side. type EventTrigger struct { - Source string - Types []string - SecretEnv string - Config map[string]any + Source string + Types []string + SecretEnv string + Config map[string]any } // ScheduleTrigger describes a cron-style schedule binding for a reasoner. @@ -468,6 +468,7 @@ type Agent struct { client *client.Client httpClient *http.Client reasoners map[string]*Reasoner + sessions map[string]SessionDefinition aiClient *ai.Client // AI/LLM client memory *Memory // Memory system for state management @@ -554,6 +555,7 @@ func New(cfg Config) (*Agent, error) { cfg: cfg, httpClient: httpClient, reasoners: make(map[string]*Reasoner), + sessions: make(map[string]SessionDefinition), aiClient: aiClient, memory: NewMemory(cfg.MemoryBackend), stopLease: make(chan struct{}), @@ -1013,6 +1015,7 @@ func (a *Agent) discoveryPayload() map[string]any { "deployment_type": deployment, "reasoners": reasoners, "skills": []map[string]any{}, + "sessions": a.SessionDefinitions(), } } diff --git a/sdk/go/agent/agent_lifecycle.go b/sdk/go/agent/agent_lifecycle.go index 143c9343c..dd8b2fcac 100644 --- a/sdk/go/agent/agent_lifecycle.go +++ b/sdk/go/agent/agent_lifecycle.go @@ -161,10 +161,13 @@ func (a *Agent) registerNode(ctx context.Context) error { "environment": "development", "platform": "go", }, - "sdk": map[string]any{ - "language": "go", + "custom": map[string]any{ + "sdk": map[string]any{ + "language": "go", + }, + "sessions": a.SessionDefinitions(), + "tags": a.cfg.Tags, }, - "tags": a.cfg.Tags, }, Features: map[string]any{}, DeploymentType: a.cfg.DeploymentType, diff --git a/sdk/go/agent/session.go b/sdk/go/agent/session.go new file mode 100644 index 000000000..42ce1116e --- /dev/null +++ b/sdk/go/agent/session.go @@ -0,0 +1,78 @@ +package agent + +type SessionDefinition struct { + Name string `json:"name"` + Provider string `json:"provider"` + Transport string `json:"transport"` + Model string `json:"model,omitempty"` + Modalities []string `json:"modalities"` + Voice string `json:"voice,omitempty"` + Tools []string `json:"tools"` + Metadata map[string]any `json:"metadata"` +} + +type SessionOption func(*SessionDefinition) + +func WithSessionModel(model string) SessionOption { + return func(s *SessionDefinition) { s.Model = model } +} + +func WithSessionModalities(modalities ...string) SessionOption { + return func(s *SessionDefinition) { s.Modalities = append([]string(nil), modalities...) } +} + +func WithSessionVoice(voice string) SessionOption { + return func(s *SessionDefinition) { s.Voice = voice } +} + +func WithSessionTools(tools ...string) SessionOption { + return func(s *SessionDefinition) { s.Tools = append([]string(nil), tools...) } +} + +func WithSessionMetadata(metadata map[string]any) SessionOption { + return func(s *SessionDefinition) { + s.Metadata = map[string]any{} + for key, value := range metadata { + s.Metadata[key] = value + } + } +} + +func (a *Agent) RegisterSession(name string, provider string, transport string, opts ...SessionOption) error { + capability, err := ValidateSessionTransport(provider, transport) + if err != nil { + return err + } + + definition := SessionDefinition{ + Name: name, + Provider: capability.Provider, + Transport: capability.Transport, + Modalities: []string{"audio", "text"}, + Tools: []string{}, + Metadata: map[string]any{}, + } + for _, opt := range opts { + opt(&definition) + } + if len(definition.Modalities) == 0 { + definition.Modalities = []string{"audio", "text"} + } + if definition.Tools == nil { + definition.Tools = []string{} + } + if definition.Metadata == nil { + definition.Metadata = map[string]any{} + } + + a.sessions[name] = definition + return nil +} + +func (a *Agent) SessionDefinitions() []SessionDefinition { + sessions := make([]SessionDefinition, 0, len(a.sessions)) + for _, session := range a.sessions { + sessions = append(sessions, session) + } + return sessions +} diff --git a/sdk/go/agent/session_test.go b/sdk/go/agent/session_test.go new file mode 100644 index 000000000..f0c9c3689 --- /dev/null +++ b/sdk/go/agent/session_test.go @@ -0,0 +1,43 @@ +package agent + +import "testing" + +func TestAgentRegisterSessionStoresExplicitDefinition(t *testing.T) { + a, err := New(Config{NodeID: "support", Version: "v1"}) + if err != nil { + t.Fatalf("New returned error: %v", err) + } + + if err := a.RegisterSession( + "voice", + "openai", + "webrtc", + WithSessionModel("gpt-realtime-2"), + WithSessionVoice("marin"), + WithSessionTools("support.resolve_voice_turn"), + ); err != nil { + t.Fatalf("RegisterSession returned error: %v", err) + } + + sessions := a.SessionDefinitions() + if len(sessions) != 1 { + t.Fatalf("len(sessions) = %d, want 1", len(sessions)) + } + if sessions[0].Provider != "openai" || sessions[0].Transport != "webrtc" { + t.Fatalf("session provider/transport = %s/%s", sessions[0].Provider, sessions[0].Transport) + } + if sessions[0].Tools[0] != "support.resolve_voice_turn" { + t.Fatalf("tool target = %q", sessions[0].Tools[0]) + } +} + +func TestAgentRegisterSessionRejectsInvalidTransport(t *testing.T) { + a, err := New(Config{NodeID: "support", Version: "v1"}) + if err != nil { + t.Fatalf("New returned error: %v", err) + } + + if err := a.RegisterSession("voice", "openrouter", "webrtc"); err == nil { + t.Fatal("expected invalid provider/transport error") + } +} diff --git a/sdk/go/agent/session_transport.go b/sdk/go/agent/session_transport.go new file mode 100644 index 000000000..d7f239354 --- /dev/null +++ b/sdk/go/agent/session_transport.go @@ -0,0 +1,77 @@ +package agent + +import ( + "fmt" + "sort" + "strings" +) + +var SupportedSessionTransports = map[string][]string{ + "openai": {"webrtc", "websocket"}, + "openrouter": {"audio_turns"}, +} + +type SessionTransportCapability struct { + Provider string + Transport string +} + +type SessionTransportError struct { + Provider string + Transport string + Supported []string +} + +func (e *SessionTransportError) Error() string { + return fmt.Sprintf( + "unsupported session transport %q for provider %q. Supported transports: %s. AgentField does not infer or switch providers; set provider and transport explicitly.", + e.Transport, + e.Provider, + strings.Join(e.Supported, ", "), + ) +} + +func NormalizeSessionTransportValue(value string) string { + return strings.ReplaceAll(strings.ToLower(strings.TrimSpace(value)), "-", "_") +} + +func ValidateSessionTransport(provider string, transport string) (SessionTransportCapability, error) { + normalizedProvider := NormalizeSessionTransportValue(provider) + normalizedTransport := NormalizeSessionTransportValue(transport) + + if normalizedProvider == "" { + return SessionTransportCapability{}, fmt.Errorf("session provider is required; AgentField does not infer providers") + } + if normalizedTransport == "" { + return SessionTransportCapability{}, fmt.Errorf("session transport is required; AgentField does not infer transports") + } + + supported, ok := SupportedSessionTransports[normalizedProvider] + if !ok { + known := make([]string, 0, len(SupportedSessionTransports)) + for knownProvider := range SupportedSessionTransports { + known = append(known, knownProvider) + } + sort.Strings(known) + return SessionTransportCapability{}, fmt.Errorf( + "unknown session provider %q. Known providers: %s. Register provider capabilities before using a custom session provider", + provider, + strings.Join(known, ", "), + ) + } + + for _, candidate := range supported { + if candidate == normalizedTransport { + return SessionTransportCapability{ + Provider: normalizedProvider, + Transport: normalizedTransport, + }, nil + } + } + + return SessionTransportCapability{}, &SessionTransportError{ + Provider: normalizedProvider, + Transport: normalizedTransport, + Supported: append([]string(nil), supported...), + } +} diff --git a/sdk/go/agent/session_transport_test.go b/sdk/go/agent/session_transport_test.go new file mode 100644 index 000000000..9e26090c2 --- /dev/null +++ b/sdk/go/agent/session_transport_test.go @@ -0,0 +1,65 @@ +package agent + +import ( + "errors" + "strings" + "testing" +) + +func TestValidateSessionTransportAcceptsExplicitSupportedPairs(t *testing.T) { + capability, err := ValidateSessionTransport("OpenAI", "WebRTC") + if err != nil { + t.Fatalf("ValidateSessionTransport returned error: %v", err) + } + + if capability.Provider != "openai" { + t.Fatalf("provider = %q, want openai", capability.Provider) + } + if capability.Transport != "webrtc" { + t.Fatalf("transport = %q, want webrtc", capability.Transport) + } +} + +func TestValidateSessionTransportRejectsProviderTransportMismatch(t *testing.T) { + _, err := ValidateSessionTransport("openrouter", "webrtc") + if err == nil { + t.Fatal("expected error") + } + + var transportErr *SessionTransportError + if !errors.As(err, &transportErr) { + t.Fatalf("error type = %T, want *SessionTransportError", err) + } + if transportErr.Provider != "openrouter" { + t.Fatalf("provider = %q, want openrouter", transportErr.Provider) + } + if transportErr.Transport != "webrtc" { + t.Fatalf("transport = %q, want webrtc", transportErr.Transport) + } + if got := err.Error(); got == "" || !containsAllSessionTransport(got, []string{"audio_turns", "does not infer or switch providers"}) { + t.Fatalf("unexpected error message: %q", got) + } +} + +func TestValidateSessionTransportRequiresExplicitTransport(t *testing.T) { + _, err := ValidateSessionTransport("openai", "") + if err == nil || !containsAllSessionTransport(err.Error(), []string{"transport is required"}) { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestValidateSessionTransportRejectsUnknownProvider(t *testing.T) { + _, err := ValidateSessionTransport("custom", "webrtc") + if err == nil || !containsAllSessionTransport(err.Error(), []string{"unknown session provider", "custom"}) { + t.Fatalf("unexpected error: %v", err) + } +} + +func containsAllSessionTransport(value string, needles []string) bool { + for _, needle := range needles { + if !strings.Contains(value, needle) { + return false + } + } + return true +} diff --git a/sdk/python/agentfield/__init__.py b/sdk/python/agentfield/__init__.py index 644d894d2..c2b2f6952 100644 --- a/sdk/python/agentfield/__init__.py +++ b/sdk/python/agentfield/__init__.py @@ -60,6 +60,13 @@ ) from .client import ApprovalRequestResponse, ApprovalResult, ApprovalStatusResponse from .triggers import EventTrigger, ScheduleTrigger, TriggerContext +from .session_transport import ( + SessionTransportCapability, + SessionTransportError, + SUPPORTED_SESSION_TRANSPORTS, + validate_session_transport, +) +from .sessions import RealtimeSession, SessionDefinition, SessionTurn from .decorators import on_event, on_schedule, reasoner from .tool_calling import ( ToolCallConfig, @@ -140,6 +147,14 @@ "EventTrigger", "ScheduleTrigger", "TriggerContext", + # Session transport validation + "SessionTransportCapability", + "SessionTransportError", + "SUPPORTED_SESSION_TRANSPORTS", + "validate_session_transport", + "RealtimeSession", + "SessionDefinition", + "SessionTurn", "on_event", "on_schedule", "reasoner", diff --git a/sdk/python/agentfield/agent.py b/sdk/python/agentfield/agent.py index a60a37802..a208d5cf0 100644 --- a/sdk/python/agentfield/agent.py +++ b/sdk/python/agentfield/agent.py @@ -60,6 +60,10 @@ from agentfield.async_config import AsyncConfig from agentfield.async_execution_manager import AsyncExecutionManager from agentfield.pydantic_utils import convert_function_args, should_convert_args +from agentfield.sessions import ( + RealtimeSession, + build_session_definition, +) from fastapi import FastAPI, Request, HTTPException from fastapi.encoders import jsonable_encoder from fastapi.responses import JSONResponse @@ -657,6 +661,7 @@ def __init__( # Using Dict[str, Entry] with __slots__ dataclasses for minimal footprint self._reasoner_registry: Dict[str, ReasonerEntry] = {} self._skill_registry: Dict[str, SkillEntry] = {} + self._session_registry: Dict[str, Dict[str, Any]] = {} # VC override tracking (still needed for _effective_component_vc_setting) self._reasoner_vc_overrides: Dict[str, bool] = {} @@ -1540,8 +1545,59 @@ def _build_agent_metadata(self) -> Optional[Dict[str, Any]]: metadata["tags"] = self.agent_tags if self.author: metadata["author"] = self.author + if self._session_registry: + metadata["sessions"] = [ + entry["definition"].to_dict() + for entry in self._session_registry.values() + ] return metadata if metadata else None + @property + def sessions(self) -> List[Dict[str, Any]]: + return [ + entry["definition"].to_dict() + for entry in self._session_registry.values() + ] + + def session( + self, + name: str, + *, + provider: str, + transport: str, + model: Optional[str] = None, + modalities: Optional[List[str]] = None, + voice: Optional[str] = None, + tools: Optional[List[str]] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> Callable[[Callable[[RealtimeSession], Awaitable[Any]]], Callable[[RealtimeSession], Awaitable[Any]]]: + """Register a realtime/voice session endpoint. + + Provider and transport are both explicit; AgentField does not infer or + switch them. Unsupported combinations fail at declaration time and again + at control-plane session start. + """ + + definition = build_session_definition( + name, + provider=provider, + transport=transport, + model=model, + modalities=modalities, + voice=voice, + tools=tools, + metadata=metadata, + ) + + def decorator( + func: Callable[[RealtimeSession], Awaitable[Any]] + ) -> Callable[[RealtimeSession], Awaitable[Any]]: + self._session_registry[name] = {"definition": definition, "handler": func} + setattr(func, "_agentfield_session", definition) + return func + + return decorator + def _build_vc_metadata(self) -> Dict[str, Any]: """Produce a serializable VC policy snapshot for control-plane visibility.""" effective_reasoners = { diff --git a/sdk/python/agentfield/execution_context.py b/sdk/python/agentfield/execution_context.py index 1e2a23da8..02556f80d 100644 --- a/sdk/python/agentfield/execution_context.py +++ b/sdk/python/agentfield/execution_context.py @@ -67,12 +67,10 @@ def to_headers(self) -> Dict[str, str]: The AgentField backend issues fresh execution IDs for child nodes. """ - parent_execution = self.parent_execution_id or self.execution_id - headers: Dict[str, str] = { _RUN_HEADER: self.run_id, "X-Workflow-ID": self.workflow_id or self.run_id, - _PARENT_EXECUTION_HEADER: parent_execution, + _PARENT_EXECUTION_HEADER: self.execution_id, _EXECUTION_HEADER: self.execution_id, "X-Workflow-Run-ID": self.run_id, } diff --git a/sdk/python/agentfield/session_transport.py b/sdk/python/agentfield/session_transport.py new file mode 100644 index 000000000..06307f4be --- /dev/null +++ b/sdk/python/agentfield/session_transport.py @@ -0,0 +1,74 @@ +"""Session provider/transport capability validation. + +The SDK deliberately does not infer a transport from provider, caller type, or +runtime environment. Session authors choose both knobs explicitly; this module +only validates that the combination is supported. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Dict, FrozenSet + + +SUPPORTED_SESSION_TRANSPORTS: Dict[str, FrozenSet[str]] = { + "openai": frozenset({"webrtc", "websocket"}), + "openrouter": frozenset({"audio_turns"}), +} + + +class SessionTransportError(ValueError): + """Raised when a session provider/transport pair is unsupported.""" + + def __init__(self, provider: str, transport: str, supported: FrozenSet[str]) -> None: + self.provider = provider + self.transport = transport + self.supported = supported + supported_display = ", ".join(sorted(supported)) or "none" + super().__init__( + f"Unsupported session transport '{transport}' for provider '{provider}'. " + f"Supported transports: {supported_display}. AgentField does not infer " + "or switch providers; set provider and transport explicitly." + ) + + +@dataclass(frozen=True) +class SessionTransportCapability: + provider: str + transport: str + + +def normalize_session_transport_value(value: str) -> str: + return value.strip().lower().replace("-", "_") + + +def validate_session_transport(provider: str, transport: str) -> SessionTransportCapability: + """Validate an explicit session provider/transport pair. + + This is intended to run in SDK registration paths and again in the control + plane session-start path. It does not infer defaults. + """ + + normalized_provider = normalize_session_transport_value(provider) + normalized_transport = normalize_session_transport_value(transport) + + if not normalized_provider: + raise ValueError("Session provider is required; AgentField does not infer providers.") + if not normalized_transport: + raise ValueError("Session transport is required; AgentField does not infer transports.") + + supported = SUPPORTED_SESSION_TRANSPORTS.get(normalized_provider) + if supported is None: + known = ", ".join(sorted(SUPPORTED_SESSION_TRANSPORTS)) + raise ValueError( + f"Unknown session provider '{provider}'. Known providers: {known}. " + "Register provider capabilities before using a custom session provider." + ) + + if normalized_transport not in supported: + raise SessionTransportError(normalized_provider, normalized_transport, supported) + + return SessionTransportCapability( + provider=normalized_provider, + transport=normalized_transport, + ) diff --git a/sdk/python/agentfield/sessions.py b/sdk/python/agentfield/sessions.py new file mode 100644 index 000000000..fd2a83a02 --- /dev/null +++ b/sdk/python/agentfield/sessions.py @@ -0,0 +1,100 @@ +"""Realtime/session DX primitives for AgentField agents.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Awaitable, Callable, Dict, List, Optional + +from .session_transport import validate_session_transport + + +SessionHandler = Callable[["RealtimeSession"], Awaitable[Any]] + + +@dataclass(frozen=True) +class SessionDefinition: + name: str + provider: str + transport: str + model: Optional[str] = None + modalities: List[str] = field(default_factory=lambda: ["audio", "text"]) + voice: Optional[str] = None + tools: List[str] = field(default_factory=list) + metadata: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + return { + "name": self.name, + "provider": self.provider, + "transport": self.transport, + "model": self.model, + "modalities": list(self.modalities), + "voice": self.voice, + "tools": list(self.tools), + "metadata": dict(self.metadata), + } + + +@dataclass(frozen=True) +class SessionTurn: + text: Optional[str] = None + transcript: Optional[str] = None + audio: Optional[Any] = None + audio_format: Optional[str] = None + channel: Optional[str] = None + metadata: Dict[str, Any] = field(default_factory=dict) + + +class RealtimeSession: + """Session handler context. + + The control plane owns media transport. This object is the app-facing + context passed to a session handler and keeps agent work on app.call. + """ + + def __init__(self, app: Any, session_id: str, definition: SessionDefinition): + self.app = app + self.session_id = session_id + self.definition = definition + self._outbox: List[Dict[str, Any]] = [] + + async def input(self) -> SessionTurn: + raise RuntimeError( + "session.input() is populated by the AgentField control plane transport adapter" + ) + + async def call(self, target: str, *args: Any, **kwargs: Any) -> Any: + return await self.app.call(target, *args, **kwargs) + + async def say(self, text: str, **metadata: Any) -> Dict[str, Any]: + event = {"type": "speech", "text": text, "metadata": metadata} + self._outbox.append(event) + return event + + @property + def outbox(self) -> List[Dict[str, Any]]: + return list(self._outbox) + + +def build_session_definition( + name: str, + *, + provider: str, + transport: str, + model: Optional[str] = None, + modalities: Optional[List[str]] = None, + voice: Optional[str] = None, + tools: Optional[List[str]] = None, + metadata: Optional[Dict[str, Any]] = None, +) -> SessionDefinition: + capability = validate_session_transport(provider, transport) + return SessionDefinition( + name=name, + provider=capability.provider, + transport=capability.transport, + model=model, + modalities=list(modalities or ["audio", "text"]), + voice=voice, + tools=list(tools or []), + metadata=dict(metadata or {}), + ) diff --git a/sdk/python/tests/test_agent_session.py b/sdk/python/tests/test_agent_session.py new file mode 100644 index 000000000..18dbb9eee --- /dev/null +++ b/sdk/python/tests/test_agent_session.py @@ -0,0 +1,40 @@ +import pytest + +from agentfield import Agent, SessionTransportError + + +def test_app_session_registers_voice_metadata(): + app = Agent("support", auto_register=False) + + @app.session( + "voice", + provider="openai", + model="gpt-realtime-2", + transport="webrtc", + modalities=["audio", "text"], + voice="marin", + tools=["launch_support_workflow"], + ) + async def voice(session): + return session + + assert app.sessions == [ + { + "name": "voice", + "provider": "openai", + "transport": "webrtc", + "model": "gpt-realtime-2", + "modalities": ["audio", "text"], + "voice": "marin", + "tools": ["launch_support_workflow"], + "metadata": {}, + } + ] + assert app._build_agent_metadata()["sessions"] == app.sessions + + +def test_app_session_rejects_unsupported_provider_transport_pair(): + app = Agent("support", auto_register=False) + + with pytest.raises(SessionTransportError): + app.session("voice", provider="openrouter", transport="webrtc") diff --git a/sdk/python/tests/test_execution_context_core.py b/sdk/python/tests/test_execution_context_core.py index f697de6eb..bc9c3f29b 100644 --- a/sdk/python/tests/test_execution_context_core.py +++ b/sdk/python/tests/test_execution_context_core.py @@ -28,7 +28,7 @@ def test_to_headers_includes_optional_fields(): assert headers["X-Workflow-ID"] == "wf-1" assert headers["X-Execution-ID"] == "exec-1" - assert headers["X-Parent-Execution-ID"] == "parent-1" + assert headers["X-Parent-Execution-ID"] == "exec-1" assert headers["X-Parent-Workflow-ID"] == "wf-parent" assert headers["X-Session-ID"] == "sess-1" assert headers["X-Caller-DID"] == "did:caller" diff --git a/sdk/python/tests/test_session_transport.py b/sdk/python/tests/test_session_transport.py new file mode 100644 index 000000000..107aa3863 --- /dev/null +++ b/sdk/python/tests/test_session_transport.py @@ -0,0 +1,33 @@ +import pytest + +from agentfield.session_transport import ( + SessionTransportError, + validate_session_transport, +) + + +def test_validate_session_transport_accepts_explicit_supported_pairs(): + capability = validate_session_transport("OpenAI", "WebRTC") + + assert capability.provider == "openai" + assert capability.transport == "webrtc" + + +def test_validate_session_transport_rejects_provider_transport_mismatch(): + with pytest.raises(SessionTransportError) as exc: + validate_session_transport("openrouter", "webrtc") + + assert exc.value.provider == "openrouter" + assert exc.value.transport == "webrtc" + assert "Supported transports: audio_turns" in str(exc.value) + assert "does not infer or switch providers" in str(exc.value) + + +def test_validate_session_transport_requires_explicit_transport(): + with pytest.raises(ValueError, match="transport is required"): + validate_session_transport("openai", "") + + +def test_validate_session_transport_rejects_unknown_provider(): + with pytest.raises(ValueError, match="Unknown session provider 'custom'"): + validate_session_transport("custom", "webrtc") diff --git a/sdk/python/uv.lock b/sdk/python/uv.lock index ef3693b01..c887b218e 100644 --- a/sdk/python/uv.lock +++ b/sdk/python/uv.lock @@ -4,7 +4,7 @@ requires-python = ">=3.10, <3.14" [[package]] name = "agentfield" -version = "0.1.89rc2" +version = "0.1.92-rc.2" source = { editable = "." } dependencies = [ { name = "aiohttp" }, diff --git a/sdk/typescript/src/agent/Agent.ts b/sdk/typescript/src/agent/Agent.ts index 8160893e8..753cc8023 100644 --- a/sdk/typescript/src/agent/Agent.ts +++ b/sdk/typescript/src/agent/Agent.ts @@ -52,6 +52,12 @@ import { ProcessLogRing, registerAgentfieldLogsRoute } from './processLogs.js'; +import { + buildSessionDefinition, + type RealtimeSession, + type SessionDefinition, + type SessionOptions +} from '../session.js'; interface WildcardParams extends ParamsDictionary { 0: string; @@ -96,6 +102,7 @@ export class Agent { private readonly memoryWatchers: Array<{ pattern: string; handler: MemoryWatchHandler; scope?: string; scopeId?: string }> = []; private readonly localVerifier?: LocalVerifier; private readonly realtimeValidationFunctions = new Set(); + private readonly sessions = new Map Promise | unknown }>(); private readonly processLogRing = new ProcessLogRing(); private readonly executionLogger: ExecutionLogger; /** Tracks an AbortController per in-flight execution_id so the @@ -183,6 +190,19 @@ export class Agent { this.skills.includeRouter(router); } + session( + name: string, + options: SessionOptions, + handler: (session: RealtimeSession) => Promise | unknown + ) { + this.sessions.set(name, { definition: buildSessionDefinition(name, options), handler }); + return this; + } + + sessionDefinitions(): SessionDefinition[] { + return Array.from(this.sessions.values()).map((entry) => entry.definition); + } + handler(adapter?: (event: any, context?: any) => ServerlessEvent): AgentHandler { return async (event: any, res?: any): Promise => { // If a response object is provided, treat this as a standard HTTP request (e.g., Vercel/Netlify) @@ -1148,7 +1168,8 @@ export class Agent { version: this.config.version, deployment_type: deploymentType, reasoners: this.reasonerDefinitions(), - skills: this.skillDefinitions() + skills: this.skillDefinitions(), + sessions: this.sessionDefinitions() }; } @@ -1490,7 +1511,8 @@ export class Agent { sdk: { language: 'typescript', version: AGENTFIELD_TS_SDK_VERSION - } + }, + sessions: this.sessionDefinitions() } } }); diff --git a/sdk/typescript/src/index.ts b/sdk/typescript/src/index.ts index c3a000502..bc2c65c62 100644 --- a/sdk/typescript/src/index.ts +++ b/sdk/typescript/src/index.ts @@ -26,3 +26,5 @@ export * from './types/skill.js'; export * from './harness/index.js'; export * from './status/ExecutionStatus.js'; export * from './approval/ApprovalClient.js'; +export * from './sessionTransport.js'; +export * from './session.js'; diff --git a/sdk/typescript/src/session.ts b/sdk/typescript/src/session.ts new file mode 100644 index 000000000..792d09151 --- /dev/null +++ b/sdk/typescript/src/session.ts @@ -0,0 +1,59 @@ +import { validateSessionTransport } from './sessionTransport.js'; + +export interface SessionDefinition { + name: string; + provider: string; + transport: string; + model?: string; + modalities: string[]; + voice?: string; + tools: string[]; + metadata: Record; +} + +export interface SessionOptions { + provider: string; + transport: string; + model?: string; + modalities?: string[]; + voice?: string; + tools?: string[]; + metadata?: Record; +} + +export interface SessionTurn { + text?: string; + transcript?: string; + audio?: unknown; + audioFormat?: string; + channel?: string; + metadata?: Record; +} + +export class RealtimeSession { + readonly sessionId: string; + readonly definition: SessionDefinition; + + constructor(sessionId: string, definition: SessionDefinition) { + this.sessionId = sessionId; + this.definition = definition; + } + + async input(): Promise { + throw new Error('session.input() is populated by the AgentField control plane transport adapter'); + } +} + +export function buildSessionDefinition(name: string, options: SessionOptions): SessionDefinition { + const capability = validateSessionTransport(options.provider, options.transport); + return { + name, + provider: capability.provider, + transport: capability.transport, + model: options.model, + modalities: options.modalities ?? ['audio', 'text'], + voice: options.voice, + tools: options.tools ?? [], + metadata: options.metadata ?? {} + }; +} diff --git a/sdk/typescript/src/sessionTransport.ts b/sdk/typescript/src/sessionTransport.ts new file mode 100644 index 000000000..46699fb2b --- /dev/null +++ b/sdk/typescript/src/sessionTransport.ts @@ -0,0 +1,65 @@ +export const SUPPORTED_SESSION_TRANSPORTS = { + openai: ['webrtc', 'websocket'], + openrouter: ['audio_turns'] +} as const; + +export type SessionProvider = keyof typeof SUPPORTED_SESSION_TRANSPORTS; +export type SessionTransport = (typeof SUPPORTED_SESSION_TRANSPORTS)[SessionProvider][number]; + +export interface SessionTransportCapability { + provider: SessionProvider; + transport: SessionTransport; +} + +export class SessionTransportError extends Error { + readonly provider: string; + readonly transport: string; + readonly supported: readonly string[]; + + constructor(provider: string, transport: string, supported: readonly string[]) { + const supportedDisplay = supported.length > 0 ? supported.join(', ') : 'none'; + super( + `Unsupported session transport '${transport}' for provider '${provider}'. ` + + `Supported transports: ${supportedDisplay}. AgentField does not infer ` + + 'or switch providers; set provider and transport explicitly.' + ); + this.name = 'SessionTransportError'; + this.provider = provider; + this.transport = transport; + this.supported = supported; + } +} + +export function normalizeSessionTransportValue(value: string): string { + return value.trim().toLowerCase().replace(/-/g, '_'); +} + +export function validateSessionTransport(provider: string, transport: string): SessionTransportCapability { + const normalizedProvider = normalizeSessionTransportValue(provider); + const normalizedTransport = normalizeSessionTransportValue(transport); + + if (!normalizedProvider) { + throw new Error('Session provider is required; AgentField does not infer providers.'); + } + if (!normalizedTransport) { + throw new Error('Session transport is required; AgentField does not infer transports.'); + } + + const supported = SUPPORTED_SESSION_TRANSPORTS[normalizedProvider as SessionProvider]; + if (!supported) { + const known = Object.keys(SUPPORTED_SESSION_TRANSPORTS).sort().join(', '); + throw new Error( + `Unknown session provider '${provider}'. Known providers: ${known}. ` + + 'Register provider capabilities before using a custom session provider.' + ); + } + + if (!supported.includes(normalizedTransport as never)) { + throw new SessionTransportError(normalizedProvider, normalizedTransport, supported); + } + + return { + provider: normalizedProvider as SessionProvider, + transport: normalizedTransport as SessionTransport + }; +} diff --git a/sdk/typescript/tests/agent.test.ts b/sdk/typescript/tests/agent.test.ts index c57151fe9..775943d68 100644 --- a/sdk/typescript/tests/agent.test.ts +++ b/sdk/typescript/tests/agent.test.ts @@ -13,6 +13,31 @@ describe('Agent', () => { expect(agent.skills.all().map((s) => s.name)).toContain('format'); }); + it('registers explicit realtime session definitions', () => { + const agent = new Agent({ nodeId: 'support-agent', devMode: true }); + agent.session('voice', { + provider: 'openai', + transport: 'webrtc', + model: 'gpt-realtime-2', + modalities: ['audio', 'text'], + voice: 'marin', + tools: ['support.resolve_voice_turn'] + }, async () => ({})); + + expect(agent.sessionDefinitions()).toEqual([ + { + name: 'voice', + provider: 'openai', + transport: 'webrtc', + model: 'gpt-realtime-2', + modalities: ['audio', 'text'], + voice: 'marin', + tools: ['support.resolve_voice_turn'], + metadata: {} + } + ]); + }); + it('includes routers with prefixes', () => { const router = new AgentRouter({ prefix: 'simulation' }); router.reasoner('run', async () => ({})); diff --git a/sdk/typescript/tests/agent_runtime_paths.test.ts b/sdk/typescript/tests/agent_runtime_paths.test.ts index 128bca9dc..fb3c24a6b 100644 --- a/sdk/typescript/tests/agent_runtime_paths.test.ts +++ b/sdk/typescript/tests/agent_runtime_paths.test.ts @@ -222,6 +222,7 @@ describe('Agent runtime paths', () => { version: '1.0.0', deployment_type: 'serverless', reasoners: [], + sessions: [], skills: [] } }); diff --git a/sdk/typescript/tests/session_transport.test.ts b/sdk/typescript/tests/session_transport.test.ts new file mode 100644 index 000000000..926e1e391 --- /dev/null +++ b/sdk/typescript/tests/session_transport.test.ts @@ -0,0 +1,31 @@ +import { describe, expect, it } from 'vitest'; +import { SessionTransportError, validateSessionTransport } from '../src/sessionTransport.js'; + +describe('session transport validation', () => { + it('accepts explicit supported pairs', () => { + expect(validateSessionTransport('OpenAI', 'WebRTC')).toEqual({ + provider: 'openai', + transport: 'webrtc' + }); + }); + + it('rejects provider and transport mismatches', () => { + expect(() => validateSessionTransport('openrouter', 'webrtc')).toThrow(SessionTransportError); + expect(() => validateSessionTransport('openrouter', 'webrtc')).toThrow( + /Supported transports: audio_turns/ + ); + expect(() => validateSessionTransport('openrouter', 'webrtc')).toThrow( + /does not infer or switch providers/ + ); + }); + + it('requires explicit transport', () => { + expect(() => validateSessionTransport('openai', '')).toThrow(/transport is required/); + }); + + it('rejects unknown providers', () => { + expect(() => validateSessionTransport('custom', 'webrtc')).toThrow( + /Unknown session provider 'custom'/ + ); + }); +}); From 0b49010c0212208387fcda61c06df3fdf521f756 Mon Sep 17 00:00:00 2001 From: Santosh Date: Thu, 11 Jun 2026 16:13:42 -0400 Subject: [PATCH 2/3] fix: align session route wildcards --- control-plane/internal/handlers/sessions.go | 8 ++++++-- control-plane/internal/handlers/sessions_test.go | 14 ++++++++++++++ control-plane/internal/server/routes_core.go | 4 ++-- 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/control-plane/internal/handlers/sessions.go b/control-plane/internal/handlers/sessions.go index 3efb0b764..975cdbdfb 100644 --- a/control-plane/internal/handlers/sessions.go +++ b/control-plane/internal/handlers/sessions.go @@ -141,7 +141,7 @@ func SessionRealtimeOfferHandler(store storage.StorageProvider) gin.HandlerFunc } answer, err := createOpenAIRealtimeCall( c.Request.Context(), - c.Param("session_id"), + sessionPathID(c), string(sdp), firstNonEmptySession(c.Query("model"), "gpt-realtime-2"), firstNonEmptySession(c.Query("voice"), "marin"), @@ -161,7 +161,7 @@ func SessionRealtimeOfferHandler(store storage.StorageProvider) gin.HandlerFunc func SessionToolHandler(store storage.StorageProvider, timeout time.Duration, internalToken string) gin.HandlerFunc { return func(c *gin.Context) { _ = store - sessionID := strings.TrimSpace(c.Param("session_id")) + sessionID := sessionPathID(c) toolName := strings.TrimSpace(c.Param("tool")) var req ToolSessionRequest if err := c.ShouldBindJSON(&req); err != nil && err != io.EOF { @@ -203,6 +203,10 @@ func SessionToolHandler(store storage.StorageProvider, timeout time.Duration, in } } +func sessionPathID(c *gin.Context) string { + return firstNonEmptySession(c.Param("session_id"), c.Param("target")) +} + func lookupSessionDefinition(c *gin.Context, store storage.StorageProvider, nodeID string, sessionName string) (sessionDefinition, bool) { agent, err := store.GetAgent(c.Request.Context(), nodeID) if err != nil || agent == nil { diff --git a/control-plane/internal/handlers/sessions_test.go b/control-plane/internal/handlers/sessions_test.go index d571e8d9e..06dc838be 100644 --- a/control-plane/internal/handlers/sessions_test.go +++ b/control-plane/internal/handlers/sessions_test.go @@ -6,6 +6,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/Agent-Field/agentfield/control-plane/pkg/types" "github.com/gin-gonic/gin" @@ -47,6 +48,19 @@ func TestStartSessionHandlerRejectsUnsupportedTransport(t *testing.T) { require.Contains(t, rec.Body.String(), "does not infer or switch providers") } +func TestSessionRoutesRegisterTogether(t *testing.T) { + gin.SetMode(gin.TestMode) + store := &nodeRESTStorageStub{agent: sessionTestAgent()} + + require.NotPanics(t, func() { + router := gin.New() + group := router.Group("/api/v1/sessions") + group.POST("/:target/start", StartSessionHandler(store)) + group.POST("/:target/realtime-offer", SessionRealtimeOfferHandler(store)) + group.POST("/:target/tools/:tool", SessionToolHandler(store, time.Second, "")) + }) +} + func sessionTestAgent() *types.AgentNode { return &types.AgentNode{ ID: "support", diff --git a/control-plane/internal/server/routes_core.go b/control-plane/internal/server/routes_core.go index 6dd350aa3..7131d475c 100644 --- a/control-plane/internal/server/routes_core.go +++ b/control-plane/internal/server/routes_core.go @@ -149,8 +149,8 @@ func (s *AgentFieldServer) registerCoreRoutes(agentAPI *gin.RouterGroup) { sessionGroup := agentAPI.Group("/sessions") { sessionGroup.POST("/:target/start", handlers.StartSessionHandler(s.storage)) - sessionGroup.POST("/:session_id/realtime-offer", handlers.SessionRealtimeOfferHandler(s.storage)) - sessionGroup.POST("/:session_id/tools/:tool", handlers.SessionToolHandler(s.storage, s.config.AgentField.ExecutionQueue.AgentCallTimeout, s.config.Features.DID.Authorization.InternalToken)) + sessionGroup.POST("/:target/realtime-offer", handlers.SessionRealtimeOfferHandler(s.storage)) + sessionGroup.POST("/:target/tools/:tool", handlers.SessionToolHandler(s.storage, s.config.AgentField.ExecutionQueue.AgentCallTimeout, s.config.Features.DID.Authorization.InternalToken)) } // Workflow endpoints will be reintroduced once the simplified execution pipeline lands. From aa7d592009f061f24f1de878b7b359b4a8625a7c Mon Sep 17 00:00:00 2001 From: Santosh Date: Thu, 11 Jun 2026 16:35:29 -0400 Subject: [PATCH 3/3] test: cover session control-plane paths --- control-plane/internal/cli/session_test.go | 182 ++++++++++ .../internal/handlers/sessions_test.go | 322 ++++++++++++++++++ 2 files changed, 504 insertions(+) diff --git a/control-plane/internal/cli/session_test.go b/control-plane/internal/cli/session_test.go index d7f79d742..a43eba2e5 100644 --- a/control-plane/internal/cli/session_test.go +++ b/control-plane/internal/cli/session_test.go @@ -3,28 +3,89 @@ package cli import ( "bytes" "context" + "encoding/json" + "errors" "io" "net/http" + "os" + "path/filepath" "strings" "testing" "github.com/stretchr/testify/require" ) +func TestNewSessionCommandRegistersSubcommands(t *testing.T) { + cmd := NewSessionCommand() + var uses []string + for _, sub := range cmd.Commands() { + uses = append(uses, sub.Use) + } + + require.Contains(t, uses, "start .") + require.Contains(t, uses, "offer ") + require.Contains(t, uses, "tool ") + require.Contains(t, uses, "workflows ") +} + +func TestRunSessionStartPostsPayloadAndWritesResponse(t *testing.T) { + var gotBody map[string]interface{} + withTriggerTestServer(t, func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/api/v1/sessions/support.voice/start", r.URL.Path) + require.Equal(t, http.MethodPost, r.Method) + require.Equal(t, "application/json", r.Header.Get("Content-Type")) + require.NoError(t, json.NewDecoder(r.Body).Decode(&gotBody)) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"session_id":"sess-1","offer_url":"/api/v1/sessions/sess-1/realtime-offer"}`)) + }) + + var stdout bytes.Buffer + err := runSessionStart(context.Background(), "support.voice", &sessionStartOptions{ + provider: "openai", + transport: "webrtc", + model: "gpt-realtime-2", + voice: "marin", + outputFormat: "json", + stdout: &stdout, + }) + + require.NoError(t, err) + require.Equal(t, map[string]interface{}{ + "provider": "openai", + "transport": "webrtc", + "model": "gpt-realtime-2", + "voice": "marin", + }, gotBody) + require.JSONEq(t, `{"session_id":"sess-1","offer_url":"/api/v1/sessions/sess-1/realtime-offer"}`, stdout.String()) +} + +func TestRunSessionStartReturnsStatusErrors(t *testing.T) { + withTriggerTestServer(t, func(w http.ResponseWriter, r *http.Request) { + http.Error(w, `{"error":"bad session"}`, http.StatusBadRequest) + }) + + err := runSessionStart(context.Background(), "support.voice", &sessionStartOptions{stdout: &bytes.Buffer{}}) + + require.ErrorContains(t, err, "session start failed with status 400") +} + func TestRunSessionOfferPostsSDPAndWritesRawAnswer(t *testing.T) { var gotBody string var gotContentType string + var gotAPIKey string withTriggerTestServer(t, func(w http.ResponseWriter, r *http.Request) { require.Equal(t, "/api/v1/sessions/sess-1/realtime-offer", r.URL.Path) require.Equal(t, "openai", r.URL.Query().Get("provider")) require.Equal(t, "webrtc", r.URL.Query().Get("transport")) gotContentType = r.Header.Get("Content-Type") + gotAPIKey = r.Header.Get("X-API-Key") body, err := io.ReadAll(r.Body) require.NoError(t, err) gotBody = string(body) w.Header().Set("Content-Type", "application/sdp") _, _ = w.Write([]byte("v=0\r\nanswer\r\n")) }) + apiKey = "test-key" var stdout bytes.Buffer err := runSessionOffer(context.Background(), "sess-1", &sessionOfferOptions{ @@ -36,6 +97,7 @@ func TestRunSessionOfferPostsSDPAndWritesRawAnswer(t *testing.T) { }) require.NoError(t, err) require.Equal(t, "application/sdp", gotContentType) + require.Equal(t, "test-key", gotAPIKey) require.Equal(t, "v=0\r\noffer\r\n", gotBody) require.Equal(t, "v=0\r\nanswer\r\n", stdout.String()) } @@ -71,3 +133,123 @@ func TestRunSessionOfferRequiresSDP(t *testing.T) { }) require.ErrorContains(t, err, "SDP offer required") } + +func TestRunSessionOfferReturnsStatusErrors(t *testing.T) { + withTriggerTestServer(t, func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "provider rejected", http.StatusBadGateway) + }) + + err := runSessionOffer(context.Background(), "sess-1", &sessionOfferOptions{ + provider: "openai", + transport: "webrtc", + sdpSource: "v=0", + stdout: &bytes.Buffer{}, + }) + + require.ErrorContains(t, err, "session offer failed with status 502") +} + +func TestReadSessionSDPFromFileAndInline(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "offer.sdp") + require.NoError(t, os.WriteFile(path, []byte("v=0\nfile-offer\n"), 0o600)) + + got, err := readSessionSDP("@"+path, nil) + require.NoError(t, err) + require.Equal(t, "v=0\nfile-offer\n", got) + + got, err = readSessionSDP("v=0\ninline\n", nil) + require.NoError(t, err) + require.Equal(t, "v=0\ninline\n", got) +} + +func TestReadSessionSDPRejectsBadSources(t *testing.T) { + dir := t.TempDir() + emptyPath := filepath.Join(dir, "empty.sdp") + require.NoError(t, os.WriteFile(emptyPath, []byte(" "), 0o600)) + + _, err := readSessionSDP("@", nil) + require.ErrorContains(t, err, "SDP file path is required") + + _, err = readSessionSDP("@"+filepath.Join(dir, "missing.sdp"), nil) + require.ErrorContains(t, err, "read SDP file") + + _, err = readSessionSDP("@"+emptyPath, nil) + require.ErrorContains(t, err, "is empty") + + _, err = readSessionSDP("-", errReader{}) + require.ErrorContains(t, err, "read SDP from stdin") +} + +func TestRunSessionToolPostsPayloadAndWritesResponse(t *testing.T) { + var gotBody map[string]interface{} + withTriggerTestServer(t, func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/api/v1/sessions/sess-1/tools/resolve", r.URL.Path) + require.NoError(t, json.NewDecoder(r.Body).Decode(&gotBody)) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"execution_id":"exec-1","status":"queued"}`)) + }) + + var stdout bytes.Buffer + err := runSessionTool(context.Background(), "sess-1", "resolve", &sessionToolOptions{ + target: "support.resolve", + inputSource: `{"topic":"billing"}`, + outputFormat: "pretty", + stdout: &stdout, + }) + + require.NoError(t, err) + require.Equal(t, map[string]interface{}{ + "target": "support.resolve", + "input": map[string]interface{}{"topic": "billing"}, + }, gotBody) + require.Contains(t, stdout.String(), `"execution_id": "exec-1"`) +} + +func TestRunSessionToolReadsInputFromStdin(t *testing.T) { + var gotBody map[string]interface{} + withTriggerTestServer(t, func(w http.ResponseWriter, r *http.Request) { + require.NoError(t, json.NewDecoder(r.Body).Decode(&gotBody)) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"ok":true}`)) + }) + + err := runSessionTool(context.Background(), "sess-1", "support.resolve", &sessionToolOptions{ + stdin: strings.NewReader(`{"topic":"shipping"}`), + stdout: &bytes.Buffer{}, + }) + + require.NoError(t, err) + require.Equal(t, map[string]interface{}{ + "target": "", + "input": map[string]interface{}{"topic": "shipping"}, + }, gotBody) +} + +func TestRunSessionToolReturnsInputAndStatusErrors(t *testing.T) { + err := runSessionTool(context.Background(), "sess-1", "resolve", &sessionToolOptions{ + inputSource: "{", + stdout: &bytes.Buffer{}, + }) + require.ErrorContains(t, err, "parse json") + + err = runSessionTool(context.Background(), "sess-1", "resolve", &sessionToolOptions{ + stdin: strings.NewReader("{"), + stdout: &bytes.Buffer{}, + }) + require.ErrorContains(t, err, "parse stdin JSON") + + withTriggerTestServer(t, func(w http.ResponseWriter, r *http.Request) { + http.Error(w, `{"error":"bad tool"}`, http.StatusBadRequest) + }) + err = runSessionTool(context.Background(), "sess-1", "resolve", &sessionToolOptions{ + stdout: &bytes.Buffer{}, + }) + require.ErrorContains(t, err, "session tool failed with status 400") +} + +type errReader struct{} + +func (errReader) Read([]byte) (int, error) { + return 0, errors.New("boom") +} diff --git a/control-plane/internal/handlers/sessions_test.go b/control-plane/internal/handlers/sessions_test.go index 06dc838be..e957047e2 100644 --- a/control-plane/internal/handlers/sessions_test.go +++ b/control-plane/internal/handlers/sessions_test.go @@ -3,8 +3,10 @@ package handlers import ( "bytes" "encoding/json" + "io" "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -48,6 +50,87 @@ func TestStartSessionHandlerRejectsUnsupportedTransport(t *testing.T) { require.Contains(t, rec.Body.String(), "does not infer or switch providers") } +func TestStartSessionHandlerRejectsInvalidOrMissingDefinitions(t *testing.T) { + gin.SetMode(gin.TestMode) + + tests := []struct { + name string + store *nodeRESTStorageStub + path string + status int + message string + }{ + { + name: "bad target", + store: &nodeRESTStorageStub{agent: sessionTestAgent()}, + path: "/api/v1/sessions/support/start", + status: http.StatusBadRequest, + message: "session target must be", + }, + { + name: "missing agent", + store: &nodeRESTStorageStub{}, + path: "/api/v1/sessions/support.voice/start", + status: http.StatusNotFound, + message: "agent not found", + }, + { + name: "missing metadata", + store: &nodeRESTStorageStub{agent: &types.AgentNode{ID: "support"}}, + path: "/api/v1/sessions/support.voice/start", + status: http.StatusNotFound, + message: "agent has no registered sessions", + }, + { + name: "missing sessions list", + store: &nodeRESTStorageStub{agent: &types.AgentNode{ + ID: "support", + Metadata: types.AgentMetadata{Custom: map[string]interface{}{"sessions": "bad"}}, + }}, + path: "/api/v1/sessions/support.voice/start", + status: http.StatusNotFound, + message: "agent has no registered sessions", + }, + { + name: "unknown session", + store: &nodeRESTStorageStub{agent: sessionTestAgent()}, + path: "/api/v1/sessions/support.chat/start", + status: http.StatusNotFound, + message: "session not registered", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + router := gin.New() + router.POST("/api/v1/sessions/:target/start", StartSessionHandler(tt.store)) + + req := httptest.NewRequest(http.MethodPost, tt.path, bytes.NewBufferString(`{}`)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + require.Equal(t, tt.status, rec.Code) + require.Contains(t, rec.Body.String(), tt.message) + }) + } +} + +func TestStartSessionHandlerRejectsCapabilityOverrideMismatch(t *testing.T) { + gin.SetMode(gin.TestMode) + store := &nodeRESTStorageStub{agent: sessionTestAgent()} + router := gin.New() + router.POST("/api/v1/sessions/:target/start", StartSessionHandler(store)) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/sessions/support.voice/start", bytes.NewBufferString(`{"provider":"openai","transport":"websocket"}`)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + require.Equal(t, http.StatusBadRequest, rec.Code) + require.Contains(t, rec.Body.String(), "registered for provider=openai transport=webrtc") +} + func TestSessionRoutesRegisterTogether(t *testing.T) { gin.SetMode(gin.TestMode) store := &nodeRESTStorageStub{agent: sessionTestAgent()} @@ -61,6 +144,245 @@ func TestSessionRoutesRegisterTogether(t *testing.T) { }) } +func TestSessionRealtimeOfferHandlerValidatesInputs(t *testing.T) { + gin.SetMode(gin.TestMode) + store := &nodeRESTStorageStub{} + + tests := []struct { + name string + path string + body string + status int + message string + envKey string + custom bool + }{ + { + name: "missing provider", + path: "/api/v1/sessions/sess-1/realtime-offer", + body: `{}`, + status: http.StatusBadRequest, + message: "session provider is required", + }, + { + name: "non webrtc transport", + path: "/api/v1/sessions/sess-1/realtime-offer?provider=openai&transport=websocket", + body: "v=0", + status: http.StatusBadRequest, + message: "requires transport=webrtc", + }, + { + name: "non openai provider", + path: "/api/v1/sessions/sess-1/realtime-offer?provider=custom&transport=webrtc", + body: "v=0", + status: http.StatusBadRequest, + message: "require provider=openai", + custom: true, + }, + { + name: "missing key", + path: "/api/v1/sessions/sess-1/realtime-offer?provider=openai&transport=webrtc", + body: "v=0", + status: http.StatusBadGateway, + message: "OPENAI_API_KEY is required", + }, + { + name: "missing sdp", + path: "/api/v1/sessions/sess-1/realtime-offer?provider=openai&transport=webrtc", + body: " ", + status: http.StatusBadRequest, + message: "SDP offer body is required", + envKey: "test-key", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.custom { + original := types.SupportedSessionTransports["custom"] + types.SupportedSessionTransports["custom"] = []string{"webrtc"} + t.Cleanup(func() { + if original == nil { + delete(types.SupportedSessionTransports, "custom") + } else { + types.SupportedSessionTransports["custom"] = original + } + }) + } + if tt.envKey != "" { + t.Setenv("OPENAI_API_KEY", tt.envKey) + } else { + t.Setenv("OPENAI_API_KEY", "") + } + router := gin.New() + router.POST("/api/v1/sessions/:target/realtime-offer", SessionRealtimeOfferHandler(store)) + + req := httptest.NewRequest(http.MethodPost, tt.path, strings.NewReader(tt.body)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + require.Equal(t, tt.status, rec.Code) + require.Contains(t, rec.Body.String(), tt.message) + }) + } +} + +func TestSessionRealtimeOfferHandlerCallsRealtimeProvider(t *testing.T) { + gin.SetMode(gin.TestMode) + t.Setenv("OPENAI_API_KEY", "test-key") + originalTransport := http.DefaultClient.Transport + t.Cleanup(func() { http.DefaultClient.Transport = originalTransport }) + + var gotSafetyID string + var gotSession string + http.DefaultClient.Transport = roundTripFunc(func(req *http.Request) (*http.Response, error) { + require.Equal(t, "https://api.openai.com/v1/realtime/calls", req.URL.String()) + require.Equal(t, "Bearer test-key", req.Header.Get("Authorization")) + gotSafetyID = req.Header.Get("OpenAI-Safety-Identifier") + + reader, err := req.MultipartReader() + require.NoError(t, err) + for { + part, err := reader.NextPart() + if err == io.EOF { + break + } + require.NoError(t, err) + data, err := io.ReadAll(part) + require.NoError(t, err) + if part.FormName() == "session" { + gotSession = string(data) + } + } + + return &http.Response{ + StatusCode: http.StatusOK, + Header: make(http.Header), + Body: io.NopCloser(strings.NewReader("v=0\r\nanswer\r\n")), + }, nil + }) + + router := gin.New() + router.POST("/api/v1/sessions/:target/realtime-offer", SessionRealtimeOfferHandler(&nodeRESTStorageStub{})) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/sessions/sess-1/realtime-offer?provider=openai&transport=webrtc&model=gpt-test&voice=cedar", strings.NewReader("v=0\r\noffer\r\n")) + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + require.Equal(t, "application/sdp", rec.Header().Get("Content-Type")) + require.Equal(t, "v=0\r\nanswer\r\n", rec.Body.String()) + require.Len(t, gotSafetyID, 32) + require.Contains(t, gotSession, `"model":"gpt-test"`) + require.Contains(t, gotSession, `"voice":"cedar"`) +} + +func TestSessionRealtimeOfferHandlerSurfacesProviderErrors(t *testing.T) { + gin.SetMode(gin.TestMode) + t.Setenv("OPENAI_API_KEY", "test-key") + originalTransport := http.DefaultClient.Transport + t.Cleanup(func() { http.DefaultClient.Transport = originalTransport }) + http.DefaultClient.Transport = roundTripFunc(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusBadRequest, + Header: make(http.Header), + Body: io.NopCloser(strings.NewReader("bad offer")), + }, nil + }) + + router := gin.New() + router.POST("/api/v1/sessions/:target/realtime-offer", SessionRealtimeOfferHandler(&nodeRESTStorageStub{})) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/sessions/sess-1/realtime-offer?provider=openai&transport=webrtc", strings.NewReader("v=0")) + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + require.Equal(t, http.StatusBadGateway, rec.Code) + require.Contains(t, rec.Body.String(), "OpenAI realtime call failed") +} + +func TestSessionToolHandlerValidatesAndForwards(t *testing.T) { + gin.SetMode(gin.TestMode) + router := gin.New() + var gotSessionID string + var gotAuth string + var gotBody map[string]interface{} + router.POST("/api/v1/execute/async/:target", func(c *gin.Context) { + gotSessionID = c.GetHeader("X-Session-ID") + gotAuth = c.GetHeader("Authorization") + require.NoError(t, c.ShouldBindJSON(&gotBody)) + require.Equal(t, "support.resolve", c.Param("target")) + c.JSON(http.StatusAccepted, gin.H{"execution_id": "exec-1"}) + }) + router.POST("/api/v1/sessions/:target/tools/:tool", SessionToolHandler(&nodeRESTStorageStub{}, time.Second, "internal-token")) + server := httptest.NewServer(router) + t.Cleanup(server.Close) + + req, err := http.NewRequest(http.MethodPost, server.URL+"/api/v1/sessions/sess-1/tools/resolve", strings.NewReader(`{"target":"support.resolve","input":{"topic":"billing"}}`)) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := server.Client().Do(req) + require.NoError(t, err) + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + require.Equal(t, http.StatusAccepted, resp.StatusCode) + require.JSONEq(t, `{"execution_id":"exec-1"}`, string(body)) + require.Equal(t, "sess-1", gotSessionID) + require.Equal(t, "Bearer internal-token", gotAuth) + require.Equal(t, map[string]interface{}{"input": map[string]interface{}{"topic": "billing"}}, gotBody) +} + +func TestSessionToolHandlerRejectsInvalidRequests(t *testing.T) { + gin.SetMode(gin.TestMode) + router := gin.New() + router.POST("/api/v1/sessions/:target/tools/:tool", SessionToolHandler(&nodeRESTStorageStub{}, time.Second, "")) + + tests := []struct { + name string + body string + message string + }{ + {name: "bad json", body: "{", message: "invalid JSON body"}, + {name: "bad target", body: `{"target":"resolve"}`, message: "session tool target must be"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/api/v1/sessions/sess-1/tools/resolve", strings.NewReader(tt.body)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + require.Equal(t, http.StatusBadRequest, rec.Code) + require.Contains(t, rec.Body.String(), tt.message) + }) + } +} + +func TestSessionHelpers(t *testing.T) { + node, session, ok := splitSessionTarget(" support.voice ") + require.True(t, ok) + require.Equal(t, "support", node) + require.Equal(t, "voice", session) + + _, _, ok = splitSessionTarget("support") + require.False(t, ok) + require.Equal(t, "fallback", firstNonEmptySession(" ", "fallback")) + require.Equal(t, map[string]string{ + "local": "support.local", + "resolve": "support.resolve", + }, sessionToolTargets("support", []string{"", "local", "support.resolve"})) +} + +type roundTripFunc func(*http.Request) (*http.Response, error) + +func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return f(req) +} + func sessionTestAgent() *types.AgentNode { return &types.AgentNode{ ID: "support",