diff --git a/Sources/MCP/Base/Transports/HTTPClientTransport.swift b/Sources/MCP/Base/Transports/HTTPClientTransport.swift index 9cf93a0..d4d7f5a 100644 --- a/Sources/MCP/Base/Transports/HTTPClientTransport.swift +++ b/Sources/MCP/Base/Transports/HTTPClientTransport.swift @@ -103,7 +103,7 @@ public actor HTTPClientTransport: Actor, Transport { request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id") } - let (responseData, response) = try await session.data(for: request) + let (responseStream, response) = try await session.bytes(for: request) guard let httpResponse = response as? HTTPURLResponse else { throw MCPError.internalError("Invalid HTTP response") @@ -124,15 +124,19 @@ public actor HTTPClientTransport: Actor, Transport { // For SSE, the processing happens in the streaming task if contentType.contains("text/event-stream") { logger.debug("Received SSE response, processing in streaming task") - // The streaming is handled by the SSE task if active + try await self.processSSE(responseStream) return } // For JSON responses, deliver the data directly - if contentType.contains("application/json") && !responseData.isEmpty { - logger.debug("Received JSON response", metadata: ["size": "\(responseData.count)"]) - messageContinuation.yield(responseData) + if contentType.contains("application/json") { + var buffer = Data() + for try await byte in responseStream { + buffer.append(byte) + } + messageContinuation.yield(buffer) } + case 404: // If we get a 404 with a session ID, it means our session is invalid if sessionID != nil { @@ -141,6 +145,11 @@ public actor HTTPClientTransport: Actor, Transport { throw MCPError.internalError("Session expired") } throw MCPError.internalError("Endpoint not found") + + case 405: + // MCP Transport spec states that if a server doesn't support streaming from the GET, + // it should return a 405. So lets cancel the task. + self.streamingTask?.cancel() default: throw MCPError.internalError("HTTP error: \(httpResponse.statusCode)") } @@ -189,10 +198,17 @@ public actor HTTPClientTransport: Actor, Transport { request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id") } + // S.T. A different approach to resumability must be figured out + // to properly support the spec. Since SSE can come in from the + // POSTs, their sse message id's end up getting saves to lastEventID + // and then passed to the GET which tells the server you are trying + // to resume something that is "done". When that happens the GET + // will not return notifications. + // Add Last-Event-ID header for resumability if available - if let lastEventID = lastEventID { - request.addValue(lastEventID, forHTTPHeaderField: "Last-Event-ID") - } +// if let lastEventID = lastEventID { +// request.addValue(lastEventID, forHTTPHeaderField: "Last-Event-ID") +// } logger.debug("Starting SSE connection") @@ -219,6 +235,10 @@ public actor HTTPClientTransport: Actor, Transport { self.sessionID = newSessionID } + try await self.processSSE(stream) + } + + private func processSSE(_ stream: URLSession.AsyncBytes) async throws { // Process the SSE stream var buffer = "" var eventType = "" @@ -309,4 +329,5 @@ public actor HTTPClientTransport: Actor, Transport { } } #endif + }