Skip to content

Commit 9baa40f

Browse files
committed
feat: add client Events() method and anti-fragile STDIO transport - Add Events() method to Client interface and implementation - Add comprehensive test for Events() method access - Implement anti-fragile JSON-RPC filtering in STDIO transport - Add isValidJSONRPC() validation to filter out log messages and noise - Add comprehensive test coverage for filtering behavior - Enhance events_integration example with client event demonstration - Ensure robust communication in noisy environments
1 parent 7e29937 commit 9baa40f

File tree

5 files changed

+448
-20
lines changed

5 files changed

+448
-20
lines changed

client/client.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,19 @@ type Client interface {
227227
// GetSamplingHandler returns the currently registered sampling handler.
228228
GetSamplingHandler() SamplingHandler
229229

230+
// Events returns the events subject for subscribing to client events.
231+
//
232+
// This provides access to the event system for monitoring client lifecycle,
233+
// errors, and other significant events. Subscribers can listen for specific
234+
// event types such as initialization, connection changes, and errors.
235+
//
236+
// Example:
237+
// events := client.Events()
238+
// events.Subscribe(events.TopicClientError, func(event events.ClientErrorEvent) {
239+
// log.Printf("Client error: %s", event.Error)
240+
// })
241+
Events() *events.Subject
242+
230243
// RequestSampling initiates a sampling request to the server.
231244
//
232245
// This is the unified method for all sampling operations, supporting both
@@ -676,3 +689,8 @@ func (c *clientImpl) sendBatchRequestWithTimeout(requests []map[string]interface
676689

677690
return result, nil
678691
}
692+
693+
// Events returns the events subject for subscribing to client events.
694+
func (c *clientImpl) Events() *events.Subject {
695+
return c.events
696+
}

client/test/v20250326/client_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,3 +262,14 @@ func TestRoots_v20250326(t *testing.T) {
262262
t.Errorf("Remove root params not as expected: %v", removeParams)
263263
}
264264
}
265+
266+
// TestClientEvents_v20250326 tests that the Events() method returns the events subject
267+
func TestClientEvents_v20250326(t *testing.T) {
268+
c, _ := setupTest(t)
269+
270+
// Test that Events() method returns a non-nil events subject
271+
events := c.Events()
272+
if events == nil {
273+
t.Fatal("Expected Events() to return non-nil events subject")
274+
}
275+
}

examples/events_integration/main.go

Lines changed: 123 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"log/slog"
77
"time"
88

9+
"github.com/localrivet/gomcp/client"
910
"github.com/localrivet/gomcp/events"
1011
"github.com/localrivet/gomcp/server"
1112
)
@@ -102,6 +103,31 @@ type RequestFailedEvent struct {
102103
Metadata map[string]any `json:"metadata,omitempty"`
103104
}
104105

106+
// Client-side event types
107+
type ClientInitializedEvent struct {
108+
ClientName string `json:"clientName"`
109+
ServerURL string `json:"serverUrl"`
110+
ProtocolVersion string `json:"protocolVersion"`
111+
InitializedAt time.Time `json:"initializedAt"`
112+
Capabilities map[string]any `json:"capabilities"`
113+
Metadata map[string]any `json:"metadata,omitempty"`
114+
}
115+
116+
type ClientConnectionEvent struct {
117+
Status string `json:"status"`
118+
ServerURL string `json:"serverUrl"`
119+
Timestamp time.Time `json:"timestamp"`
120+
Error string `json:"error,omitempty"`
121+
Metadata map[string]any `json:"metadata,omitempty"`
122+
}
123+
124+
type ClientErrorEvent struct {
125+
Operation string `json:"operation"`
126+
Error string `json:"error"`
127+
OccurredAt time.Time `json:"occurredAt"`
128+
Metadata map[string]any `json:"metadata,omitempty"`
129+
}
130+
105131
func main() {
106132
// Create a server with events
107133
srv := server.NewServer("events-demo")
@@ -130,18 +156,28 @@ func main() {
130156
logger.Info(" 💭 prompt execution", "topic", events.TopicPromptExecuted)
131157
logger.Info(" ❌ request failures", "topic", events.TopicRequestFailed)
132158
logger.Info("")
133-
logger.Info("🧪 Testing Event System:")
159+
logger.Info("🧪 Testing Server Event System:")
134160

135-
// Demonstrate event publishing with some test events
161+
// Demonstrate server event publishing with some test events
136162
demonstrateEvents(srv, logger)
137163

164+
logger.Info("")
165+
logger.Info("🧪 Testing Client Event System:")
166+
167+
// Demonstrate client events
168+
demonstrateClientEvents(logger)
169+
138170
logger.Info("")
139171
logger.Info("✅ Events integration example complete!")
140172
logger.Info("💡 In a real MCP server, these events fire automatically when:")
141173
logger.Info(" - Server starts and clients connect")
142174
logger.Info(" - Tools and resources are registered")
143175
logger.Info(" - Clients execute tools, access resources, or use prompts")
144176
logger.Info(" - Errors occur during request processing")
177+
logger.Info("💡 In a real MCP client, events fire when:")
178+
logger.Info(" - Client initializes and connects to servers")
179+
logger.Info(" - Connection status changes")
180+
logger.Info(" - Errors occur during client operations")
145181
}
146182

147183
func setupEventSubscriptions(srv server.Server, logger *slog.Logger) {
@@ -422,3 +458,88 @@ func demonstrateEvents(srv server.Server, logger *slog.Logger) {
422458
// Give events time to process
423459
time.Sleep(50 * time.Millisecond)
424460
}
461+
462+
func demonstrateClientEvents(logger *slog.Logger) {
463+
logger.Info("📢 Demonstrating client events system...")
464+
465+
// Create a simple client to demonstrate client-side events
466+
c, err := client.NewClient("test-client", client.WithStdio())
467+
if err != nil {
468+
logger.Info("Failed to create client", "error", err)
469+
return
470+
}
471+
472+
// Get the client's events subject
473+
clientEvents := c.Events()
474+
475+
// Subscribe to client events (if the topics exist, otherwise demonstrate with custom topics)
476+
events.Subscribe[ClientInitializedEvent](clientEvents, "client.initialized",
477+
func(ctx context.Context, evt ClientInitializedEvent) error {
478+
logger.Info("🚀 Client initialized",
479+
"clientName", evt.ClientName,
480+
"serverURL", evt.ServerURL,
481+
"protocolVersion", evt.ProtocolVersion)
482+
return nil
483+
})
484+
485+
events.Subscribe[ClientConnectionEvent](clientEvents, "client.connection",
486+
func(ctx context.Context, evt ClientConnectionEvent) error {
487+
logger.Info("🔌 Client connection status",
488+
"status", evt.Status,
489+
"serverURL", evt.ServerURL)
490+
return nil
491+
})
492+
493+
events.Subscribe[ClientErrorEvent](clientEvents, "client.error",
494+
func(ctx context.Context, evt ClientErrorEvent) error {
495+
logger.Info("❌ Client error",
496+
"operation", evt.Operation,
497+
"error", evt.Error)
498+
return nil
499+
})
500+
501+
// Publish test client events
502+
testClientEvent := ClientInitializedEvent{
503+
ClientName: "test-client",
504+
ServerURL: "stdio://server",
505+
ProtocolVersion: "2025-03-26",
506+
InitializedAt: time.Now(),
507+
Capabilities: map[string]any{"supports": []string{"greet", "calculate"}},
508+
Metadata: make(map[string]any),
509+
}
510+
511+
if err := events.Publish[ClientInitializedEvent](clientEvents, "client.initialized", testClientEvent); err != nil {
512+
logger.Info("Failed to publish client initialized event", "error", err)
513+
}
514+
515+
time.Sleep(10 * time.Millisecond)
516+
517+
testClientConnectionEvent := ClientConnectionEvent{
518+
Status: "connected",
519+
ServerURL: "stdio://server",
520+
Timestamp: time.Now(),
521+
Metadata: make(map[string]any),
522+
}
523+
524+
if err := events.Publish[ClientConnectionEvent](clientEvents, "client.connection", testClientConnectionEvent); err != nil {
525+
logger.Info("Failed to publish client connection event", "error", err)
526+
}
527+
528+
time.Sleep(10 * time.Millisecond)
529+
530+
testClientErrorEvent := ClientErrorEvent{
531+
Operation: "tool_call",
532+
Error: "Demonstration client error",
533+
OccurredAt: time.Now(),
534+
Metadata: make(map[string]any),
535+
}
536+
537+
if err := events.Publish[ClientErrorEvent](clientEvents, "client.error", testClientErrorEvent); err != nil {
538+
logger.Info("Failed to publish client error event", "error", err)
539+
}
540+
541+
// Give events time to process
542+
time.Sleep(50 * time.Millisecond)
543+
544+
logger.Info("✅ Client events demonstration complete")
545+
}

transport/stdio/stdio.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package stdio
66

77
import (
88
"bufio"
9+
"encoding/json"
910
"errors"
1011
"io"
1112
"os"
@@ -15,6 +16,48 @@ import (
1516
"github.com/localrivet/gomcp/transport"
1617
)
1718

19+
// isValidJSONRPC checks if a message appears to be a valid JSON-RPC message.
20+
// This provides anti-fragile behavior by filtering out log messages and other noise.
21+
func isValidJSONRPC(data []byte) bool {
22+
// Quick check: must be valid JSON
23+
var raw map[string]interface{}
24+
if err := json.Unmarshal(data, &raw); err != nil {
25+
return false
26+
}
27+
28+
// Must have jsonrpc field with value "2.0"
29+
jsonrpc, ok := raw["jsonrpc"]
30+
if !ok {
31+
return false
32+
}
33+
if jsonrpcStr, ok := jsonrpc.(string); !ok || jsonrpcStr != "2.0" {
34+
return false
35+
}
36+
37+
// Must be one of: request (has method + id), response (has id + result/error), or notification (has method, no id)
38+
_, hasMethod := raw["method"]
39+
_, hasID := raw["id"]
40+
_, hasResult := raw["result"]
41+
_, hasError := raw["error"]
42+
43+
// Request: method + id
44+
if hasMethod && hasID {
45+
return true
46+
}
47+
48+
// Response: id + (result or error)
49+
if hasID && (hasResult || hasError) {
50+
return true
51+
}
52+
53+
// Notification: method, no id
54+
if hasMethod && !hasID {
55+
return true
56+
}
57+
58+
return false
59+
}
60+
1861
// Transport implements the transport.Transport interface for Standard I/O.
1962
type Transport struct {
2063
transport.BaseTransport
@@ -140,6 +183,19 @@ func (t *Transport) readLoop() {
140183
continue
141184
}
142185

186+
// Anti-fragile filtering: only process valid JSON-RPC messages
187+
if !isValidJSONRPC([]byte(line)) {
188+
// Log filtered message if debug enabled
189+
if debugHandler := t.GetDebugHandler(); debugHandler != nil {
190+
if len(line) > 100 {
191+
debugHandler("stdio transport filtered non-JSON-RPC: " + line[:100] + "...")
192+
} else {
193+
debugHandler("stdio transport filtered non-JSON-RPC: " + line)
194+
}
195+
}
196+
continue
197+
}
198+
143199
// Log received message if debug enabled
144200
if debugHandler := t.GetDebugHandler(); debugHandler != nil {
145201
if len(line) > 100 {

0 commit comments

Comments
 (0)