11/* eslint-disable max-depth */
22/* eslint-disable complexity */
33
4+ import { HTTPParser } from '@achingbrain/http-parser-js'
45import { multiaddr , protocols } from '@multiformats/multiaddr'
56import { multiaddrToUri } from '@multiformats/multiaddr-to-uri'
6- // @ts -expect-error missing types
7- import { milo } from '@perseveranza-pets/milo/index-with-wasm.js'
87import defer from 'p-defer'
9- import { Uint8ArrayList , isUint8ArrayList } from 'uint8arraylist'
8+ import { type Uint8ArrayList } from 'uint8arraylist'
9+
1010interface Fetch { ( req : Request ) : Promise < Response > }
1111
12+ const METHOD_GET = 1
13+
14+ function getStringMethod ( method : number ) : string {
15+ if ( method === 1 ) {
16+ return 'GET'
17+ }
18+
19+ return 'UNKNOWN'
20+ }
21+
1222interface Duplex < TSource , TSink = TSource , RSink = Promise < void > > {
1323 source : AsyncIterable < TSource > | Iterable < TSource >
1424 sink ( source : AsyncIterable < TSink > | Iterable < TSink > ) : RSink
@@ -85,8 +95,6 @@ export async function handleRequestViaDuplex (s: Duplex<Uint8Array | Uint8ArrayL
8595 await writeResponseToDuplex ( s , resp )
8696}
8797
88- const BUFFER_SIZE = 16 << 10
89-
9098/**
9199 * Exported for testing.
92100 *
@@ -100,85 +108,37 @@ export function readHTTPMsg (expectRequest: boolean, r: Duplex<Uint8Array | Uint
100108 return [
101109 msgPromise . promise ,
102110 ( async ( ) => {
103- const unconsumedChunks = new Uint8ArrayList ( )
104-
105- const textDecoder = new TextDecoder ( )
106- const ptr = milo . alloc ( BUFFER_SIZE )
107-
108- const parser = milo . create ( )
109- // Simplifies implementation at the cost of storing data twice
110- milo . setManageUnconsumed ( parser , true )
111-
112- const bodyStreamControllerPromise = defer < ReadableStreamController < Uint8Array > > ( )
113- const body = new ReadableStream < Uint8Array > ( {
114- async start ( controller ) {
115- bodyStreamControllerPromise . resolve ( controller )
116- }
117- } )
118- const bodyStreamController = await bodyStreamControllerPromise . promise
119-
120- // Response
121- let status = ''
122- let reason = ''
123-
124- // Requests
125- let url = ''
126- let method = ''
127-
111+ const body = new TransformStream ( )
112+ const writer = body . writable . getWriter ( )
113+ let messageComplete = false
128114 let fulfilledMsgPromise = false
129115
130- milo . setOnStatus ( parser , ( _ : unknown , from : number , size : number ) => {
131- status = textDecoder . decode ( unconsumedChunks . subarray ( from , from + size ) )
132- } )
133- milo . setOnReason ( parser , ( _ : unknown , from : number , size : number ) => {
134- reason = textDecoder . decode ( unconsumedChunks . subarray ( from , from + size ) )
135- } )
136- milo . setOnUrl ( parser , ( _ : unknown , from : number , size : number ) => {
137- url = textDecoder . decode ( unconsumedChunks . subarray ( from , from + size ) )
138- } )
139- milo . setOnMethod ( parser , ( _ : unknown , from : number , size : number ) => {
140- method = textDecoder . decode ( unconsumedChunks . subarray ( from , from + size ) )
141- } )
116+ const parser = new HTTPParser ( expectRequest ? 'REQUEST' : 'RESPONSE' )
117+ parser [ HTTPParser . kOnHeadersComplete ] = ( info ) => {
118+ fulfilledMsgPromise = true
142119
143- milo . setOnRequest ( parser , ( ) => {
144- if ( ! expectRequest ) {
145- msgPromise . reject ( new Error ( 'Received request instead of response' ) )
146- fulfilledMsgPromise = true
147- }
148- } )
149- milo . setOnResponse ( parser , ( ) => {
150- if ( expectRequest ) {
151- msgPromise . reject ( new Error ( 'Received response instead of request' ) )
152- fulfilledMsgPromise = true
120+ // Handle the headers
121+ const headers = new Headers ( )
122+
123+ for ( let i = 0 ; i < info . headers . length ; i += 2 ) {
124+ headers . set ( info . headers [ i ] , info . headers [ i + 1 ] )
153125 }
154- } )
155126
156- // Handle the headers
157- const headers = new Headers ( )
158- let lastHeaderName : string = ''
127+ let reqBody : ReadableStream | null = body . readable
159128
160- milo . setOnHeaderName ( parser , ( _ : unknown , from : number , size : number ) => {
161- lastHeaderName = textDecoder . decode ( unconsumedChunks . subarray ( from , from + size ) )
162- } )
163- milo . setOnHeaderValue ( parser , ( _ : unknown , from : number , size : number ) => {
164- const headerVal = textDecoder . decode ( unconsumedChunks . subarray ( from , from + size ) )
165- headers . set ( lastHeaderName , headerVal )
166- } )
167- milo . setOnHeaders ( parser , ( _ : unknown , from : number , size : number ) => {
168129 // Headers are parsed. We can return the response
169130 try {
170131 if ( expectRequest ) {
171- let reqBody : ReadableStream < Uint8Array > | null = body
172- if ( method === 'GET' ) {
132+ if ( info . method === METHOD_GET ) {
173133 reqBody = null
174134 }
175135
176- const urlWithHost = `https://${ headers . get ( 'Host' ) ?? 'unknown_host._libp2p' } ${ url } `
136+ const urlWithHost = `https://${ headers . get ( 'Host' ) ?? 'unknown_host._libp2p' } ${ info . url } `
177137 detectBrokenRequestBody ( ) . then ( async ( broken ) => {
178138 let req : Request
179139 if ( ! broken ) {
180140 req = new Request ( urlWithHost , {
181- method,
141+ method : getStringMethod ( info . method ) ,
182142 body : reqBody ,
183143 headers,
184144 // @ts -expect-error this is required by NodeJS despite being the only reasonable option https://fetch.spec.whatwg.org/#requestinit
@@ -187,7 +147,7 @@ export function readHTTPMsg (expectRequest: boolean, r: Duplex<Uint8Array | Uint
187147 } else {
188148 if ( reqBody === null ) {
189149 req = new Request ( urlWithHost , {
190- method,
150+ method : getStringMethod ( info . method ) ,
191151 headers
192152 } )
193153 } else {
@@ -211,7 +171,7 @@ export function readHTTPMsg (expectRequest: boolean, r: Duplex<Uint8Array | Uint
211171 offset += parts [ i ] . byteLength
212172 }
213173 req = new Request ( urlWithHost , {
214- method,
174+ method : getStringMethod ( info . method ) ,
215175 body,
216176 headers
217177 } )
@@ -223,63 +183,51 @@ export function readHTTPMsg (expectRequest: boolean, r: Duplex<Uint8Array | Uint
223183 msgPromise . reject ( err )
224184 } )
225185 } else {
226- let respBody : ReadableStream < Uint8Array > | null = body
227- if ( status === ' 204' ) {
186+ let respBody : ReadableStream < Uint8Array > | null = body . readable
187+ if ( info . statusCode === 204 ) {
228188 respBody = null
229189 }
230190 const resp = new Response ( respBody , {
231191 headers,
232- status : parseInt ( status ) ,
233- statusText : reason
192+ status : info . statusCode ,
193+ statusText : info . statusMessage
234194 } )
235195 msgPromise . resolve ( resp )
236196 fulfilledMsgPromise = true
237197 }
238198 } catch ( error ) {
239199 msgPromise . reject ( error )
240200 }
241- } )
242-
243- // Handle the body
244- milo . setOnData ( parser , ( _ : unknown , from : number , size : number ) => {
245- const c : Uint8Array = unconsumedChunks . subarray ( from , from + size )
246- // @ts -expect-error Unclear why this fails typecheck. TODO debug
247- bodyStreamController . enqueue ( c )
248- } )
249- milo . setOnError ( parser , ( ) => {
250- bodyStreamController . error ( new Error ( 'Error parsing HTTP message' ) )
251- } )
252-
253- let messageComplete = false
254- milo . setOnMessageComplete ( parser , ( ) => {
255- bodyStreamController . close ( )
201+ }
202+ parser [ HTTPParser . kOnBody ] = ( buf ) => {
203+ writer . write ( buf )
204+ . catch ( ( err : Error ) => {
205+ msgPromise . reject ( err )
206+ } )
207+ }
208+ parser [ HTTPParser . kOnMessageComplete ] = ( ) => {
256209 messageComplete = true
257- } )
210+ writer . close ( )
211+ . catch ( ( err : Error ) => {
212+ msgPromise . reject ( err )
213+ } )
214+ }
258215
259216 // Consume data
260- for await ( let chunks of r . source ) {
261- if ( ! isUint8ArrayList ( chunks ) ) {
262- chunks = new Uint8ArrayList ( chunks )
263- }
264- for ( const chunk of chunks ) {
265- unconsumedChunks . append ( chunk )
266- const buffer = new Uint8Array ( milo . memory . buffer , ptr , BUFFER_SIZE )
267- buffer . set ( chunk , 0 )
268- const consumed = milo . parse ( parser , ptr , chunk . length )
269- unconsumedChunks . consume ( consumed )
270- }
217+ for await ( const chunks of r . source ) {
218+ const chunk = chunks . subarray ( )
219+ parser . execute ( chunk )
271220 }
272- milo . finish ( parser )
221+
222+ parser . finish ( )
273223
274224 if ( ! messageComplete ) {
275- bodyStreamController . error ( new Error ( 'Incomplete HTTP message' ) )
225+ await writer . abort ( new Error ( 'Incomplete HTTP message' ) )
226+
276227 if ( ! fulfilledMsgPromise ) {
277228 msgPromise . reject ( new Error ( 'Incomplete HTTP message' ) )
278229 }
279230 }
280-
281- milo . destroy ( parser )
282- milo . dealloc ( ptr , BUFFER_SIZE )
283231 } ) ( )
284232 ]
285233}
0 commit comments