Skip to content

Commit db06f3e

Browse files
committed
[SPARK-42105][SS][DOCS] Reflect the change of SPARK-40925 to SS guide doc
### What changes were proposed in this pull request? This PR proposes to update Structured Streaming guide doc to reflect the change of SPARK-40925. ### Why are the changes needed? SPARK-40925 addressed majority of limitation on global watermark, but we still haven't updated the doc. ### Does this PR introduce _any_ user-facing change? Yes, documentation fix. ### How was this patch tested? Built the doc page via bundle. Here's a screenshot. <img width="979" alt="screenshot-SPARK-42105-update" src="https://user-images.githubusercontent.com/1317309/213628538-330bb326-4dd4-4212-9687-157ffac32429.png"> Closes apache#39662 from HeartSaVioR/SPARK-42105. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 9c53845 commit db06f3e

File tree

1 file changed

+7
-33
lines changed

1 file changed

+7
-33
lines changed

docs/structured-streaming-programming-guide.md

+7-33
Original file line numberDiff line numberDiff line change
@@ -1834,20 +1834,23 @@ Though Spark cannot check and force it, the state function should be implemented
18341834
There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets.
18351835
Some of them are as follows.
18361836

1837-
- Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.
1838-
18391837
- Limit and take the first N rows are not supported on streaming Datasets.
18401838

18411839
- Distinct operations on streaming Datasets are not supported.
18421840

1843-
- Deduplication operation is not supported after aggregation on a streaming Datasets.
1844-
18451841
- Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.
18461842

18471843
- Few types of outer joins on streaming Datasets are not supported. See the
18481844
<a href="#support-matrix-for-joins-in-streaming-queries">support matrix in the Join Operations section</a>
18491845
for more details.
18501846

1847+
- Chaining multiple stateful operations on streaming Datasets is not supported with Update and Complete mode.
1848+
- In addition, below operations followed by other stateful operation is not supported in Append mode.
1849+
- stream-stream time interval join (inner/outer)
1850+
- flatMapGroupsWithState
1851+
- A known workaround is to split your streaming query into multiple queries having a single stateful operation per each query,
1852+
and ensure end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional.
1853+
18511854
In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).
18521855

18531856
- `count()` - Cannot return a single count from a streaming Dataset. Instead, use `ds.groupBy().count()` which returns a streaming Dataset containing a running count.
@@ -1863,35 +1866,6 @@ For example, sorting on the input stream is not supported, as it requires keepin
18631866
track of all the data received in the stream. This is therefore fundamentally hard to execute
18641867
efficiently.
18651868

1866-
### Limitation of global watermark
1867-
1868-
In Append mode, if a stateful operation emits rows older than current watermark plus allowed late record delay,
1869-
they will be "late rows" in downstream stateful operations (as Spark uses global watermark). Note that these rows may be discarded.
1870-
This is a limitation of a global watermark, and it could potentially cause a correctness issue.
1871-
1872-
Spark will check the logical plan of query and log a warning when Spark detects such a pattern.
1873-
1874-
Any of the stateful operation(s) after any of below stateful operations can have this issue:
1875-
1876-
* streaming aggregation in Append mode
1877-
* stream-stream outer join
1878-
* `mapGroupsWithState` and `flatMapGroupsWithState` in Append mode (depending on the implementation of the state function)
1879-
1880-
As Spark cannot check the state function of `mapGroupsWithState`/`flatMapGroupsWithState`, Spark assumes that the state function
1881-
emits late rows if the operator uses Append mode.
1882-
1883-
Spark provides two ways to check the number of late rows on stateful operators which would help you identify the issue:
1884-
1885-
1. On Spark UI: check the metrics in stateful operator nodes in query execution details page in SQL tab
1886-
2. On Streaming Query Listener: check "numRowsDroppedByWatermark" in "stateOperators" in QueryProcessEvent.
1887-
1888-
Please note that "numRowsDroppedByWatermark" represents the number of "dropped" rows by watermark, which is not always same as the count of "late input rows" for the operator.
1889-
It depends on the implementation of the operator - e.g. streaming aggregation does pre-aggregate input rows and checks the late inputs against pre-aggregated inputs,
1890-
hence the number is not same as the number of original input rows. You'd like to just check the fact whether the value is zero or non-zero.
1891-
1892-
There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure
1893-
end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional.
1894-
18951869
### State Store
18961870

18971871
State store is a versioned key-value store which provides both read and write operations. In

0 commit comments

Comments
 (0)