From 92ad5c0404afd7568036aca1f2b6f6cf4ae3ca57 Mon Sep 17 00:00:00 2001 From: Anurag Pant Date: Tue, 24 Jun 2025 14:07:35 -0700 Subject: [PATCH 1/2] feat: Make maximum message size configurable --- src/mcp/server/fastmcp/server.py | 2 ++ src/mcp/server/streamable_http.py | 7 +++++- src/mcp/server/streamable_http_manager.py | 7 +++++- .../fastmcp/test_configure_input_size.py | 23 +++++++++++++++++++ tests/shared/test_streamable_http.py | 22 ++++++++++++++++++ 5 files changed, 59 insertions(+), 2 deletions(-) create mode 100644 tests/server/fastmcp/test_configure_input_size.py diff --git a/src/mcp/server/fastmcp/server.py b/src/mcp/server/fastmcp/server.py index 956a8aa78..a509ff751 100644 --- a/src/mcp/server/fastmcp/server.py +++ b/src/mcp/server/fastmcp/server.py @@ -93,6 +93,7 @@ class Settings(BaseSettings, Generic[LifespanResultT]): # StreamableHTTP settings json_response: bool = False stateless_http: bool = False # If True, uses true stateless mode (new transport per request) + maximum_message_size: int | None = None # Specified in bytes # resource settings warn_on_duplicate_resources: bool = True @@ -838,6 +839,7 @@ def streamable_http_app(self) -> Starlette: json_response=self.settings.json_response, stateless=self.settings.stateless_http, # Use the stateless setting security_settings=self.settings.transport_security, + maximum_message_size=self.settings.maximum_message_size, ) # Create the ASGI handler diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index d46549929..74fb83cb7 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -135,6 +135,7 @@ class StreamableHTTPServerTransport: _write_stream: MemoryObjectSendStream[SessionMessage] | None = None _write_stream_reader: MemoryObjectReceiveStream[SessionMessage] | None = None _security: TransportSecurityMiddleware + _maximum_message_size: int = MAXIMUM_MESSAGE_SIZE def __init__( self, @@ -142,6 +143,7 @@ def __init__( is_json_response_enabled: bool = False, event_store: EventStore | None = None, security_settings: TransportSecuritySettings | None = None, + maximum_message_size: int | None = None, ) -> None: """ Initialize a new StreamableHTTP server transport. @@ -155,6 +157,8 @@ def __init__( resumability will be enabled, allowing clients to reconnect and resume messages. security_settings: Optional security settings for DNS rebinding protection. + maximum_message_size: Optional configurable maximum message size specified + in bytes Raises: ValueError: If the session ID contains invalid characters. @@ -166,6 +170,7 @@ def __init__( self.is_json_response_enabled = is_json_response_enabled self._event_store = event_store self._security = TransportSecurityMiddleware(security_settings) + self._maximum_message_size = maximum_message_size if maximum_message_size else MAXIMUM_MESSAGE_SIZE self._request_streams: dict[ RequestId, tuple[ @@ -329,7 +334,7 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re # Parse the body - only read it once body = await request.body() - if len(body) > MAXIMUM_MESSAGE_SIZE: + if len(body) > self._maximum_message_size: response = self._create_error_response( "Payload Too Large: Message exceeds maximum size", HTTPStatus.REQUEST_ENTITY_TOO_LARGE, diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index 41b807388..e652770a3 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -52,7 +52,8 @@ class StreamableHTTPSessionManager: json_response: Whether to use JSON responses instead of SSE streams stateless: If True, creates a completely fresh transport for each request with no session tracking or state persistence between requests. - + maximum_message_size: Optional configurable maximum message size specified + in bytes """ def __init__( @@ -62,12 +63,14 @@ def __init__( json_response: bool = False, stateless: bool = False, security_settings: TransportSecuritySettings | None = None, + maximum_message_size: int | None = None, ): self.app = app self.event_store = event_store self.json_response = json_response self.stateless = stateless self.security_settings = security_settings + self.maximum_message_size = maximum_message_size # Session tracking (only used if not stateless) self._session_creation_lock = anyio.Lock() @@ -166,6 +169,7 @@ async def _handle_stateless_request( is_json_response_enabled=self.json_response, event_store=None, # No event store in stateless mode security_settings=self.security_settings, + maximum_message_size=self.maximum_message_size, ) # Start server in a new task @@ -222,6 +226,7 @@ async def _handle_stateful_request( is_json_response_enabled=self.json_response, event_store=self.event_store, # May be None (no resumability) security_settings=self.security_settings, + maximum_message_size=self.maximum_message_size, ) assert http_transport.mcp_session_id is not None diff --git a/tests/server/fastmcp/test_configure_input_size.py b/tests/server/fastmcp/test_configure_input_size.py new file mode 100644 index 000000000..c3dd97959 --- /dev/null +++ b/tests/server/fastmcp/test_configure_input_size.py @@ -0,0 +1,23 @@ +"""Test that the maximum http input size is configurable via FastMCP settings""" + +import pytest + +from mcp.server.fastmcp import FastMCP + + +@pytest.mark.anyio +async def test_configure_input_size(): + """Create a FastMCP server with StreamableHTTP transport.""" + configured_input_size = 1024 + mcp = FastMCP("Test Server", maximum_message_size=configured_input_size) + + # Add a simple tool + @mcp.tool(description="A simple echo tool") + def echo(message: str) -> str: + return f"Echo: {message}" + + # Create the StreamableHTTP app + _ = mcp.streamable_http_app() + + # Check that the maximum input size is set correctly + assert mcp.session_manager.maximum_message_size == configured_input_size diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 1ffcc13b0..61f640cc6 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -236,6 +236,7 @@ def create_app(is_json_response_enabled=False, event_store: EventStore | None = event_store=event_store, json_response=is_json_response_enabled, security_settings=security_settings, + maximum_message_size=1024, ) # Create an ASGI application that uses the session manager @@ -491,6 +492,27 @@ def test_method_not_allowed(basic_server, basic_server_url): assert "Method Not Allowed" in response.text +def test_maximum_message_size_validation(basic_server, basic_server_url): + """Test maximum allowed input size validation.""" + + # Test with input greater than 1024 bytes + large_input = "A" * 1300 # string of size 1300 bytes + response = requests.post( + f"{basic_server_url}/mcp", + headers={ + "Accept": "application/json, text/event-stream", + "Content-Type": "application/json", + }, + json={ + "jsonrpc": "2.0", + "method": "call_tool", + "body": {"name": "test_tool_with_standalone_notification", "args": {"text": large_input}}, + }, + ) + assert response.status_code == 413 + assert "Payload Too Large" in response.text + + def test_session_validation(basic_server, basic_server_url): """Test session ID validation.""" # session_id not used directly in this test From 1afb342cc35da9c3ab1deaedf3557e0948fdb63b Mon Sep 17 00:00:00 2001 From: Anurag Pant Date: Wed, 25 Jun 2025 15:13:55 -0700 Subject: [PATCH 2/2] perf: Stream request body in chunks with size validation --- src/mcp/server/streamable_http.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index 74fb83cb7..ef65fc4e5 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -332,15 +332,21 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re await response(scope, receive, send) return - # Parse the body - only read it once - body = await request.body() - if len(body) > self._maximum_message_size: - response = self._create_error_response( - "Payload Too Large: Message exceeds maximum size", - HTTPStatus.REQUEST_ENTITY_TOO_LARGE, - ) - await response(scope, receive, send) - return + # Parse the body by streaming and joining chunks + chunks: list[bytes] = [] + total_message_size = 0 + async for chunk in request.stream(): + total_message_size += len(chunk) + # Validate that body is smaller than maximum allowed message size + if total_message_size > self._maximum_message_size: + response = self._create_error_response( + "Payload Too Large: Message exceeds maximum size", + HTTPStatus.REQUEST_ENTITY_TOO_LARGE, + ) + await response(scope, receive, send) + return + chunks.append(chunk) + body = b"".join(chunks) try: raw_message = json.loads(body)