Skip to content

Fix: memory leak and exit problems #586

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions examples/servers/simple-prompt/mcp_simple_prompt/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,23 @@ async def get_prompt(
if transport == "sse":
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.responses import Response
from starlette.routing import Mount, Route

sse = SseServerTransport("/messages/")

async def handle_sse(request):
async with sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
await app.run(
streams[0], streams[1], app.create_initialization_options()
)
with anyio.CancelScope() as cancel_scope:
async with sse.connect_sse(
request.scope,
request.receive,
request._send,
lambda: cancel_scope.cancel(),
) as streams:
await app.run(
streams[0], streams[1], app.create_initialization_options()
)
return Response(status_code=200)

starlette_app = Starlette(
debug=True,
Expand Down
18 changes: 12 additions & 6 deletions examples/servers/simple-resource/mcp_simple_resource/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,23 @@ async def read_resource(uri: FileUrl) -> str | bytes:
if transport == "sse":
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.responses import Response
from starlette.routing import Mount, Route

sse = SseServerTransport("/messages/")

async def handle_sse(request):
async with sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
await app.run(
streams[0], streams[1], app.create_initialization_options()
)
with anyio.CancelScope() as cancel_scope:
async with sse.connect_sse(
request.scope,
request.receive,
request._send,
lambda: cancel_scope.cancel(),
) as streams:
await app.run(
streams[0], streams[1], app.create_initialization_options()
)
return Response(status_code=200)

starlette_app = Starlette(
debug=True,
Expand Down
21 changes: 14 additions & 7 deletions examples/servers/simple-tool/mcp_simple_tool/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,24 @@ async def list_tools() -> list[types.Tool]:
if transport == "sse":
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import Response
from starlette.routing import Mount, Route

sse = SseServerTransport("/messages/")

async def handle_sse(request):
async with sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
await app.run(
streams[0], streams[1], app.create_initialization_options()
)
async def handle_sse(request: Request):
with anyio.CancelScope() as cancel_scope:
async with sse.connect_sse(
request.scope,
request.receive,
request._send,
lambda: cancel_scope.cancel(),
) as streams:
await app.run(
streams[0], streams[1], app.create_initialization_options()
)
return Response(status_code=200)

starlette_app = Starlette(
debug=True,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ dependencies = [
"httpx-sse>=0.4",
"pydantic>=2.7.2,<3.0.0",
"starlette>=0.27",
"sse-starlette>=1.6.1",
"sse-starlette>=2.3.0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sse-starlette supports client_close_handler_callable since version 2.3.0.

"pydantic-settings>=2.5.2",
"uvicorn>=0.23.1; sys_platform != 'emscripten'",
]
Expand Down
26 changes: 15 additions & 11 deletions src/mcp/server/fastmcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pydantic_settings import BaseSettings, SettingsConfigDict
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import Response
from starlette.routing import Mount, Route

from mcp.server.fastmcp.exceptions import ResourceError
Expand Down Expand Up @@ -482,17 +483,20 @@ def sse_app(self) -> Starlette:
"""Return an instance of the SSE server app."""
sse = SseServerTransport(self.settings.message_path)

async def handle_sse(request: Request) -> None:
async with sse.connect_sse(
request.scope,
request.receive,
request._send, # type: ignore[reportPrivateUsage]
) as streams:
await self._mcp_server.run(
streams[0],
streams[1],
self._mcp_server.create_initialization_options(),
)
async def handle_sse(request: Request) -> Response:
with anyio.CancelScope() as cancel_scope:
async with sse.connect_sse(
request.scope,
request.receive,
request._send, # type: ignore[reportPrivateUsage]
callback=lambda: cancel_scope.cancel(),
) as streams:
await self._mcp_server.run(
streams[0],
streams[1],
self._mcp_server.create_initialization_options(),
)
return Response(status_code=200)

return Starlette(
debug=self.settings.debug,
Expand Down
23 changes: 20 additions & 3 deletions src/mcp/server/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ async def handle_sse(request):
"""

import logging
from collections.abc import Callable
from contextlib import asynccontextmanager
from typing import Any
from urllib.parse import quote
Expand All @@ -43,7 +44,7 @@ async def handle_sse(request):
from sse_starlette import EventSourceResponse
from starlette.requests import Request
from starlette.responses import Response
from starlette.types import Receive, Scope, Send
from starlette.types import Message, Receive, Scope, Send

import mcp.types as types

Expand Down Expand Up @@ -79,7 +80,13 @@ def __init__(self, endpoint: str) -> None:
logger.debug(f"SseServerTransport initialized with endpoint: {endpoint}")

@asynccontextmanager
async def connect_sse(self, scope: Scope, receive: Receive, send: Send):
async def connect_sse(
self,
scope: Scope,
receive: Receive,
send: Send,
callback: Callable[[], None] | None = None,
):
if scope["type"] != "http":
logger.error("connect_sse received non-HTTP request")
raise ValueError("connect_sse can only handle HTTP requests")
Expand Down Expand Up @@ -120,9 +127,19 @@ async def sse_writer():
}
)

async def client_close_handler(message: Message) -> None:
await read_stream_writer.aclose()
await write_stream_reader.aclose()
del self._read_stream_writers[session_id]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
del self._read_stream_writers[session_id]
self._read_stream_writers.pop(session_id, None)

if callback:
callback()
logger.debug(f"Closed SSE session with ID: {session_id}")

async with anyio.create_task_group() as tg:
response = EventSourceResponse(
content=sse_stream_reader, data_sender_callable=sse_writer
content=sse_stream_reader,
data_sender_callable=sse_writer,
client_close_handler_callable=client_close_handler, # type: ignore[arg-type]
)
logger.debug("Starting SSE response task")
tg.start_soon(response, scope, receive, send)
Expand Down
20 changes: 13 additions & 7 deletions tests/shared/test_sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pydantic import AnyUrl
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import Response
from starlette.routing import Mount, Route

from mcp.client.session import ClientSession
Expand Down Expand Up @@ -83,13 +84,18 @@ def make_server_app() -> Starlette:
sse = SseServerTransport("/messages/")
server = ServerTest()

async def handle_sse(request: Request) -> None:
async with sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
await server.run(
streams[0], streams[1], server.create_initialization_options()
)
async def handle_sse(request: Request) -> Response:
with anyio.CancelScope() as cancel_scope:
async with sse.connect_sse(
request.scope,
request.receive,
request._send,
lambda: cancel_scope.cancel(),
) as streams:
await server.run(
streams[0], streams[1], server.create_initialization_options()
)
return Response(status_code=200)

app = Starlette(
routes=[
Expand Down
Loading
Loading