Skip to content

Commit cf688a5

Browse files
Keep source locked until the end of the pipe
1 parent 15a9768 commit cf688a5

File tree

2 files changed

+15
-3
lines changed

2 files changed

+15
-3
lines changed

index.bs

+12-1
Original file line numberDiff line numberDiff line change
@@ -2201,6 +2201,14 @@ The following abstract operations operate on {{ReadableStream}} instances at a h
22012201
1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform
22022202
! [$ReadableStreamBYOBReaderRelease$](|reader|).
22032203
1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|).
2204+
1. Set |reader| to ! [$AcquireReadableStreamDefaultReader$](|source|).
2205+
<p class="note">The initial reader is released to ensure that any pending read requests
2206+
are immediately aborted, and no more chunks are pulled from |source|. A new reader is
2207+
acquired in order to keep |source| locked until the shutdown is [=finalized=], for example
2208+
to [=cancel a readable stream|cancel=] |source| if necessary.
2209+
This exchange of readers is not observable to author code and the user agent is free to
2210+
implement this differently, for example by keeping the same reader and internally aborting
2211+
its pending read requests.
22042212
1. If |dest|.[=WritableStream/[[state]]=] is "`writable`" and !
22052213
[$WritableStreamCloseQueuedOrInFlight$](|dest|) is false,
22062214
1. If any [=chunks=] have been read but not yet written, write them to |dest|.
@@ -2216,6 +2224,7 @@ The following abstract operations operate on {{ReadableStream}} instances at a h
22162224
1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform
22172225
! [$ReadableStreamBYOBReaderRelease$](|reader|).
22182226
1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|).
2227+
1. Set |reader| to ! [$AcquireReadableStreamDefaultReader$](|source|).
22192228
1. If |dest|.[=WritableStream/[[state]]=] is "`writable`" and !
22202229
[$WritableStreamCloseQueuedOrInFlight$](|dest|) is false,
22212230
1. If any [=chunks=] have been read but not yet written, write them to |dest|.
@@ -2224,8 +2233,10 @@ The following abstract operations operate on {{ReadableStream}} instances at a h
22242233
1. [=Finalize=], passing along |error| if it was given.
22252234
* <dfn id="rs-pipeTo-finalize"><i>Finalize</i></dfn>: both forms of shutdown will eventually ask
22262235
to finalize, optionally with an error |error|, which means to perform the following steps:
2227-
1. Assert: |reader|.[=ReadableStreamGenericReader/[[stream]]=] is undefined.
22282236
1. Perform ! [$WritableStreamDefaultWriterRelease$](|writer|).
2237+
1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform
2238+
! [$ReadableStreamBYOBReaderRelease$](|reader|).
2239+
1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|).
22292240
1. If |signal| is not undefined, [=AbortSignal/remove=] |abortAlgorithm| from |signal|.
22302241
1. If |error| was given, [=reject=] |promise| with |error|.
22312242
1. Otherwise, [=resolve=] |promise| with undefined.

reference-implementation/lib/abstract-ops/readable-streams.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
134134
assert(IsReadableStreamLocked(source) === false);
135135
assert(IsWritableStreamLocked(dest) === false);
136136

137-
const reader = AcquireReadableStreamDefaultReader(source);
137+
let reader = AcquireReadableStreamDefaultReader(source);
138138
const writer = AcquireWritableStreamDefaultWriter(dest);
139139

140140
source._disturbed = true;
@@ -315,6 +315,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
315315
}
316316
shuttingDown = true;
317317
ReadableStreamDefaultReaderRelease(reader);
318+
reader = AcquireReadableStreamDefaultReader(reader);
318319

319320
if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) {
320321
uponFulfillment(waitForWritesToFinish(), () => finalize(isError, error));
@@ -324,8 +325,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
324325
}
325326

326327
function finalize(isError, error) {
327-
assert(reader._stream === undefined);
328328
WritableStreamDefaultWriterRelease(writer);
329+
ReadableStreamDefaultReaderRelease(reader);
329330

330331
if (signal !== undefined) {
331332
signal.removeEventListener('abort', abortAlgorithm);

0 commit comments

Comments
 (0)