Skip to content

StreamableHttp - GET request standalone SSE #561

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 37 commits into from
May 2, 2025
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2b95598
initial streamable http server
ihrpr Apr 20, 2025
3d790f8
add request validation and tests
ihrpr Apr 20, 2025
27bc01e
session management
ihrpr Apr 20, 2025
3c4cf10
terminations of a session
ihrpr Apr 20, 2025
bce74b3
fix cleaning up
ihrpr Apr 20, 2025
2011579
add happy path test
ihrpr Apr 20, 2025
2cebf08
tests
ihrpr Apr 20, 2025
6c9c320
json mode
ihrpr Apr 20, 2025
ede8cde
clean up
ihrpr Apr 21, 2025
2a3bed8
fix example server
ihrpr Apr 21, 2025
0456b1b
return 405 for get stream
ihrpr Apr 21, 2025
97ca48d
speed up tests
ihrpr Apr 21, 2025
f738cbf
stateless implemetation
ihrpr Apr 21, 2025
92d4287
format
ihrpr Apr 21, 2025
aa9f6e5
uv lock
ihrpr Apr 21, 2025
2fba7f3
Merge branch 'ihrpr/streamablehttp-server' into ihrpr/streamablehttp-…
ihrpr Apr 21, 2025
45723ea
simplify readme
ihrpr Apr 21, 2025
6b7a616
clean up
ihrpr Apr 21, 2025
b1be691
get sse
ihrpr Apr 22, 2025
201ec99
uv lock
ihrpr Apr 22, 2025
46ec72d
clean up
ihrpr Apr 22, 2025
1902abb
Merge branch 'ihrpr/streamablehttp-server' into ihrpr/streamablehttp-…
ihrpr Apr 22, 2025
da1df74
Merge branch 'ihrpr/streamablehttp-stateless' into ihrpr/get-sse
ihrpr Apr 22, 2025
9b096dc
add comments to server example where we use related_request_id
ihrpr Apr 23, 2025
bbe79c2
Merge branch 'main' into ihrpr/streamablehttp-server
ihrpr Apr 23, 2025
a0a9c5b
small fixes
ihrpr Apr 23, 2025
a5ac2e0
use mta field for related_request_id
ihrpr Apr 23, 2025
2e615f3
unrelated test and format
ihrpr Apr 23, 2025
cae32e2
Merge branch 'ihrpr/streamablehttp-server' into ihrpr/streamablehttp-…
ihrpr Apr 25, 2025
58745c7
remove useless sleep
ihrpr Apr 25, 2025
1387929
rename require_initialization to standalone_mode
ihrpr Apr 25, 2025
bccff75
Merge branch 'ihrpr/streamablehttp-stateless' into ihrpr/get-sse
ihrpr Apr 25, 2025
9a6da2e
ruff check
ihrpr Apr 25, 2025
ff70bd6
Merge branch 'main' into ihrpr/streamablehttp-server
ihrpr May 2, 2025
179fbc8
Merge branch 'ihrpr/streamablehttp-server' into ihrpr/streamablehttp-…
ihrpr May 2, 2025
a979864
Merge branch 'ihrpr/streamablehttp-stateless' into ihrpr/get-sse
ihrpr May 2, 2025
181bea6
Merge branch 'main' into ihrpr/get-sse
ihrpr May 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions examples/servers/simple-streamablehttp-stateless/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# MCP Simple StreamableHttp Stateless Server Example

A stateless MCP server example demonstrating the StreamableHttp transport without maintaining session state. This example is ideal for understanding how to deploy MCP servers in multi-node environments where requests can be routed to any instance.

## Features

- Uses the StreamableHTTP transport in stateless mode (mcp_session_id=None)
- Each request creates a new ephemeral connection
- No session state maintained between requests
- Task lifecycle scoped to individual requests
- Suitable for deployment in multi-node environments


## Usage

Start the server:

```bash
# Using default port 3000
uv run mcp-simple-streamablehttp-stateless

# Using custom port
uv run mcp-simple-streamablehttp-stateless --port 3000

# Custom logging level
uv run mcp-simple-streamablehttp-stateless --log-level DEBUG

# Enable JSON responses instead of SSE streams
uv run mcp-simple-streamablehttp-stateless --json-response
```

The server exposes a tool named "start-notification-stream" that accepts three arguments:

- `interval`: Time between notifications in seconds (e.g., 1.0)
- `count`: Number of notifications to send (e.g., 5)
- `caller`: Identifier string for the caller


## Client

You can connect to this server using an HTTP client. For now, only the TypeScript SDK has streamable HTTP client examples, or you can use [Inspector](https://github.com/modelcontextprotocol/inspector) for testing.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .server import main

if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import contextlib
import logging

import anyio
import click
import mcp.types as types
from mcp.server.lowlevel import Server
from mcp.server.streamableHttp import (
StreamableHTTPServerTransport,
)
from starlette.applications import Starlette
from starlette.routing import Mount

logger = logging.getLogger(__name__)
# Global task group that will be initialized in the lifespan
task_group = None


@contextlib.asynccontextmanager
async def lifespan(app):
"""Application lifespan context manager for managing task group."""
global task_group

async with anyio.create_task_group() as tg:
task_group = tg
logger.info("Application started, task group initialized!")
try:
yield
finally:
logger.info("Application shutting down, cleaning up resources...")
if task_group:
tg.cancel_scope.cancel()
task_group = None
logger.info("Resources cleaned up successfully.")


@click.command()
@click.option("--port", default=3000, help="Port to listen on for HTTP")
@click.option(
"--log-level",
default="INFO",
help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
)
@click.option(
"--json-response",
is_flag=True,
default=False,
help="Enable JSON responses instead of SSE streams",
)
def main(
port: int,
log_level: str,
json_response: bool,
) -> int:
# Configure logging
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)

app = Server("mcp-streamable-http-stateless-demo")

@app.call_tool()
async def call_tool(
name: str, arguments: dict
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
ctx = app.request_context
interval = arguments.get("interval", 1.0)
count = arguments.get("count", 5)
caller = arguments.get("caller", "unknown")

# Send the specified number of notifications with the given interval
for i in range(count):
await ctx.session.send_log_message(
level="info",
data=f"Notification {i+1}/{count} from caller: {caller}",
logger="notification_stream",
related_request_id=ctx.request_id,
)
if i < count - 1: # Don't wait after the last notification
await anyio.sleep(interval)

return [
types.TextContent(
type="text",
text=(
f"Sent {count} notifications with {interval}s interval"
f" for caller: {caller}"
),
)
]

@app.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(
name="start-notification-stream",
description=(
"Sends a stream of notifications with configurable count"
" and interval"
),
inputSchema={
"type": "object",
"required": ["interval", "count", "caller"],
"properties": {
"interval": {
"type": "number",
"description": "Interval between notifications in seconds",
},
"count": {
"type": "number",
"description": "Number of notifications to send",
},
"caller": {
"type": "string",
"description": (
"Identifier of the caller to include in notifications"
),
},
},
},
)
]

# ASGI handler for stateless HTTP connections
async def handle_streamable_http(scope, receive, send):
logger.debug("Creating new transport")
# Use lock to prevent race conditions when creating new sessions
http_transport = StreamableHTTPServerTransport(
mcp_session_id=None,
is_json_response_enabled=json_response,
)
async with http_transport.connect() as streams:
read_stream, write_stream = streams

if not task_group:
raise RuntimeError("Task group is not initialized")

async def run_server():
await app.run(
read_stream,
write_stream,
app.create_initialization_options(),
# Runs in standalone mode for stateless deployments
# where clients perform initialization with any node
standalone_mode=True,
)

# Start server task
task_group.start_soon(run_server)

# Handle the HTTP request and return the response
await http_transport.handle_request(scope, receive, send)

# Create an ASGI application using the transport
starlette_app = Starlette(
debug=True,
routes=[
Mount("/mcp", app=handle_streamable_http),
],
lifespan=lifespan,
)

import uvicorn

uvicorn.run(starlette_app, host="0.0.0.0", port=port)

return 0
36 changes: 36 additions & 0 deletions examples/servers/simple-streamablehttp-stateless/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[project]
name = "mcp-simple-streamablehttp-stateless"
version = "0.1.0"
description = "A simple MCP server exposing a StreamableHttp transport in stateless mode"
readme = "README.md"
requires-python = ">=3.10"
authors = [{ name = "Anthropic, PBC." }]
keywords = ["mcp", "llm", "automation", "web", "fetch", "http", "streamable", "stateless"]
license = { text = "MIT" }
dependencies = ["anyio>=4.5", "click>=8.1.0", "httpx>=0.27", "mcp", "starlette", "uvicorn"]

[project.scripts]
mcp-simple-streamablehttp-stateless = "mcp_simple_streamablehttp_stateless.server:main"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["mcp_simple_streamablehttp_stateless"]

[tool.pyright]
include = ["mcp_simple_streamablehttp_stateless"]
venvPath = "."
venv = ".venv"

[tool.ruff.lint]
select = ["E", "F", "I"]
ignore = []

[tool.ruff]
line-length = 88
target-version = "py310"

[tool.uv]
dev-dependencies = ["pyright>=1.1.378", "pytest>=8.3.3", "ruff>=0.6.9"]
37 changes: 37 additions & 0 deletions examples/servers/simple-streamablehttp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# MCP Simple StreamableHttp Server Example

A simple MCP server example demonstrating the StreamableHttp transport, which enables HTTP-based communication with MCP servers using streaming.

## Features

- Uses the StreamableHTTP transport for server-client communication
- Supports REST API operations (POST, GET, DELETE) for `/mcp` endpoint
- Task management with anyio task groups
- Ability to send multiple notifications over time to the client
- Proper resource cleanup and lifespan management

## Usage

Start the server on the default or custom port:

```bash

# Using custom port
uv run mcp-simple-streamablehttp --port 3000

# Custom logging level
uv run mcp-simple-streamablehttp --log-level DEBUG

# Enable JSON responses instead of SSE streams
uv run mcp-simple-streamablehttp --json-response
```

The server exposes a tool named "start-notification-stream" that accepts three arguments:

- `interval`: Time between notifications in seconds (e.g., 1.0)
- `count`: Number of notifications to send (e.g., 5)
- `caller`: Identifier string for the caller

## Client

You can connect to this server using an HTTP client, for now only Typescript SDK has streamable HTTP client examples or you can use (Inspector)[https://github.com/modelcontextprotocol/inspector]
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .server import main

if __name__ == "__main__":
main()
Loading
Loading