Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 277 additions & 0 deletions router-tests/mcp_hot_reload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
package integration

import (
"encoding/json"
"os"
"path/filepath"
"testing"
"time"

"github.com/mark3labs/mcp-go/mcp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/wundergraph/cosmo/router-tests/testenv"
"github.com/wundergraph/cosmo/router/core"
"github.com/wundergraph/cosmo/router/pkg/config"
"go.uber.org/goleak"
)

func TestMCPOperationHotReload(t *testing.T) {
t.Parallel()
operationsDir := t.TempDir()
storageProviderID := "mcp_hot_reload_test_id"

t.Run("List Updated User Operations On Addition and Removal", func(t *testing.T) {

testenv.Run(t, &testenv.Config{
MCP: config.MCPConfiguration{
Enabled: true,
Storage: config.MCPStorageConfig{
ProviderID: storageProviderID,
},
HotReloadConfig: config.MCPOperationsHotReloadConfig{
Enabled: true,
Interval: 1 * time.Second,
},
},
RouterOptions: []core.Option{
core.WithStorageProviders(config.StorageProviders{
FileSystem: []config.FileSystemStorageProvider{
{
ID: storageProviderID,
Path: operationsDir,
},
},
}),
},
}, func(t *testing.T, xEnv *testenv.Environment) {

toolsRequest := mcp.ListToolsRequest{}
resp, err := xEnv.MCPClient.ListTools(xEnv.Context, toolsRequest)
require.NoError(t, err)

initialToolsCount := len(resp.Tools)

mcpOperationFile := filepath.Join(operationsDir, "main.graphql")

// write mcp operation content
err = os.WriteFile(mcpOperationFile, []byte("query getEmployeeNotes($id: Int!) {\nemployee(id: $id) {\nid\nnotes\n}\n}"), 0644)
assert.NoError(t, err)

require.EventuallyWithT(t, func(t *assert.CollectT) {

resp, err := xEnv.MCPClient.ListTools(xEnv.Context, toolsRequest)
assert.NoError(t, err)
assert.Len(t, resp.Tools, initialToolsCount+1)

// verity getEmployeeNotes operation is present
require.Contains(t, resp.Tools, mcp.Tool{
Name: "execute_operation_get_employee_notes",
Description: "Executes the GraphQL operation 'getEmployeeNotes' of type query.",
InputSchema: mcp.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{"id": map[string]interface{}{"type": "integer"}},
Required: []string{"id"},
},
RawInputSchema: json.RawMessage(nil),
Annotations: mcp.ToolAnnotation{
Title: "Execute operation getEmployeeNotes",
ReadOnlyHint: mcp.ToBoolPtr(true),
IdempotentHint: mcp.ToBoolPtr(true),
OpenWorldHint: mcp.ToBoolPtr(true),
},
})
}, 10*time.Second, 100*time.Millisecond)

err = os.Remove(mcpOperationFile)
assert.NoError(t, err)

assert.EventuallyWithT(t, func(t *assert.CollectT) {

resp, err = xEnv.MCPClient.ListTools(xEnv.Context, toolsRequest)
assert.NoError(t, err)
assert.Len(t, resp.Tools, initialToolsCount)

// verity getEmployeeNotes operation tool is properly removed
require.NotContains(t, resp.Tools, mcp.Tool{
Name: "execute_operation_get_employee_notes",
Description: "Executes the GraphQL operation 'getEmployeeNotes' of type query.",
InputSchema: mcp.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{"id": map[string]interface{}{"type": "integer"}},
Required: []string{"id"},
},
RawInputSchema: json.RawMessage(nil),
Annotations: mcp.ToolAnnotation{
Title: "Execute operation getEmployeeNotes",
ReadOnlyHint: mcp.ToBoolPtr(true),
IdempotentHint: mcp.ToBoolPtr(true),
OpenWorldHint: mcp.ToBoolPtr(true),
},
})

}, 10*time.Second, 100*time.Millisecond)

})
})

t.Run("List Updated User Operations On Content Update", func(t *testing.T) {

testenv.Run(t, &testenv.Config{
MCP: config.MCPConfiguration{
Enabled: true,
Storage: config.MCPStorageConfig{
ProviderID: storageProviderID,
},
HotReloadConfig: config.MCPOperationsHotReloadConfig{
Enabled: true,
Interval: 1 * time.Second,
},
},
RouterOptions: []core.Option{
core.WithStorageProviders(config.StorageProviders{
FileSystem: []config.FileSystemStorageProvider{
{
ID: storageProviderID,
Path: operationsDir,
},
},
}),
},
}, func(t *testing.T, xEnv *testenv.Environment) {

mcpOperationFile := filepath.Join(operationsDir, "main.graphql")

// write mcp operation content
require.NoError(t, os.WriteFile(mcpOperationFile, []byte("query getEmployeeNotes($id: Int!) {\nemployee(id: $id) {\nid\nnotes\n}\n}"), 0o600))

require.EventuallyWithT(t, func(t *assert.CollectT) {

toolsRequest := mcp.ListToolsRequest{}
resp, err := xEnv.MCPClient.ListTools(xEnv.Context, toolsRequest)
assert.NoError(t, err)

// verity getEmployeeNotes operation is present
require.Contains(t, resp.Tools, mcp.Tool{
Name: "execute_operation_get_employee_notes",
Description: "Executes the GraphQL operation 'getEmployeeNotes' of type query.",
InputSchema: mcp.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{"id": map[string]interface{}{"type": "integer"}},
Required: []string{"id"},
},
RawInputSchema: json.RawMessage(nil),
Annotations: mcp.ToolAnnotation{
Title: "Execute operation getEmployeeNotes",
ReadOnlyHint: mcp.ToBoolPtr(true),
IdempotentHint: mcp.ToBoolPtr(true),
OpenWorldHint: mcp.ToBoolPtr(true),
},
})
}, 10*time.Second, 100*time.Millisecond)

// update mcp operation content
require.NoError(t, os.WriteFile(mcpOperationFile, []byte("\nquery getEmployeeNotesUpdatedTitle($id: Int!) {\nemployee(id: $id) {\nid\nnotes\n}\n}"), 0o600))

require.EventuallyWithT(t, func(t *assert.CollectT) {

toolsRequest := mcp.ListToolsRequest{}
resp, err := xEnv.MCPClient.ListTools(xEnv.Context, toolsRequest)
assert.NoError(t, err)

// verity getEmployeeNotesUpdatedTitle operation is present
require.Contains(t, resp.Tools, mcp.Tool{
Name: "execute_operation_get_employee_notes_updated_title",
Description: "Executes the GraphQL operation 'getEmployeeNotesUpdatedTitle' of type query.",
InputSchema: mcp.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{"id": map[string]interface{}{"type": "integer"}},
Required: []string{"id"},
},
RawInputSchema: json.RawMessage(nil),
Annotations: mcp.ToolAnnotation{
Title: "Execute operation getEmployeeNotesUpdatedTitle",
ReadOnlyHint: mcp.ToBoolPtr(true),
IdempotentHint: mcp.ToBoolPtr(true),
OpenWorldHint: mcp.ToBoolPtr(true),
},
})
}, 10*time.Second, 100*time.Millisecond)
})
})
}

func TestShutDownMCPGoRoutineLeaks(t *testing.T) {

defer goleak.VerifyNone(t,
goleak.IgnoreTopFunction("github.com/hashicorp/consul/sdk/freeport.checkFreedPorts"), // Freeport, spawned by init
goleak.IgnoreAnyFunction("net/http.(*conn).serve"), // HTTPTest server I can't close if I want to keep the problematic goroutine open for the test
)

operationsDir := t.TempDir()
storageProviderID := "mcp_hot_reload_test_id"

xEnv, err := testenv.CreateTestEnv(t, &testenv.Config{
MCP: config.MCPConfiguration{
Enabled: true,
Storage: config.MCPStorageConfig{
ProviderID: storageProviderID,
},
HotReloadConfig: config.MCPOperationsHotReloadConfig{
Enabled: true,
Interval: 1 * time.Second,
},
},
RouterOptions: []core.Option{
core.WithStorageProviders(config.StorageProviders{
FileSystem: []config.FileSystemStorageProvider{
{
ID: storageProviderID,
Path: operationsDir,
},
},
}),
},
})

require.NoError(t, err)

mcpOperationFile := filepath.Join(operationsDir, "main.graphql")
// write mcp operation content
require.NoError(t, os.WriteFile(mcpOperationFile, []byte("query getEmployeeNotes($id: Int!) {\nemployee(id: $id) {\nid\nnotes\n}\n}"), 0o600))

// Verify GoRoutines are properly setup for Hot Reloading
require.EventuallyWithT(t, func(t *assert.CollectT) {

toolsRequest := mcp.ListToolsRequest{}
resp, err := xEnv.MCPClient.ListTools(xEnv.Context, toolsRequest)
assert.NoError(t, err)

require.Contains(t, resp.Tools, mcp.Tool{
Name: "execute_operation_get_employee_notes",
Description: "Executes the GraphQL operation 'getEmployeeNotes' of type query.",
InputSchema: mcp.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{"id": map[string]interface{}{"type": "integer"}},
Required: []string{"id"},
},
RawInputSchema: json.RawMessage(nil),
Annotations: mcp.ToolAnnotation{
Title: "Execute operation getEmployeeNotes",
ReadOnlyHint: mcp.ToBoolPtr(true),
IdempotentHint: mcp.ToBoolPtr(true),
OpenWorldHint: mcp.ToBoolPtr(true),
},
})
}, 10*time.Second, 100*time.Millisecond)

xEnv.Shutdown()

toolsRequest := mcp.ListToolsRequest{}
resp, err := xEnv.MCPClient.ListTools(xEnv.Context, toolsRequest)
if assert.Error(t, err) {
require.ErrorIs(t, err, testenv.ErrEnvironmentClosed)
}
require.Nil(t, resp)

}
20 changes: 11 additions & 9 deletions router-tests/testenv/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -1431,17 +1431,19 @@ func configureRouter(listenerAddr string, testConfig *Config, routerConfig *node
}

if testConfig.MCP.Enabled {
// Add Storage provider
routerOpts = append(routerOpts, core.WithStorageProviders(config.StorageProviders{
FileSystem: []config.FileSystemStorageProvider{
{
ID: "test",
Path: "testdata/mcp_operations",
if testConfig.MCP.Storage.ProviderID == "" {
// Add Storage provider
routerOpts = append(routerOpts, core.WithStorageProviders(config.StorageProviders{
FileSystem: []config.FileSystemStorageProvider{
{
ID: "test",
Path: "testdata/mcp_operations",
},
},
},
}))
}))

testConfig.MCP.Storage.ProviderID = "test"
testConfig.MCP.Storage.ProviderID = "test"
}

routerOpts = append(routerOpts, core.WithMCP(testConfig.MCP))
}
Expand Down
15 changes: 14 additions & 1 deletion router/core/graph_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,9 +1077,22 @@ func (s *graphServer) buildGraphMux(ctx context.Context,

// We support the MCP only on the base graph. Feature flags are not supported yet.
if featureFlagName == "" && s.mcpServer != nil {
if mErr := s.mcpServer.Reload(executor.ClientSchema); mErr != nil {
if mErr := s.mcpServer.Reload(ctx, executor.ClientSchema); mErr != nil {
return nil, fmt.Errorf("failed to reload MCP server: %w", mErr)
}
go func() {
for {
select {
case <-s.mcpServer.ReloadOperationsChannel():
s.logger.Log(zap.InfoLevel, "Reloading mcp server!")
if mErr := s.mcpServer.Reload(ctx, executor.ClientSchema); mErr != nil {
return
}
Comment on lines +1221 to +1223
Copy link

@coderabbitai coderabbitai bot Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle reload errors without terminating the goroutine

If Reload returns an error, the goroutine exits and no further hot reloads will be processed. This could leave the system unable to recover from transient reload failures.

Consider logging the error and continuing to listen for reload signals:

 case <-s.mcpServer.ReloadOperationsChannel():
     s.logger.Log(zap.InfoLevel, "Reloading mcp server!")
     if mErr := s.mcpServer.Reload(ctx, executor.ClientSchema); mErr != nil {
-        return
+        s.logger.Error("Failed to reload MCP server", zap.Error(mErr))
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if mErr := s.mcpServer.Reload(ctx, executor.ClientSchema); mErr != nil {
return
}
case <-s.mcpServer.ReloadOperationsChannel():
s.logger.Log(zap.InfoLevel, "Reloading mcp server!")
if mErr := s.mcpServer.Reload(ctx, executor.ClientSchema); mErr != nil {
s.logger.Error("Failed to reload MCP server", zap.Error(mErr))
}
🤖 Prompt for AI Agents
In router/core/graph_server.go around lines 1088 to 1090, the current code
returns immediately if s.mcpServer.Reload returns an error, causing the
goroutine to exit and stop processing further reloads. Modify the code to log
the error instead of returning, allowing the goroutine to continue running and
handle subsequent reload signals without termination.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @endigma, Please let me know if it is the expected behaviour to further retry mcpServer reload operations even after reload failure.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

case <-ctx.Done():
return
}
}
}()
}

if s.Config.cacheWarmup != nil && s.Config.cacheWarmup.Enabled {
Expand Down
1 change: 1 addition & 0 deletions router/core/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,7 @@ func (r *Router) bootstrap(ctx context.Context) error {
mcpserver.WithExcludeMutations(r.mcp.ExcludeMutations),
mcpserver.WithEnableArbitraryOperations(r.mcp.EnableArbitraryOperations),
mcpserver.WithExposeSchema(r.mcp.ExposeSchema),
mcpserver.WithHotReload(r.mcp.HotReloadConfig.Enabled, r.mcp.HotReloadConfig.Interval),
}

// Determine the router GraphQL endpoint
Expand Down
22 changes: 14 additions & 8 deletions router/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,14 +905,15 @@ type CacheWarmupConfiguration struct {
}

type MCPConfiguration struct {
Enabled bool `yaml:"enabled" envDefault:"false" env:"MCP_ENABLED"`
Server MCPServer `yaml:"server,omitempty"`
Storage MCPStorageConfig `yaml:"storage,omitempty"`
GraphName string `yaml:"graph_name" envDefault:"mygraph" env:"MCP_GRAPH_NAME"`
ExcludeMutations bool `yaml:"exclude_mutations" envDefault:"false" env:"MCP_EXCLUDE_MUTATIONS"`
EnableArbitraryOperations bool `yaml:"enable_arbitrary_operations" envDefault:"false" env:"MCP_ENABLE_ARBITRARY_OPERATIONS"`
ExposeSchema bool `yaml:"expose_schema" envDefault:"false" env:"MCP_EXPOSE_SCHEMA"`
RouterURL string `yaml:"router_url,omitempty" env:"MCP_ROUTER_URL"`
Enabled bool `yaml:"enabled" envDefault:"false" env:"MCP_ENABLED"`
Server MCPServer `yaml:"server,omitempty"`
Storage MCPStorageConfig `yaml:"storage,omitempty"`
GraphName string `yaml:"graph_name" envDefault:"mygraph" env:"MCP_GRAPH_NAME"`
ExcludeMutations bool `yaml:"exclude_mutations" envDefault:"false" env:"MCP_EXCLUDE_MUTATIONS"`
EnableArbitraryOperations bool `yaml:"enable_arbitrary_operations" envDefault:"false" env:"MCP_ENABLE_ARBITRARY_OPERATIONS"`
ExposeSchema bool `yaml:"expose_schema" envDefault:"false" env:"MCP_EXPOSE_SCHEMA"`
RouterURL string `yaml:"router_url,omitempty" env:"MCP_ROUTER_URL"`
HotReloadConfig MCPOperationsHotReloadConfig `yaml:"hot_reload_config,omitempty" envPrefix:"MCP_OPERATIONS_HOT_RELOAD_"`
}

type MCPStorageConfig struct {
Expand All @@ -924,6 +925,11 @@ type MCPServer struct {
BaseURL string `yaml:"base_url,omitempty" env:"MCP_SERVER_BASE_URL"`
}

type MCPOperationsHotReloadConfig struct {
Enabled bool `yaml:"enabled" envDefault:"false" env:"ENABLED"`
Interval time.Duration `yaml:"interval" envDefault:"10s" env:"INTERVAL"`
}

type PluginsConfiguration struct {
Enabled bool `yaml:"enabled" envDefault:"false" env:"ENABLED"`
Path string `yaml:"path" envDefault:"plugins" env:"PATH"`
Expand Down
Loading