Skip to content

Commit db3e5e2

Browse files
authored
Rework on KAFKA-3968: fsync the parent directory of a segment file when the file is created (#10680)
(reverted #10405). #10405 has several issues, for example: It fails to create a topic with 9000 partitions. It flushes in several unnecessary places. If multiple segments of the same partition are flushed at roughly the same time, we may end up doing multiple unnecessary flushes: the logic of handling the flush in LogSegments.scala is weird. Kafka does not call fsync() on the directory when a new log segment is created and flushed to disk. The problem is that following sequence of calls doesn't guarantee file durability: fd = open("log", O_RDWR | O_CREATE); // suppose open creates "log" write(fd); fsync(fd); If the system crashes after fsync() but before the parent directory has been flushed to disk, the log file can disappear. This PR is to flush the directory when flush() is called for the first time. Did performance test which shows this PR has a minimal performance impact on Kafka clusters. Reviewers: Jun Rao <[email protected]>
1 parent 29c55fd commit db3e5e2

File tree

5 files changed

+25
-42
lines changed

5 files changed

+25
-42
lines changed

clients/src/main/java/org/apache/kafka/common/record/FileRecords.java

-7
Original file line numberDiff line numberDiff line change
@@ -197,13 +197,6 @@ public void flush() throws IOException {
197197
channel.force(true);
198198
}
199199

200-
/**
201-
* Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing.
202-
*/
203-
public void flushParentDir() throws IOException {
204-
Utils.flushParentDir(file.toPath());
205-
}
206-
207200
/**
208201
* Close this record set
209202
*/

clients/src/main/java/org/apache/kafka/common/utils/Utils.java

+11-12
Original file line numberDiff line numberDiff line change
@@ -902,27 +902,26 @@ public static void atomicMoveWithFallback(Path source, Path target, boolean need
902902
}
903903
} finally {
904904
if (needFlushParentDir) {
905-
flushParentDir(target);
905+
flushDir(target.toAbsolutePath().normalize().getParent());
906906
}
907907
}
908908
}
909909

910910
/**
911-
* Flushes the parent directory to guarantee crash consistency.
911+
* Flushes dirty directories to guarantee crash consistency.
912912
*
913-
* @throws IOException if flushing the parent directory fails.
913+
* @throws IOException if flushing the directory fails.
914914
*/
915-
public static void flushParentDir(Path path) throws IOException {
916-
FileChannel dir = null;
917-
try {
918-
Path parent = path.toAbsolutePath().getParent();
919-
if (parent != null) {
920-
dir = FileChannel.open(parent, StandardOpenOption.READ);
915+
public static void flushDir(Path path) throws IOException {
916+
if (path != null) {
917+
FileChannel dir = null;
918+
try {
919+
dir = FileChannel.open(path, StandardOpenOption.READ);
921920
dir.force(true);
921+
} finally {
922+
if (dir != null)
923+
dir.close();
922924
}
923-
} finally {
924-
if (dir != null)
925-
dir.close();
926925
}
927926
}
928927

core/src/main/scala/kafka/log/Log.scala

+7-3
Original file line numberDiff line numberDiff line change
@@ -1676,7 +1676,10 @@ class Log(@volatile private var _dir: File,
16761676
if (offset > this.recoveryPoint) {
16771677
debug(s"Flushing log up to offset $offset, last flushed: $lastFlushTime, current time: ${time.milliseconds()}, " +
16781678
s"unflushed: $unflushedMessages")
1679-
logSegments(this.recoveryPoint, offset).foreach(_.flush())
1679+
val segments = logSegments(this.recoveryPoint, offset)
1680+
segments.foreach(_.flush())
1681+
// if there are any new segments, we need to flush the parent directory for crash consistency
1682+
segments.lastOption.filter(_.baseOffset >= this.recoveryPoint).foreach(_ => Utils.flushDir(dir.toPath))
16801683

16811684
lock synchronized {
16821685
checkIfMemoryMappedBufferClosed()
@@ -2312,7 +2315,7 @@ object Log extends Logging {
23122315
// need to do this in two phases to be crash safe AND do the delete asynchronously
23132316
// if we crash in the middle of this we complete the swap in loadSegments()
23142317
if (!isRecoveredSwapFile)
2315-
sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix, false))
2318+
sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix))
23162319
sortedNewSegments.reverse.foreach(existingSegments.add(_))
23172320
val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet
23182321

@@ -2335,6 +2338,7 @@ object Log extends Logging {
23352338
}
23362339
// okay we are safe now, remove the swap suffix
23372340
sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))
2341+
Utils.flushDir(dir.toPath)
23382342
}
23392343

23402344
/**
@@ -2369,7 +2373,7 @@ object Log extends Logging {
23692373
scheduler: Scheduler,
23702374
logDirFailureChannel: LogDirFailureChannel,
23712375
producerStateManager: ProducerStateManager): Unit = {
2372-
segmentsToDelete.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix, false))
2376+
segmentsToDelete.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))
23732377

23742378
def deleteSegments(): Unit = {
23752379
info(s"Deleting segment files ${segmentsToDelete.mkString(",")}")

core/src/main/scala/kafka/log/LogManager.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ class LogManager(logDirs: Seq[File],
150150
val created = dir.mkdirs()
151151
if (!created)
152152
throw new IOException(s"Failed to create data directory ${dir.getAbsolutePath}")
153-
Utils.flushParentDir(dir.toPath)
153+
Utils.flushDir(dir.toPath.toAbsolutePath.normalize.getParent)
154154
}
155155
if (!dir.isDirectory || !dir.canRead)
156156
throw new IOException(s"${dir.getAbsolutePath} is not a readable log directory.")
@@ -640,6 +640,8 @@ class LogManager(logDirs: Seq[File],
640640
try {
641641
recoveryPointCheckpoints.get(logDir).foreach { checkpoint =>
642642
val recoveryOffsets = logsToCheckpoint.map { case (tp, log) => tp -> log.recoveryPoint }
643+
// checkpoint.write calls Utils.atomicMoveWithFallback, which flushes the parent
644+
// directory and guarantees crash consistency.
643645
checkpoint.write(recoveryOffsets)
644646
}
645647
} catch {
@@ -867,7 +869,6 @@ class LogManager(logDirs: Seq[File],
867869
val dir = new File(logDirPath, logDirName)
868870
try {
869871
Files.createDirectories(dir.toPath)
870-
Utils.flushParentDir(dir.toPath)
871872
Success(dir)
872873
} catch {
873874
case e: IOException =>

core/src/main/scala/kafka/log/LogSegment.scala

+4-18
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import java.io.{File, IOException}
2020
import java.nio.file.{Files, NoSuchFileException}
2121
import java.nio.file.attribute.FileTime
2222
import java.util.concurrent.TimeUnit
23-
import java.util.concurrent.atomic.AtomicBoolean
2423
import kafka.common.LogSegmentOffsetOverflowException
2524
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
2625
import kafka.server.epoch.LeaderEpochFileCache
@@ -51,7 +50,6 @@ import scala.math._
5150
* @param indexIntervalBytes The approximate number of bytes between entries in the index
5251
* @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time
5352
* @param time The time instance
54-
* @param needsFlushParentDir Whether or not we need to flush the parent directory during the first flush
5553
*/
5654
@nonthreadsafe
5755
class LogSegment private[log] (val log: FileRecords,
@@ -61,8 +59,7 @@ class LogSegment private[log] (val log: FileRecords,
6159
val baseOffset: Long,
6260
val indexIntervalBytes: Int,
6361
val rollJitterMs: Long,
64-
val time: Time,
65-
val needsFlushParentDir: Boolean = false) extends Logging {
62+
val time: Time) extends Logging {
6663

6764
def offsetIndex: OffsetIndex = lazyOffsetIndex.get
6865

@@ -98,9 +95,6 @@ class LogSegment private[log] (val log: FileRecords,
9895
/* the number of bytes since we last added an entry in the offset index */
9996
private var bytesSinceLastIndexEntry = 0
10097

101-
/* whether or not we need to flush the parent dir during the next flush */
102-
private val atomicNeedsFlushParentDir = new AtomicBoolean(needsFlushParentDir)
103-
10498
// The timestamp we used for time based log rolling and for ensuring max compaction delay
10599
// volatile for LogCleaner to see the update
106100
@volatile private var rollingBasedTimestamp: Option[Long] = None
@@ -478,9 +472,6 @@ class LogSegment private[log] (val log: FileRecords,
478472
offsetIndex.flush()
479473
timeIndex.flush()
480474
txnIndex.flush()
481-
// We only need to flush the parent of the log file because all other files share the same parent
482-
if (atomicNeedsFlushParentDir.getAndSet(false))
483-
log.flushParentDir()
484475
}
485476
}
486477

@@ -499,14 +490,11 @@ class LogSegment private[log] (val log: FileRecords,
499490
* Change the suffix for the index and log files for this log segment
500491
* IOException from this method should be handled by the caller
501492
*/
502-
def changeFileSuffixes(oldSuffix: String, newSuffix: String, needsFlushParentDir: Boolean = true): Unit = {
493+
def changeFileSuffixes(oldSuffix: String, newSuffix: String): Unit = {
503494
log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
504495
lazyOffsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix)))
505496
lazyTimeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix)))
506497
txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)))
507-
// We only need to flush the parent of the log file because all other files share the same parent
508-
if (needsFlushParentDir)
509-
log.flushParentDir()
510498
}
511499

512500
/**
@@ -669,8 +657,7 @@ class LogSegment private[log] (val log: FileRecords,
669657
object LogSegment {
670658

671659
def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false,
672-
initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = "",
673-
needsRecovery: Boolean = false): LogSegment = {
660+
initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = {
674661
val maxIndexSize = config.maxIndexSize
675662
new LogSegment(
676663
FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate),
@@ -680,8 +667,7 @@ object LogSegment {
680667
baseOffset,
681668
indexIntervalBytes = config.indexInterval,
682669
rollJitterMs = config.randomSegmentJitter,
683-
time,
684-
needsFlushParentDir = needsRecovery || !fileAlreadyExists)
670+
time)
685671
}
686672

687673
def deleteIfExists(dir: File, baseOffset: Long, fileSuffix: String = ""): Unit = {

0 commit comments

Comments
 (0)