Skip to content

Commit 1e4dc0d

Browse files
siyingHeartSaVioR
authored andcommitted
[SPARK-51724][SS] RocksDB StateStore's lineage manager should be synchronized
### Why are the changes needed? RocksDB State Store's Lineage Manager currently isn't synchronized, but it can be accessed by both DB loading and maintenance thread. In theory, it can cause wrong lineage: 1. maintenance thread get current lineage 2. task commit() adds a lineage from the lienage 3. maintenance thread does the truncation and store it back In this case, the new lineage added by 2. is lost. It should be fixed by simply synchronizing those operations. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Run existing unit tests ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50520 from siying/lineagemanagerrace. Authored-by: Siying Dong <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]> (cherry picked from commit 187adb8) Signed-off-by: Jungtaek Lim <[email protected]>
1 parent e896843 commit 1e4dc0d

File tree

1 file changed

+15
-10
lines changed
  • sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state

1 file changed

+15
-10
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1462,8 +1462,7 @@ class RocksDB(
14621462
// This is relative aggressive because that even if the uploading succeeds,
14631463
// it is not necessarily the one written to the commit log. But we can always load lineage
14641464
// from commit log so it is fine.
1465-
lineageManager.resetLineage(lineageManager.getLineageForCurrVersion()
1466-
.filter(i => i.version >= snapshot.version))
1465+
lineageManager.truncateFromVersion(snapshot.version)
14671466
logInfo(log"${MDC(LogKeys.LOG_ID, loggingId)}: " +
14681467
log"Upload snapshot of version ${MDC(LogKeys.VERSION_NUM, snapshot.version)}, " +
14691468
log"with uniqueId: ${MDC(LogKeys.UUID, snapshot.uniqueId)} " +
@@ -1975,27 +1974,33 @@ case class AcquiredThreadInfo(
19751974
private[sql] class RocksDBLineageManager {
19761975
@volatile private var lineage: Array[LineageItem] = Array.empty
19771976

1978-
override def toString: String = lineage.map {
1979-
case LineageItem(version, uuid) => s"$version: $uuid"
1980-
}.mkString(" ")
1977+
override def toString: String = synchronized {
1978+
lineage.map {
1979+
case LineageItem(version, uuid) => s"$version: $uuid"
1980+
}.mkString(" ")
1981+
}
19811982

1982-
def appendLineageItem(item: LineageItem): Unit = {
1983+
def appendLineageItem(item: LineageItem): Unit = synchronized {
19831984
lineage = lineage :+ item
19841985
}
19851986

1986-
def resetLineage(newLineage: Array[LineageItem]): Unit = {
1987+
def truncateFromVersion(versionToKeep: Long): Unit = synchronized {
1988+
resetLineage(getLineageForCurrVersion().filter(i => i.version >= versionToKeep))
1989+
}
1990+
1991+
def resetLineage(newLineage: Array[LineageItem]): Unit = synchronized {
19871992
lineage = newLineage
19881993
}
19891994

1990-
def getLineageForCurrVersion(): Array[LineageItem] = {
1995+
def getLineageForCurrVersion(): Array[LineageItem] = synchronized {
19911996
lineage.clone()
19921997
}
19931998

1994-
def contains(item: LineageItem): Boolean = {
1999+
def contains(item: LineageItem): Boolean = synchronized {
19952000
lineage.contains(item)
19962001
}
19972002

1998-
def clear(): Unit = {
2003+
def clear(): Unit = synchronized {
19992004
lineage = Array.empty
20002005
}
20012006
}

0 commit comments

Comments
 (0)