From 55df2cf68210a2940158cdbf8037210b6e8a013b Mon Sep 17 00:00:00 2001 From: Catalin Pop Date: Fri, 15 Sep 2023 02:09:59 +0300 Subject: [PATCH 1/3] - LikeCloseEvent reason should be options. server can close connection with a code without providing any reason - adding option shouldRetry -> bool that is meant to replace isFatalConnectionProblem(deprecated in the original javascript library) . this way we can configure to override and reconnect on certain fatal errors --- .../lib/src/graphql_transport_ws.dart | 163 ++++++++++-------- 1 file changed, 92 insertions(+), 71 deletions(-) diff --git a/links/gql_websocket_link/lib/src/graphql_transport_ws.dart b/links/gql_websocket_link/lib/src/graphql_transport_ws.dart index 638e65a4..637a2327 100644 --- a/links/gql_websocket_link/lib/src/graphql_transport_ws.dart +++ b/links/gql_websocket_link/lib/src/graphql_transport_ws.dart @@ -102,24 +102,24 @@ class TransportWsEvent { message = null, event = null; const TransportWsEvent.connected( - WebSocketChannel this.socket, - this.payload, - ) : type = TransportWsEventType.connected, + WebSocketChannel this.socket, + this.payload, + ) : type = TransportWsEventType.connected, received = null, message = null, event = null; const TransportWsEvent.ping( - this.payload, { - required bool this.received, - }) : type = TransportWsEventType.ping, + this.payload, { + required bool this.received, + }) : type = TransportWsEventType.ping, socket = null, message = null, event = null; const TransportWsEvent.pong( - this.payload, { - required bool this.received, - }) : type = TransportWsEventType.pong, + this.payload, { + required bool this.received, + }) : type = TransportWsEventType.pong, socket = null, message = null, event = null; @@ -161,17 +161,17 @@ class TransportWsEventHandler { /// Also, the second argument is the optional payload that the server may /// send through the `ConnectionAck` message. final T? Function(WebSocketChannel socket, Map? payload)? - connected; + connected; /// The first argument communicates whether the ping was received from the server. /// If `false`, the ping was sent by the client. final T? Function(Map? payload, {required bool received})? - ping; + ping; /// The first argument communicates whether the pong was received from the server. /// If `false`, the pong was sent by the client. final T? Function(Map? payload, {required bool received})? - pong; + pong; /// Called for all **valid** messages received by the client. Mainly useful for /// debugging and logging received messages. @@ -203,15 +203,15 @@ class TransportWsEventHandler { /// The [TransportWsEventType]s that this handler will handle. Set eventTypesHandled() => { - if (connecting != null) TransportWsEventType.connecting, - if (opened != null) TransportWsEventType.opened, - if (connected != null) TransportWsEventType.connected, - if (ping != null) TransportWsEventType.ping, - if (pong != null) TransportWsEventType.pong, - if (message != null) TransportWsEventType.message, - if (closed != null) TransportWsEventType.closed, - if (error != null) TransportWsEventType.error, - }; + if (connecting != null) TransportWsEventType.connecting, + if (opened != null) TransportWsEventType.opened, + if (connected != null) TransportWsEventType.connected, + if (ping != null) TransportWsEventType.ping, + if (pong != null) TransportWsEventType.pong, + if (message != null) TransportWsEventType.message, + if (closed != null) TransportWsEventType.closed, + if (error != null) TransportWsEventType.error, + }; } /// Called for all **valid** messages received by the client. Mainly useful for @@ -382,6 +382,17 @@ class TransportWsClientOptions { /// @default [randomizedExponentialBackoff] final Future Function(int retries) retryWait; + /// Check if the close event or connection error is fatal. If you return `false`, + /// the client will fail immediately without additional retries; however, if you + /// return `true`, the client will keep retrying until the `retryAttempts` have + /// been exceeded. + /// The argument is whatever has been thrown during the connection phase. + /// Beware, the library classifies a few close events as fatal regardless of + /// what is returned here. They are listed in the documentation of the + /// `retryAttempts` option. + /// @default [shouldRetryDefault] + final bool Function(Object errOrCloseEvent) shouldRetry; + /// Check if the close event or connection error is fatal. If you return `true`, /// the client will fail immediately without additional retries; however, if you /// return `false`, the client will keep retrying until the `retryAttempts` have @@ -428,11 +439,11 @@ class TransportWsClientOptions { /// A function that encodes the request message to json string before sending it over the network. final FutureOr Function(TransportWsMessage message) - graphQLSocketMessageEncoder; + graphQLSocketMessageEncoder; /// The default [graphQLSocketMessageEncoder] that encodes the request message to json string. static String defaultGraphQLSocketMessageEncoder( - TransportWsMessage message) => + TransportWsMessage message) => jsonEncode(message); /// A function that decodes the incoming http response to `Map`, @@ -445,7 +456,7 @@ class TransportWsClientOptions { /// The default [graphQLSocketMessageDecoder] that decodes the request message from a json string. static Map? defaultGraphQLSocketMessageDecoder( - dynamic message) => + dynamic message) => jsonDecode(message as String) as Map?; /// A function that logs events within the execution of the [TransportWsClient]. @@ -464,6 +475,7 @@ class TransportWsClientOptions { this.disablePong = false, this.retryAttempts = 0, this.retryWait = randomizedExponentialBackoff, + this.shouldRetry = shouldRetryDefault, this.isFatalConnectionProblem = isFatalConnectionProblemDefault, this.eventHandlers, this.generateID = generateUUID, @@ -501,11 +513,34 @@ class TransportWsClientOptions { /// /// Reference: https://gist.github.com/jed/982883 static String generateUUID() => "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx" - .replaceAllMapped(RegExp("[xy]"), (c) { - final int r = (_random.nextDouble() * 16).floor() | 0; - final v = c.group(0) == "x" ? r : (r & 0x3) | 0x8; - return v.toRadixString(16); - }); + .replaceAllMapped(RegExp("[xy]"), (c) { + final int r = (_random.nextDouble() * 16).floor() | 0; + final v = c.group(0) == "x" ? r : (r & 0x3) | 0x8; + return v.toRadixString(16); + }); + + /// By default, connection should not retry on fatal errors + static bool shouldRetryDefault(Object errOrCloseEvent){ + if (errOrCloseEvent is LikeCloseEvent && + (_isFatalInternalCloseCode(errOrCloseEvent.code) || + const [ + CloseCode.internalServerError, + CloseCode.internalClientError, + CloseCode.badRequest, + CloseCode.badResponse, + CloseCode.unauthorized, + // CloseCode.Forbidden, might grant access out after retry + CloseCode.subprotocolNotAcceptable, + // CloseCode.ConnectionInitialisationTimeout, might not time out after retry + // CloseCode.ConnectionAcknowledgementTimeout, might not time out after retry + CloseCode.subscriberAlreadyExists, + CloseCode.tooManyInitialisationRequests, + // 4499, // Terminated, probably because the socket froze, we want to retry + ].contains(errOrCloseEvent.code))) { + return false; + } + return true; + } } class _Connected { @@ -513,9 +548,9 @@ class _Connected { final Future throwOnClose; _Connected( - this.socket, - this.throwOnClose, - ); + this.socket, + this.throwOnClose, + ); } /// @category Client */ @@ -530,9 +565,9 @@ abstract class TransportWsClient { /// uses the `sink` to emit received data or errors. Returns a _cleanup_ /// function used for dropping the subscription and cleaning stuff up. void Function() subscribe( - Request payload, - EventSink sink, - ); + Request payload, + EventSink sink, + ); /// Terminates the WebSocket abruptly and immediately. /// @@ -588,29 +623,15 @@ class _ConnectionState { /// Checks the `connect` problem and evaluates if the client should retry. bool shouldRetryConnectOrThrow(Object errOrCloseEvent) { options.log?.call("shouldRetryConnectOrThrow $errOrCloseEvent"); - // some close codes are worth reporting immediately - if (errOrCloseEvent is LikeCloseEvent && - (_isFatalInternalCloseCode(errOrCloseEvent.code) || - const [ - CloseCode.internalServerError, - CloseCode.internalClientError, - CloseCode.badRequest, - CloseCode.badResponse, - CloseCode.unauthorized, - // CloseCode.Forbidden, might grant access out after retry - CloseCode.subprotocolNotAcceptable, - // CloseCode.ConnectionInitialisationTimeout, might not time out after retry - // CloseCode.ConnectionAcknowledgementTimeout, might not time out after retry - CloseCode.subscriberAlreadyExists, - CloseCode.tooManyInitialisationRequests, - // 4499, // Terminated, probably because the socket froze, we want to retry - ].contains(errOrCloseEvent.code))) { - throw errOrCloseEvent; - } // client was disposed, no retries should proceed regardless if (disposed) return false; + // some close codes are worth reporting immediately + if(!options.shouldRetry(errOrCloseEvent)){ + throw errOrCloseEvent; + } + // normal closure (possibly all subscriptions have completed) // if no locks were acquired in the meantime, shouldnt try again if (errOrCloseEvent is LikeCloseEvent && errOrCloseEvent.code == 1000) { @@ -691,7 +712,7 @@ class _ConnectionState { ?.cancel(); // in case where a pong was received before a ping (this is valid behaviour) queuedPing = Timer(options.keepAlive, () async { final _pingMsg = - await options.graphQLSocketMessageEncoder(PingMessage(null)); + await options.graphQLSocketMessageEncoder(PingMessage(null)); if (isOpen) { socket.sink.add(_pingMsg); emitter.emit(TransportWsEvent.ping(null, received: false)); @@ -768,7 +789,7 @@ class _ConnectionState { bool acknowledged = false; late final StreamSubscription _messageSubs; _messageSubs = socket.stream.listen( - (Object? msg) async { + (Object? msg) async { options.log?.call( "socket.stream protocol:${socket.protocol} msg:${msg}", ); @@ -849,9 +870,9 @@ class _ConnectionState { socket.closeCode == null ? "DONE" : LikeCloseEvent( - code: socket.closeCode!, - reason: socket.closeReason!, - ), + code: socket.closeCode!, + reason: socket.closeReason, + ), ), onError: (Object err) => onError?.call(err), ); @@ -860,7 +881,7 @@ class _ConnectionState { onOpen(); })() .onError( - (error, stackTrace) => denied( + (error, stackTrace) => denied( WebSocketLinkServerException( originalException: error, originalStackTrace: stackTrace, @@ -906,7 +927,7 @@ class _ConnectionState { // if the socket is still open Future.delayed( options.lazyCloseTimeout, - () { + () { if (locks == 0 && isOpen) { complete(); } @@ -938,9 +959,9 @@ class _Client extends TransportWsClient { @override void Function() subscribe( - Request payload, - EventSink sink, - ) { + Request payload, + EventSink sink, + ) { final id = options.generateID(); options.log?.call("subscribe $id"); @@ -985,7 +1006,7 @@ class _Client extends TransportWsClient { releaser = () async { final _completeMsg = - await options.graphQLSocketMessageEncoder(CompleteMessage(id)); + await options.graphQLSocketMessageEncoder(CompleteMessage(id)); if (!done && state.isOpen) { // if not completed already and socket is open, send complete message to server on release socket.sink.add(_completeMsg); @@ -1062,9 +1083,9 @@ class _Connection { class _Emitter { final Map> listeners; final void Function() Function( - String id, - void Function(TransportWsMessage) listener, - ) onMessage; + String id, + void Function(TransportWsMessage) listener, + ) onMessage; final void Function(String logMessage)? log; _Emitter({ required this.listeners, @@ -1117,7 +1138,7 @@ TransportWsClient createClient(TransportWsClientOptions options) { } final Map> listeners = - Map.fromIterables( + Map.fromIterables( TransportWsEventType.values, TransportWsEventType.values.map((e) => []), ); @@ -1153,7 +1174,7 @@ class LikeCloseEvent { final int code; /// Returns the WebSocket connection close reason provided by the server. */ - final String reason; + final String? reason; final bool? wasClean; @@ -1209,7 +1230,7 @@ class TransportWebSocketLink extends Link { }; return controller.stream.map( - (response) { + (response) { if (response.data == null && response.errors == null) { throw WebSocketLinkServerException( parsedResponse: response, From 50da165c4ffd5af4622301ee17d6dc61f7ff783a Mon Sep 17 00:00:00 2001 From: Catalin Pop Date: Sun, 8 Oct 2023 19:50:19 +0300 Subject: [PATCH 2/3] - format code --- .../lib/src/graphql_transport_ws.dart | 119 +++++++++--------- 1 file changed, 62 insertions(+), 57 deletions(-) diff --git a/links/gql_websocket_link/lib/src/graphql_transport_ws.dart b/links/gql_websocket_link/lib/src/graphql_transport_ws.dart index 637a2327..2962323c 100644 --- a/links/gql_websocket_link/lib/src/graphql_transport_ws.dart +++ b/links/gql_websocket_link/lib/src/graphql_transport_ws.dart @@ -95,31 +95,33 @@ class TransportWsEvent { received = null, message = null, event = null; + const TransportWsEvent.opened(WebSocketChannel this.socket) : type = TransportWsEventType.opened, payload = null, received = null, message = null, event = null; - const TransportWsEvent.connected( - WebSocketChannel this.socket, - this.payload, - ) : type = TransportWsEventType.connected, + + const TransportWsEvent.connected(WebSocketChannel this.socket, + this.payload,) + : type = TransportWsEventType.connected, received = null, message = null, event = null; - const TransportWsEvent.ping( - this.payload, { - required bool this.received, - }) : type = TransportWsEventType.ping, + const TransportWsEvent.ping(this.payload, { + required bool this.received, + }) + : type = TransportWsEventType.ping, socket = null, message = null, event = null; - const TransportWsEvent.pong( - this.payload, { - required bool this.received, - }) : type = TransportWsEventType.pong, + + const TransportWsEvent.pong(this.payload, { + required bool this.received, + }) + : type = TransportWsEventType.pong, socket = null, message = null, event = null; @@ -130,12 +132,14 @@ class TransportWsEvent { received = null, socket = null, event = null; + const TransportWsEvent.closed(Object this.event) : type = TransportWsEventType.closed, payload = null, received = null, message = null, socket = null; + const TransportWsEvent.error(Object this.event) : type = TransportWsEventType.error, payload = null, @@ -202,16 +206,17 @@ class TransportWsEventHandler { T? handle(TransportWsEvent event) => event.execute(this); /// The [TransportWsEventType]s that this handler will handle. - Set eventTypesHandled() => { - if (connecting != null) TransportWsEventType.connecting, - if (opened != null) TransportWsEventType.opened, - if (connected != null) TransportWsEventType.connected, - if (ping != null) TransportWsEventType.ping, - if (pong != null) TransportWsEventType.pong, - if (message != null) TransportWsEventType.message, - if (closed != null) TransportWsEventType.closed, - if (error != null) TransportWsEventType.error, - }; + Set eventTypesHandled() => + { + if (connecting != null) TransportWsEventType.connecting, + if (opened != null) TransportWsEventType.opened, + if (connected != null) TransportWsEventType.connected, + if (ping != null) TransportWsEventType.ping, + if (pong != null) TransportWsEventType.pong, + if (message != null) TransportWsEventType.message, + if (closed != null) TransportWsEventType.closed, + if (error != null) TransportWsEventType.error, + }; } /// Called for all **valid** messages received by the client. Mainly useful for @@ -512,15 +517,16 @@ class TransportWsClientOptions { /// in case you need more uniqueness. /// /// Reference: https://gist.github.com/jed/982883 - static String generateUUID() => "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx" - .replaceAllMapped(RegExp("[xy]"), (c) { - final int r = (_random.nextDouble() * 16).floor() | 0; - final v = c.group(0) == "x" ? r : (r & 0x3) | 0x8; - return v.toRadixString(16); - }); + static String generateUUID() => + "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx" + .replaceAllMapped(RegExp("[xy]"), (c) { + final int r = (_random.nextDouble() * 16).floor() | 0; + final v = c.group(0) == "x" ? r : (r & 0x3) | 0x8; + return v.toRadixString(16); + }); /// By default, connection should not retry on fatal errors - static bool shouldRetryDefault(Object errOrCloseEvent){ + static bool shouldRetryDefault(Object errOrCloseEvent) { if (errOrCloseEvent is LikeCloseEvent && (_isFatalInternalCloseCode(errOrCloseEvent.code) || const [ @@ -547,10 +553,8 @@ class _Connected { final WebSocketChannel socket; final Future throwOnClose; - _Connected( - this.socket, - this.throwOnClose, - ); + _Connected(this.socket, + this.throwOnClose,); } /// @category Client */ @@ -564,10 +568,8 @@ abstract class TransportWsClient { /// Subscribes through the WebSocket following the config parameters. It /// uses the `sink` to emit received data or errors. Returns a _cleanup_ /// function used for dropping the subscription and cleaning stuff up. - void Function() subscribe( - Request payload, - EventSink sink, - ); + void Function() subscribe(Request payload, + EventSink sink,); /// Terminates the WebSocket abruptly and immediately. /// @@ -628,7 +630,7 @@ class _ConnectionState { if (disposed) return false; // some close codes are worth reporting immediately - if(!options.shouldRetry(errOrCloseEvent)){ + if (!options.shouldRetry(errOrCloseEvent)) { throw errOrCloseEvent; } @@ -866,14 +868,15 @@ class _ConnectionState { ); } }, - onDone: () => onClose?.call( - socket.closeCode == null - ? "DONE" - : LikeCloseEvent( - code: socket.closeCode!, - reason: socket.closeReason, - ), - ), + onDone: () => + onClose?.call( + socket.closeCode == null + ? "DONE" + : LikeCloseEvent( + code: socket.closeCode!, + reason: socket.closeReason, + ), + ), onError: (Object err) => onError?.call(err), ); @@ -881,13 +884,14 @@ class _ConnectionState { onOpen(); })() .onError( - (error, stackTrace) => denied( - WebSocketLinkServerException( - originalException: error, - originalStackTrace: stackTrace, - ), - stackTrace, - ), + (error, stackTrace) => + denied( + WebSocketLinkServerException( + originalException: error, + originalStackTrace: stackTrace, + ), + stackTrace, + ), ); return _comp.future; } @@ -950,7 +954,9 @@ class _Client extends TransportWsClient { _Client({required this.state}); final _ConnectionState state; + _Emitter get emitter => state.emitter; + @override TransportWsClientOptions get options => state.options; @@ -958,10 +964,8 @@ class _Client extends TransportWsClient { void Function() on(TransportWsEventHandler listener) => emitter.on(listener); @override - void Function() subscribe( - Request payload, - EventSink sink, - ) { + void Function() subscribe(Request payload, + EventSink sink,) { final id = options.generateID(); options.log?.call("subscribe $id"); @@ -1087,6 +1091,7 @@ class _Emitter { void Function(TransportWsMessage) listener, ) onMessage; final void Function(String logMessage)? log; + _Emitter({ required this.listeners, required this.onMessage, From 5a56eda8237f7370cb46886bfd0696180fcab049 Mon Sep 17 00:00:00 2001 From: Catalin Pop Date: Mon, 9 Oct 2023 13:35:22 +0300 Subject: [PATCH 3/3] apply dart format . --set-exit-if-changed --- .../lib/src/graphql_transport_ws.dart | 105 +++++++++--------- 1 file changed, 54 insertions(+), 51 deletions(-) diff --git a/links/gql_websocket_link/lib/src/graphql_transport_ws.dart b/links/gql_websocket_link/lib/src/graphql_transport_ws.dart index 2962323c..dc72592a 100644 --- a/links/gql_websocket_link/lib/src/graphql_transport_ws.dart +++ b/links/gql_websocket_link/lib/src/graphql_transport_ws.dart @@ -103,25 +103,26 @@ class TransportWsEvent { message = null, event = null; - const TransportWsEvent.connected(WebSocketChannel this.socket, - this.payload,) - : type = TransportWsEventType.connected, + const TransportWsEvent.connected( + WebSocketChannel this.socket, + this.payload, + ) : type = TransportWsEventType.connected, received = null, message = null, event = null; - const TransportWsEvent.ping(this.payload, { + const TransportWsEvent.ping( + this.payload, { required bool this.received, - }) - : type = TransportWsEventType.ping, + }) : type = TransportWsEventType.ping, socket = null, message = null, event = null; - const TransportWsEvent.pong(this.payload, { + const TransportWsEvent.pong( + this.payload, { required bool this.received, - }) - : type = TransportWsEventType.pong, + }) : type = TransportWsEventType.pong, socket = null, message = null, event = null; @@ -165,17 +166,17 @@ class TransportWsEventHandler { /// Also, the second argument is the optional payload that the server may /// send through the `ConnectionAck` message. final T? Function(WebSocketChannel socket, Map? payload)? - connected; + connected; /// The first argument communicates whether the ping was received from the server. /// If `false`, the ping was sent by the client. final T? Function(Map? payload, {required bool received})? - ping; + ping; /// The first argument communicates whether the pong was received from the server. /// If `false`, the pong was sent by the client. final T? Function(Map? payload, {required bool received})? - pong; + pong; /// Called for all **valid** messages received by the client. Mainly useful for /// debugging and logging received messages. @@ -206,8 +207,7 @@ class TransportWsEventHandler { T? handle(TransportWsEvent event) => event.execute(this); /// The [TransportWsEventType]s that this handler will handle. - Set eventTypesHandled() => - { + Set eventTypesHandled() => { if (connecting != null) TransportWsEventType.connecting, if (opened != null) TransportWsEventType.opened, if (connected != null) TransportWsEventType.connected, @@ -444,11 +444,11 @@ class TransportWsClientOptions { /// A function that encodes the request message to json string before sending it over the network. final FutureOr Function(TransportWsMessage message) - graphQLSocketMessageEncoder; + graphQLSocketMessageEncoder; /// The default [graphQLSocketMessageEncoder] that encodes the request message to json string. static String defaultGraphQLSocketMessageEncoder( - TransportWsMessage message) => + TransportWsMessage message) => jsonEncode(message); /// A function that decodes the incoming http response to `Map`, @@ -461,7 +461,7 @@ class TransportWsClientOptions { /// The default [graphQLSocketMessageDecoder] that decodes the request message from a json string. static Map? defaultGraphQLSocketMessageDecoder( - dynamic message) => + dynamic message) => jsonDecode(message as String) as Map?; /// A function that logs events within the execution of the [TransportWsClient]. @@ -517,8 +517,7 @@ class TransportWsClientOptions { /// in case you need more uniqueness. /// /// Reference: https://gist.github.com/jed/982883 - static String generateUUID() => - "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx" + static String generateUUID() => "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx" .replaceAllMapped(RegExp("[xy]"), (c) { final int r = (_random.nextDouble() * 16).floor() | 0; final v = c.group(0) == "x" ? r : (r & 0x3) | 0x8; @@ -553,8 +552,10 @@ class _Connected { final WebSocketChannel socket; final Future throwOnClose; - _Connected(this.socket, - this.throwOnClose,); + _Connected( + this.socket, + this.throwOnClose, + ); } /// @category Client */ @@ -568,8 +569,10 @@ abstract class TransportWsClient { /// Subscribes through the WebSocket following the config parameters. It /// uses the `sink` to emit received data or errors. Returns a _cleanup_ /// function used for dropping the subscription and cleaning stuff up. - void Function() subscribe(Request payload, - EventSink sink,); + void Function() subscribe( + Request payload, + EventSink sink, + ); /// Terminates the WebSocket abruptly and immediately. /// @@ -714,7 +717,7 @@ class _ConnectionState { ?.cancel(); // in case where a pong was received before a ping (this is valid behaviour) queuedPing = Timer(options.keepAlive, () async { final _pingMsg = - await options.graphQLSocketMessageEncoder(PingMessage(null)); + await options.graphQLSocketMessageEncoder(PingMessage(null)); if (isOpen) { socket.sink.add(_pingMsg); emitter.emit(TransportWsEvent.ping(null, received: false)); @@ -791,7 +794,7 @@ class _ConnectionState { bool acknowledged = false; late final StreamSubscription _messageSubs; _messageSubs = socket.stream.listen( - (Object? msg) async { + (Object? msg) async { options.log?.call( "socket.stream protocol:${socket.protocol} msg:${msg}", ); @@ -868,15 +871,14 @@ class _ConnectionState { ); } }, - onDone: () => - onClose?.call( - socket.closeCode == null - ? "DONE" - : LikeCloseEvent( - code: socket.closeCode!, - reason: socket.closeReason, - ), - ), + onDone: () => onClose?.call( + socket.closeCode == null + ? "DONE" + : LikeCloseEvent( + code: socket.closeCode!, + reason: socket.closeReason, + ), + ), onError: (Object err) => onError?.call(err), ); @@ -884,14 +886,13 @@ class _ConnectionState { onOpen(); })() .onError( - (error, stackTrace) => - denied( - WebSocketLinkServerException( - originalException: error, - originalStackTrace: stackTrace, - ), - stackTrace, - ), + (error, stackTrace) => denied( + WebSocketLinkServerException( + originalException: error, + originalStackTrace: stackTrace, + ), + stackTrace, + ), ); return _comp.future; } @@ -931,7 +932,7 @@ class _ConnectionState { // if the socket is still open Future.delayed( options.lazyCloseTimeout, - () { + () { if (locks == 0 && isOpen) { complete(); } @@ -964,8 +965,10 @@ class _Client extends TransportWsClient { void Function() on(TransportWsEventHandler listener) => emitter.on(listener); @override - void Function() subscribe(Request payload, - EventSink sink,) { + void Function() subscribe( + Request payload, + EventSink sink, + ) { final id = options.generateID(); options.log?.call("subscribe $id"); @@ -1010,7 +1013,7 @@ class _Client extends TransportWsClient { releaser = () async { final _completeMsg = - await options.graphQLSocketMessageEncoder(CompleteMessage(id)); + await options.graphQLSocketMessageEncoder(CompleteMessage(id)); if (!done && state.isOpen) { // if not completed already and socket is open, send complete message to server on release socket.sink.add(_completeMsg); @@ -1087,9 +1090,9 @@ class _Connection { class _Emitter { final Map> listeners; final void Function() Function( - String id, - void Function(TransportWsMessage) listener, - ) onMessage; + String id, + void Function(TransportWsMessage) listener, + ) onMessage; final void Function(String logMessage)? log; _Emitter({ @@ -1143,7 +1146,7 @@ TransportWsClient createClient(TransportWsClientOptions options) { } final Map> listeners = - Map.fromIterables( + Map.fromIterables( TransportWsEventType.values, TransportWsEventType.values.map((e) => []), ); @@ -1235,7 +1238,7 @@ class TransportWebSocketLink extends Link { }; return controller.stream.map( - (response) { + (response) { if (response.data == null && response.errors == null) { throw WebSocketLinkServerException( parsedResponse: response,