diff --git a/src/mcp/client/__main__.py b/src/mcp/client/__main__.py index 84e15bd5..2ec68e56 100644 --- a/src/mcp/client/__main__.py +++ b/src/mcp/client/__main__.py @@ -11,8 +11,8 @@ from mcp.client.session import ClientSession from mcp.client.sse import sse_client from mcp.client.stdio import StdioServerParameters, stdio_client +from mcp.shared.message import SessionMessage from mcp.shared.session import RequestResponder -from mcp.types import JSONRPCMessage if not sys.warnoptions: import warnings @@ -36,8 +36,8 @@ async def message_handler( async def run_session( - read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception], - write_stream: MemoryObjectSendStream[JSONRPCMessage], + read_stream: MemoryObjectReceiveStream[SessionMessage | Exception], + write_stream: MemoryObjectSendStream[SessionMessage], client_info: types.Implementation | None = None, ): async with ClientSession( diff --git a/src/mcp/client/session.py b/src/mcp/client/session.py index e29797d1..32fb4cbe 100644 --- a/src/mcp/client/session.py +++ b/src/mcp/client/session.py @@ -7,6 +7,7 @@ import mcp.types as types from mcp.shared.context import RequestContext +from mcp.shared.message import SessionMessage from mcp.shared.session import BaseSession, RequestResponder from mcp.shared.version import SUPPORTED_PROTOCOL_VERSIONS @@ -92,8 +93,8 @@ class ClientSession( ): def __init__( self, - read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception], - write_stream: MemoryObjectSendStream[types.JSONRPCMessage], + read_stream: MemoryObjectReceiveStream[SessionMessage | Exception], + write_stream: MemoryObjectSendStream[SessionMessage], read_timeout_seconds: timedelta | None = None, sampling_callback: SamplingFnT | None = None, list_roots_callback: ListRootsFnT | None = None, diff --git a/src/mcp/client/sse.py b/src/mcp/client/sse.py index 4f6241a7..ff04d2f9 100644 --- a/src/mcp/client/sse.py +++ b/src/mcp/client/sse.py @@ -10,6 +10,7 @@ from httpx_sse import aconnect_sse import mcp.types as types +from mcp.shared.message import SessionMessage logger = logging.getLogger(__name__) @@ -31,11 +32,11 @@ async def sse_client( `sse_read_timeout` determines how long (in seconds) the client will wait for a new event before disconnecting. All other HTTP operations are controlled by `timeout`. """ - read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception] - read_stream_writer: MemoryObjectSendStream[types.JSONRPCMessage | Exception] + read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] + read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] - write_stream: MemoryObjectSendStream[types.JSONRPCMessage] - write_stream_reader: MemoryObjectReceiveStream[types.JSONRPCMessage] + write_stream: MemoryObjectSendStream[SessionMessage] + write_stream_reader: MemoryObjectReceiveStream[SessionMessage] read_stream_writer, read_stream = anyio.create_memory_object_stream(0) write_stream, write_stream_reader = anyio.create_memory_object_stream(0) @@ -97,7 +98,8 @@ async def sse_reader( await read_stream_writer.send(exc) continue - await read_stream_writer.send(message) + session_message = SessionMessage(message) + await read_stream_writer.send(session_message) case _: logger.warning( f"Unknown SSE event: {sse.event}" @@ -111,11 +113,13 @@ async def sse_reader( async def post_writer(endpoint_url: str): try: async with write_stream_reader: - async for message in write_stream_reader: - logger.debug(f"Sending client message: {message}") + async for session_message in write_stream_reader: + logger.debug( + f"Sending client message: {session_message}" + ) response = await client.post( endpoint_url, - json=message.model_dump( + json=session_message.message.model_dump( by_alias=True, mode="json", exclude_none=True, diff --git a/src/mcp/client/stdio/__init__.py b/src/mcp/client/stdio/__init__.py index 83de57a2..e8be5aff 100644 --- a/src/mcp/client/stdio/__init__.py +++ b/src/mcp/client/stdio/__init__.py @@ -11,6 +11,7 @@ from pydantic import BaseModel, Field import mcp.types as types +from mcp.shared.message import SessionMessage from .win32 import ( create_windows_process, @@ -98,11 +99,11 @@ async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stder Client transport for stdio: this will connect to a server by spawning a process and communicating with it over stdin/stdout. """ - read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception] - read_stream_writer: MemoryObjectSendStream[types.JSONRPCMessage | Exception] + read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] + read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] - write_stream: MemoryObjectSendStream[types.JSONRPCMessage] - write_stream_reader: MemoryObjectReceiveStream[types.JSONRPCMessage] + write_stream: MemoryObjectSendStream[SessionMessage] + write_stream_reader: MemoryObjectReceiveStream[SessionMessage] read_stream_writer, read_stream = anyio.create_memory_object_stream(0) write_stream, write_stream_reader = anyio.create_memory_object_stream(0) @@ -143,7 +144,8 @@ async def stdout_reader(): await read_stream_writer.send(exc) continue - await read_stream_writer.send(message) + session_message = SessionMessage(message) + await read_stream_writer.send(session_message) except anyio.ClosedResourceError: await anyio.lowlevel.checkpoint() @@ -152,8 +154,10 @@ async def stdin_writer(): try: async with write_stream_reader: - async for message in write_stream_reader: - json = message.model_dump_json(by_alias=True, exclude_none=True) + async for session_message in write_stream_reader: + json = session_message.message.model_dump_json( + by_alias=True, exclude_none=True + ) await process.stdin.send( (json + "\n").encode( encoding=server.encoding, diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 1e004242..7a8887cd 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -15,6 +15,7 @@ import httpx from httpx_sse import EventSource, aconnect_sse +from mcp.shared.message import SessionMessage from mcp.types import ( ErrorData, JSONRPCError, @@ -52,10 +53,10 @@ async def streamablehttp_client( """ read_stream_writer, read_stream = anyio.create_memory_object_stream[ - JSONRPCMessage | Exception + SessionMessage | Exception ](0) write_stream, write_stream_reader = anyio.create_memory_object_stream[ - JSONRPCMessage + SessionMessage ](0) async def get_stream(): @@ -86,7 +87,8 @@ async def get_stream(): try: message = JSONRPCMessage.model_validate_json(sse.data) logger.debug(f"GET message: {message}") - await read_stream_writer.send(message) + session_message = SessionMessage(message) + await read_stream_writer.send(session_message) except Exception as exc: logger.error(f"Error parsing GET message: {exc}") await read_stream_writer.send(exc) @@ -100,7 +102,8 @@ async def post_writer(client: httpx.AsyncClient): nonlocal session_id try: async with write_stream_reader: - async for message in write_stream_reader: + async for session_message in write_stream_reader: + message = session_message.message # Add session ID to headers if we have one post_headers = request_headers.copy() if session_id: @@ -141,9 +144,10 @@ async def post_writer(client: httpx.AsyncClient): message="Session terminated", ), ) - await read_stream_writer.send( + session_message = SessionMessage( JSONRPCMessage(jsonrpc_error) ) + await read_stream_writer.send(session_message) continue response.raise_for_status() @@ -163,7 +167,8 @@ async def post_writer(client: httpx.AsyncClient): json_message = JSONRPCMessage.model_validate_json( content ) - await read_stream_writer.send(json_message) + session_message = SessionMessage(json_message) + await read_stream_writer.send(session_message) except Exception as exc: logger.error(f"Error parsing JSON response: {exc}") await read_stream_writer.send(exc) @@ -175,11 +180,15 @@ async def post_writer(client: httpx.AsyncClient): async for sse in event_source.aiter_sse(): if sse.event == "message": try: - await read_stream_writer.send( + message = ( JSONRPCMessage.model_validate_json( sse.data ) ) + session_message = SessionMessage(message) + await read_stream_writer.send( + session_message + ) except Exception as exc: logger.exception("Error parsing message") await read_stream_writer.send(exc) diff --git a/src/mcp/client/websocket.py b/src/mcp/client/websocket.py index 2c2ed38b..ac542fb3 100644 --- a/src/mcp/client/websocket.py +++ b/src/mcp/client/websocket.py @@ -10,6 +10,7 @@ from websockets.typing import Subprotocol import mcp.types as types +from mcp.shared.message import SessionMessage logger = logging.getLogger(__name__) @@ -19,8 +20,8 @@ async def websocket_client( url: str, ) -> AsyncGenerator[ tuple[ - MemoryObjectReceiveStream[types.JSONRPCMessage | Exception], - MemoryObjectSendStream[types.JSONRPCMessage], + MemoryObjectReceiveStream[SessionMessage | Exception], + MemoryObjectSendStream[SessionMessage], ], None, ]: @@ -39,10 +40,10 @@ async def websocket_client( # Create two in-memory streams: # - One for incoming messages (read_stream, written by ws_reader) # - One for outgoing messages (write_stream, read by ws_writer) - read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception] - read_stream_writer: MemoryObjectSendStream[types.JSONRPCMessage | Exception] - write_stream: MemoryObjectSendStream[types.JSONRPCMessage] - write_stream_reader: MemoryObjectReceiveStream[types.JSONRPCMessage] + read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] + read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] + write_stream: MemoryObjectSendStream[SessionMessage] + write_stream_reader: MemoryObjectReceiveStream[SessionMessage] read_stream_writer, read_stream = anyio.create_memory_object_stream(0) write_stream, write_stream_reader = anyio.create_memory_object_stream(0) @@ -59,7 +60,8 @@ async def ws_reader(): async for raw_text in ws: try: message = types.JSONRPCMessage.model_validate_json(raw_text) - await read_stream_writer.send(message) + session_message = SessionMessage(message) + await read_stream_writer.send(session_message) except ValidationError as exc: # If JSON parse or model validation fails, send the exception await read_stream_writer.send(exc) @@ -70,9 +72,9 @@ async def ws_writer(): sends them to the server. """ async with write_stream_reader: - async for message in write_stream_reader: + async for session_message in write_stream_reader: # Convert to a dict, then to JSON - msg_dict = message.model_dump( + msg_dict = session_message.message.model_dump( by_alias=True, mode="json", exclude_none=True ) await ws.send(json.dumps(msg_dict)) diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index b47f5305..a31d52a6 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -84,6 +84,7 @@ async def main(): from mcp.server.stdio import stdio_server as stdio_server from mcp.shared.context import RequestContext from mcp.shared.exceptions import McpError +from mcp.shared.message import SessionMessage from mcp.shared.session import RequestResponder logger = logging.getLogger(__name__) @@ -471,8 +472,8 @@ async def handler(req: types.CompleteRequest): async def run( self, - read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception], - write_stream: MemoryObjectSendStream[types.JSONRPCMessage], + read_stream: MemoryObjectReceiveStream[SessionMessage | Exception], + write_stream: MemoryObjectSendStream[SessionMessage], initialization_options: InitializationOptions, # When False, exceptions are returned as messages to the client. # When True, exceptions are raised, which will cause the server to shut down diff --git a/src/mcp/server/session.py b/src/mcp/server/session.py index 07e5a315..7a5ace5e 100644 --- a/src/mcp/server/session.py +++ b/src/mcp/server/session.py @@ -47,6 +47,7 @@ async def handle_list_prompts(ctx: RequestContext) -> list[types.Prompt]: import mcp.types as types from mcp.server.models import InitializationOptions +from mcp.shared.message import SessionMessage from mcp.shared.session import ( BaseSession, RequestResponder, @@ -82,8 +83,8 @@ class ServerSession( def __init__( self, - read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception], - write_stream: MemoryObjectSendStream[types.JSONRPCMessage], + read_stream: MemoryObjectReceiveStream[SessionMessage | Exception], + write_stream: MemoryObjectSendStream[SessionMessage], init_options: InitializationOptions, standalone_mode: bool = False, ) -> None: diff --git a/src/mcp/server/sse.py b/src/mcp/server/sse.py index d051c25b..c781c64d 100644 --- a/src/mcp/server/sse.py +++ b/src/mcp/server/sse.py @@ -46,6 +46,7 @@ async def handle_sse(request): from starlette.types import Receive, Scope, Send import mcp.types as types +from mcp.shared.message import SessionMessage logger = logging.getLogger(__name__) @@ -63,9 +64,7 @@ class SseServerTransport: """ _endpoint: str - _read_stream_writers: dict[ - UUID, MemoryObjectSendStream[types.JSONRPCMessage | Exception] - ] + _read_stream_writers: dict[UUID, MemoryObjectSendStream[SessionMessage | Exception]] def __init__(self, endpoint: str) -> None: """ @@ -85,11 +84,11 @@ async def connect_sse(self, scope: Scope, receive: Receive, send: Send): raise ValueError("connect_sse can only handle HTTP requests") logger.debug("Setting up SSE connection") - read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception] - read_stream_writer: MemoryObjectSendStream[types.JSONRPCMessage | Exception] + read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] + read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] - write_stream: MemoryObjectSendStream[types.JSONRPCMessage] - write_stream_reader: MemoryObjectReceiveStream[types.JSONRPCMessage] + write_stream: MemoryObjectSendStream[SessionMessage] + write_stream_reader: MemoryObjectReceiveStream[SessionMessage] read_stream_writer, read_stream = anyio.create_memory_object_stream(0) write_stream, write_stream_reader = anyio.create_memory_object_stream(0) @@ -109,12 +108,12 @@ async def sse_writer(): await sse_stream_writer.send({"event": "endpoint", "data": session_uri}) logger.debug(f"Sent endpoint event: {session_uri}") - async for message in write_stream_reader: - logger.debug(f"Sending message via SSE: {message}") + async for session_message in write_stream_reader: + logger.debug(f"Sending message via SSE: {session_message}") await sse_stream_writer.send( { "event": "message", - "data": message.model_dump_json( + "data": session_message.message.model_dump_json( by_alias=True, exclude_none=True ), } @@ -169,7 +168,8 @@ async def handle_post_message( await writer.send(err) return - logger.debug(f"Sending message to writer: {message}") + session_message = SessionMessage(message) + logger.debug(f"Sending session message to writer: {session_message}") response = Response("Accepted", status_code=202) await response(scope, receive, send) - await writer.send(message) + await writer.send(session_message) diff --git a/src/mcp/server/stdio.py b/src/mcp/server/stdio.py index 0e0e4912..f0bbe5a3 100644 --- a/src/mcp/server/stdio.py +++ b/src/mcp/server/stdio.py @@ -27,6 +27,7 @@ async def run_server(): from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream import mcp.types as types +from mcp.shared.message import SessionMessage @asynccontextmanager @@ -47,11 +48,11 @@ async def stdio_server( if not stdout: stdout = anyio.wrap_file(TextIOWrapper(sys.stdout.buffer, encoding="utf-8")) - read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception] - read_stream_writer: MemoryObjectSendStream[types.JSONRPCMessage | Exception] + read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] + read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] - write_stream: MemoryObjectSendStream[types.JSONRPCMessage] - write_stream_reader: MemoryObjectReceiveStream[types.JSONRPCMessage] + write_stream: MemoryObjectSendStream[SessionMessage] + write_stream_reader: MemoryObjectReceiveStream[SessionMessage] read_stream_writer, read_stream = anyio.create_memory_object_stream(0) write_stream, write_stream_reader = anyio.create_memory_object_stream(0) @@ -66,15 +67,18 @@ async def stdin_reader(): await read_stream_writer.send(exc) continue - await read_stream_writer.send(message) + session_message = SessionMessage(message) + await read_stream_writer.send(session_message) except anyio.ClosedResourceError: await anyio.lowlevel.checkpoint() async def stdout_writer(): try: async with write_stream_reader: - async for message in write_stream_reader: - json = message.model_dump_json(by_alias=True, exclude_none=True) + async for session_message in write_stream_reader: + json = session_message.message.model_dump_json( + by_alias=True, exclude_none=True + ) await stdout.write(json + "\n") await stdout.flush() except anyio.ClosedResourceError: diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index 36c4b636..8929a50f 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -24,6 +24,7 @@ from starlette.responses import Response from starlette.types import Receive, Scope, Send +from mcp.shared.message import SessionMessage from mcp.types import ( INTERNAL_ERROR, INVALID_PARAMS, @@ -125,10 +126,10 @@ class StreamableHTTPServerTransport: """ # Server notification streams for POST requests as well as standalone SSE stream - _read_stream_writer: MemoryObjectSendStream[JSONRPCMessage | Exception] | None = ( + _read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] | None = ( None ) - _write_stream_reader: MemoryObjectReceiveStream[JSONRPCMessage] | None = None + _write_stream_reader: MemoryObjectReceiveStream[SessionMessage] | None = None def __init__( self, @@ -378,7 +379,8 @@ async def _handle_post_request( await response(scope, receive, send) # Process the message after sending the response - await writer.send(message) + session_message = SessionMessage(message) + await writer.send(session_message) return @@ -394,7 +396,8 @@ async def _handle_post_request( if self.is_json_response_enabled: # Process the message - await writer.send(message) + session_message = SessionMessage(message) + await writer.send(session_message) try: # Process messages from the request-specific stream # We need to collect all messages until we get a response @@ -500,7 +503,8 @@ async def sse_writer(): async with anyio.create_task_group() as tg: tg.start_soon(response, scope, receive, send) # Then send the message to be processed by the server - await writer.send(message) + session_message = SessionMessage(message) + await writer.send(session_message) except Exception: logger.exception("SSE response error") # Clean up the request stream if something goes wrong @@ -516,7 +520,7 @@ async def sse_writer(): ) await response(scope, receive, send) if writer: - await writer.send(err) + await writer.send(Exception(err)) return async def _handle_get_request(self, request: Request, send: Send) -> None: @@ -794,8 +798,8 @@ async def connect( self, ) -> AsyncGenerator[ tuple[ - MemoryObjectReceiveStream[JSONRPCMessage | Exception], - MemoryObjectSendStream[JSONRPCMessage], + MemoryObjectReceiveStream[SessionMessage | Exception], + MemoryObjectSendStream[SessionMessage], ], None, ]: @@ -808,10 +812,10 @@ async def connect( # Create the memory streams for this connection read_stream_writer, read_stream = anyio.create_memory_object_stream[ - JSONRPCMessage | Exception + SessionMessage | Exception ](0) write_stream, write_stream_reader = anyio.create_memory_object_stream[ - JSONRPCMessage + SessionMessage ](0) # Store the streams @@ -823,8 +827,9 @@ async def connect( # Create a message router that distributes messages to request streams async def message_router(): try: - async for message in write_stream_reader: + async for session_message in write_stream_reader: # Determine which request stream(s) should receive this message + message = session_message.message target_request_id = None if isinstance( message.root, JSONRPCNotification | JSONRPCRequest diff --git a/src/mcp/server/websocket.py b/src/mcp/server/websocket.py index aee855cf..9dc3f2a2 100644 --- a/src/mcp/server/websocket.py +++ b/src/mcp/server/websocket.py @@ -8,6 +8,7 @@ from starlette.websockets import WebSocket import mcp.types as types +from mcp.shared.message import SessionMessage logger = logging.getLogger(__name__) @@ -22,11 +23,11 @@ async def websocket_server(scope: Scope, receive: Receive, send: Send): websocket = WebSocket(scope, receive, send) await websocket.accept(subprotocol="mcp") - read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception] - read_stream_writer: MemoryObjectSendStream[types.JSONRPCMessage | Exception] + read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] + read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] - write_stream: MemoryObjectSendStream[types.JSONRPCMessage] - write_stream_reader: MemoryObjectReceiveStream[types.JSONRPCMessage] + write_stream: MemoryObjectSendStream[SessionMessage] + write_stream_reader: MemoryObjectReceiveStream[SessionMessage] read_stream_writer, read_stream = anyio.create_memory_object_stream(0) write_stream, write_stream_reader = anyio.create_memory_object_stream(0) @@ -41,15 +42,18 @@ async def ws_reader(): await read_stream_writer.send(exc) continue - await read_stream_writer.send(client_message) + session_message = SessionMessage(client_message) + await read_stream_writer.send(session_message) except anyio.ClosedResourceError: await websocket.close() async def ws_writer(): try: async with write_stream_reader: - async for message in write_stream_reader: - obj = message.model_dump_json(by_alias=True, exclude_none=True) + async for session_message in write_stream_reader: + obj = session_message.message.model_dump_json( + by_alias=True, exclude_none=True + ) await websocket.send_text(obj) except anyio.ClosedResourceError: await websocket.close() diff --git a/src/mcp/shared/memory.py b/src/mcp/shared/memory.py index abf87a3a..b53f8dd6 100644 --- a/src/mcp/shared/memory.py +++ b/src/mcp/shared/memory.py @@ -19,11 +19,11 @@ SamplingFnT, ) from mcp.server import Server -from mcp.types import JSONRPCMessage +from mcp.shared.message import SessionMessage MessageStream = tuple[ - MemoryObjectReceiveStream[JSONRPCMessage | Exception], - MemoryObjectSendStream[JSONRPCMessage], + MemoryObjectReceiveStream[SessionMessage | Exception], + MemoryObjectSendStream[SessionMessage], ] @@ -40,10 +40,10 @@ async def create_client_server_memory_streams() -> ( """ # Create streams for both directions server_to_client_send, server_to_client_receive = anyio.create_memory_object_stream[ - JSONRPCMessage | Exception + SessionMessage | Exception ](1) client_to_server_send, client_to_server_receive = anyio.create_memory_object_stream[ - JSONRPCMessage | Exception + SessionMessage | Exception ](1) client_streams = (server_to_client_receive, client_to_server_send) diff --git a/src/mcp/shared/message.py b/src/mcp/shared/message.py new file mode 100644 index 00000000..c9341c36 --- /dev/null +++ b/src/mcp/shared/message.py @@ -0,0 +1,35 @@ +""" +Message wrapper with metadata support. + +This module defines a wrapper type that combines JSONRPCMessage with metadata +to support transport-specific features like resumability. +""" + +from dataclasses import dataclass + +from mcp.types import JSONRPCMessage, RequestId + + +@dataclass +class ClientMessageMetadata: + """Metadata specific to client messages.""" + + resumption_token: str | None = None + + +@dataclass +class ServerMessageMetadata: + """Metadata specific to server messages.""" + + related_request_id: RequestId | None = None + + +MessageMetadata = ClientMessageMetadata | ServerMessageMetadata | None + + +@dataclass +class SessionMessage: + """A message with specific metadata for transport-specific features.""" + + message: JSONRPCMessage + metadata: MessageMetadata = None diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 3a01cb04..d6d7ee56 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -12,6 +12,7 @@ from typing_extensions import Self from mcp.shared.exceptions import McpError +from mcp.shared.message import SessionMessage from mcp.types import ( CancelledNotification, ClientNotification, @@ -172,8 +173,8 @@ class BaseSession( def __init__( self, - read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception], - write_stream: MemoryObjectSendStream[JSONRPCMessage], + read_stream: MemoryObjectReceiveStream[SessionMessage | Exception], + write_stream: MemoryObjectSendStream[SessionMessage], receive_request_type: type[ReceiveRequestT], receive_notification_type: type[ReceiveNotificationT], # If none, reading will never time out @@ -241,7 +242,8 @@ async def send_request( # TODO: Support progress callbacks - await self._write_stream.send(JSONRPCMessage(jsonrpc_request)) + session_message = SessionMessage(message=JSONRPCMessage(jsonrpc_request)) + await self._write_stream.send(session_message) try: with anyio.fail_after( @@ -293,14 +295,16 @@ async def send_notification( jsonrpc="2.0", **notification.model_dump(by_alias=True, mode="json", exclude_none=True), ) - await self._write_stream.send(JSONRPCMessage(jsonrpc_notification)) + session_message = SessionMessage(message=JSONRPCMessage(jsonrpc_notification)) + await self._write_stream.send(session_message) async def _send_response( self, request_id: RequestId, response: SendResultT | ErrorData ) -> None: if isinstance(response, ErrorData): jsonrpc_error = JSONRPCError(jsonrpc="2.0", id=request_id, error=response) - await self._write_stream.send(JSONRPCMessage(jsonrpc_error)) + session_message = SessionMessage(message=JSONRPCMessage(jsonrpc_error)) + await self._write_stream.send(session_message) else: jsonrpc_response = JSONRPCResponse( jsonrpc="2.0", @@ -309,7 +313,8 @@ async def _send_response( by_alias=True, mode="json", exclude_none=True ), ) - await self._write_stream.send(JSONRPCMessage(jsonrpc_response)) + session_message = SessionMessage(message=JSONRPCMessage(jsonrpc_response)) + await self._write_stream.send(session_message) async def _receive_loop(self) -> None: async with ( @@ -319,15 +324,15 @@ async def _receive_loop(self) -> None: async for message in self._read_stream: if isinstance(message, Exception): await self._handle_incoming(message) - elif isinstance(message.root, JSONRPCRequest): + elif isinstance(message.message.root, JSONRPCRequest): validated_request = self._receive_request_type.model_validate( - message.root.model_dump( + message.message.root.model_dump( by_alias=True, mode="json", exclude_none=True ) ) responder = RequestResponder( - request_id=message.root.id, + request_id=message.message.root.id, request_meta=validated_request.root.params.meta if validated_request.root.params else None, @@ -342,10 +347,10 @@ async def _receive_loop(self) -> None: if not responder._completed: # type: ignore[reportPrivateUsage] await self._handle_incoming(responder) - elif isinstance(message.root, JSONRPCNotification): + elif isinstance(message.message.root, JSONRPCNotification): try: notification = self._receive_notification_type.model_validate( - message.root.model_dump( + message.message.root.model_dump( by_alias=True, mode="json", exclude_none=True ) ) @@ -361,12 +366,12 @@ async def _receive_loop(self) -> None: # For other validation errors, log and continue logging.warning( f"Failed to validate notification: {e}. " - f"Message was: {message.root}" + f"Message was: {message.message.root}" ) else: # Response or error - stream = self._response_streams.pop(message.root.id, None) + stream = self._response_streams.pop(message.message.root.id, None) if stream: - await stream.send(message.root) + await stream.send(message.message.root) else: await self._handle_incoming( RuntimeError( diff --git a/tests/client/test_session.py b/tests/client/test_session.py index 543ebb2f..6abcf70c 100644 --- a/tests/client/test_session.py +++ b/tests/client/test_session.py @@ -3,6 +3,7 @@ import mcp.types as types from mcp.client.session import DEFAULT_CLIENT_INFO, ClientSession +from mcp.shared.message import SessionMessage from mcp.shared.session import RequestResponder from mcp.types import ( LATEST_PROTOCOL_VERSION, @@ -24,10 +25,10 @@ @pytest.mark.anyio async def test_client_session_initialize(): client_to_server_send, client_to_server_receive = anyio.create_memory_object_stream[ - JSONRPCMessage + SessionMessage ](1) server_to_client_send, server_to_client_receive = anyio.create_memory_object_stream[ - JSONRPCMessage + SessionMessage ](1) initialized_notification = None @@ -35,7 +36,8 @@ async def test_client_session_initialize(): async def mock_server(): nonlocal initialized_notification - jsonrpc_request = await client_to_server_receive.receive() + session_message = await client_to_server_receive.receive() + jsonrpc_request = session_message.message assert isinstance(jsonrpc_request.root, JSONRPCRequest) request = ClientRequest.model_validate( jsonrpc_request.model_dump(by_alias=True, mode="json", exclude_none=True) @@ -59,17 +61,20 @@ async def mock_server(): async with server_to_client_send: await server_to_client_send.send( - JSONRPCMessage( - JSONRPCResponse( - jsonrpc="2.0", - id=jsonrpc_request.root.id, - result=result.model_dump( - by_alias=True, mode="json", exclude_none=True - ), + SessionMessage( + JSONRPCMessage( + JSONRPCResponse( + jsonrpc="2.0", + id=jsonrpc_request.root.id, + result=result.model_dump( + by_alias=True, mode="json", exclude_none=True + ), + ) ) ) ) - jsonrpc_notification = await client_to_server_receive.receive() + session_notification = await client_to_server_receive.receive() + jsonrpc_notification = session_notification.message assert isinstance(jsonrpc_notification.root, JSONRPCNotification) initialized_notification = ClientNotification.model_validate( jsonrpc_notification.model_dump( @@ -116,10 +121,10 @@ async def message_handler( @pytest.mark.anyio async def test_client_session_custom_client_info(): client_to_server_send, client_to_server_receive = anyio.create_memory_object_stream[ - JSONRPCMessage + SessionMessage ](1) server_to_client_send, server_to_client_receive = anyio.create_memory_object_stream[ - JSONRPCMessage + SessionMessage ](1) custom_client_info = Implementation(name="test-client", version="1.2.3") @@ -128,7 +133,8 @@ async def test_client_session_custom_client_info(): async def mock_server(): nonlocal received_client_info - jsonrpc_request = await client_to_server_receive.receive() + session_message = await client_to_server_receive.receive() + jsonrpc_request = session_message.message assert isinstance(jsonrpc_request.root, JSONRPCRequest) request = ClientRequest.model_validate( jsonrpc_request.model_dump(by_alias=True, mode="json", exclude_none=True) @@ -146,13 +152,15 @@ async def mock_server(): async with server_to_client_send: await server_to_client_send.send( - JSONRPCMessage( - JSONRPCResponse( - jsonrpc="2.0", - id=jsonrpc_request.root.id, - result=result.model_dump( - by_alias=True, mode="json", exclude_none=True - ), + SessionMessage( + JSONRPCMessage( + JSONRPCResponse( + jsonrpc="2.0", + id=jsonrpc_request.root.id, + result=result.model_dump( + by_alias=True, mode="json", exclude_none=True + ), + ) ) ) ) @@ -181,10 +189,10 @@ async def mock_server(): @pytest.mark.anyio async def test_client_session_default_client_info(): client_to_server_send, client_to_server_receive = anyio.create_memory_object_stream[ - JSONRPCMessage + SessionMessage ](1) server_to_client_send, server_to_client_receive = anyio.create_memory_object_stream[ - JSONRPCMessage + SessionMessage ](1) received_client_info = None @@ -192,7 +200,8 @@ async def test_client_session_default_client_info(): async def mock_server(): nonlocal received_client_info - jsonrpc_request = await client_to_server_receive.receive() + session_message = await client_to_server_receive.receive() + jsonrpc_request = session_message.message assert isinstance(jsonrpc_request.root, JSONRPCRequest) request = ClientRequest.model_validate( jsonrpc_request.model_dump(by_alias=True, mode="json", exclude_none=True) @@ -210,13 +219,15 @@ async def mock_server(): async with server_to_client_send: await server_to_client_send.send( - JSONRPCMessage( - JSONRPCResponse( - jsonrpc="2.0", - id=jsonrpc_request.root.id, - result=result.model_dump( - by_alias=True, mode="json", exclude_none=True - ), + SessionMessage( + JSONRPCMessage( + JSONRPCResponse( + jsonrpc="2.0", + id=jsonrpc_request.root.id, + result=result.model_dump( + by_alias=True, mode="json", exclude_none=True + ), + ) ) ) ) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index 95747ffd..523ba199 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -3,6 +3,7 @@ import pytest from mcp.client.stdio import StdioServerParameters, stdio_client +from mcp.shared.message import SessionMessage from mcp.types import JSONRPCMessage, JSONRPCRequest, JSONRPCResponse tee: str = shutil.which("tee") # type: ignore @@ -22,7 +23,8 @@ async def test_stdio_client(): async with write_stream: for message in messages: - await write_stream.send(message) + session_message = SessionMessage(message) + await write_stream.send(session_message) read_messages = [] async with read_stream: @@ -30,7 +32,7 @@ async def test_stdio_client(): if isinstance(message, Exception): raise message - read_messages.append(message) + read_messages.append(message.message) if len(read_messages) == 2: break diff --git a/tests/issues/test_192_request_id.py b/tests/issues/test_192_request_id.py index 00e18789..cf5eb608 100644 --- a/tests/issues/test_192_request_id.py +++ b/tests/issues/test_192_request_id.py @@ -3,6 +3,7 @@ from mcp.server.lowlevel import NotificationOptions, Server from mcp.server.models import InitializationOptions +from mcp.shared.message import SessionMessage from mcp.types import ( LATEST_PROTOCOL_VERSION, ClientCapabilities, @@ -64,8 +65,10 @@ async def run_server(): jsonrpc="2.0", ) - await client_writer.send(JSONRPCMessage(root=init_req)) - await server_reader.receive() # Get init response but don't need to check it + await client_writer.send(SessionMessage(JSONRPCMessage(root=init_req))) + response = ( + await server_reader.receive() + ) # Get init response but don't need to check it # Send initialized notification initialized_notification = JSONRPCNotification( @@ -73,21 +76,23 @@ async def run_server(): params=NotificationParams().model_dump(by_alias=True, exclude_none=True), jsonrpc="2.0", ) - await client_writer.send(JSONRPCMessage(root=initialized_notification)) + await client_writer.send( + SessionMessage(JSONRPCMessage(root=initialized_notification)) + ) # Send ping request with custom ID ping_request = JSONRPCRequest( id=custom_request_id, method="ping", params={}, jsonrpc="2.0" ) - await client_writer.send(JSONRPCMessage(root=ping_request)) + await client_writer.send(SessionMessage(JSONRPCMessage(root=ping_request))) # Read response response = await server_reader.receive() # Verify response ID matches request ID assert ( - response.root.id == custom_request_id + response.message.root.id == custom_request_id ), "Response ID should match request ID" # Cancel server task diff --git a/tests/server/test_lifespan.py b/tests/server/test_lifespan.py index 309a44b8..a3ff59bc 100644 --- a/tests/server/test_lifespan.py +++ b/tests/server/test_lifespan.py @@ -10,6 +10,7 @@ from mcp.server.fastmcp import Context, FastMCP from mcp.server.lowlevel.server import NotificationOptions, Server from mcp.server.models import InitializationOptions +from mcp.shared.message import SessionMessage from mcp.types import ( ClientCapabilities, Implementation, @@ -82,41 +83,49 @@ async def run_server(): clientInfo=Implementation(name="test-client", version="0.1.0"), ) await send_stream1.send( - JSONRPCMessage( - root=JSONRPCRequest( - jsonrpc="2.0", - id=1, - method="initialize", - params=TypeAdapter(InitializeRequestParams).dump_python(params), + SessionMessage( + JSONRPCMessage( + root=JSONRPCRequest( + jsonrpc="2.0", + id=1, + method="initialize", + params=TypeAdapter(InitializeRequestParams).dump_python(params), + ) ) ) ) response = await receive_stream2.receive() + response = response.message # Send initialized notification await send_stream1.send( - JSONRPCMessage( - root=JSONRPCNotification( - jsonrpc="2.0", - method="notifications/initialized", + SessionMessage( + JSONRPCMessage( + root=JSONRPCNotification( + jsonrpc="2.0", + method="notifications/initialized", + ) ) ) ) # Call the tool to verify lifespan context await send_stream1.send( - JSONRPCMessage( - root=JSONRPCRequest( - jsonrpc="2.0", - id=2, - method="tools/call", - params={"name": "check_lifespan", "arguments": {}}, + SessionMessage( + JSONRPCMessage( + root=JSONRPCRequest( + jsonrpc="2.0", + id=2, + method="tools/call", + params={"name": "check_lifespan", "arguments": {}}, + ) ) ) ) # Get response and verify response = await receive_stream2.receive() + response = response.message assert response.root.result["content"][0]["text"] == "true" # Cancel server task @@ -178,41 +187,49 @@ async def run_server(): clientInfo=Implementation(name="test-client", version="0.1.0"), ) await send_stream1.send( - JSONRPCMessage( - root=JSONRPCRequest( - jsonrpc="2.0", - id=1, - method="initialize", - params=TypeAdapter(InitializeRequestParams).dump_python(params), + SessionMessage( + JSONRPCMessage( + root=JSONRPCRequest( + jsonrpc="2.0", + id=1, + method="initialize", + params=TypeAdapter(InitializeRequestParams).dump_python(params), + ) ) ) ) response = await receive_stream2.receive() + response = response.message # Send initialized notification await send_stream1.send( - JSONRPCMessage( - root=JSONRPCNotification( - jsonrpc="2.0", - method="notifications/initialized", + SessionMessage( + JSONRPCMessage( + root=JSONRPCNotification( + jsonrpc="2.0", + method="notifications/initialized", + ) ) ) ) # Call the tool to verify lifespan context await send_stream1.send( - JSONRPCMessage( - root=JSONRPCRequest( - jsonrpc="2.0", - id=2, - method="tools/call", - params={"name": "check_lifespan", "arguments": {}}, + SessionMessage( + JSONRPCMessage( + root=JSONRPCRequest( + jsonrpc="2.0", + id=2, + method="tools/call", + params={"name": "check_lifespan", "arguments": {}}, + ) ) ) ) # Get response and verify response = await receive_stream2.receive() + response = response.message assert response.root.result["content"][0]["text"] == "true" # Cancel server task diff --git a/tests/server/test_session.py b/tests/server/test_session.py index 561a94b6..f2f03358 100644 --- a/tests/server/test_session.py +++ b/tests/server/test_session.py @@ -7,11 +7,11 @@ from mcp.server.lowlevel import NotificationOptions from mcp.server.models import InitializationOptions from mcp.server.session import ServerSession +from mcp.shared.message import SessionMessage from mcp.shared.session import RequestResponder from mcp.types import ( ClientNotification, InitializedNotification, - JSONRPCMessage, PromptsCapability, ResourcesCapability, ServerCapabilities, @@ -21,10 +21,10 @@ @pytest.mark.anyio async def test_server_session_initialize(): server_to_client_send, server_to_client_receive = anyio.create_memory_object_stream[ - JSONRPCMessage + SessionMessage ](1) client_to_server_send, client_to_server_receive = anyio.create_memory_object_stream[ - JSONRPCMessage + SessionMessage ](1) # Create a message handler to catch exceptions diff --git a/tests/server/test_stdio.py b/tests/server/test_stdio.py index 85c5bf21..c546a716 100644 --- a/tests/server/test_stdio.py +++ b/tests/server/test_stdio.py @@ -4,6 +4,7 @@ import pytest from mcp.server.stdio import stdio_server +from mcp.shared.message import SessionMessage from mcp.types import JSONRPCMessage, JSONRPCRequest, JSONRPCResponse @@ -29,7 +30,7 @@ async def test_stdio_server(): async for message in read_stream: if isinstance(message, Exception): raise message - received_messages.append(message) + received_messages.append(message.message) if len(received_messages) == 2: break @@ -50,7 +51,8 @@ async def test_stdio_server(): async with write_stream: for response in responses: - await write_stream.send(response) + session_message = SessionMessage(response) + await write_stream.send(session_message) stdout.seek(0) output_lines = stdout.readlines()