diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 25ede8c9d..f22650452 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -19,6 +19,6 @@ jobs: submodules: true - uses: actions/setup-node@v1 with: - node-version: 14 + node-version: 19 - run: npm install - run: npm test diff --git a/index.bs b/index.bs index da6a50aed..182b72aab 100644 --- a/index.bs +++ b/index.bs @@ -19,6 +19,7 @@ spec:infra; type:dfn; text:list spec:html; type:dfn; text:entangle spec:html; type:dfn; text:message port post message steps spec:html; type:dfn; text:port message queue +spec:html; type:dfn; text:transferable objects
@@ -571,7 +572,7 @@ callback UnderlyingSourceStartCallback = any (ReadableStreamController controlle
 callback UnderlyingSourcePullCallback = Promise (ReadableStreamController controller);
 callback UnderlyingSourceCancelCallback = Promise (optional any reason);
 
-enum ReadableStreamType { "bytes" };
+enum ReadableStreamType { "bytes", "owning" };
 
 
 
@@ -639,8 +640,7 @@ enum ReadableStreamType { "bytes" }; something more persistently wrong. -
type (byte streams - only)
+
type

Can be set to "bytes" to signal that the constructed {{ReadableStream}} is a readable byte stream. This ensures that the resulting @@ -651,8 +651,16 @@ enum ReadableStreamType { "bytes" };

For an example of how to set up a readable byte stream, including using the different controller interface, see [[#example-rbs-push]]. -

Setting any value other than "{{ReadableStreamType/bytes}}" or undefined will cause the - {{ReadableStream()}} constructor to throw an exception. +

Can be set to "owning" to signal that the + constructed {{ReadableStream}} will own chunks (via transfer or serialization) before enqueuing them. + This ensures that enqueued chunks are not mutable by the source. + Transferred or serialized chunks may have closing steps which are executed if + enqueued chunks are dequeued without being provided to the application, for instance when + a {{ReadableStream}} is errored. +

+ +

Setting any value other than "{{ReadableStreamType/bytes}}", "{{ReadableStreamType/owning}}" + or undefined will cause the {{ReadableStream()}} constructor to throw an exception.

autoAllocateChunkSize (byte streams only)
@@ -801,7 +809,8 @@ option. If {{UnderlyingSource/type}} is set to undefined (including via omission 1. Perform ? [$SetUpReadableByteStreamControllerFromUnderlyingSource$]([=this=], |underlyingSource|, |underlyingSourceDict|, |highWaterMark|). 1. Otherwise, - 1. Assert: |underlyingSourceDict|["{{UnderlyingSource/type}}"] does not [=map/exist=]. + 1. Assert: |underlyingSourceDict|["{{UnderlyingSource/type}}"] does not [=map/exist=] or + is "{{ReadableStreamType/owning}}". 1. Let |sizeAlgorithm| be ! [$ExtractSizeAlgorithm$](|strategy|). 1. Let |highWaterMark| be ? [$ExtractHighWaterMark$](|strategy|, 1). 1. Perform ? [$SetUpReadableStreamDefaultControllerFromUnderlyingSource$]([=this=], @@ -1461,7 +1470,7 @@ interface ReadableStreamDefaultController { readonly attribute unrestricted double? desiredSize; undefined close(); - undefined enqueue(optional any chunk); + undefined enqueue(optional any chunk, optional StructuredSerializeOptions options = { }); undefined error(optional any e); }; @@ -1523,6 +1532,10 @@ the following table: \[[stream]] The {{ReadableStream}} instance controlled + + \[[isOwning]] + A boolean flag indicating whether to take ownership of enqueued chunks + via transfer or serialization.

Methods and properties

@@ -1569,11 +1582,12 @@ the following table:
The enqueue(|chunk|) method steps are: + for="ReadableStreamDefaultController">enqueue(|chunk|, |options|) method steps are: + 1. Let |transferList| be |options|["transfer"]. 1. If ! [$ReadableStreamDefaultControllerCanCloseOrEnqueue$]([=this=]) is false, throw a {{TypeError}} exception. - 1. Perform ? [$ReadableStreamDefaultControllerEnqueue$]([=this=], |chunk|). + 1. Perform ? [$ReadableStreamDefaultControllerEnqueue$]([=this=], |chunk|, |transferList|).
@@ -2045,7 +2059,7 @@ The following abstract operations operate on {{ReadableStream}} instances at a h
CreateReadableStream(|startAlgorithm|, |pullAlgorithm|, - |cancelAlgorithm|[, |highWaterMark|, [, |sizeAlgorithm|]]) performs the following steps: + |cancelAlgorithm|[, |highWaterMark|[, |sizeAlgorithm|[, |type|]]]) performs the following steps: 1. If |highWaterMark| was not passed, set it to 1. 1. If |sizeAlgorithm| was not passed, set it to an algorithm that returns 1. @@ -2053,8 +2067,9 @@ The following abstract operations operate on {{ReadableStream}} instances at a h 1. Let |stream| be a [=new=] {{ReadableStream}}. 1. Perform ! [$InitializeReadableStream$](|stream|). 1. Let |controller| be a [=new=] {{ReadableStreamDefaultController}}. + 1. Let |isOwning| be true if |type| is "{{ReadableStreamType/owning}}" and false otherwise. 1. Perform ? [$SetUpReadableStreamDefaultController$](|stream|, |controller|, |startAlgorithm|, - |pullAlgorithm|, |cancelAlgorithm|, |highWaterMark|, |sizeAlgorithm|). + |pullAlgorithm|, |cancelAlgorithm|, |highWaterMark|, |sizeAlgorithm|, |isOwning|). 1. Return |stream|.

This abstract operation will throw an exception if and only if the supplied @@ -2141,6 +2156,26 @@ The following abstract operations operate on {{ReadableStream}} instances at a h the operations below, the JavaScript-modifiable reader, writer, and stream APIs (i.e. methods on the appropriate prototypes) must not be used. Instead, the streams must be manipulated directly. +

+ {{WritableStream}} "{{WritableStreamType/owning}}" and {{ReadableStream}} "{{ReadableStreamType/owning}}" + has some impact on the piping operation: + - Piping a not owning {{ReadableStream}} to a not owning {{WritableStream}} will enqueue + the exact samne JavaScript values. + - Piping an owning {{ReadableStream}} to an owning {{WritableStream}} is similar to the above case, + as ownership will be transferred from the {{ReadableStream}} to the {{WritableStream}}. + In case of error during the piping operation, the user Agent is responsible to call the + necessary [=closing steps=] of the JavaScript values that were dequeud from the + {{ReadableStream}} but not successfully enqueued in the {{WritableStream}}. + - Piping an owning {{ReadableStream}} to a not owning {{WritableStream}} will enqueue + the serialized/transfered JavaScript values. + In case of error during the piping operation, the user Agent is responsible to call the + necessary [=closing steps=] of the JavaScript values that were dequeud from the + {{ReadableStream}} but not successfully enqueued in the {{WritableStream}}. + Note that [=closing steps=] of the enqueued JavaScript values in the {{WritableStream}} + will not be called automatically. It is up to the application to handle this. + - Piping a not owning {{ReadableStream}} to an owning {{WritableStream}} will trigger + serialization of the {{ReadableStream}} chunks when enqueued in {{WritableStream}}. + Piping may fail if the serialization fails, say in case of transferable-only JavaScript values. * Backpressure must be enforced: * While [$WritableStreamDefaultWriterGetDesiredSize$](|writer|) is ≤ 0 or is null, the user agent must not read from |reader|. @@ -2235,8 +2270,8 @@ create them does not matter. objects|transferring=] their [=chunks=]. However, it does introduce a noticeable asymmetry between the two branches, and limits the possible [=chunks=] to serializable ones. [[!HTML]] - If |stream| is a [=readable byte stream=], then |cloneForBranch2| is ignored and chunks are cloned - unconditionally. + If |stream| is a [=readable byte stream=], or if |stream| type is "{{ReadableStreamType/owning}}", + then |cloneForBranch2| is ignored and chunks are cloned unconditionally.

In this standard ReadableStreamTee is always called with |cloneForBranch2| set to false; other specifications pass true via the [=ReadableStream/tee=] wrapper algorithm. @@ -2247,6 +2282,8 @@ create them does not matter. 1. Assert: |cloneForBranch2| is a boolean. 1. If |stream|.[=ReadableStream/[[controller]]=] [=implements=] {{ReadableByteStreamController}}, return ? [$ReadableByteStreamTee$](|stream|). + 1. If |stream|.[=ReadableStream/[[controller]]=].[=ReadableStreamDefaultController/[[isOwning]]=] + is true, return ? [$ReadableStreamDefaultTee$](|stream|, true). 1. Return ? [$ReadableStreamDefaultTee$](|stream|, |cloneForBranch2|).

@@ -2287,10 +2324,10 @@ create them does not matter. 1. Otherwise, set |chunk2| to |cloneResult|.\[[Value]]. 1. If |canceled1| is false, perform ! [$ReadableStreamDefaultControllerEnqueue$](|branch1|.[=ReadableStream/[[controller]]=], - |chunk1|). + |chunk1|, undefined). 1. If |canceled2| is false, perform ! [$ReadableStreamDefaultControllerEnqueue$](|branch2|.[=ReadableStream/[[controller]]=], - |chunk2|). + |chunk2|, undefined). 1. Set |reading| to false. 1. If |readAgain| is true, perform |pullAlgorithm|. @@ -2950,13 +2987,19 @@ The following abstract operations support the implementation of the
ReadableStreamDefaultControllerEnqueue(|controller|, - |chunk|) performs the following steps: + |chunk|, |transferList|) performs the following steps: 1. If ! [$ReadableStreamDefaultControllerCanCloseOrEnqueue$](|controller|) is false, return. 1. Let |stream| be |controller|.[=ReadableStreamDefaultController/[[stream]]=]. 1. If ! [$IsReadableStreamLocked$](|stream|) is true and ! - [$ReadableStreamGetNumReadRequests$](|stream|) > 0, perform ! - [$ReadableStreamFulfillReadRequest$](|stream|, |chunk|, false). + [$ReadableStreamGetNumReadRequests$](|stream|) > 0, perform the following steps: + 1. Let |internalChunk| be |chunk|. + 1. If |controller|.[=ReadableStreamDefaultController/[[isOwning]]=] is true, perform the following steps: + 1. Set |internalChunk| to [$StructuredTransferOrClone$](|chunk|, |transferList|). + 1. If |internalChunk| is an abrupt completion, + 1. Perform ! [$ReadableStreamDefaultControllerError$](|controller|, |internalChunk|.\[[Value]]). + 1. Return |internalChunk|. + 1. Perform ! [$ReadableStreamFulfillReadRequest$](|stream|, |internalChunk|, false). 1. Otherwise, 1. Let |result| be the result of performing |controller|.[=ReadableStreamDefaultController/[[strategySizeAlgorithm]]=], passing in |chunk|, @@ -2965,7 +3008,7 @@ The following abstract operations support the implementation of the 1. Perform ! [$ReadableStreamDefaultControllerError$](|controller|, |result|.\[[Value]]). 1. Return |result|. 1. Let |chunkSize| be |result|.\[[Value]]. - 1. Let |enqueueResult| be [$EnqueueValueWithSize$](|controller|, |chunk|, |chunkSize|). + 1. Let |enqueueResult| be [$EnqueueValueWithSize$](|controller|, |chunk|, |chunkSize|, |transferList|). 1. If |enqueueResult| is an abrupt completion, 1. Perform ! [$ReadableStreamDefaultControllerError$](|controller|, |enqueueResult|.\[[Value]]). 1. Return |enqueueResult|. @@ -3029,7 +3072,7 @@ The following abstract operations support the implementation of the SetUpReadableStreamDefaultController(|stream|, |controller|, |startAlgorithm|, |pullAlgorithm|, |cancelAlgorithm|, |highWaterMark|, - |sizeAlgorithm|) performs the following steps: + |sizeAlgorithm|, |isOwning|) performs the following steps: 1. Assert: |stream|.[=ReadableStream/[[controller]]=] is undefined. 1. Set |controller|.[=ReadableStreamDefaultController/[[stream]]=] to |stream|. @@ -3039,8 +3082,9 @@ The following abstract operations support the implementation of the |controller|.[=ReadableStreamDefaultController/[[pullAgain]]=], and |controller|.[=ReadableStreamDefaultController/[[pulling]]=] to false. 1. Set |controller|.[=ReadableStreamDefaultController/[[strategySizeAlgorithm]]=] to - |sizeAlgorithm| and |controller|.[=ReadableStreamDefaultController/[[strategyHWM]]=] to - |highWaterMark|. + |sizeAlgorithm|, |controller|.[=ReadableStreamDefaultController/[[strategyHWM]]=] to + |highWaterMark| and |controller|.[=ReadableStreamDefaultController/[[isOwning]]=] to + |isOwning|. 1. Set |controller|.[=ReadableStreamDefaultController/[[pullAlgorithm]]=] to |pullAlgorithm|. 1. Set |controller|.[=ReadableStreamDefaultController/[[cancelAlgorithm]]=] to |cancelAlgorithm|. 1. Set |stream|.[=ReadableStream/[[controller]]=] to |controller|. @@ -3065,6 +3109,8 @@ The following abstract operations support the implementation of the 1. Let |startAlgorithm| be an algorithm that returns undefined. 1. Let |pullAlgorithm| be an algorithm that returns [=a promise resolved with=] undefined. 1. Let |cancelAlgorithm| be an algorithm that returns [=a promise resolved with=] undefined. + 1. Let |isOwning| be true if |underlyingSourceDict|["{{UnderlyingSource/type}}"] is + "{{ReadableStreamType/owning}}" and false otherwise. 1. If |underlyingSourceDict|["{{UnderlyingSource/start}}"] [=map/exists=], then set |startAlgorithm| to an algorithm which returns the result of [=invoking=] |underlyingSourceDict|["{{UnderlyingSource/start}}"] with argument list @@ -3078,7 +3124,7 @@ The following abstract operations support the implementation of the [=invoking=] |underlyingSourceDict|["{{UnderlyingSource/cancel}}"] with argument list « |reason| » and [=callback this value=] |underlyingSource|. 1. Perform ? [$SetUpReadableStreamDefaultController$](|stream|, |controller|, |startAlgorithm|, - |pullAlgorithm|, |cancelAlgorithm|, |highWaterMark|, |sizeAlgorithm|). + |pullAlgorithm|, |cancelAlgorithm|, |highWaterMark|, |sizeAlgorithm|, |isOwning|).

Byte stream controllers

@@ -3979,13 +4025,14 @@ dictionary UnderlyingSink { UnderlyingSinkWriteCallback write; UnderlyingSinkCloseCallback close; UnderlyingSinkAbortCallback abort; - any type; + WritableStreamType type; }; callback UnderlyingSinkStartCallback = any (WritableStreamDefaultController controller); callback UnderlyingSinkWriteCallback = Promise (any chunk, WritableStreamDefaultController controller); callback UnderlyingSinkCloseCallback = Promise (); callback UnderlyingSinkAbortCallback = Promise (optional any reason); +enum WritableStreamType { "owning" };
@@ -4071,8 +4118,15 @@ callback UnderlyingSinkAbortCallback = Promise (optional any reason);
type
-

This property is reserved for future use, so any attempts to supply a value will throw an - exception. +

Can be set to "owning" to signal that the + constructed {{WritableStream}} will own chunks (via transfer or serialization) before enqueuing them. + This ensures that enqueued chunks are not mutable by the source. + Transferred or serialized chunks may have [=closing steps=]] which are executed if + enqueued chunks are dequeued without being provided to the application, for instance when + a {{WritableStream}} is errored. + +

Setting any value other than "{{WritableStreamType/owning}}" or undefined will cause the + {{WritableStream()}} constructor to throw an exception.

The controller argument passed to {{UnderlyingSink/start|start()}} and @@ -4141,10 +4195,6 @@ as seen for example in [[#example-ws-no-backpressure]].

We cannot declare the |underlyingSink| argument as having the {{UnderlyingSink}} type directly, because doing so would lose the reference to the original object. We need to retain the object so we can [=invoke=] the various methods on it. - 1. If |underlyingSinkDict|["{{UnderlyingSink/type}}"] [=map/exists=], throw a {{RangeError}} - exception. -

This is to allow us to add new potential types in the future, without - backward-compatibility concerns. 1. Perform ! [$InitializeWritableStream$]([=this=]). 1. Let |sizeAlgorithm| be ! [$ExtractSizeAlgorithm$](|strategy|). 1. Let |highWaterMark| be ? [$ExtractHighWaterMark$](|strategy|, 1). @@ -4239,7 +4289,7 @@ interface WritableStreamDefaultWriter { Promise abort(optional any reason); Promise close(); undefined releaseLock(); - Promise write(optional any chunk); + Promise write(optional any chunk, optional StructuredSerializeOptions options = { }); }; @@ -4324,19 +4374,23 @@ following table: lock on the writer for the duration of the write; the lock instead simply prevents other [=producers=] from writing in an interleaved manner. -

await writer.{{WritableStreamDefaultWriter/write(chunk)|write}}(chunk) +
await writer.{{WritableStreamDefaultWriter/write(chunk, options)|write}}(chunk, options)

Writes the given [=chunk=] to the writable stream, by waiting until any previous writes have finished successfully, and then sending the [=chunk=] to the [=underlying sink=]'s {{UnderlyingSink/write|write()}} method. It will return a promise that fulfills with undefined upon a successful write, or rejects if the write fails or stream becomes errored before the writing process is initiated. +

If the writable stream has its type set to "{{WritableStreamType/owning}}", the given [=chunk=] + is serialized with [=options=], which ensures that mutating [=chunk=] will have no impact on what + is written by the writable stream.

Note that what "success" means is up to the [=underlying sink=]; it might indicate simply that the [=chunk=] has been accepted, and not necessarily that it is safely saved to its ultimate destination. -

If chunk is mutable, [=producers=] are advised to +

If chunk is mutable and the writable stream type is + not "{{WritableStreamType/owning}}", [=producers=] are advised to avoid mutating it after passing it to {{WritableStreamDefaultWriter/write()}}, until after the promise returned by {{WritableStreamDefaultWriter/write()}} settles. This ensures that the [=underlying sink=] receives and processes the same value that was passed in. @@ -4404,12 +4458,13 @@ following table:

- The write(|chunk|) + The write(|chunk|, |options|) method steps are: + 1. Let |transferList| be |options|["transfer"]. 1. If [=this=].[=WritableStreamDefaultWriter/[[stream]]=] is undefined, return [=a promise rejected with=] a {{TypeError}} exception. - 1. Return ! [$WritableStreamDefaultWriterWrite$]([=this=], |chunk|). + 1. Return ! [$WritableStreamDefaultWriterWrite$]([=this=], |chunk|, |transferList|).

The {{WritableStreamDefaultController}} class

@@ -4480,6 +4535,10 @@ the following table: \[[writeAlgorithm]] A promise-returning algorithm, taking one argument (the [=chunk=] to write), which writes data to the [=underlying sink=] + + \[[isOwning]] + A boolean flag indicating whether to take ownership of enqueued chunks + via transfer or serialization. The close sentinel is a unique value enqueued into @@ -4571,15 +4630,16 @@ The following abstract operations operate on {{WritableStream}} instances at a h
CreateWritableStream(|startAlgorithm|, |writeAlgorithm|, - |closeAlgorithm|, |abortAlgorithm|, |highWaterMark|, |sizeAlgorithm|) performs the following + |closeAlgorithm|, |abortAlgorithm|, |highWaterMark|, |sizeAlgorithm|[, |type|]) performs the following steps: 1. Assert: ! [$IsNonNegativeNumber$](|highWaterMark|) is true. 1. Let |stream| be a [=new=] {{WritableStream}}. 1. Perform ! [$InitializeWritableStream$](|stream|). 1. Let |controller| be a [=new=] {{WritableStreamDefaultController}}. + 1. Let |isOwning| be true if |type| is "{{WritableStreamType/owning}}" and false otherwise. 1. Perform ? [$SetUpWritableStreamDefaultController$](|stream|, |controller|, |startAlgorithm|, - |writeAlgorithm|, |closeAlgorithm|, |abortAlgorithm|, |highWaterMark|, |sizeAlgorithm|). + |writeAlgorithm|, |closeAlgorithm|, |abortAlgorithm|, |highWaterMark|, |sizeAlgorithm|, |isOwning|). 1. Return |stream|.

This abstract operation will throw an exception if and only if the supplied @@ -5044,7 +5104,7 @@ The following abstract operations support the implementation and manipulation of

WritableStreamDefaultWriterWrite(|writer|, |chunk|) + id="writable-stream-default-writer-write">WritableStreamDefaultWriterWrite(|writer|, |chunk|, |transferList|) performs the following steps: 1. Let |stream| be |writer|.[=WritableStreamDefaultWriter/[[stream]]=]. @@ -5063,7 +5123,7 @@ The following abstract operations support the implementation and manipulation of |stream|.[=WritableStream/[[storedError]]=]. 1. Assert: |state| is "`writable`". 1. Let |promise| be ! [$WritableStreamAddWriteRequest$](|stream|). - 1. Perform ! [$WritableStreamDefaultControllerWrite$](|controller|, |chunk|, |chunkSize|). + 1. Perform ! [$WritableStreamDefaultControllerWrite$](|controller|, |chunk|, |chunkSize|, |transferList|). 1. Return |promise|.
@@ -5077,7 +5137,7 @@ The following abstract operations support the implementation of the SetUpWritableStreamDefaultController(|stream|, |controller|, |startAlgorithm|, |writeAlgorithm|, |closeAlgorithm|, |abortAlgorithm|, - |highWaterMark|, |sizeAlgorithm|) performs the following steps: + |highWaterMark|, |sizeAlgorithm|, |isOwning|) performs the following steps: 1. Assert: |stream| [=implements=] {{WritableStream}}. 1. Assert: |stream|.[=WritableStream/[[controller]]=] is undefined. @@ -5088,6 +5148,7 @@ The following abstract operations support the implementation of the 1. Set |controller|.[=WritableStreamDefaultController/[[started]]=] to false. 1. Set |controller|.[=WritableStreamDefaultController/[[strategySizeAlgorithm]]=] to |sizeAlgorithm|. + 1. Set |controller|.[=WritableStreamDefaultController/[[isOwning]]=] to |isOwning|. 1. Set |controller|.[=WritableStreamDefaultController/[[strategyHWM]]=] to |highWaterMark|. 1. Set |controller|.[=WritableStreamDefaultController/[[writeAlgorithm]]=] to |writeAlgorithm|. 1. Set |controller|.[=WritableStreamDefaultController/[[closeAlgorithm]]=] to |closeAlgorithm|. @@ -5117,6 +5178,8 @@ The following abstract operations support the implementation of the 1. Let |writeAlgorithm| be an algorithm that returns [=a promise resolved with=] undefined. 1. Let |closeAlgorithm| be an algorithm that returns [=a promise resolved with=] undefined. 1. Let |abortAlgorithm| be an algorithm that returns [=a promise resolved with=] undefined. + 1. Let |isOwning| be true if |underlyingSinkDict|["{{UnderlyingSink/type}}"] is + "{{WritableStreamType/owning}}" and false otherwise. 1. If |underlyingSinkDict|["{{UnderlyingSink/start}}"] [=map/exists=], then set |startAlgorithm| to an algorithm which returns the result of [=invoking=] |underlyingSinkDict|["{{UnderlyingSink/start}}"] with argument list « |controller| » @@ -5134,7 +5197,7 @@ The following abstract operations support the implementation of the |underlyingSinkDict|["{{UnderlyingSink/abort}}"] with argument list « |reason| » and [=callback this value=] |underlyingSink|. 1. Perform ? [$SetUpWritableStreamDefaultController$](|stream|, |controller|, |startAlgorithm|, - |writeAlgorithm|, |closeAlgorithm|, |abortAlgorithm|, |highWaterMark|, |sizeAlgorithm|). + |writeAlgorithm|, |closeAlgorithm|, |abortAlgorithm|, |highWaterMark|, |sizeAlgorithm|, |isOwning|).
@@ -5186,7 +5249,7 @@ The following abstract operations support the implementation of the id="writable-stream-default-controller-close">WritableStreamDefaultControllerClose(|controller|) performs the following steps: - 1. Perform ! [$EnqueueValueWithSize$](|controller|, [=close sentinel=], 0). + 1. Perform ! [$EnqueueValueWithSize$](|controller|, [=close sentinel=], 0, undefined). 1. Perform ! [$WritableStreamDefaultControllerAdvanceQueueIfNeeded$](|controller|).
@@ -5288,9 +5351,9 @@ The following abstract operations support the implementation of the
WritableStreamDefaultControllerWrite(|controller|, - |chunk|, |chunkSize|) performs the following steps: + |chunk|, |chunkSize|, |transferList|) performs the following steps: - 1. Let |enqueueResult| be [$EnqueueValueWithSize$](|controller|, |chunk|, |chunkSize|). + 1. Let |enqueueResult| be [$EnqueueValueWithSize$](|controller|, |chunk|, |chunkSize|, |transferList|). 1. If |enqueueResult| is an abrupt completion, 1. Perform ! [$WritableStreamDefaultControllerErrorIfNeeded$](|controller|, |enqueueResult|.\[[Value]]). @@ -5429,8 +5492,8 @@ dictionary Transformer { TransformerStartCallback start; TransformerTransformCallback transform; TransformerFlushCallback flush; - any readableType; - any writableType; + ReadableStreamType readableType; + WritableStreamType writableType; }; callback TransformerStartCallback = any (TransformStreamDefaultController controller); @@ -5500,13 +5563,12 @@ callback TransformerTransformCallback = Promise (any chunk, Transform
readableType
-

This property is reserved for future use, so any attempts to supply a value will throw an - exception. +

This property allows specializing the [=readable side=]. Only "{{ReadableStreamType/owning}}" is allowed. + Any other value will throw an exception.

writableType
-

This property is reserved for future use, so any attempts to supply a value will throw an - exception. +

This property allows specializing the [=writable side=]. Only "{{WritableStreamType/owning}}" is allowed.

The controller object passed to {{Transformer/start|start()}}, @@ -5553,17 +5615,17 @@ side=], or to terminate or error the stream.

We cannot declare the |transformer| argument as having the {{Transformer}} type directly, because doing so would lose the reference to the original object. We need to retain the object so we can [=invoke=] the various methods on it. - 1. If |transformerDict|["{{Transformer/readableType}}"] [=map/exists=], throw a {{RangeError}} - exception. - 1. If |transformerDict|["{{Transformer/writableType}}"] [=map/exists=], throw a {{RangeError}} - exception. + 1. If |transformerDict|["{{Transformer/readableType}}"] [=map/exists=] + and is not "{{ReadableStreamType/owning}}", throw a {{TypeError}} exception. + 1. Assert: |transformerDict|["{{Transformer/writableType}}"] [=map/exists|does not exit=] + or its value is "{{WritableStreamType/owning}}". 1. Let |readableHighWaterMark| be ? [$ExtractHighWaterMark$](|readableStrategy|, 0). 1. Let |readableSizeAlgorithm| be ! [$ExtractSizeAlgorithm$](|readableStrategy|). 1. Let |writableHighWaterMark| be ? [$ExtractHighWaterMark$](|writableStrategy|, 1). 1. Let |writableSizeAlgorithm| be ! [$ExtractSizeAlgorithm$](|writableStrategy|). 1. Let |startPromise| be [=a new promise=]. 1. Perform ! [$InitializeTransformStream$]([=this=], |startPromise|, |writableHighWaterMark|, - |writableSizeAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|). + |writableSizeAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|, |transformerDict|). 1. Perform ? [$SetUpTransformStreamDefaultControllerFromTransformer$]([=this=], |transformer|, |transformerDict|). 1. If |transformerDict|["{{Transformer/start}}"] [=map/exists=], then [=resolve=] |startPromise| @@ -5649,7 +5711,7 @@ The Web IDL definition for the {{TransformStreamDefaultController}} class is giv interface TransformStreamDefaultController { readonly attribute unrestricted double? desiredSize; - undefined enqueue(optional any chunk); + undefined enqueue(optional any chunk, optional StructuredSerializeOptions options = { }); undefined error(optional any reason); undefined terminate(); }; @@ -5687,10 +5749,10 @@ the following table:

Returns the [=desired size to fill a stream's internal queue|desired size to fill the readable side's internal queue=]. It can be negative, if the queue is over-full. -

controller.{{TransformStreamDefaultController/enqueue()|enqueue}}(chunk) +
controller.{{TransformStreamDefaultController/enqueue()|enqueue}}(chunk, options)

Enqueues the given [=chunk=] chunk in the [=readable side=] of the controlled - transform stream. + transform stream with options.

controller.{{TransformStreamDefaultController/error()|error}}(e)
@@ -5715,9 +5777,9 @@ the following table:
The enqueue(|chunk|) method steps are: - - 1. Perform ? [$TransformStreamDefaultControllerEnqueue$]([=this=], |chunk|). + for="TransformStreamDefaultController">enqueue(|chunk|, |options|) method steps are: + 1. Let |transferList| be |options|["transfer"]. + 1. Perform ? [$TransformStreamDefaultControllerEnqueue$]([=this=], |chunk|, |transferList|).
@@ -5744,7 +5806,7 @@ The following abstract operations operate on {{TransformStream}} instances at a InitializeTransformStream(|stream|, |startPromise|, |writableHighWaterMark|, |writableSizeAlgorithm|, |readableHighWaterMark|, - |readableSizeAlgorithm|) performs the following steps: + |readableSizeAlgorithm|, |transformerDict|) performs the following steps: 1. Let |startAlgorithm| be an algorithm that returns |startPromise|. 1. Let |writeAlgorithm| be the following steps, taking a |chunk| argument: @@ -5755,14 +5817,14 @@ The following abstract operations operate on {{TransformStream}} instances at a 1. Return ! [$TransformStreamDefaultSinkCloseAlgorithm$](|stream|). 1. Set |stream|.[=TransformStream/[[writable]]=] to ! [$CreateWritableStream$](|startAlgorithm|, |writeAlgorithm|, |closeAlgorithm|, |abortAlgorithm|, |writableHighWaterMark|, - |writableSizeAlgorithm|). + |writableSizeAlgorithm|, |transformerDict|.["writableType"]). 1. Let |pullAlgorithm| be the following steps: 1. Return ! [$TransformStreamDefaultSourcePullAlgorithm$](|stream|). 1. Let |cancelAlgorithm| be the following steps, taking a |reason| argument: 1. Perform ! [$TransformStreamErrorWritableAndUnblockWrite$](|stream|, |reason|). 1. Return [=a promise resolved with=] undefined. 1. Set |stream|.[=TransformStream/[[readable]]=] to ! [$CreateReadableStream$](|startAlgorithm|, - |pullAlgorithm|, |cancelAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|). + |pullAlgorithm|, |cancelAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|, |transformerDict|.["readableType"]). 1. Set |stream|.[=TransformStream/[[backpressure]]=] and |stream|.[=TransformStream/[[backpressureChangePromise]]=] to undefined.

The [=TransformStream/[[backpressure]]=] slot is set to undefined so that it can @@ -5840,7 +5902,28 @@ The following abstract operations support the implementaiton of the 1. Let |controller| be a [=new=] {{TransformStreamDefaultController}}. 1. Let |transformAlgorithm| be the following steps, taking a |chunk| argument: - 1. Let |result| be [$TransformStreamDefaultControllerEnqueue$](|controller|, |chunk|). + 1. Let |result| be [$TransformStreamDefaultControllerEnqueue$](|controller|, |chunk|, undefined). +

+ {{WritableStream}} "{{WritableStreamType/owning}}" and {{ReadableStream}} "{{ReadableStreamType/owning}}" + have some impact on the identity transform, similar to [$ReadableStreamPipeTo$]. + - Enqueuing a chunk from a not owning {{WritableStream}} to a not owning {{ReadableStream}} will enqueue + the exact same JavaScript values. + - Enqueuing a chunk from an owning {{WritableStream}} to an owning {{ReadableStream}} is similar to the above case, + as ownership will be transferred from the {{WritableStream}} to the {{ReadableStream}}. + In case of error during the enqueuing operation, the user agent is responsible to call the + necessary [=closing steps=] of the JavaScript values that were unsuccessfully enqueud on the + {{ReadableStream}}. + - Enqueuing a chunk from an owning {{WritableStream}} to a not owning {{ReadableStream}} will enqueue + the serialized/transfered JavaScript values. + In case of error during the enqueuing operation, the user agent is responsible to call the + necessary [=closing steps=] of the JavaScript values that were unsuccessfully enqueued on the + {{ReadableStream}}. + Note that [=closing steps=] of the enqueued JavaScript values in the {{ReadableStream}} + will not be called automatically. It is up to the application to handle this. + - Enqueuing a chunk from a not owning {{WritableStream}} to an owning {{ReadableStream}} will trigger + serialization of the {{WritableStream}} chunks when enqueued in {{ReadableStream}}. + Enqueuing may fail if the serialization fails, say in case of transferable-only JavaScript values. +
1. If |result| is an abrupt completion, return [=a promise rejected with=] |result|.\[[Value]]. 1. Otherwise, return [=a promise resolved with=] undefined. 1. Let |flushAlgorithm| be an algorithm which returns [=a promise resolved with=] undefined. @@ -5876,7 +5959,7 @@ The following abstract operations support the implementaiton of the
TransformStreamDefaultControllerEnqueue(|controller|, - |chunk|) performs the following steps: + |chunk|, |transferList|) performs the following steps: 1. Let |stream| be |controller|.[=TransformStreamDefaultController/[[stream]]=]. 1. Let |readableController| be @@ -5884,7 +5967,7 @@ The following abstract operations support the implementaiton of the 1. If ! [$ReadableStreamDefaultControllerCanCloseOrEnqueue$](|readableController|) is false, throw a {{TypeError}} exception. 1. Let |enqueueResult| be [$ReadableStreamDefaultControllerEnqueue$](|readableController|, - |chunk|). + |chunk|, |transferList|). 1. If |enqueueResult| is an abrupt completion, 1. Perform ! [$TransformStreamErrorWritableAndUnblockWrite$](|stream|, |enqueueResult|.\[[Value]]). @@ -6362,13 +6445,17 @@ for="value-with-size">value and size.
EnqueueValueWithSize(|container|, |value|, |size|) performs the + id="enqueue-value-with-size">EnqueueValueWithSize(|container|, |value|, |size|, |transferList|) performs the following steps: 1. Assert: |container| has \[[queue]] and \[[queueTotalSize]] internal slots. 1. If ! [$IsNonNegativeNumber$](|size|) is false, throw a {{RangeError}} exception. 1. If |size| is +∞, throw a {{RangeError}} exception. - 1. [=list/Append=] a new [=value-with-size=] with [=value-with-size/value=] |value| and + 1. Let |enqueuedValue| be |value|. + 1. If |container| has a \[[isOwning]] internal slot whose value is true, perform the following steps: + 1. Set |enqueuedValue| to [$StructuredTransferOrClone$](|value|, |transferList|). + 1. If |enqueuedValue| is an abrupt completion, return |enqueuedValue|. + 1. [=list/Append=] a new [=value-with-size=] with [=value-with-size/value=] |enqueuedValue| and [=value-with-size/size=] |size| to |container|.\[[queue]]. 1. Set |container|.\[[queueTotalSize]] to |container|.\[[queueTotalSize]] + |size|.
@@ -6388,6 +6475,10 @@ for="value-with-size">value and size. performs the following steps: 1. Assert: |container| has \[[queue]] and \[[queueTotalSize]] internal slots. + 1. If |container| has a \[[isOwning]] internal slot whose value is true, perform the following steps until |container|.\[[queue]] + is [=list/is empty|empty=]: + 1. Let |chunk| be ! [$DequeueValue$]([=this=]). + 1. If |chunk| has [=closing steps=], perform the [=closing steps=] given |chunk|. 1. Set |container|.\[[queue]] to a new empty [=list=]. 1. Set |container|.\[[queueTotalSize]] to 0.
@@ -6447,7 +6538,7 @@ abstract operations are used to implement these "cross-realm transforms". 1. Let |value| be ! [$Get$](|data|, "`value`"). 1. Assert: [$Type$](|type|) is String. 1. If |type| is "`chunk`", - 1. Perform ! [$ReadableStreamDefaultControllerEnqueue$](|controller|, |value|). + 1. Perform ! [$ReadableStreamDefaultControllerEnqueue$](|controller|, |value|, undefined). 1. Otherwise, if |type| is "`close`", 1. Perform ! [$ReadableStreamDefaultControllerClose$](|controller|). 1. Disentangle |port|. @@ -6596,6 +6687,14 @@ The following abstract operations are a grab-bag of utilities. 1. Return ? [$StructuredDeserialize$](|serialized|, [=the current Realm=]).
+
+ StructuredTransferOrClone(|value|, |transferList|) + performs the following steps: + 1. Let |serialized| be ! [$StructuredSerializeWithTransfer$](|value|, |transferList|). + 1. Let |deserialized| be ! [$StructuredDeserializeWithTransfer$](|serialized|, [=the current Realm=]). + 1. Return |deserialized|.\[[Deserialized]]. +
+

Using streams in other specifications

Much of this standard concerns itself with the internal machinery of streams. Other specifications @@ -6727,13 +6826,13 @@ mark=] is greater than zero.
- To enqueue the JavaScript value |chunk| into a - {{ReadableStream}} |stream|: + To enqueue the JavaScript value |chunk| + with an optional |transferList| into a {{ReadableStream}} |stream|: 1. If |stream|.[=ReadableStream/[[controller]]=] [=implements=] {{ReadableStreamDefaultController}}, 1. Perform ! [$ReadableStreamDefaultControllerEnqueue$](|stream|.[=ReadableStream/[[controller]]=], - |chunk|). + |chunk|, |transferList|). 1. Otherwise, 1. Assert: |stream|.[=ReadableStream/[[controller]]=] [=implements=] {{ReadableByteStreamController}}. diff --git a/reference-implementation/lib/ReadableStream-impl.js b/reference-implementation/lib/ReadableStream-impl.js index 1eda283e1..47bde08b7 100644 --- a/reference-implementation/lib/ReadableStream-impl.js +++ b/reference-implementation/lib/ReadableStream-impl.js @@ -29,7 +29,7 @@ exports.implementation = class ReadableStreamImpl { this, underlyingSource, underlyingSourceDict, highWaterMark ); } else { - assert(!('type' in underlyingSourceDict)); + assert(!('type' in underlyingSourceDict) || underlyingSourceDict.type === 'owning'); const sizeAlgorithm = ExtractSizeAlgorithm(strategy); const highWaterMark = ExtractHighWaterMark(strategy, 1); aos.SetUpReadableStreamDefaultControllerFromUnderlyingSource( diff --git a/reference-implementation/lib/ReadableStreamDefaultController-impl.js b/reference-implementation/lib/ReadableStreamDefaultController-impl.js index 5c7ec7033..d29faf89b 100644 --- a/reference-implementation/lib/ReadableStreamDefaultController-impl.js +++ b/reference-implementation/lib/ReadableStreamDefaultController-impl.js @@ -17,12 +17,13 @@ exports.implementation = class ReadableStreamDefaultControllerImpl { aos.ReadableStreamDefaultControllerClose(this); } - enqueue(chunk) { + enqueue(chunk, options) { + const transferList = options ? options.transfer : undefined; if (aos.ReadableStreamDefaultControllerCanCloseOrEnqueue(this) === false) { throw new TypeError('The stream is not in a state that permits enqueue'); } - return aos.ReadableStreamDefaultControllerEnqueue(this, chunk); + return aos.ReadableStreamDefaultControllerEnqueue(this, chunk, transferList); } error(e) { diff --git a/reference-implementation/lib/ReadableStreamDefaultController.webidl b/reference-implementation/lib/ReadableStreamDefaultController.webidl index aeea7249f..dc02cfdcb 100644 --- a/reference-implementation/lib/ReadableStreamDefaultController.webidl +++ b/reference-implementation/lib/ReadableStreamDefaultController.webidl @@ -1,8 +1,12 @@ +dictionary StructuredSerializeOptions { + sequence transfer = []; +}; + [Exposed=(Window,Worker,Worklet)] interface ReadableStreamDefaultController { readonly attribute unrestricted double? desiredSize; undefined close(); - undefined enqueue(optional any chunk); + undefined enqueue(optional any chunk, optional StructuredSerializeOptions options = { }); undefined error(optional any e); }; diff --git a/reference-implementation/lib/TransformStream-impl.js b/reference-implementation/lib/TransformStream-impl.js index aaab20efa..44e244202 100644 --- a/reference-implementation/lib/TransformStream-impl.js +++ b/reference-implementation/lib/TransformStream-impl.js @@ -12,11 +12,11 @@ exports.implementation = class TransformStreamImpl { transformer = null; } const transformerDict = Transformer.convert(transformer); - if ('readableType' in transformerDict) { - throw new RangeError('Invalid readableType specified'); + if ('readableType' in transformerDict && transformerDict['readableType'] !== 'owning') { + throw new TypeError('Invalid readableType specified'); } if ('writableType' in transformerDict) { - throw new RangeError('Invalid writableType specified'); + assert(transformerDict['writableType'] !== 'owning'); } const readableHighWaterMark = ExtractHighWaterMark(readableStrategy, 0); @@ -27,7 +27,8 @@ exports.implementation = class TransformStreamImpl { const startPromise = newPromise(); aos.InitializeTransformStream( - this, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm + this, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm, + transformerDict ); aos.SetUpTransformStreamDefaultControllerFromTransformer(this, transformer, transformerDict); diff --git a/reference-implementation/lib/TransformStreamDefaultController-impl.js b/reference-implementation/lib/TransformStreamDefaultController-impl.js index e28e9ae2f..2cb3644ca 100644 --- a/reference-implementation/lib/TransformStreamDefaultController-impl.js +++ b/reference-implementation/lib/TransformStreamDefaultController-impl.js @@ -9,8 +9,9 @@ exports.implementation = class TransformStreamDefaultController { return rsAOs.ReadableStreamDefaultControllerGetDesiredSize(readableController); } - enqueue(chunk) { - aos.TransformStreamDefaultControllerEnqueue(this, chunk); + enqueue(chunk, options) { + const transferList = options ? options.transfer : undefined; + aos.TransformStreamDefaultControllerEnqueue(this, chunk, transferList); } error(reason) { diff --git a/reference-implementation/lib/TransformStreamDefaultController.webidl b/reference-implementation/lib/TransformStreamDefaultController.webidl index 75cdd23ff..a66dd421f 100644 --- a/reference-implementation/lib/TransformStreamDefaultController.webidl +++ b/reference-implementation/lib/TransformStreamDefaultController.webidl @@ -1,8 +1,12 @@ +dictionary StructuredSerializeOptions { + sequence transfer = []; +}; + [Exposed=(Window,Worker,Worklet)] interface TransformStreamDefaultController { readonly attribute unrestricted double? desiredSize; - undefined enqueue(optional any chunk); + undefined enqueue(optional any chunk, optional StructuredSerializeOptions options = { }); undefined error(optional any reason); undefined terminate(); }; diff --git a/reference-implementation/lib/Transformer.webidl b/reference-implementation/lib/Transformer.webidl index eefea2b0d..e91c6d760 100644 --- a/reference-implementation/lib/Transformer.webidl +++ b/reference-implementation/lib/Transformer.webidl @@ -2,8 +2,8 @@ dictionary Transformer { TransformerStartCallback start; TransformerTransformCallback transform; TransformerFlushCallback flush; - any readableType; - any writableType; + ReadableStreamType readableType; + WritableStreamType writableType; }; callback TransformerStartCallback = any (TransformStreamDefaultController controller); diff --git a/reference-implementation/lib/UnderlyingSink.webidl b/reference-implementation/lib/UnderlyingSink.webidl index 53ae15c8e..ba193d4df 100644 --- a/reference-implementation/lib/UnderlyingSink.webidl +++ b/reference-implementation/lib/UnderlyingSink.webidl @@ -3,10 +3,12 @@ dictionary UnderlyingSink { UnderlyingSinkWriteCallback write; UnderlyingSinkCloseCallback close; UnderlyingSinkAbortCallback abort; - any type; + WritableStreamType type; }; callback UnderlyingSinkStartCallback = any (WritableStreamDefaultController controller); callback UnderlyingSinkWriteCallback = Promise (any chunk, WritableStreamDefaultController controller); callback UnderlyingSinkCloseCallback = Promise (); callback UnderlyingSinkAbortCallback = Promise (optional any reason); + +enum WritableStreamType { "owning" }; diff --git a/reference-implementation/lib/UnderlyingSource.webidl b/reference-implementation/lib/UnderlyingSource.webidl index 7a0047638..b8aa75b70 100644 --- a/reference-implementation/lib/UnderlyingSource.webidl +++ b/reference-implementation/lib/UnderlyingSource.webidl @@ -12,4 +12,4 @@ callback UnderlyingSourceStartCallback = any (ReadableStreamController controlle callback UnderlyingSourcePullCallback = Promise (ReadableStreamController controller); callback UnderlyingSourceCancelCallback = Promise (optional any reason); -enum ReadableStreamType { "bytes" }; +enum ReadableStreamType { "bytes", "owning" }; diff --git a/reference-implementation/lib/WritableStream-impl.js b/reference-implementation/lib/WritableStream-impl.js index 7d23802d4..06a801c3a 100644 --- a/reference-implementation/lib/WritableStream-impl.js +++ b/reference-implementation/lib/WritableStream-impl.js @@ -12,7 +12,7 @@ exports.implementation = class WritableStreamImpl { underlyingSink = null; } const underlyingSinkDict = UnderlyingSink.convert(underlyingSink); - if ('type' in underlyingSinkDict) { + if ('type' in underlyingSinkDict && underlyingSinkDict.type !== 'owning') { throw new RangeError('Invalid type is specified'); } diff --git a/reference-implementation/lib/WritableStreamDefaultWriter-impl.js b/reference-implementation/lib/WritableStreamDefaultWriter-impl.js index 1c8934331..10da58236 100644 --- a/reference-implementation/lib/WritableStreamDefaultWriter-impl.js +++ b/reference-implementation/lib/WritableStreamDefaultWriter-impl.js @@ -59,12 +59,13 @@ exports.implementation = class WritableStreamDefaultWriterImpl { aos.WritableStreamDefaultWriterRelease(this); } - write(chunk) { + write(chunk, options) { + const transferList = options ? options.transfer : undefined; if (this._stream === undefined) { return promiseRejectedWith(defaultWriterLockException('write to')); } - return aos.WritableStreamDefaultWriterWrite(this, chunk); + return aos.WritableStreamDefaultWriterWrite(this, chunk, transferList); } }; diff --git a/reference-implementation/lib/WritableStreamDefaultWriter.webidl b/reference-implementation/lib/WritableStreamDefaultWriter.webidl index 6a542d2e1..50a8f9544 100644 --- a/reference-implementation/lib/WritableStreamDefaultWriter.webidl +++ b/reference-implementation/lib/WritableStreamDefaultWriter.webidl @@ -1,3 +1,7 @@ +dictionary StructuredSerializeOptions { + sequence transfer = []; +}; + [Exposed=(Window,Worker,Worklet)] interface WritableStreamDefaultWriter { constructor(WritableStream stream); @@ -9,5 +13,5 @@ interface WritableStreamDefaultWriter { Promise abort(optional any reason); Promise close(); undefined releaseLock(); - Promise write(optional any chunk); + Promise write(optional any chunk, optional StructuredSerializeOptions options = { }); }; diff --git a/reference-implementation/lib/abstract-ops/miscellaneous.js b/reference-implementation/lib/abstract-ops/miscellaneous.js index 08589a740..d4d9a054f 100644 --- a/reference-implementation/lib/abstract-ops/miscellaneous.js +++ b/reference-implementation/lib/abstract-ops/miscellaneous.js @@ -20,3 +20,18 @@ exports.CloneAsUint8Array = O => { const buffer = O.buffer.slice(O.byteOffset, O.byteOffset + O.byteLength); return new Uint8Array(buffer); }; + +exports.StructuredTransferOrClone = (value, transferList) => { + return globalThis.structuredClone(value, { transfer: transferList }); +}; + +exports.RunCloseSteps = value => { + if (typeof value.close === 'function') { + return; + } + try { + value.close(); + } catch (closeException) { + // Nothing to do. + } +}; diff --git a/reference-implementation/lib/abstract-ops/queue-with-sizes.js b/reference-implementation/lib/abstract-ops/queue-with-sizes.js index 22086caa5..2a0d5b21d 100644 --- a/reference-implementation/lib/abstract-ops/queue-with-sizes.js +++ b/reference-implementation/lib/abstract-ops/queue-with-sizes.js @@ -1,6 +1,6 @@ 'use strict'; const assert = require('assert'); -const { IsNonNegativeNumber } = require('./miscellaneous.js'); +const { IsNonNegativeNumber, RunCloseSteps, StructuredTransferOrClone } = require('./miscellaneous.js'); exports.DequeueValue = container => { assert('_queue' in container && '_queueTotalSize' in container); @@ -15,7 +15,7 @@ exports.DequeueValue = container => { return pair.value; }; -exports.EnqueueValueWithSize = (container, value, size) => { +exports.EnqueueValueWithSize = (container, value, size, transferList) => { assert('_queue' in container && '_queueTotalSize' in container); if (!IsNonNegativeNumber(size)) { @@ -24,7 +24,9 @@ exports.EnqueueValueWithSize = (container, value, size) => { if (size === Infinity) { throw new RangeError('Size must be a finite, non-NaN, non-negative number.'); } - + if (container._isOwning && !container._isPipeToOptimizedTransfer) { + value = StructuredTransferOrClone(value, transferList); + } container._queue.push({ value, size }); container._queueTotalSize += size; }; @@ -40,6 +42,12 @@ exports.PeekQueueValue = container => { exports.ResetQueue = container => { assert('_queue' in container && '_queueTotalSize' in container); + if (container._isOwning) { + while (container._queue.length > 0) { + const value = exports.DequeueValue(container); + RunCloseSteps(value); + } + } container._queue = []; container._queueTotalSize = 0; }; diff --git a/reference-implementation/lib/abstract-ops/readable-streams.js b/reference-implementation/lib/abstract-ops/readable-streams.js index db1da4c73..a02fac012 100644 --- a/reference-implementation/lib/abstract-ops/readable-streams.js +++ b/reference-implementation/lib/abstract-ops/readable-streams.js @@ -6,7 +6,8 @@ const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, re require('../helpers/webidl.js'); const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetachedBuffer, TransferArrayBuffer } = require('./ecmascript.js'); -const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js'); +const { CloneAsUint8Array, IsNonNegativeNumber, RunCloseSteps, StructuredTransferOrClone } = + require('./miscellaneous.js'); const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js'); const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort, WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease, @@ -81,15 +82,17 @@ function AcquireReadableStreamDefaultReader(stream) { } function CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark = 1, - sizeAlgorithm = () => 1) { + sizeAlgorithm = () => 1, type = undefined) { assert(IsNonNegativeNumber(highWaterMark) === true); const stream = ReadableStream.new(globalThis); InitializeReadableStream(stream); const controller = ReadableStreamDefaultController.new(globalThis); + const isOwning = type === 'owning'; + SetUpReadableStreamDefaultController( - stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm + stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm, isOwning ); return stream; @@ -136,6 +139,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC const reader = AcquireReadableStreamDefaultReader(source); const writer = AcquireWritableStreamDefaultWriter(dest); + writer._stream._controller._isPipeToOptimizedTransfer = source._controller._isOwning && dest._controller._isOwning; source._disturbed = true; @@ -206,7 +210,11 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC { chunkSteps: chunk => { currentWrite = transformPromiseWith( - WritableStreamDefaultWriterWrite(writer, chunk), undefined, () => {} + WritableStreamDefaultWriterWrite(writer, chunk), undefined, () => { + if (reader._stream._controller._isOwning) { + RunCloseSteps(chunk); + } + } ); resolveRead(false); }, @@ -319,6 +327,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } function finalize(isError, error) { + writer._stream._controller._isPipeToOptimizedTransfer = undefined; WritableStreamDefaultWriterRelease(writer); ReadableStreamDefaultReaderRelease(reader); @@ -340,7 +349,7 @@ function ReadableStreamTee(stream, cloneForBranch2) { if (ReadableByteStreamController.isImpl(stream._controller)) { return ReadableByteStreamTee(stream); } - return ReadableStreamDefaultTee(stream, cloneForBranch2); + return ReadableStreamDefaultTee(stream, stream._controller._isOwning ? true : cloneForBranch2); } function ReadableStreamDefaultTee(stream, cloneForBranch2) { @@ -392,10 +401,10 @@ function ReadableStreamDefaultTee(stream, cloneForBranch2) { // } if (canceled1 === false) { - ReadableStreamDefaultControllerEnqueue(branch1._controller, chunk1); + ReadableStreamDefaultControllerEnqueue(branch1._controller, chunk1, undefined); } if (canceled2 === false) { - ReadableStreamDefaultControllerEnqueue(branch2._controller, chunk2); + ReadableStreamDefaultControllerEnqueue(branch2._controller, chunk2, undefined); } reading = false; @@ -1074,7 +1083,7 @@ function ReadableStreamDefaultControllerClose(controller) { } } -function ReadableStreamDefaultControllerEnqueue(controller, chunk) { +function ReadableStreamDefaultControllerEnqueue(controller, chunk, transferList) { if (ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) === false) { return; } @@ -1082,6 +1091,14 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) { const stream = controller._stream; if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) { + if (controller._isOwning) { + try { + chunk = StructuredTransferOrClone(chunk, transferList); + } catch (chunkCloneError) { + ReadableStreamDefaultControllerError(controller, chunkCloneError); + throw chunkCloneError; + } + } ReadableStreamFulfillReadRequest(stream, chunk, false); } else { let chunkSize; @@ -1093,7 +1110,7 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) { } try { - EnqueueValueWithSize(controller, chunk, chunkSize); + EnqueueValueWithSize(controller, chunk, chunkSize, transferList); } catch (enqueueE) { ReadableStreamDefaultControllerError(controller, enqueueE); throw enqueueE; @@ -1148,7 +1165,7 @@ function ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) { } function SetUpReadableStreamDefaultController( - stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm) { + stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm, isOwning) { assert(stream._controller === undefined); controller._stream = stream; @@ -1169,6 +1186,8 @@ function SetUpReadableStreamDefaultController( controller._pullAlgorithm = pullAlgorithm; controller._cancelAlgorithm = cancelAlgorithm; + controller._isOwning = isOwning; + stream._controller = controller; const startResult = startAlgorithm(); @@ -1195,7 +1214,7 @@ function SetUpReadableStreamDefaultControllerFromUnderlyingSource( let startAlgorithm = () => undefined; let pullAlgorithm = () => promiseResolvedWith(undefined); let cancelAlgorithm = () => promiseResolvedWith(undefined); - + const isOwning = underlyingSourceDict.type === 'owning'; if ('start' in underlyingSourceDict) { startAlgorithm = () => underlyingSourceDict.start.call(underlyingSource, controller); } @@ -1207,8 +1226,8 @@ function SetUpReadableStreamDefaultControllerFromUnderlyingSource( } SetUpReadableStreamDefaultController( - stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm - ); + stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm, + isOwning); } // Byte stream controllers diff --git a/reference-implementation/lib/abstract-ops/transform-streams.js b/reference-implementation/lib/abstract-ops/transform-streams.js index 8e3f5fcc3..7c95fd9fb 100644 --- a/reference-implementation/lib/abstract-ops/transform-streams.js +++ b/reference-implementation/lib/abstract-ops/transform-streams.js @@ -25,7 +25,7 @@ Object.assign(exports, { // CreateTransformStream is not implemented since it is only meant for external specs. function InitializeTransformStream( - stream, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm) { + stream, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm, transformDict) { function startAlgorithm() { return startPromise; } @@ -43,7 +43,8 @@ function InitializeTransformStream( } stream._writable = CreateWritableStream( - startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm + startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm, + transformDict['writableType'] ); function pullAlgorithm() { @@ -56,7 +57,8 @@ function InitializeTransformStream( } stream._readable = CreateReadableStream( - startAlgorithm, pullAlgorithm, cancelAlgorithm, readableHighWaterMark, readableSizeAlgorithm + startAlgorithm, pullAlgorithm, cancelAlgorithm, readableHighWaterMark, readableSizeAlgorithm, + transformDict['readableType'] ); // The [[backpressure]] slot is set to undefined so that it can be initialised by TransformStreamSetBackpressure. @@ -116,11 +118,17 @@ function SetUpTransformStreamDefaultController(stream, controller, transformAlgo function SetUpTransformStreamDefaultControllerFromTransformer(stream, transformer, transformerDict) { const controller = TransformStreamDefaultController.new(globalThis); + const readableController = stream.readable._controller; + const writableController = stream.writable._controller; + let transformAlgorithm = chunk => { try { TransformStreamDefaultControllerEnqueue(controller, chunk); return promiseResolvedWith(undefined); } catch (transformResultE) { + if (writableController._isOwning) { + RunCloseSteps(chunk); + } return promiseRejectedWith(transformResultE); } }; @@ -129,6 +137,8 @@ function SetUpTransformStreamDefaultControllerFromTransformer(stream, transforme if ('transform' in transformerDict) { transformAlgorithm = chunk => transformerDict.transform.call(transformer, chunk, controller); + } else { + readableController._isPipeToOptimizedTransfer = writableController._isOwning && readableController._isOwning; } if ('flush' in transformerDict) { flushAlgorithm = () => transformerDict.flush.call(transformer, controller); @@ -142,7 +152,7 @@ function TransformStreamDefaultControllerClearAlgorithms(controller) { controller._flushAlgorithm = undefined; } -function TransformStreamDefaultControllerEnqueue(controller, chunk) { +function TransformStreamDefaultControllerEnqueue(controller, chunk, transferList) { verbose('TransformStreamDefaultControllerEnqueue()'); const stream = controller._stream; @@ -155,7 +165,7 @@ function TransformStreamDefaultControllerEnqueue(controller, chunk) { // accept TransformStreamDefaultControllerEnqueue() calls. try { - ReadableStreamDefaultControllerEnqueue(readableController, chunk); + ReadableStreamDefaultControllerEnqueue(readableController, chunk, transferList); } catch (e) { // This happens when readableStrategy.size() throws. TransformStreamErrorWritableAndUnblockWrite(stream, e); diff --git a/reference-implementation/lib/abstract-ops/writable-streams.js b/reference-implementation/lib/abstract-ops/writable-streams.js index cf303bfe7..9bebc518d 100644 --- a/reference-implementation/lib/abstract-ops/writable-streams.js +++ b/reference-implementation/lib/abstract-ops/writable-streams.js @@ -44,16 +44,17 @@ function AcquireWritableStreamDefaultWriter(stream) { } function CreateWritableStream(startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark = 1, - sizeAlgorithm = () => 1) { + sizeAlgorithm = () => 1, type = undefined) { assert(IsNonNegativeNumber(highWaterMark) === true); const stream = WritableStream.new(globalThis); InitializeWritableStream(stream); const controller = WritableStreamDefaultController.new(globalThis); + const isOwning = type === 'owning'; SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, - abortAlgorithm, highWaterMark, sizeAlgorithm); + abortAlgorithm, highWaterMark, sizeAlgorithm, isOwning); return stream; } @@ -529,7 +530,7 @@ function WritableStreamDefaultWriterWrite(writer, chunk) { // Default controllers function SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, - abortAlgorithm, highWaterMark, sizeAlgorithm) { + abortAlgorithm, highWaterMark, sizeAlgorithm, isOwning) { assert(WritableStream.isImpl(stream)); assert(stream._controller === undefined); @@ -551,6 +552,8 @@ function SetUpWritableStreamDefaultController(stream, controller, startAlgorithm controller._closeAlgorithm = closeAlgorithm; controller._abortAlgorithm = abortAlgorithm; + controller._isOwning = isOwning; + const backpressure = WritableStreamDefaultControllerGetBackpressure(controller); WritableStreamUpdateBackpressure(stream, backpressure); @@ -579,6 +582,7 @@ function SetUpWritableStreamDefaultControllerFromUnderlyingSink(stream, underlyi let writeAlgorithm = () => promiseResolvedWith(undefined); let closeAlgorithm = () => promiseResolvedWith(undefined); let abortAlgorithm = () => promiseResolvedWith(undefined); + const isOwning = underlyingSinkDict.type === 'owning'; if ('start' in underlyingSinkDict) { startAlgorithm = () => underlyingSinkDict.start.call(underlyingSink, controller); @@ -594,8 +598,8 @@ function SetUpWritableStreamDefaultControllerFromUnderlyingSink(stream, underlyi } SetUpWritableStreamDefaultController( - stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm - ); + stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm, + isOwning); } function WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller) { @@ -637,7 +641,7 @@ function WritableStreamDefaultControllerClearAlgorithms(controller) { } function WritableStreamDefaultControllerClose(controller) { - EnqueueValueWithSize(controller, closeSentinel, 0); + EnqueueValueWithSize(controller, closeSentinel, 0, undefined); WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); } @@ -729,7 +733,7 @@ function WritableStreamDefaultControllerProcessWrite(controller, chunk) { function WritableStreamDefaultControllerWrite(controller, chunk, chunkSize) { try { - EnqueueValueWithSize(controller, chunk, chunkSize); + EnqueueValueWithSize(controller, chunk, chunkSize, undefined); } catch (enqueueE) { WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueE); return; diff --git a/reference-implementation/run-web-platform-tests.js b/reference-implementation/run-web-platform-tests.js index 836911eeb..8d857c9f7 100644 --- a/reference-implementation/run-web-platform-tests.js +++ b/reference-implementation/run-web-platform-tests.js @@ -35,7 +35,9 @@ async function main() { const excludeGlobs = [ // These tests use ArrayBuffers backed by WebAssembly.Memory objects, which *should* be non-transferable. // However, our TransferArrayBuffer implementation cannot detect these, and will incorrectly "transfer" them anyway. - 'readable-byte-streams/non-transferable-buffers.any.html' + 'readable-byte-streams/non-transferable-buffers.any.html', + 'readable-streams/owning-type-message-port.any.html', // MessagePort is not defined. + 'readable-streams/owning-type-video-frame.any.html' // VideoFrame is not defined. ]; const anyTestPattern = /\.any\.html$/; @@ -58,6 +60,7 @@ async function main() { } }; }; + window.structuredClone = globalThis.structuredClone; window.eval(bundledJS); }, filter(testPath) { diff --git a/reference-implementation/web-platform-tests b/reference-implementation/web-platform-tests index 6a46d9cb8..c844711f6 160000 --- a/reference-implementation/web-platform-tests +++ b/reference-implementation/web-platform-tests @@ -1 +1 @@ -Subproject commit 6a46d9cb8d20c510a620141c721b81b460a4ee55 +Subproject commit c844711f65688541d135cba1f5015a899128fcf8