Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51714][SS] Add Failure Ingestion test to test state store checkpoint format V2 #50508

Closed
wants to merge 12 commits into from

Conversation

siying
Copy link
Contributor

@siying siying commented Apr 3, 2025

Why are the changes needed?

The new state store checkpoint format needs failure tolerance tests to make sure the implementation is correct and delivers the behavior we would like.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

It is test code itself

Was this patch authored or co-authored using generative AI tooling?

No.

@siying siying force-pushed the ingest_failure9 branch from 28fb3d8 to a4656e8 Compare April 3, 2025 18:48
@@ -135,7 +135,23 @@ class RocksDB(
private val nativeStats = rocksDbOptions.statistics()

private val workingDir = createTempDir("workingDir")
private val fileManager = new RocksDBFileManager(dfsRootDir, createTempDir("fileManager"),

protected def CreateFileManager(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: createFileManager

@@ -132,8 +132,12 @@ class RocksDBFileManager(

import RocksDBImmutableFile._

protected def GetFileSystem(myDfsRootDir: String, myHadoopConf: Configuration) : FileSystem = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: getFileSystem

@@ -524,13 +524,33 @@ private[sql] class RocksDBStateStoreProvider
@volatile private var stateStoreEncoding: String = _
@volatile private var stateSchemaProvider: Option[StateSchemaProvider] = _

protected def CreateRocksDB(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move all these functions to start with lower case ?

import org.apache.spark.sql.execution.streaming.CheckpointFileManager.{CancellableFSDataOutputStream, RenameBasedFSDataOutputStream}
import org.apache.spark.sql.execution.streaming.FileSystemBasedCheckpointFileManager

/** A wrapper file output stream that will throw exception in close() and put the underlying
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: lets follow the comment style ?

stopMaintenanceTaskNoLock()
}

/** Only used for unit tests. The function doesn't hold lockloadedProviders. Calling
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: doesn't hold the loadedProviders lock


/** Only used for unit tests. The function doesn't hold lockloadedProviders. Calling
* it can work-around a deadlock condition where a maintenance task is waiting for the lock */
private[streaming] def stopMaintenanceTaskNoLock(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: stopMaintenanceTaskWithoutLock

override def close(): Unit = {
if (!closed) {
closed = true
FailureInjectionFileSystem.delayedStreams =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are adding to a singleton here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is a singleton.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating singleton fields within this class feels a bit odd. Do you think we can refactor a bit ?

Copy link
Contributor Author

@siying siying Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anishshri-db it's very hard to access any of these objects from the test code, as they are created deep in the state store code as we are operating on the DB level. Static variables are the best way of communicating between unit test and file system wrapper in deep level that I can think of. What's the main concern of using singleton here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm ok - wonder if there is some way to be keep this test/class local. but if not, its fine. also cc - @HeartSaVioR - in case he has seen other patterns for similar cases before

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the singleton is not reused among suites, it's probably fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I'm concerned the status of object instance are "shared" and there is no reset to make sure the test is isolated.

Shall we have a clear() method to reset the status, and ensure tests relying on this object to call that method in prior?

Also, I'd feel more comfort to move the content of this file to be in RocksDBCheckpointFailureInjectionSuite, since the object instance is "unsafe" to be used among test suites. (There are some environments where multiple test suites can be executed in parallel.)

}
}

/** Contains a list of variables for failure ingestion conditions */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we add some more details here ?

override def open(f: Path, bufferSize: Int): FSDataInputStream = innerFs.open(f, bufferSize)

override def create(
f: Path,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent seems off ?

override def delete(f: Path, recursive: Boolean): Boolean = innerFs.delete(f, recursive)

override def listStatus(f: Path): Array[FileStatus] = {
innerFs.listStatus(f)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could move to same line to be consistent ?

/** A rapper RocksDB State Store Provider that replaces FileSystem used in RocksDBFileManager
* to FailureInjectionFileSystem. */
class FailureInjectionRocksDBStateStoreProvider extends RocksDBStateStoreProvider {
override def CreateRocksDB(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: createRocksDB

/** RocksDBFieManager is created by RocksDB class where it creates a default FileSystem.
* we made RocksDB create a RocksDBFileManager but a different FileSystem here. */
def createRocksDBWithFaultInjection(
dfsRootDir: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent seems off ?

} else {
ret
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra newline ?


override def renameTempFile(srcPath: Path, dstPath: Path,
overwriteIfPossible: Boolean): Unit = {
if (FailureInjectionFileSystem.allowOverwriteInRename || !fs.exists(dstPath)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent seems off ?

partitionId = partitionId
) {
override def createFileManager(
dfsRootDir: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

) {
override def getFileSystem(
myDfsRootDir: String,
myHadoopConf: Configuration): FileSystem = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

object FailureInjectionRocksDBStateStoreProvider {
/**
* RocksDBFieManager is created by RocksDB class where it creates a default FileSystem.
* we made RocksDB create a RocksDBFileManager but a different FileSystem here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We make RocksDB create a RocksDBFileManager that uses a different FileSystem here


@SlowSQLTest
/**
* Test suite to ingest some failures in RocksDB checkpoint */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: inject some failures ?

* be killed or write zip file. Only after the later one is successfully committed, it comes back
* and write the zip file.
* */
test("Zip File Overwritten by Previous Task Checkpoint V2") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this largely the same as prev test - only for checkpoint v2 ? should we parameterize into 1 test ?

* be killed or write changelog file. Only after the later one is successfully committed, it come
* back and write the changelog file.
* */
test("Changelog File Overwritten by Previous Task With Changelog Checkpoint V2") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here ?

* 2. The batch eventually failed
* 3. Query is retried and moved forward
* 4. The snapshot checkpoint succeeded
* In checkpoint V2, this snapshot shouldn't take effective. Otherwise, it will break the strong
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shouldn't take effect

}
}

test("Basic RocksDB Zip File Upload Failure Handling") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dont we need to run test with both checkpoint v1 and v2 ?


/**
* This test is to simulate the case where a previous task had connectivity problem that couldn't
* be killed or write changelog file. Only after the later one is successfully committed, it come
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comes back and writes the

db.put("foo", "bar")
checkpointId3 = commitAndGetCheckpointId(db)

db2.doMaintenance()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test with varying maintenance intervals ? i.e. where maintenance is configured to run very often vs very rarely ? maybe not for each case - but for some of the most common test cases here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have it. Maybe do it as a follow up when we have time later.

}

/**
* An integreated test to cover this scenario:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: An integrated test

}

/**
* A wrapper checkpoint file manager that might inject functions in some function calls.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: inject failures in some function calls ?

* This can be put into SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS to provide failure
* injection behavior.
*
* @param path
Copy link
Contributor

@anishshri-db anishshri-db Apr 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you need to add comments for the args ?

Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm pending comments being addressed. Thx

@siying
Copy link
Contributor Author

siying commented Apr 6, 2025

Thanks @anishshri-db for review and approval. @HeartSaVioR can you help review and merge it?

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will review from L385 of the last test suite file.

override def close(): Unit = {
if (!closed) {
closed = true
FailureInjectionFileSystem.delayedStreams =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I'm concerned the status of object instance are "shared" and there is no reset to make sure the test is isolated.

Shall we have a clear() method to reset the status, and ensure tests relying on this object to call that method in prior?

Also, I'd feel more comfort to move the content of this file to be in RocksDBCheckpointFailureInjectionSuite, since the object instance is "unsafe" to be used among test suites. (There are some environments where multiple test suites can be executed in parallel.)

* RocksDBFieManager is created by RocksDB class where it creates a default FileSystem.
* We make RocksDB create a RocksDBFileManager that uses a different FileSystem here.
* */
def createRocksDBWithFaultInjection(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: any reason this has to be placed in different object / method? If this is only used once in createRocksDB, I'm not sure this is quite different from inlining.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually used twice. One in FailureInjectionRocksDBStateStoreProvider.createRocksDB() and one in withDBT

object FailureInjectionFileSystem {
// File names matching this regex will cause the copyFromLocalFile to fail
var failPreCopyFromLocalFileNameRegex: Seq[String] = Seq.empty
// File names matching this regex will cause the createAtomic to fail and put the streams in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is it close() rather than createAtomic()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@siying Could you please revisit this?


@SlowSQLTest
/**
* Test suite to inject some failures in RocksDB checkpoint */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please use /** ... */ in the same line for one-liner classdoc.


override def beforeEach(): Unit = {
super.beforeEach()
FailureInjectionFileSystem.failPreCopyFromLocalFileNameRegex = Seq.empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: likewise I said, shall we just have a method to clear everything instead of doing here?

ex,
condition = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE",
parameters = Map(
"fileToRead" -> s"$remoteDir/2.changelog"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait... didn't we disable changelog checkpointing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I think that's always the error we throw, even if changelog is disabled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK that is odd. @anishshri-db Would you mind taking a look?

}
}

Seq(false, true).foreach { ifEnableStateStoreCheckpointIds =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too similar with the above test except the pattern of file we want to block uploading. Shall we parameterize and dedup the code?

test(
"Zip File Overwritten by Previous Task Checkpoint " +
s"ifEnableStateStoreCheckpointIds = $ifEnableStateStoreCheckpointIds") {
val fmClass = "org.apache.spark.sql.execution.streaming.state." +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

// This validation isn't necessary here but we just would like to make sure
// FailureInjectionCheckpointFileManager has correct behavior -- allows zip files
// to be delayed to be written, so that the test for
// ifEnableStateStoreCheckpointIds = trrue is valid.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not trrue

* consistency guaranteed by V2.
*/
test("Delay Snapshot V2") {
val fmClass = "org.apache.spark.sql.execution.streaming.state." +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass

*/
Seq(false, true).foreach { ifAllowRenameOverwrite =>
test(s"Job failure with changelog shows up ifAllowRenameOverwrite = $ifAllowRenameOverwrite") {
val fmClass = "org.apache.spark.sql.execution.streaming.state." +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

additionalConfs = Map(
rocksdbChangelogCheckpointingConfKey -> "true",
SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2",
STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> fmClass)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is parent here? (I haven't realized that it had been referred as parent for other occurrences).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied it from other test suites. tests such as CheckpointFileManagerSuite, RocksDBSuite and other several tests are all doing this.

ExpectFailure[SparkException] { ex =>
ex.getCause.getMessage.contains("CANNOT_WRITE_STATE_STORE.CANNOT_COMMIT")
},
AddData(inputData, 3, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional to add more data after the query is expected to fail? Because it could be just confusing and you can just do inputData.addData() outside of testStream.

* and validate that the snapshot checkpoint is not used in subsequent query restart.
*/
test("Previous Maintenance Snapshot Checkpoint Overwrite") {
val fmClass = "org.apache.spark.sql.execution.streaming.state." +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

CheckAnswer((3, 1)),
AddData(inputData, 3, 2),
AddData(inputData, 3, 1),
CheckAnswer((3, 1), (3, 3), (2, 1), (1, 1)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd rather use CheckNewAnswer to avoid accumulation of results.

Copy link
Contributor Author

@siying siying left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed most comments.

* RocksDBFieManager is created by RocksDB class where it creates a default FileSystem.
* We make RocksDB create a RocksDBFileManager that uses a different FileSystem here.
* */
def createRocksDBWithFaultInjection(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually used twice. One in FailureInjectionRocksDBStateStoreProvider.createRocksDB() and one in withDBT

ex,
condition = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE",
parameters = Map(
"fileToRead" -> s"$remoteDir/2.changelog"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I think that's always the error we throw, even if changelog is disabled.

additionalConfs = Map(
rocksdbChangelogCheckpointingConfKey -> "true",
SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2",
STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> fmClass)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied it from other test suites. tests such as CheckpointFileManagerSuite, RocksDBSuite and other several tests are all doing this.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

…ng/state/FailureInjectionCheckpointFileManager.scala


review comment
@HeartSaVioR
Copy link
Contributor

507c30c

This passed CI, and my new commit does not change anything except code comment.

eb3ce1e

Hence I'll consider this as CI passed.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@HeartSaVioR HeartSaVioR changed the title [SPARK-51714][SS] Add Failure Ingestion test to test state store checkpoint format V2 [SPARK-51714][SS][TESTS] Add Failure Ingestion test to test state store checkpoint format V2 Apr 9, 2025
@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Apr 9, 2025

Thanks! Merging to master /4.0 (this wouldn't be risky since it's test only change).

The main change is the test, but we also change the main code, so I can't say this is not risky.

@HeartSaVioR HeartSaVioR changed the title [SPARK-51714][SS][TESTS] Add Failure Ingestion test to test state store checkpoint format V2 [SPARK-51714][SS] Add Failure Ingestion test to test state store checkpoint format V2 Apr 9, 2025
@siying
Copy link
Contributor Author

siying commented Apr 9, 2025

@HeartSaVioR thank you so much for your review and modification before pushing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants