Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STOP-2386 #2679

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/cli/src/util/createServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import * as E from 'fp-ts/Either';
import { pipe } from 'fp-ts/function';
import * as pino from 'pino';
import * as signale from 'signale';
import * as split from 'split2';
import * as split from './customTransformStream';
import { PassThrough, Readable } from 'stream';
import { LOG_COLOR_MAP } from '../const/options';
import { CreatePrism } from './runner';
Expand Down Expand Up @@ -132,7 +132,7 @@ function pipeOutputToSignale(stream: Readable) {
? prefix.concat(' ' + chalk.bold.white(`${logLine.input.method} ${logLine.input.url.path}`))
: prefix;
}

stream.pipe(split(JSON.parse)).on('data', (logLine: PrismLogDescriptor) => {
signale[logLine.level]({ prefix: constructPrefix(logLine), message: logLine.msg });
});
Expand Down
132 changes: 132 additions & 0 deletions packages/cli/src/util/customTransformStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
'use strict'

import { Transform, TransformCallback, TransformOptions } from 'stream'
import { StringDecoder } from 'string_decoder'

const kLast = Symbol('last')
const kDecoder = Symbol('decoder')

interface SplitStreamOptions extends TransformOptions {
matcher?: RegExp | string
mapper?: (input: string) => any
maxLength?: number
skipOverflow?: boolean
}

function transform(this: any, chunk: Buffer, enc: string, cb: TransformCallback): void {
let decodedChunk = this[kDecoder].write(chunk); // Decode the chunk to a string

// Accumulate the chunk in the last buffer
this[kLast] += decodedChunk;

try {
// Try parsing the JSON data
// Assuming the data is an array of JSON objects or newline-delimited JSON objects
while (this[kLast]) {
let obj;
try {
obj = JSON.parse(this[kLast]); // Try to parse the JSON
this.push(obj); // Process the valid JSON object
this[kLast] = ''; // Clear the accumulated data after processing
} catch (error) {
// If JSON is invalid, break out and wait for more data to accumulate
break;
}
}
} catch (error) {
return cb(error as Error); // Return error through callback if an unexpected error occurs
}

this.overflow = this[kLast].length > this.maxLength
if (this.overflow && !this.skipOverflow) {
cb(new Error('maximum buffer reached'))
return
}
cb()
}

function flush(this: any, cb: TransformCallback): void {
// forward any gibberish left in there
this[kLast] += this[kDecoder].end()

if (this[kLast]) {
try {
push(this, this.mapper(this[kLast]))
} catch (error) {
return cb(error)
}
}

cb()
}

function push(self: any, val: any): void {
if (val !== undefined) {
self.push(val)
}
}

function noop(incoming: any): any {
return incoming
}

function split(
matcher: RegExp | string = /\r?\n/,
mapper: (input: string) => any = noop,
options: SplitStreamOptions = {}
): Transform {
// Set defaults for any arguments not supplied.
options = options || {}

// Test arguments explicitly.
switch (arguments.length) {
case 1:
// If mapper is only argument.
if (typeof matcher === 'function') {
mapper = matcher
matcher = /\r?\n/
// If options is only argument.
} else if (typeof matcher === 'object' && !(matcher instanceof RegExp) && !matcher[Symbol.split]) {
options = matcher
matcher = /\r?\n/
}
break

case 2:
// If mapper and options are arguments.
if (typeof matcher === 'function') {
options = mapper
mapper = matcher
matcher = /\r?\n/
// If matcher and options are arguments.
} else if (typeof mapper === 'object') {
options = mapper
mapper = noop
}
}

options = Object.assign({}, options)
options.autoDestroy = true
options.transform = transform
options.flush = flush
options.readableObjectMode = true

const stream = new Transform(options)

stream[kLast] = ''
stream[kDecoder] = new StringDecoder('utf8')
stream.matcher = matcher
stream.mapper = mapper
stream.maxLength = options.maxLength
stream.skipOverflow = options.skipOverflow || false
stream.overflow = false
stream._destroy = function (err: Error, cb: () => void): void {
// Weird Node v12 bug that we need to work around
this._writableState.errorEmitted = false
cb(err)
}

return stream
}

export = split
Loading