Skip to content

Commit c7024d3

Browse files
siyingHeartSaVioR
andcommitted
[SPARK-51714][SS] Add Failure Ingestion test to test state store checkpoint format V2
### Why are the changes needed? The new state store checkpoint format needs failure tolerance tests to make sure the implementation is correct and delivers the behavior we would like. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? It is test code itself ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50508 from siying/ingest_failure9. Lead-authored-by: Siying Dong <[email protected]> Co-authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 3f16577 commit c7024d3

File tree

6 files changed

+899
-4
lines changed

6 files changed

+899
-4
lines changed

Diff for: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

+17-1
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,23 @@ class RocksDB(
135135
private val nativeStats = rocksDbOptions.statistics()
136136

137137
private val workingDir = createTempDir("workingDir")
138-
private val fileManager = new RocksDBFileManager(dfsRootDir, createTempDir("fileManager"),
138+
139+
protected def createFileManager(
140+
dfsRootDir: String,
141+
localTempDir: File,
142+
hadoopConf: Configuration,
143+
codecName: String,
144+
loggingId: String): RocksDBFileManager = {
145+
new RocksDBFileManager(
146+
dfsRootDir,
147+
localTempDir,
148+
hadoopConf,
149+
codecName,
150+
loggingId = loggingId
151+
)
152+
}
153+
154+
private val fileManager = createFileManager(dfsRootDir, createTempDir("fileManager"),
139155
hadoopConf, conf.compressionCodec, loggingId = loggingId)
140156
private val byteArrayPair = new ByteArrayPair()
141157
private val commitLatencyMs = new mutable.HashMap[String, Long]()

Diff for: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala

+6-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
3232
import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule}
3333
import org.apache.commons.io.{FilenameUtils, IOUtils}
3434
import org.apache.hadoop.conf.Configuration
35-
import org.apache.hadoop.fs.{FileStatus, Path, PathFilter}
35+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
3636
import org.json4s.{Formats, NoTypeHints}
3737
import org.json4s.jackson.Serialization
3838

@@ -133,8 +133,12 @@ class RocksDBFileManager(
133133

134134
import RocksDBImmutableFile._
135135

136+
protected def getFileSystem(myDfsRootDir: String, myHadoopConf: Configuration) : FileSystem = {
137+
new Path(myDfsRootDir).getFileSystem(myHadoopConf)
138+
}
139+
136140
private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf)
137-
private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)
141+
private val fs = getFileSystem(dfsRootDir, hadoopConf)
138142
private val onlyZipFiles = new PathFilter {
139143
override def accept(path: Path): Boolean = path.toString.endsWith(".zip")
140144
}

Diff for: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala

+21-1
Original file line numberDiff line numberDiff line change
@@ -524,13 +524,33 @@ private[sql] class RocksDBStateStoreProvider
524524
@volatile private var stateStoreEncoding: String = _
525525
@volatile private var stateSchemaProvider: Option[StateSchemaProvider] = _
526526

527+
protected def createRocksDB(
528+
dfsRootDir: String,
529+
conf: RocksDBConf,
530+
localRootDir: File,
531+
hadoopConf: Configuration,
532+
loggingId: String,
533+
useColumnFamilies: Boolean,
534+
enableStateStoreCheckpointIds: Boolean,
535+
partitionId: Int = 0): RocksDB = {
536+
new RocksDB(
537+
dfsRootDir,
538+
conf,
539+
localRootDir,
540+
hadoopConf,
541+
loggingId,
542+
useColumnFamilies,
543+
enableStateStoreCheckpointIds,
544+
partitionId)
545+
}
546+
527547
private[sql] lazy val rocksDB = {
528548
val dfsRootDir = stateStoreId.storeCheckpointLocation().toString
529549
val storeIdStr = s"StateStoreId(opId=${stateStoreId.operatorId}," +
530550
s"partId=${stateStoreId.partitionId},name=${stateStoreId.storeName})"
531551
val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
532552
val localRootDir = Utils.createTempDir(Utils.getLocalDir(sparkConf), storeIdStr)
533-
new RocksDB(dfsRootDir, RocksDBConf(storeConf), localRootDir, hadoopConf, storeIdStr,
553+
createRocksDB(dfsRootDir, RocksDBConf(storeConf), localRootDir, hadoopConf, storeIdStr,
534554
useColumnFamilies, storeConf.enableStateStoreCheckpointIds, stateStoreId.partitionId)
535555
}
536556

Diff for: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

+8
Original file line numberDiff line numberDiff line change
@@ -988,6 +988,14 @@ object StateStore extends Logging {
988988

989989
/** Stop maintenance thread and reset the maintenance task */
990990
def stopMaintenanceTask(): Unit = loadedProviders.synchronized {
991+
stopMaintenanceTaskWithoutLock()
992+
}
993+
994+
/**
995+
* Only used for unit tests. The function doesn't hold loadedProviders lock. Calling
996+
* it can work-around a deadlock condition where a maintenance task is waiting for the lock
997+
* */
998+
private[streaming] def stopMaintenanceTaskWithoutLock(): Unit = {
991999
if (maintenanceThreadPool != null) {
9921000
maintenanceThreadPoolLock.synchronized {
9931001
maintenancePartitions.clear()

0 commit comments

Comments
 (0)