Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51717][SS][RocksDB] Fix SST mismatch corruption that can happen for second snapshot created for a new query #50512

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ private[spark] object LogKeys {
case object RIGHT_EXPR extends LogKey
case object RIGHT_LOGICAL_PLAN_STATS_SIZE_IN_BYTES extends LogKey
case object RMSE extends LogKey
case object ROCKS_DB_FILE_MAPPING extends LogKey
case object ROCKS_DB_LOG_LEVEL extends LogKey
case object ROCKS_DB_LOG_MESSAGE extends LogKey
case object RPC_ADDRESS extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,10 @@ class RocksDB(

val (dfsFileSuffix, immutableFileMapping) = rocksDBFileMapping.createSnapshotFileMapping(
fileManager, checkpointDir, version)
logInfo(log"RocksDB file mapping after creating snapshot file mapping for version " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@micheal-o
How much size of this message in general? We had a PR which reduced the log for RocksDB state store provider, and I wouldn't like to re-introduce the issue. If this is considerably huge, we'd need to make this be debug level.
cc. @anishshri-db

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR this is logged only when we create a snapshot (e.g. 1 per 10 batches), hence i don't expect it to be noisy or a lot. The content is just list of local SST files and what DFS files they are mapped to. From past experience this is also not typically a lot. Maybe, on average, should be less than 10 - 20 SST files.

This would really make debugging this type of issue easier and faster.

log"${MDC(LogKeys.VERSION_NUM, version)}:\n" +
log"${MDC(LogKeys.ROCKS_DB_FILE_MAPPING, rocksDBFileMapping)}")

val newSnapshot = Some(RocksDBSnapshot(
checkpointDir,
version,
Expand Down Expand Up @@ -1344,6 +1348,16 @@ class RocksDBFileMapping {
// from reusing SST files which have not been yet persisted to DFS,
val snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo] = ConcurrentHashMap.newKeySet()

/**
* Clear everything stored in the file mapping.
*/
def clear(): Unit = {
localFileMappings.clear()
snapshotsPendingUpload.clear()
}

override def toString: String = localFileMappings.toString()

/**
* Get the mapped DFS file for the given local file for a DFS load operation.
* If the currently mapped DFS file was mapped in the same or newer version as the version we
Expand All @@ -1360,14 +1374,21 @@ class RocksDBFileMapping {
fileManager: RocksDBFileManager,
localFileName: String,
versionToLoad: Long): Option[RocksDBImmutableFile] = {
getDfsFileWithVersionCheck(fileManager, localFileName, _ >= versionToLoad)
getDfsFileWithIncompatibilityCheck(
fileManager,
localFileName,
// We can't reuse the current local file since it was added in the same or newer version
// as the version we want to load
(fileVersion, _) => fileVersion >= versionToLoad
)
}

/**
* Get the mapped DFS file for the given local file for a DFS save (i.e. checkpoint) operation.
* If the currently mapped DFS file was mapped in the same or newer version as the version we
* want to save (or was generated in a version which has not been uploaded to DFS yet),
* the mapped DFS file is ignored. In this scenario, the local mapping to this DFS file
* want to save (or was generated in a version which has not been uploaded to DFS yet)
* or the mapped dfs file isn't the same size as the local file,
* then the mapped DFS file is ignored. In this scenario, the local mapping to this DFS file
* will be cleared, and function will return None.
*
* @note If the file was added in current version (i.e. versionToSave - 1), we can reuse it.
Expand All @@ -1378,19 +1399,26 @@ class RocksDBFileMapping {
*/
private def getDfsFileForSave(
fileManager: RocksDBFileManager,
localFileName: String,
localFile: File,
versionToSave: Long): Option[RocksDBImmutableFile] = {
getDfsFileWithVersionCheck(fileManager, localFileName, _ >= versionToSave)
getDfsFileWithIncompatibilityCheck(
fileManager,
localFile.getName,
(dfsFileVersion, dfsFile) =>
// The DFS file is not the same as the file we want to save, either if
// the DFS file was added in the same or higher version, or the file size is different
dfsFileVersion >= versionToSave || dfsFile.sizeBytes != localFile.length()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make clear: would dfsFile.sizeBytes make another call to remote FS, or will it contain the info already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already contains the info in dfsFile. See RocksDBSstFile case class.

)
}

private def getDfsFileWithVersionCheck(
private def getDfsFileWithIncompatibilityCheck(
fileManager: RocksDBFileManager,
localFileName: String,
isIncompatibleVersion: Long => Boolean): Option[RocksDBImmutableFile] = {
isIncompatible: (Long, RocksDBImmutableFile) => Boolean): Option[RocksDBImmutableFile] = {
localFileMappings.get(localFileName).map { case (dfsFileMappedVersion, dfsFile) =>
val dfsFileSuffix = fileManager.dfsFileSuffix(dfsFile)
val versionSnapshotInfo = RocksDBVersionSnapshotInfo(dfsFileMappedVersion, dfsFileSuffix)
if (isIncompatibleVersion(dfsFileMappedVersion) ||
if (isIncompatible(dfsFileMappedVersion, dfsFile) ||
snapshotsPendingUpload.contains(versionSnapshotInfo)) {
// the mapped dfs file cannot be used, delete from mapping
remove(localFileName)
Expand Down Expand Up @@ -1432,7 +1460,7 @@ class RocksDBFileMapping {
val dfsFilesSuffix = UUID.randomUUID().toString
val snapshotFileMapping = localImmutableFiles.map { f =>
val localFileName = f.getName
val existingDfsFile = getDfsFileForSave(fileManager, localFileName, version)
val existingDfsFile = getDfsFileForSave(fileManager, f, version)
val dfsFile = existingDfsFile.getOrElse {
val newDfsFileName = fileManager.newDFSFileName(localFileName, dfsFilesSuffix)
val newDfsFile = RocksDBImmutableFile(localFileName, newDfsFileName, sizeBytes = f.length())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ class RocksDBFileManager(
val metadata = if (version == 0) {
if (localDir.exists) Utils.deleteRecursively(localDir)
localDir.mkdirs()
// Since we cleared the local dir, we should also clear the local file mapping
rocksDBFileMapping.clear()
RocksDBCheckpointMetadata(Seq.empty, 0)
} else {
// Delete all non-immutable files in local dir, and unzip new ones from DFS commit file
Expand All @@ -320,6 +322,10 @@ class RocksDBFileManager(
}
logFilesInDir(localDir, log"Loaded checkpoint files " +
log"for version ${MDC(LogKeys.VERSION_NUM, version)}")
logInfo(log"RocksDB file mapping after loading checkpoint version " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto about the length of log message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as my other comment above. For this one, we would only log it on load. Which would happen less frequently in typical case. For situations where there is very frequent load, then this log would be useful because that is when we typically see issues.

log"${MDC(LogKeys.VERSION_NUM, version)} from DFS:\n" +
log"${MDC(LogKeys.ROCKS_DB_FILE_MAPPING, rocksDBFileMapping)}")

metadata
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3031,6 +3031,52 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
}
}

testWithChangelogCheckpointingEnabled(
"SPARK-51717 - validate that RocksDB file mapping is cleared " +
"when we reload version 0 after we have created a snapshot to avoid SST mismatch") {
withTempDir { dir =>
val conf = dbConf.copy(minDeltasForSnapshot = 2)
val hadoopConf = new Configuration()
val remoteDir = dir.getCanonicalPath
withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db =>
db.load(0)
db.put("a", "1")
db.put("b", "1")
db.commit()

db.load(1)
db.put("a", "1")
db.commit() // we will create a snapshot for v2

// invalidate the db, so next load will reload from dfs
db.rollback()

// We will replay changelog from 0 -> 2 since the v2 snapshot haven't been uploaded yet.
// We had a bug where file mapping is not being cleared when we start from v0 again,
// hence files of v2 snapshot were being reused, if v2 snapshot is uploaded
// after this load(2) but before v3 snapshot
db.load(2)
// add a larger row to make sure new sst size is different
db.put("b", "1555315569874537247638950872648")

// now upload v2 snapshot
db.doMaintenance()

// we will create a snapshot for v3. We shouldn't reuse files of v2 snapshot,
// given that v3 was not created from v2 snapshot since we replayed changelog from 0 -> 2
db.commit()

db.doMaintenance() // upload v3 snapshot

// invalidate the db, so next load will reload from dfs
db.rollback()

// loading v3 from dfs should be successful and no SST mismatch error
db.load(3)
}
}
}

test("ensure local files deleted on filesystem" +
" are cleaned from dfs file mapping") {
def getSSTFiles(dir: File): Set[File] = {
Expand Down