diff --git a/src/protocol/protocol.ts b/src/protocol/protocol.ts index cb4da86..51aad19 100644 --- a/src/protocol/protocol.ts +++ b/src/protocol/protocol.ts @@ -109,24 +109,38 @@ export class WireProtocol { if (this.#isPendingResponse) return; this.#isPendingResponse = true; while (this.#pendingResponses.size > 0) { - const headerBuffer = await this.read_socket(16); - if (!headerBuffer) { - throw new MongoDriverError("Invalid response header"); - } - const header = parseHeader(headerBuffer); - let bodyBytes = header.messageLength - 16; - if (bodyBytes < 0) bodyBytes = 0; - const bodyBuffer = await this.read_socket(header.messageLength - 16); - if (!bodyBuffer) { - throw new MongoDriverError("Invalid response body"); - } - const pendingMessage = this.#pendingResponses.get(header.responseTo); - this.#pendingResponses.delete(header.responseTo); try { - const reply = deserializeMessage(header, bodyBuffer); - pendingMessage?.resolve(reply); - } catch (e) { - pendingMessage?.reject(e); + const headerBuffer = await this.read_socket(16); + if (!headerBuffer) { + throw new MongoDriverError("Invalid response header"); + } + const header = parseHeader(headerBuffer); + let bodyBytes = header.messageLength - 16; + if (bodyBytes < 0) bodyBytes = 0; + const bodyBuffer = await this.read_socket(header.messageLength - 16); + if (!bodyBuffer) { + throw new MongoDriverError("Invalid response body"); + } + const pendingMessage = this.#pendingResponses.get(header.responseTo); + this.#pendingResponses.delete(header.responseTo); + try { + const reply = deserializeMessage(header, bodyBuffer); + pendingMessage?.resolve(reply); + } catch (e) { + pendingMessage?.reject(e); + } + } catch (error) { + // If an error occurred in the above block, we won't be able to know for + // sure which specific message triggered the error. + // Though since the state appears to be so broken that we can't even + // read the header anymore, it's likely that the connection has + // simply closed. + // We'll just reject all pending messages so that the user can + // handle these themselves. + for (const pendingMessage of this.#pendingResponses.values()) { + pendingMessage.reject(error); + } + this.#pendingResponses.clear(); } } this.#isPendingResponse = false;