1
1
/* eslint-disable max-depth */
2
2
/* eslint-disable complexity */
3
3
4
+ import { HTTPParser } from '@achingbrain/http-parser-js'
4
5
import { multiaddr , protocols } from '@multiformats/multiaddr'
5
6
import { multiaddrToUri } from '@multiformats/multiaddr-to-uri'
6
- // @ts -expect-error missing types
7
- import { milo } from '@perseveranza-pets/milo/index-with-wasm.js'
8
7
import defer from 'p-defer'
9
- import { Uint8ArrayList , isUint8ArrayList } from 'uint8arraylist'
8
+ import { type Uint8ArrayList } from 'uint8arraylist'
9
+
10
10
interface Fetch { ( req : Request ) : Promise < Response > }
11
11
12
+ const METHOD_GET = 1
13
+
14
+ function getStringMethod ( method : number ) : string {
15
+ if ( method === 1 ) {
16
+ return 'GET'
17
+ }
18
+
19
+ return 'UNKNOWN'
20
+ }
21
+
12
22
interface Duplex < TSource , TSink = TSource , RSink = Promise < void > > {
13
23
source : AsyncIterable < TSource > | Iterable < TSource >
14
24
sink ( source : AsyncIterable < TSink > | Iterable < TSink > ) : RSink
@@ -85,8 +95,6 @@ export async function handleRequestViaDuplex (s: Duplex<Uint8Array | Uint8ArrayL
85
95
await writeResponseToDuplex ( s , resp )
86
96
}
87
97
88
- const BUFFER_SIZE = 16 << 10
89
-
90
98
/**
91
99
* Exported for testing.
92
100
*
@@ -100,85 +108,37 @@ export function readHTTPMsg (expectRequest: boolean, r: Duplex<Uint8Array | Uint
100
108
return [
101
109
msgPromise . promise ,
102
110
( async ( ) => {
103
- const unconsumedChunks = new Uint8ArrayList ( )
104
-
105
- const textDecoder = new TextDecoder ( )
106
- const ptr = milo . alloc ( BUFFER_SIZE )
107
-
108
- const parser = milo . create ( )
109
- // Simplifies implementation at the cost of storing data twice
110
- milo . setManageUnconsumed ( parser , true )
111
-
112
- const bodyStreamControllerPromise = defer < ReadableStreamController < Uint8Array > > ( )
113
- const body = new ReadableStream < Uint8Array > ( {
114
- async start ( controller ) {
115
- bodyStreamControllerPromise . resolve ( controller )
116
- }
117
- } )
118
- const bodyStreamController = await bodyStreamControllerPromise . promise
119
-
120
- // Response
121
- let status = ''
122
- let reason = ''
123
-
124
- // Requests
125
- let url = ''
126
- let method = ''
127
-
111
+ const body = new TransformStream ( )
112
+ const writer = body . writable . getWriter ( )
113
+ let messageComplete = false
128
114
let fulfilledMsgPromise = false
129
115
130
- milo . setOnStatus ( parser , ( _ : unknown , from : number , size : number ) => {
131
- status = textDecoder . decode ( unconsumedChunks . subarray ( from , from + size ) )
132
- } )
133
- milo . setOnReason ( parser , ( _ : unknown , from : number , size : number ) => {
134
- reason = textDecoder . decode ( unconsumedChunks . subarray ( from , from + size ) )
135
- } )
136
- milo . setOnUrl ( parser , ( _ : unknown , from : number , size : number ) => {
137
- url = textDecoder . decode ( unconsumedChunks . subarray ( from , from + size ) )
138
- } )
139
- milo . setOnMethod ( parser , ( _ : unknown , from : number , size : number ) => {
140
- method = textDecoder . decode ( unconsumedChunks . subarray ( from , from + size ) )
141
- } )
116
+ const parser = new HTTPParser ( expectRequest ? 'REQUEST' : 'RESPONSE' )
117
+ parser [ HTTPParser . kOnHeadersComplete ] = ( info ) => {
118
+ fulfilledMsgPromise = true
142
119
143
- milo . setOnRequest ( parser , ( ) => {
144
- if ( ! expectRequest ) {
145
- msgPromise . reject ( new Error ( 'Received request instead of response' ) )
146
- fulfilledMsgPromise = true
147
- }
148
- } )
149
- milo . setOnResponse ( parser , ( ) => {
150
- if ( expectRequest ) {
151
- msgPromise . reject ( new Error ( 'Received response instead of request' ) )
152
- fulfilledMsgPromise = true
120
+ // Handle the headers
121
+ const headers = new Headers ( )
122
+
123
+ for ( let i = 0 ; i < info . headers . length ; i += 2 ) {
124
+ headers . set ( info . headers [ i ] , info . headers [ i + 1 ] )
153
125
}
154
- } )
155
126
156
- // Handle the headers
157
- const headers = new Headers ( )
158
- let lastHeaderName : string = ''
127
+ let reqBody : ReadableStream | null = body . readable
159
128
160
- milo . setOnHeaderName ( parser , ( _ : unknown , from : number , size : number ) => {
161
- lastHeaderName = textDecoder . decode ( unconsumedChunks . subarray ( from , from + size ) )
162
- } )
163
- milo . setOnHeaderValue ( parser , ( _ : unknown , from : number , size : number ) => {
164
- const headerVal = textDecoder . decode ( unconsumedChunks . subarray ( from , from + size ) )
165
- headers . set ( lastHeaderName , headerVal )
166
- } )
167
- milo . setOnHeaders ( parser , ( _ : unknown , from : number , size : number ) => {
168
129
// Headers are parsed. We can return the response
169
130
try {
170
131
if ( expectRequest ) {
171
- let reqBody : ReadableStream < Uint8Array > | null = body
172
- if ( method === 'GET' ) {
132
+ if ( info . method === METHOD_GET ) {
173
133
reqBody = null
174
134
}
175
135
176
- const urlWithHost = `https://${ headers . get ( 'Host' ) ?? 'unknown_host._libp2p' } ${ url } `
136
+ const urlWithHost = `https://${ headers . get ( 'Host' ) ?? 'unknown_host._libp2p' } ${ info . url } `
177
137
detectBrokenRequestBody ( ) . then ( async ( broken ) => {
178
138
let req : Request
179
139
if ( ! broken ) {
180
140
req = new Request ( urlWithHost , {
181
- method,
141
+ method : getStringMethod ( info . method ) ,
182
142
body : reqBody ,
183
143
headers,
184
144
// @ts -expect-error this is required by NodeJS despite being the only reasonable option https://fetch.spec.whatwg.org/#requestinit
@@ -187,7 +147,7 @@ export function readHTTPMsg (expectRequest: boolean, r: Duplex<Uint8Array | Uint
187
147
} else {
188
148
if ( reqBody === null ) {
189
149
req = new Request ( urlWithHost , {
190
- method,
150
+ method : getStringMethod ( info . method ) ,
191
151
headers
192
152
} )
193
153
} else {
@@ -211,7 +171,7 @@ export function readHTTPMsg (expectRequest: boolean, r: Duplex<Uint8Array | Uint
211
171
offset += parts [ i ] . byteLength
212
172
}
213
173
req = new Request ( urlWithHost , {
214
- method,
174
+ method : getStringMethod ( info . method ) ,
215
175
body,
216
176
headers
217
177
} )
@@ -223,63 +183,51 @@ export function readHTTPMsg (expectRequest: boolean, r: Duplex<Uint8Array | Uint
223
183
msgPromise . reject ( err )
224
184
} )
225
185
} else {
226
- let respBody : ReadableStream < Uint8Array > | null = body
227
- if ( status === ' 204' ) {
186
+ let respBody : ReadableStream < Uint8Array > | null = body . readable
187
+ if ( info . statusCode === 204 ) {
228
188
respBody = null
229
189
}
230
190
const resp = new Response ( respBody , {
231
191
headers,
232
- status : parseInt ( status ) ,
233
- statusText : reason
192
+ status : info . statusCode ,
193
+ statusText : info . statusMessage
234
194
} )
235
195
msgPromise . resolve ( resp )
236
196
fulfilledMsgPromise = true
237
197
}
238
198
} catch ( error ) {
239
199
msgPromise . reject ( error )
240
200
}
241
- } )
242
-
243
- // Handle the body
244
- milo . setOnData ( parser , ( _ : unknown , from : number , size : number ) => {
245
- const c : Uint8Array = unconsumedChunks . subarray ( from , from + size )
246
- // @ts -expect-error Unclear why this fails typecheck. TODO debug
247
- bodyStreamController . enqueue ( c )
248
- } )
249
- milo . setOnError ( parser , ( ) => {
250
- bodyStreamController . error ( new Error ( 'Error parsing HTTP message' ) )
251
- } )
252
-
253
- let messageComplete = false
254
- milo . setOnMessageComplete ( parser , ( ) => {
255
- bodyStreamController . close ( )
201
+ }
202
+ parser [ HTTPParser . kOnBody ] = ( buf ) => {
203
+ writer . write ( buf )
204
+ . catch ( ( err : Error ) => {
205
+ msgPromise . reject ( err )
206
+ } )
207
+ }
208
+ parser [ HTTPParser . kOnMessageComplete ] = ( ) => {
256
209
messageComplete = true
257
- } )
210
+ writer . close ( )
211
+ . catch ( ( err : Error ) => {
212
+ msgPromise . reject ( err )
213
+ } )
214
+ }
258
215
259
216
// Consume data
260
- for await ( let chunks of r . source ) {
261
- if ( ! isUint8ArrayList ( chunks ) ) {
262
- chunks = new Uint8ArrayList ( chunks )
263
- }
264
- for ( const chunk of chunks ) {
265
- unconsumedChunks . append ( chunk )
266
- const buffer = new Uint8Array ( milo . memory . buffer , ptr , BUFFER_SIZE )
267
- buffer . set ( chunk , 0 )
268
- const consumed = milo . parse ( parser , ptr , chunk . length )
269
- unconsumedChunks . consume ( consumed )
270
- }
217
+ for await ( const chunks of r . source ) {
218
+ const chunk = chunks . subarray ( )
219
+ parser . execute ( chunk )
271
220
}
272
- milo . finish ( parser )
221
+
222
+ parser . finish ( )
273
223
274
224
if ( ! messageComplete ) {
275
- bodyStreamController . error ( new Error ( 'Incomplete HTTP message' ) )
225
+ await writer . abort ( new Error ( 'Incomplete HTTP message' ) )
226
+
276
227
if ( ! fulfilledMsgPromise ) {
277
228
msgPromise . reject ( new Error ( 'Incomplete HTTP message' ) )
278
229
}
279
230
}
280
-
281
- milo . destroy ( parser )
282
- milo . dealloc ( ptr , BUFFER_SIZE )
283
231
} ) ( )
284
232
]
285
233
}
0 commit comments