Skip to content

Commit 6c1dac8

Browse files
1 parent 75d0f96 commit 6c1dac8

File tree

2 files changed

+134
-2
lines changed

2 files changed

+134
-2
lines changed

packages/cli/src/util/createServer.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import * as E from 'fp-ts/Either';
77
import { pipe } from 'fp-ts/function';
88
import * as pino from 'pino';
99
import * as signale from 'signale';
10-
import * as split from 'split2';
10+
import * as split from './customTransformStream';
1111
import { PassThrough, Readable } from 'stream';
1212
import { LOG_COLOR_MAP } from '../const/options';
1313
import { CreatePrism } from './runner';
@@ -132,7 +132,7 @@ function pipeOutputToSignale(stream: Readable) {
132132
? prefix.concat(' ' + chalk.bold.white(`${logLine.input.method} ${logLine.input.url.path}`))
133133
: prefix;
134134
}
135-
135+
136136
stream.pipe(split(JSON.parse)).on('data', (logLine: PrismLogDescriptor) => {
137137
signale[logLine.level]({ prefix: constructPrefix(logLine), message: logLine.msg });
138138
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
'use strict'
2+
3+
import { Transform, TransformCallback, TransformOptions } from 'stream'
4+
import { StringDecoder } from 'string_decoder'
5+
6+
const kLast = Symbol('last')
7+
const kDecoder = Symbol('decoder')
8+
9+
interface SplitStreamOptions extends TransformOptions {
10+
matcher?: RegExp | string
11+
mapper?: (input: string) => any
12+
maxLength?: number
13+
skipOverflow?: boolean
14+
}
15+
16+
function transform(this: any, chunk: Buffer, enc: string, cb: TransformCallback): void {
17+
let decodedChunk = this[kDecoder].write(chunk); // Decode the chunk to a string
18+
19+
// Accumulate the chunk in the last buffer
20+
this[kLast] += decodedChunk;
21+
22+
try {
23+
// Try parsing the JSON data
24+
// Assuming the data is an array of JSON objects or newline-delimited JSON objects
25+
while (this[kLast]) {
26+
let obj;
27+
try {
28+
obj = JSON.parse(this[kLast]); // Try to parse the JSON
29+
this.push(obj); // Process the valid JSON object
30+
this[kLast] = ''; // Clear the accumulated data after processing
31+
} catch (error) {
32+
// If JSON is invalid, break out and wait for more data to accumulate
33+
break;
34+
}
35+
}
36+
} catch (error) {
37+
return cb(error as Error); // Return error through callback if an unexpected error occurs
38+
}
39+
40+
this.overflow = this[kLast].length > this.maxLength
41+
if (this.overflow && !this.skipOverflow) {
42+
cb(new Error('maximum buffer reached'))
43+
return
44+
}
45+
cb()
46+
}
47+
48+
function flush(this: any, cb: TransformCallback): void {
49+
// forward any gibberish left in there
50+
this[kLast] += this[kDecoder].end()
51+
52+
if (this[kLast]) {
53+
try {
54+
push(this, this.mapper(this[kLast]))
55+
} catch (error) {
56+
return cb(error)
57+
}
58+
}
59+
60+
cb()
61+
}
62+
63+
function push(self: any, val: any): void {
64+
if (val !== undefined) {
65+
self.push(val)
66+
}
67+
}
68+
69+
function noop(incoming: any): any {
70+
return incoming
71+
}
72+
73+
function split(
74+
matcher: RegExp | string = /\r?\n/,
75+
mapper: (input: string) => any = noop,
76+
options: SplitStreamOptions = {}
77+
): Transform {
78+
// Set defaults for any arguments not supplied.
79+
options = options || {}
80+
81+
// Test arguments explicitly.
82+
switch (arguments.length) {
83+
case 1:
84+
// If mapper is only argument.
85+
if (typeof matcher === 'function') {
86+
mapper = matcher
87+
matcher = /\r?\n/
88+
// If options is only argument.
89+
} else if (typeof matcher === 'object' && !(matcher instanceof RegExp) && !matcher[Symbol.split]) {
90+
options = matcher
91+
matcher = /\r?\n/
92+
}
93+
break
94+
95+
case 2:
96+
// If mapper and options are arguments.
97+
if (typeof matcher === 'function') {
98+
options = mapper
99+
mapper = matcher
100+
matcher = /\r?\n/
101+
// If matcher and options are arguments.
102+
} else if (typeof mapper === 'object') {
103+
options = mapper
104+
mapper = noop
105+
}
106+
}
107+
108+
options = Object.assign({}, options)
109+
options.autoDestroy = true
110+
options.transform = transform
111+
options.flush = flush
112+
options.readableObjectMode = true
113+
114+
const stream = new Transform(options)
115+
116+
stream[kLast] = ''
117+
stream[kDecoder] = new StringDecoder('utf8')
118+
stream.matcher = matcher
119+
stream.mapper = mapper
120+
stream.maxLength = options.maxLength
121+
stream.skipOverflow = options.skipOverflow || false
122+
stream.overflow = false
123+
stream._destroy = function (err: Error, cb: () => void): void {
124+
// Weird Node v12 bug that we need to work around
125+
this._writableState.errorEmitted = false
126+
cb(err)
127+
}
128+
129+
return stream
130+
}
131+
132+
export = split

0 commit comments

Comments
 (0)