From 03043a3885b4093632f5ec91b4b75040169a8e2c Mon Sep 17 00:00:00 2001 From: micheal-o Date: Thu, 3 Apr 2025 23:11:17 -0700 Subject: [PATCH 1/3] fix --- .../execution/streaming/state/RocksDB.scala | 33 ++++++++++--- .../streaming/state/RocksDBFileManager.scala | 2 + .../streaming/state/RocksDBSuite.scala | 46 +++++++++++++++++++ 3 files changed, 75 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 56f253b523358..569152479a75a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -1344,6 +1344,14 @@ class RocksDBFileMapping { // from reusing SST files which have not been yet persisted to DFS, val snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo] = ConcurrentHashMap.newKeySet() + /** + * Clear everything stored in the file mapping. + */ + def clear(): Unit = { + localFileMappings.clear() + snapshotsPendingUpload.clear() + } + /** * Get the mapped DFS file for the given local file for a DFS load operation. * If the currently mapped DFS file was mapped in the same or newer version as the version we @@ -1360,7 +1368,13 @@ class RocksDBFileMapping { fileManager: RocksDBFileManager, localFileName: String, versionToLoad: Long): Option[RocksDBImmutableFile] = { - getDfsFileWithVersionCheck(fileManager, localFileName, _ >= versionToLoad) + getDfsFileWithVersionCheck( + fileManager, + localFileName, + // We can't reuse the current local file since it was added in the same or newer version + // as the version we want to load + (fileVersion, _) => fileVersion >= versionToLoad + ) } /** @@ -1378,19 +1392,26 @@ class RocksDBFileMapping { */ private def getDfsFileForSave( fileManager: RocksDBFileManager, - localFileName: String, + localFile: File, versionToSave: Long): Option[RocksDBImmutableFile] = { - getDfsFileWithVersionCheck(fileManager, localFileName, _ >= versionToSave) + getDfsFileWithVersionCheck( + fileManager, + localFile.getName, + (dfsFileVersion, dfsFile) => + // The DFS file is not the same as the file we want to save, either if + // the DFS file was added in the same or higher version, or the file size is different + dfsFileVersion >= versionToSave || dfsFile.sizeBytes != localFile.length() + ) } private def getDfsFileWithVersionCheck( fileManager: RocksDBFileManager, localFileName: String, - isIncompatibleVersion: Long => Boolean): Option[RocksDBImmutableFile] = { + isIncompatible: (Long, RocksDBImmutableFile) => Boolean): Option[RocksDBImmutableFile] = { localFileMappings.get(localFileName).map { case (dfsFileMappedVersion, dfsFile) => val dfsFileSuffix = fileManager.dfsFileSuffix(dfsFile) val versionSnapshotInfo = RocksDBVersionSnapshotInfo(dfsFileMappedVersion, dfsFileSuffix) - if (isIncompatibleVersion(dfsFileMappedVersion) || + if (isIncompatible(dfsFileMappedVersion, dfsFile) || snapshotsPendingUpload.contains(versionSnapshotInfo)) { // the mapped dfs file cannot be used, delete from mapping remove(localFileName) @@ -1432,7 +1453,7 @@ class RocksDBFileMapping { val dfsFilesSuffix = UUID.randomUUID().toString val snapshotFileMapping = localImmutableFiles.map { f => val localFileName = f.getName - val existingDfsFile = getDfsFileForSave(fileManager, localFileName, version) + val existingDfsFile = getDfsFileForSave(fileManager, f, version) val dfsFile = existingDfsFile.getOrElse { val newDfsFileName = fileManager.newDFSFileName(localFileName, dfsFilesSuffix) val newDfsFile = RocksDBImmutableFile(localFileName, newDfsFileName, sizeBytes = f.length()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index d077928711de9..2fc8617aa67b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -302,6 +302,8 @@ class RocksDBFileManager( val metadata = if (version == 0) { if (localDir.exists) Utils.deleteRecursively(localDir) localDir.mkdirs() + // Since we cleared the local dir, we should also clear the local file mapping + rocksDBFileMapping.clear() RocksDBCheckpointMetadata(Seq.empty, 0) } else { // Delete all non-immutable files in local dir, and unzip new ones from DFS commit file diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 0c6824f76d016..2fc486fe162ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -3031,6 +3031,52 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } } + testWithChangelogCheckpointingEnabled( + "SPARK-51717 - validate that RocksDB file mapping is cleared " + + "when we reload version 0 after we have created a snapshot to avoid SST mismatch") { + withTempDir { dir => + val conf = dbConf.copy(minDeltasForSnapshot = 2) + val hadoopConf = new Configuration() + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db => + db.load(0) + db.put("a", "1") + db.put("b", "1") + db.commit() + + db.load(1) + db.put("a", "1") + db.commit() // we will create a snapshot for v2 + + // invalidate the db, so next load will reload from dfs + db.rollback() + + // We will replay changelog from 0 -> 2 since the v2 snapshot haven't been uploaded yet. + // We had a bug where file mapping is not being cleared when we start from v0 again, + // hence files of v2 snapshot were being reused, if v2 snapshot is uploaded + // after this load(2) but before v3 snapshot + db.load(2) + // add a larger row to make sure new sst size is different + db.put("b", "1555315569874537247638950872648") + + // now upload v2 snapshot + db.doMaintenance() + + // we will create a snapshot for v3. We shouldn't reuse files of v2 snapshot, + // given that v3 was not created from v2 snapshot since we replayed changelog from 0 -> 2 + db.commit() + + db.doMaintenance() // upload v3 snapshot + + // invalidate the db, so next load will reload from dfs + db.rollback() + + // loading v3 from dfs should be successful and no SST mismatch error + db.load(3) + } + } + } + test("ensure local files deleted on filesystem" + " are cleaned from dfs file mapping") { def getSSTFiles(dir: File): Set[File] = { From 4c7969bf25b04d68fb7a5339b3d416d78ac136e4 Mon Sep 17 00:00:00 2001 From: micheal-o Date: Fri, 4 Apr 2025 20:50:33 -0700 Subject: [PATCH 2/3] nit --- .../spark/sql/execution/streaming/state/RocksDB.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 569152479a75a..2ba0a7e35b743 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -1368,7 +1368,7 @@ class RocksDBFileMapping { fileManager: RocksDBFileManager, localFileName: String, versionToLoad: Long): Option[RocksDBImmutableFile] = { - getDfsFileWithVersionCheck( + getDfsFileWithIncompatibilityCheck( fileManager, localFileName, // We can't reuse the current local file since it was added in the same or newer version @@ -1380,8 +1380,9 @@ class RocksDBFileMapping { /** * Get the mapped DFS file for the given local file for a DFS save (i.e. checkpoint) operation. * If the currently mapped DFS file was mapped in the same or newer version as the version we - * want to save (or was generated in a version which has not been uploaded to DFS yet), - * the mapped DFS file is ignored. In this scenario, the local mapping to this DFS file + * want to save (or was generated in a version which has not been uploaded to DFS yet) + * or the mapped dfs file isn't the same size as the local file, + * then the mapped DFS file is ignored. In this scenario, the local mapping to this DFS file * will be cleared, and function will return None. * * @note If the file was added in current version (i.e. versionToSave - 1), we can reuse it. @@ -1394,7 +1395,7 @@ class RocksDBFileMapping { fileManager: RocksDBFileManager, localFile: File, versionToSave: Long): Option[RocksDBImmutableFile] = { - getDfsFileWithVersionCheck( + getDfsFileWithIncompatibilityCheck( fileManager, localFile.getName, (dfsFileVersion, dfsFile) => @@ -1404,7 +1405,7 @@ class RocksDBFileMapping { ) } - private def getDfsFileWithVersionCheck( + private def getDfsFileWithIncompatibilityCheck( fileManager: RocksDBFileManager, localFileName: String, isIncompatible: (Long, RocksDBImmutableFile) => Boolean): Option[RocksDBImmutableFile] = { From e6be1526cf35af31c9f2661abcd7050d4789380d Mon Sep 17 00:00:00 2001 From: micheal-o Date: Mon, 7 Apr 2025 22:26:11 -0700 Subject: [PATCH 3/3] log file mapping --- .../src/main/scala/org/apache/spark/internal/LogKey.scala | 1 + .../spark/sql/execution/streaming/state/RocksDB.scala | 6 ++++++ .../sql/execution/streaming/state/RocksDBFileManager.scala | 4 ++++ 3 files changed, 11 insertions(+) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 318f32c52b904..fe2be9b39f328 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -704,6 +704,7 @@ private[spark] object LogKeys { case object RIGHT_EXPR extends LogKey case object RIGHT_LOGICAL_PLAN_STATS_SIZE_IN_BYTES extends LogKey case object RMSE extends LogKey + case object ROCKS_DB_FILE_MAPPING extends LogKey case object ROCKS_DB_LOG_LEVEL extends LogKey case object ROCKS_DB_LOG_MESSAGE extends LogKey case object RPC_ADDRESS extends LogKey diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 2ba0a7e35b743..c53756cc55459 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -885,6 +885,10 @@ class RocksDB( val (dfsFileSuffix, immutableFileMapping) = rocksDBFileMapping.createSnapshotFileMapping( fileManager, checkpointDir, version) + logInfo(log"RocksDB file mapping after creating snapshot file mapping for version " + + log"${MDC(LogKeys.VERSION_NUM, version)}:\n" + + log"${MDC(LogKeys.ROCKS_DB_FILE_MAPPING, rocksDBFileMapping)}") + val newSnapshot = Some(RocksDBSnapshot( checkpointDir, version, @@ -1352,6 +1356,8 @@ class RocksDBFileMapping { snapshotsPendingUpload.clear() } + override def toString: String = localFileMappings.toString() + /** * Get the mapped DFS file for the given local file for a DFS load operation. * If the currently mapped DFS file was mapped in the same or newer version as the version we diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 2fc8617aa67b2..eb533dd796072 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -322,6 +322,10 @@ class RocksDBFileManager( } logFilesInDir(localDir, log"Loaded checkpoint files " + log"for version ${MDC(LogKeys.VERSION_NUM, version)}") + logInfo(log"RocksDB file mapping after loading checkpoint version " + + log"${MDC(LogKeys.VERSION_NUM, version)} from DFS:\n" + + log"${MDC(LogKeys.ROCKS_DB_FILE_MAPPING, rocksDBFileMapping)}") + metadata }