Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51717][SS][RocksDB] Fix SST mismatch corruption that can happen for second snapshot created for a new query #50512

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1344,6 +1344,14 @@ class RocksDBFileMapping {
// from reusing SST files which have not been yet persisted to DFS,
val snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo] = ConcurrentHashMap.newKeySet()

/**
* Clear everything stored in the file mapping.
*/
def clear(): Unit = {
localFileMappings.clear()
snapshotsPendingUpload.clear()
}

/**
* Get the mapped DFS file for the given local file for a DFS load operation.
* If the currently mapped DFS file was mapped in the same or newer version as the version we
Expand All @@ -1360,7 +1368,13 @@ class RocksDBFileMapping {
fileManager: RocksDBFileManager,
localFileName: String,
versionToLoad: Long): Option[RocksDBImmutableFile] = {
getDfsFileWithVersionCheck(fileManager, localFileName, _ >= versionToLoad)
getDfsFileWithVersionCheck(
fileManager,
localFileName,
// We can't reuse the current local file since it was added in the same or newer version
// as the version we want to load
(fileVersion, _) => fileVersion >= versionToLoad
)
}

/**
Expand All @@ -1378,19 +1392,26 @@ class RocksDBFileMapping {
*/
private def getDfsFileForSave(
fileManager: RocksDBFileManager,
localFileName: String,
localFile: File,
versionToSave: Long): Option[RocksDBImmutableFile] = {
getDfsFileWithVersionCheck(fileManager, localFileName, _ >= versionToSave)
getDfsFileWithVersionCheck(
fileManager,
localFile.getName,
(dfsFileVersion, dfsFile) =>
// The DFS file is not the same as the file we want to save, either if
// the DFS file was added in the same or higher version, or the file size is different
dfsFileVersion >= versionToSave || dfsFile.sizeBytes != localFile.length()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make clear: would dfsFile.sizeBytes make another call to remote FS, or will it contain the info already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already contains the info in dfsFile. See RocksDBSstFile case class.

)
}

private def getDfsFileWithVersionCheck(
fileManager: RocksDBFileManager,
localFileName: String,
isIncompatibleVersion: Long => Boolean): Option[RocksDBImmutableFile] = {
isIncompatible: (Long, RocksDBImmutableFile) => Boolean): Option[RocksDBImmutableFile] = {
localFileMappings.get(localFileName).map { case (dfsFileMappedVersion, dfsFile) =>
val dfsFileSuffix = fileManager.dfsFileSuffix(dfsFile)
val versionSnapshotInfo = RocksDBVersionSnapshotInfo(dfsFileMappedVersion, dfsFileSuffix)
if (isIncompatibleVersion(dfsFileMappedVersion) ||
if (isIncompatible(dfsFileMappedVersion, dfsFile) ||
snapshotsPendingUpload.contains(versionSnapshotInfo)) {
// the mapped dfs file cannot be used, delete from mapping
remove(localFileName)
Expand Down Expand Up @@ -1432,7 +1453,7 @@ class RocksDBFileMapping {
val dfsFilesSuffix = UUID.randomUUID().toString
val snapshotFileMapping = localImmutableFiles.map { f =>
val localFileName = f.getName
val existingDfsFile = getDfsFileForSave(fileManager, localFileName, version)
val existingDfsFile = getDfsFileForSave(fileManager, f, version)
val dfsFile = existingDfsFile.getOrElse {
val newDfsFileName = fileManager.newDFSFileName(localFileName, dfsFilesSuffix)
val newDfsFile = RocksDBImmutableFile(localFileName, newDfsFileName, sizeBytes = f.length())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ class RocksDBFileManager(
val metadata = if (version == 0) {
if (localDir.exists) Utils.deleteRecursively(localDir)
localDir.mkdirs()
// Since we cleared the local dir, we should also clear the local file mapping
rocksDBFileMapping.clear()
RocksDBCheckpointMetadata(Seq.empty, 0)
} else {
// Delete all non-immutable files in local dir, and unzip new ones from DFS commit file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3031,6 +3031,52 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
}
}

testWithChangelogCheckpointingEnabled(
"SPARK-51717 - validate that RocksDB file mapping is cleared " +
"when we reload version 0 after we have created a snapshot to avoid SST mismatch") {
withTempDir { dir =>
val conf = dbConf.copy(minDeltasForSnapshot = 2)
val hadoopConf = new Configuration()
val remoteDir = dir.getCanonicalPath
withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db =>
db.load(0)
db.put("a", "1")
db.put("b", "1")
db.commit()

db.load(1)
db.put("a", "1")
db.commit() // we will create a snapshot for v2

// invalidate the db, so next load will reload from dfs
db.rollback()

// We will replay changelog from 0 -> 2 since the v2 snapshot haven't been uploaded yet.
// We had a bug where file mapping is not being cleared when we start from v0 again,
// hence files of v2 snapshot were being reused, if v2 snapshot is uploaded
// after this load(2) but before v3 snapshot
db.load(2)
// add a larger row to make sure new sst size is different
db.put("b", "1555315569874537247638950872648")

// now upload v2 snapshot
db.doMaintenance()

// we will create a snapshot for v3. We shouldn't reuse files of v2 snapshot,
// given that v3 was not created from v2 snapshot since we replayed changelog from 0 -> 2
db.commit()

db.doMaintenance() // upload v3 snapshot

// invalidate the db, so next load will reload from dfs
db.rollback()

// loading v3 from dfs should be successful and no SST mismatch error
db.load(3)
}
}
}

test("ensure local files deleted on filesystem" +
" are cleaned from dfs file mapping") {
def getSSTFiles(dir: File): Set[File] = {
Expand Down