Skip to content

Commit a727557

Browse files
make transport functional
1 parent fa7bfe8 commit a727557

File tree

3 files changed

+392
-157
lines changed

3 files changed

+392
-157
lines changed
Lines changed: 58 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
import docker.client
1+
from aiodocker import Docker
22
from contextlib import asynccontextmanager
33
import anyio
44
import anyio.lowlevel
5-
import anyio.to_thread
65
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
76
from loguru import logger
87
from config.final import DockerMCPServer
98
from mcp import types
109

10+
1111
@asynccontextmanager
1212
async def docker_client(server: DockerMCPServer):
1313
"""
@@ -23,66 +23,72 @@ async def docker_client(server: DockerMCPServer):
2323
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
2424
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
2525

26-
client = docker.client.from_env()
27-
container = client.containers.run(
28-
image=server.image,
29-
name=server.container_name if server.container_name else None,
30-
command=server.args if server.args else None,
31-
environment=server.env if isinstance(server.env, (dict, list)) else {},
32-
detach=True,
33-
stdin_open=True,
34-
stdout=True,
35-
stderr=True,
36-
tty=False,
37-
)
26+
docker = Docker()
3827

39-
logger.debug(f"made instance of docker client for {container.name}")
28+
try:
29+
# Pull the image if not available
30+
await docker.images.pull(server.image)
4031

41-
# Attach to container's input/output streams
42-
attach_socket = container.attach_socket(params={"stdin": 1, "stdout": 1, "stderr": 1, "stream": 1})
43-
attach_socket._sock.setblocking(False)
32+
# Create the container
33+
container = await docker.containers.create({
34+
"Image": server.image,
35+
"Args": server.args,
36+
"OpenStdin": True,
37+
"AttachStdout": True,
38+
"AttachStderr": True,
39+
"Tty": False,
40+
"HostConfig": {"AutoRemove": True},
41+
})
4442

45-
async def read_from_stdout():
46-
try:
47-
async with read_stream_writer:
48-
buffer = ""
49-
while True:
50-
chunk = await anyio.to_thread.run_sync(lambda: attach_socket._sock.recv(1024))
51-
if not chunk:
52-
break
43+
await container.start()
44+
logger.debug(f"Started Docker container {container.id}")
5345

54-
chunk = chunk.decode("utf-8")
55-
lines = (buffer + chunk).split("\n")
56-
buffer = lines.pop()
46+
# Attach to the container's input/output streams
47+
attach_result = container.attach(stdout=True, stdin=True)
5748

58-
for line in lines:
59-
try:
60-
message = types.JSONRPCMessage.model_validate_json(line)
61-
except Exception as exc:
62-
await read_stream_writer.send(exc)
49+
async def read_from_stdout():
50+
try:
51+
async with read_stream_writer:
52+
buffer = ""
53+
while True:
54+
msg = await attach_result.read_out()
55+
if msg is None:
6356
continue
57+
chunk = msg.data
58+
if isinstance(chunk, bytes):
59+
chunk = chunk.decode("utf-8")
60+
lines = (buffer + chunk).split("\n")
61+
buffer = lines.pop()
6462

65-
await read_stream_writer.send(message)
66-
except anyio.ClosedResourceError:
67-
await anyio.lowlevel.checkpoint()
63+
for line in lines:
64+
try:
65+
json_message = types.JSONRPCMessage.model_validate_json(line)
66+
await read_stream_writer.send(json_message)
67+
except Exception as exc:
68+
await read_stream_writer.send(exc)
69+
except anyio.ClosedResourceError:
70+
await anyio.lowlevel.checkpoint()
71+
72+
async def write_to_stdin():
73+
try:
74+
async with write_stream_reader:
75+
async for message in write_stream_reader:
76+
json = message.model_dump_json(by_alias=True, exclude_none=True)
77+
await attach_result.write_in(json.encode("utf-8") + b"\n")
78+
except anyio.ClosedResourceError:
79+
await anyio.lowlevel.checkpoint()
6880

69-
async def write_to_stdin():
7081
try:
71-
async with write_stream_reader:
72-
async for message in write_stream_reader:
73-
json = message.model_dump_json(by_alias=True, exclude_none=True)
74-
await anyio.to_thread.run_sync(lambda: attach_socket._sock.sendall((json + "\n").encode("utf-8")))
75-
except anyio.ClosedResourceError:
76-
await anyio.lowlevel.checkpoint()
82+
async with anyio.create_task_group() as tg:
83+
tg.start_soon(read_from_stdout)
84+
tg.start_soon(write_to_stdin)
85+
yield read_stream, write_stream
86+
finally:
87+
await container.stop()
88+
await container.delete()
7789

78-
try:
79-
async with anyio.create_task_group() as tg:
80-
tg.start_soon(read_from_stdout)
81-
tg.start_soon(write_to_stdin)
82-
yield read_stream, write_stream
8390
except Exception as e:
8491
logger.error(f"Error in docker client: {e}")
8592
finally:
86-
attach_socket._sock.close()
87-
# Do not stop or remove the container
88-
logger.debug(f"Container {container.name} remains running.")
93+
await docker.close()
94+
logger.debug("Docker client closed.")

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ description = "A middleware to provide an openAI compatible endpoint that can ca
55
readme = "README.md"
66
requires-python = ">=3.11"
77
dependencies = [
8+
"aiodocker>=0.24.0",
89
"deepmerge>=2.0",
9-
"docker>=7.1.0",
1010
"fastapi>=0.115.6",
1111
"httpx>=0.28.1",
1212
"httpx-sse>=0.4.0",

0 commit comments

Comments
 (0)