Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 282 additions & 0 deletions router-tests/mcp_hot_reload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
package integration

import (
"encoding/json"
"os"
"testing"
"time"

"github.com/mark3labs/mcp-go/mcp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/wundergraph/cosmo/router-tests/testenv"
"github.com/wundergraph/cosmo/router/core"
"github.com/wundergraph/cosmo/router/pkg/config"
"go.uber.org/goleak"
)

func TestMCPOperationHotReload(t *testing.T) {
t.Parallel()

t.Run("List Updated User Operations On Addition and Removal", func(t *testing.T) {

operationsDir := t.TempDir()
storageProviderId := "mcp_hot_reload_test_id"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

storageProviderId should be ID and it seems the same across all subtests so can we move it out to test scope?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can.


testenv.Run(t, &testenv.Config{
MCP: config.MCPConfiguration{
Enabled: true,
Storage: config.MCPStorageConfig{
ProviderID: storageProviderId,
},
HotReloadConfig: config.MCPOperationsHotReloadConfig{
Enabled: true,
Interval: 5 * time.Second,
},
},
RouterOptions: []core.Option{
core.WithStorageProviders(config.StorageProviders{
FileSystem: []config.FileSystemStorageProvider{
{
ID: storageProviderId,
Path: operationsDir,
},
},
}),
},
}, func(t *testing.T, xEnv *testenv.Environment) {

toolsRequest := mcp.ListToolsRequest{}
resp, err := xEnv.MCPClient.ListTools(xEnv.Context, toolsRequest)
require.NoError(t, err)

initialToolsCount := len(resp.Tools)

filePath := operationsDir + "/main.graphql"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using filepath.Join is best for this sort of thing, and can it be given a more descriptive name? This applies to all subtests.


// write mcp operation content
err = os.WriteFile(filePath, []byte("query getEmployeeNotes($id: Int!) {\nemployee(id: $id) {\nid\nnotes\n}\n}"), 0644)
assert.NoError(t, err)

require.EventuallyWithT(t, func(t *assert.CollectT) {

resp, err := xEnv.MCPClient.ListTools(xEnv.Context, toolsRequest)
assert.NoError(t, err)
assert.Len(t, resp.Tools, initialToolsCount+1)

// verity getEmployeeNotes operation is present
require.Contains(t, resp.Tools, mcp.Tool{
Name: "execute_operation_get_employee_notes",
Description: "Executes the GraphQL operation 'getEmployeeNotes' of type query.",
InputSchema: mcp.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{"id": map[string]interface{}{"type": "integer"}},
Required: []string{"id"},
},
RawInputSchema: json.RawMessage(nil),
Annotations: mcp.ToolAnnotation{
Title: "Execute operation getEmployeeNotes",
ReadOnlyHint: mcp.ToBoolPtr(true),
IdempotentHint: mcp.ToBoolPtr(true),
OpenWorldHint: mcp.ToBoolPtr(true),
},
})
}, 15*time.Second, 250*time.Millisecond)

err = os.Remove(filePath)
assert.NoError(t, err)

assert.EventuallyWithT(t, func(t *assert.CollectT) {

resp, err = xEnv.MCPClient.ListTools(xEnv.Context, toolsRequest)
assert.NoError(t, err)
assert.Len(t, resp.Tools, initialToolsCount)

// verity getEmployeeNotes operation tool is properly removed
require.NotContains(t, resp.Tools, mcp.Tool{
Name: "execute_operation_get_employee_notes",
Description: "Executes the GraphQL operation 'getEmployeeNotes' of type query.",
InputSchema: mcp.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{"id": map[string]interface{}{"type": "integer"}},
Required: []string{"id"},
},
RawInputSchema: json.RawMessage(nil),
Annotations: mcp.ToolAnnotation{
Title: "Execute operation getEmployeeNotes",
ReadOnlyHint: mcp.ToBoolPtr(true),
IdempotentHint: mcp.ToBoolPtr(true),
OpenWorldHint: mcp.ToBoolPtr(true),
},
})

}, 15*time.Second, 250*time.Millisecond)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using assert.Eventually... functions, we recently introduced an option to the watcher to directly provide a non-timing based tick source. You can check watcher_test.go for how to use it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll check and update the test.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of mcp hot reload, we don't have direct access to the watcher config options, the watcher is defined in the loader.go file. So, in this particular scenario it is not possible to use non-timing based tick source.


})
})

t.Run("List Updated User Operations On Content Update", func(t *testing.T) {
operationsDir := t.TempDir()
storageProviderId := "mcp_hot_reload_test_id"

testenv.Run(t, &testenv.Config{
MCP: config.MCPConfiguration{
Enabled: true,
Storage: config.MCPStorageConfig{
ProviderID: storageProviderId,
},
HotReloadConfig: config.MCPOperationsHotReloadConfig{
Enabled: true,
Interval: 5 * time.Second,
},
},
RouterOptions: []core.Option{
core.WithStorageProviders(config.StorageProviders{
FileSystem: []config.FileSystemStorageProvider{
{
ID: storageProviderId,
Path: operationsDir,
},
},
}),
},
}, func(t *testing.T, xEnv *testenv.Environment) {

filePath := operationsDir + "/main.graphql"

// write mcp operation content
err := os.WriteFile(filePath, []byte("query getEmployeeNotes($id: Int!) {\nemployee(id: $id) {\nid\nnotes\n}\n}"), 0644)
assert.NoError(t, err)

require.EventuallyWithT(t, func(t *assert.CollectT) {

toolsRequest := mcp.ListToolsRequest{}
resp, err := xEnv.MCPClient.ListTools(xEnv.Context, toolsRequest)
assert.NoError(t, err)

// verity getEmployeeNotes operation is present
require.Contains(t, resp.Tools, mcp.Tool{
Name: "execute_operation_get_employee_notes",
Description: "Executes the GraphQL operation 'getEmployeeNotes' of type query.",
InputSchema: mcp.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{"id": map[string]interface{}{"type": "integer"}},
Required: []string{"id"},
},
RawInputSchema: json.RawMessage(nil),
Annotations: mcp.ToolAnnotation{
Title: "Execute operation getEmployeeNotes",
ReadOnlyHint: mcp.ToBoolPtr(true),
IdempotentHint: mcp.ToBoolPtr(true),
OpenWorldHint: mcp.ToBoolPtr(true),
},
})
}, 15*time.Second, 250*time.Millisecond)

// update mcp operation content
err = os.WriteFile(filePath, []byte("\nquery getEmployeeNotesUpdatedTitle($id: Int!) {\nemployee(id: $id) {\nid\nnotes\n}\n}"), 0644)
assert.NoError(t, err)

require.EventuallyWithT(t, func(t *assert.CollectT) {

toolsRequest := mcp.ListToolsRequest{}
resp, err := xEnv.MCPClient.ListTools(xEnv.Context, toolsRequest)
assert.NoError(t, err)

// verity getEmployeeNotesUpdatedTitle operation is present
require.Contains(t, resp.Tools, mcp.Tool{
Name: "execute_operation_get_employee_notes_updated_title",
Description: "Executes the GraphQL operation 'getEmployeeNotesUpdatedTitle' of type query.",
InputSchema: mcp.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{"id": map[string]interface{}{"type": "integer"}},
Required: []string{"id"},
},
RawInputSchema: json.RawMessage(nil),
Annotations: mcp.ToolAnnotation{
Title: "Execute operation getEmployeeNotesUpdatedTitle",
ReadOnlyHint: mcp.ToBoolPtr(true),
IdempotentHint: mcp.ToBoolPtr(true),
OpenWorldHint: mcp.ToBoolPtr(true),
},
})
}, 15*time.Second, 250*time.Millisecond)
})
})
}

func TestShutDownMCPGoRoutineLeaks(t *testing.T) {

defer goleak.VerifyNone(t,
goleak.IgnoreTopFunction("github.com/hashicorp/consul/sdk/freeport.checkFreedPorts"), // Freeport, spawned by init
goleak.IgnoreAnyFunction("net/http.(*conn).serve"), // HTTPTest server I can't close if I want to keep the problematic goroutine open for the test
)

operationsDir := t.TempDir()
storageProviderId := "mcp_hot_reload_test_id"

xEnv, err := testenv.CreateTestEnv(t, &testenv.Config{
MCP: config.MCPConfiguration{
Enabled: true,
Storage: config.MCPStorageConfig{
ProviderID: storageProviderId,
},
HotReloadConfig: config.MCPOperationsHotReloadConfig{
Enabled: true,
Interval: 5 * time.Second,
},
},
RouterOptions: []core.Option{
core.WithStorageProviders(config.StorageProviders{
FileSystem: []config.FileSystemStorageProvider{
{
ID: storageProviderId,
Path: operationsDir,
},
},
}),
},
})

require.NoError(t, err)

filePath := operationsDir + "/main.graphql"
// write mcp operation content
err = os.WriteFile(filePath, []byte("query getEmployeeNotes($id: Int!) {\nemployee(id: $id) {\nid\nnotes\n}\n}"), 0644)
assert.NoError(t, err)

// Verify GoRoutines are properly setup for Hot Reloading
require.EventuallyWithT(t, func(t *assert.CollectT) {

toolsRequest := mcp.ListToolsRequest{}
resp, err := xEnv.MCPClient.ListTools(xEnv.Context, toolsRequest)
assert.NoError(t, err)

require.Contains(t, resp.Tools, mcp.Tool{
Name: "execute_operation_get_employee_notes",
Description: "Executes the GraphQL operation 'getEmployeeNotes' of type query.",
InputSchema: mcp.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{"id": map[string]interface{}{"type": "integer"}},
Required: []string{"id"},
},
RawInputSchema: json.RawMessage(nil),
Annotations: mcp.ToolAnnotation{
Title: "Execute operation getEmployeeNotes",
ReadOnlyHint: mcp.ToBoolPtr(true),
IdempotentHint: mcp.ToBoolPtr(true),
OpenWorldHint: mcp.ToBoolPtr(true),
},
})
}, 15*time.Second, 250*time.Millisecond)

xEnv.Shutdown()

toolsRequest := mcp.ListToolsRequest{}
resp, err := xEnv.MCPClient.ListTools(xEnv.Context, toolsRequest)
if assert.Error(t, err) {
require.ErrorIs(t, err, testenv.ErrEnvironmentClosed)
}
require.Nil(t, resp)

}
20 changes: 11 additions & 9 deletions router-tests/testenv/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -1341,17 +1341,19 @@ func configureRouter(listenerAddr string, testConfig *Config, routerConfig *node
}

if testConfig.MCP.Enabled {
// Add Storage provider
routerOpts = append(routerOpts, core.WithStorageProviders(config.StorageProviders{
FileSystem: []config.FileSystemStorageProvider{
{
ID: "test",
Path: "testdata/mcp_operations",
if testConfig.MCP.Storage.ProviderID == "" {
// Add Storage provider
routerOpts = append(routerOpts, core.WithStorageProviders(config.StorageProviders{
FileSystem: []config.FileSystemStorageProvider{
{
ID: "test",
Path: "testdata/mcp_operations",
},
},
},
}))
}))

testConfig.MCP.Storage.ProviderID = "test"
testConfig.MCP.Storage.ProviderID = "test"
}

routerOpts = append(routerOpts, core.WithMCP(testConfig.MCP))
}
Expand Down
15 changes: 14 additions & 1 deletion router/core/graph_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,9 +1079,22 @@ func (s *graphServer) buildGraphMux(ctx context.Context,

// We support the MCP only on the base graph. Feature flags are not supported yet.
if featureFlagName == "" && s.mcpServer != nil {
if mErr := s.mcpServer.Reload(executor.ClientSchema); mErr != nil {
if mErr := s.mcpServer.Reload(ctx, executor.ClientSchema); mErr != nil {
return nil, fmt.Errorf("failed to reload MCP server: %w", mErr)
}
go func() {
for {
select {
case <-s.mcpServer.ReloadOperationsChannel():
s.logger.Log(zap.InfoLevel, "Reloading mcp server!")
if mErr := s.mcpServer.Reload(ctx, executor.ClientSchema); mErr != nil {
return
}
Comment on lines +1221 to +1223
Copy link

@coderabbitai coderabbitai bot Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle reload errors without terminating the goroutine

If Reload returns an error, the goroutine exits and no further hot reloads will be processed. This could leave the system unable to recover from transient reload failures.

Consider logging the error and continuing to listen for reload signals:

 case <-s.mcpServer.ReloadOperationsChannel():
     s.logger.Log(zap.InfoLevel, "Reloading mcp server!")
     if mErr := s.mcpServer.Reload(ctx, executor.ClientSchema); mErr != nil {
-        return
+        s.logger.Error("Failed to reload MCP server", zap.Error(mErr))
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if mErr := s.mcpServer.Reload(ctx, executor.ClientSchema); mErr != nil {
return
}
case <-s.mcpServer.ReloadOperationsChannel():
s.logger.Log(zap.InfoLevel, "Reloading mcp server!")
if mErr := s.mcpServer.Reload(ctx, executor.ClientSchema); mErr != nil {
s.logger.Error("Failed to reload MCP server", zap.Error(mErr))
}
🤖 Prompt for AI Agents
In router/core/graph_server.go around lines 1088 to 1090, the current code
returns immediately if s.mcpServer.Reload returns an error, causing the
goroutine to exit and stop processing further reloads. Modify the code to log
the error instead of returning, allowing the goroutine to continue running and
handle subsequent reload signals without termination.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @endigma, Please let me know if it is the expected behaviour to further retry mcpServer reload operations even after reload failure.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

case <-ctx.Done():
return
}
}
}()
}

if s.Config.cacheWarmup != nil && s.Config.cacheWarmup.Enabled {
Expand Down
1 change: 1 addition & 0 deletions router/core/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,7 @@ func (r *Router) bootstrap(ctx context.Context) error {
mcpserver.WithExcludeMutations(r.mcp.ExcludeMutations),
mcpserver.WithEnableArbitraryOperations(r.mcp.EnableArbitraryOperations),
mcpserver.WithExposeSchema(r.mcp.ExposeSchema),
mcpserver.WithHotReload(r.mcp.HotReloadConfig.Enabled, r.mcp.HotReloadConfig.Interval),
}

// Determine the router GraphQL endpoint
Expand Down
22 changes: 14 additions & 8 deletions router/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,14 +898,15 @@ type CacheWarmupConfiguration struct {
}

type MCPConfiguration struct {
Enabled bool `yaml:"enabled" envDefault:"false" env:"MCP_ENABLED"`
Server MCPServer `yaml:"server,omitempty"`
Storage MCPStorageConfig `yaml:"storage,omitempty"`
GraphName string `yaml:"graph_name" envDefault:"mygraph" env:"MCP_GRAPH_NAME"`
ExcludeMutations bool `yaml:"exclude_mutations" envDefault:"false" env:"MCP_EXCLUDE_MUTATIONS"`
EnableArbitraryOperations bool `yaml:"enable_arbitrary_operations" envDefault:"false" env:"MCP_ENABLE_ARBITRARY_OPERATIONS"`
ExposeSchema bool `yaml:"expose_schema" envDefault:"false" env:"MCP_EXPOSE_SCHEMA"`
RouterURL string `yaml:"router_url,omitempty" env:"MCP_ROUTER_URL"`
Enabled bool `yaml:"enabled" envDefault:"false" env:"MCP_ENABLED"`
Server MCPServer `yaml:"server,omitempty"`
Storage MCPStorageConfig `yaml:"storage,omitempty"`
GraphName string `yaml:"graph_name" envDefault:"mygraph" env:"MCP_GRAPH_NAME"`
ExcludeMutations bool `yaml:"exclude_mutations" envDefault:"false" env:"MCP_EXCLUDE_MUTATIONS"`
EnableArbitraryOperations bool `yaml:"enable_arbitrary_operations" envDefault:"false" env:"MCP_ENABLE_ARBITRARY_OPERATIONS"`
ExposeSchema bool `yaml:"expose_schema" envDefault:"false" env:"MCP_EXPOSE_SCHEMA"`
RouterURL string `yaml:"router_url,omitempty" env:"MCP_ROUTER_URL"`
HotReloadConfig MCPOperationsHotReloadConfig `yaml:"hot_reload_config,omitempty" envPrefix:"MCP_OPERATIONS_HOT_RELOAD_"`
}

type MCPStorageConfig struct {
Expand All @@ -917,6 +918,11 @@ type MCPServer struct {
BaseURL string `yaml:"base_url,omitempty" env:"MCP_SERVER_BASE_URL"`
}

type MCPOperationsHotReloadConfig struct {
Enabled bool `yaml:"enabled" envDefault:"false" env:"ENABLED"`
Interval time.Duration `yaml:"interval" envDefault:"10s" env:"INTERVAL"`
}

type PluginsConfiguration struct {
Enabled bool `yaml:"enabled" envDefault:"false" env:"ENABLED"`
Path string `yaml:"path" envDefault:"plugins" env:"PATH"`
Expand Down
Loading