@@ -225,13 +225,17 @@ func (p *Proxy) proxy(w http.ResponseWriter, r *http.Request) {
225
225
}()
226
226
227
227
// read loop -- take messages from websocket and write to http request
228
+ bufIn := make (chan []byte )
229
+ bufOut := make (chan []byte )
230
+
228
231
go func () {
229
232
if p .pingInterval > 0 && p .pingWait > 0 && p .pongWait > 0 {
230
233
conn .SetReadDeadline (time .Now ().Add (p .pongWait ))
231
234
conn .SetPongHandler (func (string ) error { conn .SetReadDeadline (time .Now ().Add (p .pongWait )); return nil })
232
235
}
233
236
defer func () {
234
237
cancelFn ()
238
+ close (bufIn )
235
239
}()
236
240
for {
237
241
select {
@@ -250,17 +254,72 @@ func (p *Proxy) proxy(w http.ResponseWriter, r *http.Request) {
250
254
p .logger .Warnln ("error reading websocket message:" , err )
251
255
return
252
256
}
253
- p .logger .Debugln ("[read] read payload:" , string (payload ))
254
- p .logger .Debugln ("[read] writing to requestBody:" )
255
- n , err := requestBodyW .Write (payload )
256
- requestBodyW .Write ([]byte ("\n " ))
257
- p .logger .Debugln ("[read] wrote to requestBody" , n )
258
- if err != nil {
259
- p .logger .Warnln ("[read] error writing message to upstream http server:" , err )
260
- return
257
+ bufIn <- payload
258
+ }
259
+ }()
260
+
261
+ // Buffer goroutine between conn and request.
262
+ // TODO: add memory limit so connection is closed if client is too fast
263
+ // TODO: use more efficient deque implementation instead of slice
264
+ go func () {
265
+ defer func () {
266
+ cancelFn ()
267
+ }()
268
+ defer close (bufOut )
269
+ buf := [][]byte {}
270
+ for {
271
+ if len (buf ) > 0 {
272
+ select {
273
+ case msg , ok := <- bufIn :
274
+ if ! ok {
275
+ return
276
+ }
277
+ buf = append (buf , msg )
278
+ case bufOut <- buf [0 ]:
279
+ buf = buf [1 :]
280
+ case <- ctx .Done ():
281
+ return
282
+ }
283
+ } else {
284
+ select {
285
+ case msg , ok := <- bufIn :
286
+ if ! ok {
287
+ return
288
+ }
289
+ buf = append (buf , msg )
290
+ case <- ctx .Done ():
291
+ return
292
+ }
261
293
}
262
294
}
263
295
}()
296
+
297
+ go func () {
298
+ defer func () {
299
+ cancelFn ()
300
+ }()
301
+ for {
302
+ select {
303
+ case payload , ok := <- bufOut :
304
+ if ! ok {
305
+ return
306
+ }
307
+ p .logger .Debugln ("[read] read payload:" , string (payload ))
308
+ p .logger .Debugln ("[read] writing to requestBody:" )
309
+ n , err := requestBodyW .Write (payload )
310
+ requestBodyW .Write ([]byte ("\n " ))
311
+ p .logger .Debugln ("[read] wrote to requestBody" , n )
312
+ if err != nil {
313
+ p .logger .Warnln ("[read] error writing message to upstream http server:" , err )
314
+ return
315
+ }
316
+ case <- ctx .Done ():
317
+ p .logger .Debugln ("write loop done" )
318
+ }
319
+ }
320
+ }()
321
+ //
322
+
264
323
// ping write loop
265
324
if p .pingInterval > 0 && p .pingWait > 0 && p .pongWait > 0 {
266
325
go func () {
0 commit comments