diff --git a/lib/connection.js b/lib/connection.js index d36ea6aa..b48e2257 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -50,6 +50,7 @@ class Connection extends EventEmitter { this.recvSinceLastCheck = false; this.expectSocketClose = false; + this.blocked = false; this.freeChannels = new BitSet(); this.channels = [{ channel: { accept: channel0(this) }, @@ -317,6 +318,18 @@ class Connection extends EventEmitter { // ignored. We also have to shut down all the channels. toClosing (capturedStack, k) { var send = this.sendMethod.bind(this); + invalidateSend(this, 'Connection closing', capturedStack); + + if(this.blocked) { + // if the connection is blocked, we will not receive a close-ok until the connection + // becomes unblocked, which may be some time. So just terminate the connection + // immediately + if (k) + k(); + var s = stackCapture('Forcibly closing as connection is blocked'); + this.toClosed(s, undefined); + return; + } this.accept = function (f) { if (f.id === defs.ConnectionCloseOk) { @@ -330,7 +343,6 @@ class Connection extends EventEmitter { } // else ignore frame }; - invalidateSend(this, 'Connection closing', capturedStack); } _closeChannels (capturedStack) { @@ -358,6 +370,7 @@ class Connection extends EventEmitter { // This is certainly true now, if it wasn't before this.expectSocketClose = true; this.stream.end(); + this.stream.destroy(); this.emit('close', maybeErr); } @@ -606,9 +619,11 @@ function channel0(connection) { connection.toClosed(s, e); } else if (f.id === defs.ConnectionBlocked) { + connection.blocked = true; connection.emit('blocked', f.fields.reason); } else if (f.id === defs.ConnectionUnblocked) { + connection.blocked = false; connection.emit('unblocked'); } else {