@@ -557,7 +557,7 @@ class Log(@volatile private var _dir: File,
557
557
}
558
558
559
559
private def initializeLeaderEpochCache (): Unit = lock synchronized {
560
- leaderEpochCache = Log .maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, recordVersion)
560
+ leaderEpochCache = Log .maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, recordVersion, logIdent )
561
561
}
562
562
563
563
private def updateLogEndOffset (offset : Long ): Unit = {
@@ -592,7 +592,7 @@ class Log(@volatile private var _dir: File,
592
592
producerStateManager : ProducerStateManager ): Unit = lock synchronized {
593
593
checkIfMemoryMappedBufferClosed()
594
594
Log .rebuildProducerState(producerStateManager, segments, logStartOffset, lastOffset, recordVersion, time,
595
- reloadFromCleanShutdown = false )
595
+ reloadFromCleanShutdown = false , logIdent )
596
596
}
597
597
598
598
def activeProducers : Seq [DescribeProducersResponseData .ProducerState ] = {
@@ -1888,14 +1888,14 @@ class Log(@volatile private var _dir: File,
1888
1888
1889
1889
private def deleteSegmentFiles (segments : Iterable [LogSegment ], asyncDelete : Boolean , deleteProducerStateSnapshots : Boolean = true ): Unit = {
1890
1890
Log .deleteSegmentFiles(segments, asyncDelete, deleteProducerStateSnapshots, dir, topicPartition,
1891
- config, scheduler, logDirFailureChannel, producerStateManager)
1891
+ config, scheduler, logDirFailureChannel, producerStateManager, this .logIdent )
1892
1892
}
1893
1893
1894
1894
private [log] def replaceSegments (newSegments : Seq [LogSegment ], oldSegments : Seq [LogSegment ], isRecoveredSwapFile : Boolean = false ): Unit = {
1895
1895
lock synchronized {
1896
1896
checkIfMemoryMappedBufferClosed()
1897
1897
Log .replaceSegments(segments, newSegments, oldSegments, isRecoveredSwapFile, dir, topicPartition,
1898
- config, scheduler, logDirFailureChannel, producerStateManager)
1898
+ config, scheduler, logDirFailureChannel, producerStateManager, this .logIdent )
1899
1899
}
1900
1900
}
1901
1901
@@ -1937,7 +1937,7 @@ class Log(@volatile private var _dir: File,
1937
1937
}
1938
1938
1939
1939
private [log] def splitOverflowedSegment (segment : LogSegment ): List [LogSegment ] = lock synchronized {
1940
- Log .splitOverflowedSegment(segment, segments, dir, topicPartition, config, scheduler, logDirFailureChannel, producerStateManager)
1940
+ Log .splitOverflowedSegment(segment, segments, dir, topicPartition, config, scheduler, logDirFailureChannel, producerStateManager, this .logIdent )
1941
1941
}
1942
1942
1943
1943
}
@@ -2005,7 +2005,12 @@ object Log extends Logging {
2005
2005
Files .createDirectories(dir.toPath)
2006
2006
val topicPartition = Log .parseTopicPartitionName(dir)
2007
2007
val segments = new LogSegments (topicPartition)
2008
- val leaderEpochCache = Log .maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
2008
+ val leaderEpochCache = Log .maybeCreateLeaderEpochCache(
2009
+ dir,
2010
+ topicPartition,
2011
+ logDirFailureChannel,
2012
+ config.messageFormatVersion.recordVersion,
2013
+ s " [Log partition= $topicPartition, dir= ${dir.getParent}] " )
2009
2014
val producerStateManager = new ProducerStateManager (topicPartition, dir, maxProducerIdExpirationMs)
2010
2015
val offsets = LogLoader .load(LoadLogParams (
2011
2016
dir,
@@ -2226,12 +2231,14 @@ object Log extends Logging {
2226
2231
* @param topicPartition The topic partition
2227
2232
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
2228
2233
* @param recordVersion The record version
2234
+ * @param logPrefix The logging prefix
2229
2235
* @return The new LeaderEpochFileCache instance (if created), none otherwise
2230
2236
*/
2231
2237
def maybeCreateLeaderEpochCache (dir : File ,
2232
2238
topicPartition : TopicPartition ,
2233
2239
logDirFailureChannel : LogDirFailureChannel ,
2234
- recordVersion : RecordVersion ): Option [LeaderEpochFileCache ] = {
2240
+ recordVersion : RecordVersion ,
2241
+ logPrefix : String ): Option [LeaderEpochFileCache ] = {
2235
2242
val leaderEpochFile = LeaderEpochCheckpointFile .newFile(dir)
2236
2243
2237
2244
def newLeaderEpochFileCache (): LeaderEpochFileCache = {
@@ -2246,7 +2253,7 @@ object Log extends Logging {
2246
2253
None
2247
2254
2248
2255
if (currentCache.exists(_.nonEmpty))
2249
- warn(s " Deleting non-empty leader epoch cache due to incompatible message format $recordVersion" )
2256
+ warn(s " ${logPrefix} Deleting non-empty leader epoch cache due to incompatible message format $recordVersion" )
2250
2257
2251
2258
Files .deleteIfExists(leaderEpochFile.toPath)
2252
2259
None
@@ -2293,6 +2300,7 @@ object Log extends Logging {
2293
2300
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
2294
2301
* @param producerStateManager The ProducerStateManager instance (if any) containing state associated
2295
2302
* with the existingSegments
2303
+ * @param logPrefix The logging prefix
2296
2304
*/
2297
2305
private [log] def replaceSegments (existingSegments : LogSegments ,
2298
2306
newSegments : Seq [LogSegment ],
@@ -2303,7 +2311,8 @@ object Log extends Logging {
2303
2311
config : LogConfig ,
2304
2312
scheduler : Scheduler ,
2305
2313
logDirFailureChannel : LogDirFailureChannel ,
2306
- producerStateManager : ProducerStateManager ): Unit = {
2314
+ producerStateManager : ProducerStateManager ,
2315
+ logPrefix : String ): Unit = {
2307
2316
val sortedNewSegments = newSegments.sortBy(_.baseOffset)
2308
2317
// Some old segments may have been removed from index and scheduled for async deletion after the caller reads segments
2309
2318
// but before this method is executed. We want to filter out those segments to avoid calling asyncDeleteSegment()
@@ -2332,7 +2341,8 @@ object Log extends Logging {
2332
2341
config,
2333
2342
scheduler,
2334
2343
logDirFailureChannel,
2335
- producerStateManager)
2344
+ producerStateManager,
2345
+ logPrefix)
2336
2346
}
2337
2347
// okay we are safe now, remove the swap suffix
2338
2348
sortedNewSegments.foreach(_.changeFileSuffixes(Log .SwapFileSuffix , " " ))
@@ -2359,7 +2369,7 @@ object Log extends Logging {
2359
2369
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
2360
2370
* @param producerStateManager The ProducerStateManager instance (if any) containing state associated
2361
2371
* with the existingSegments
2362
- *
2372
+ * @param logPrefix The logging prefix
2363
2373
* @throws IOException if the file can't be renamed and still exists
2364
2374
*/
2365
2375
private [log] def deleteSegmentFiles (segmentsToDelete : Iterable [LogSegment ],
@@ -2370,11 +2380,12 @@ object Log extends Logging {
2370
2380
config : LogConfig ,
2371
2381
scheduler : Scheduler ,
2372
2382
logDirFailureChannel : LogDirFailureChannel ,
2373
- producerStateManager : ProducerStateManager ): Unit = {
2383
+ producerStateManager : ProducerStateManager ,
2384
+ logPrefix : String ): Unit = {
2374
2385
segmentsToDelete.foreach(_.changeFileSuffixes(" " , Log .DeletedFileSuffix ))
2375
2386
2376
2387
def deleteSegments (): Unit = {
2377
- info(s " Deleting segment files ${segmentsToDelete.mkString(" ," )}" )
2388
+ info(s " ${logPrefix} Deleting segment files ${segmentsToDelete.mkString(" ," )}" )
2378
2389
val parentDir = dir.getParent
2379
2390
maybeHandleIOException(logDirFailureChannel, parentDir, s " Error while deleting segments for $topicPartition in dir $parentDir" ) {
2380
2391
segmentsToDelete.foreach { segment =>
@@ -2429,14 +2440,16 @@ object Log extends Logging {
2429
2440
* @param time The time instance used for checking the clock
2430
2441
* @param reloadFromCleanShutdown True if the producer state is being built after a clean shutdown,
2431
2442
* false otherwise.
2443
+ * @param logPrefix The logging prefix
2432
2444
*/
2433
2445
private [log] def rebuildProducerState (producerStateManager : ProducerStateManager ,
2434
2446
segments : LogSegments ,
2435
2447
logStartOffset : Long ,
2436
2448
lastOffset : Long ,
2437
2449
recordVersion : RecordVersion ,
2438
2450
time : Time ,
2439
- reloadFromCleanShutdown : Boolean ): Unit = {
2451
+ reloadFromCleanShutdown : Boolean ,
2452
+ logPrefix : String ): Unit = {
2440
2453
val allSegments = segments.values
2441
2454
val offsetsToSnapshot =
2442
2455
if (allSegments.nonEmpty) {
@@ -2445,7 +2458,7 @@ object Log extends Logging {
2445
2458
} else {
2446
2459
Seq (Some (lastOffset))
2447
2460
}
2448
- info(s " Loading producer state till offset $lastOffset with message format version ${recordVersion.value}" )
2461
+ info(s " ${logPrefix} Loading producer state till offset $lastOffset with message format version ${recordVersion.value}" )
2449
2462
2450
2463
// We want to avoid unnecessary scanning of the log to build the producer state when the broker is being
2451
2464
// upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case,
@@ -2469,7 +2482,7 @@ object Log extends Logging {
2469
2482
producerStateManager.takeSnapshot()
2470
2483
}
2471
2484
} else {
2472
- info(s " Reloading from producer snapshot and rebuilding producer state from offset $lastOffset" )
2485
+ info(s " ${logPrefix} Reloading from producer snapshot and rebuilding producer state from offset $lastOffset" )
2473
2486
val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset
2474
2487
val producerStateLoadStart = time.milliseconds()
2475
2488
producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds())
@@ -2508,7 +2521,7 @@ object Log extends Logging {
2508
2521
}
2509
2522
producerStateManager.updateMapEndOffset(lastOffset)
2510
2523
producerStateManager.takeSnapshot()
2511
- info(s " Producer state recovery took ${segmentRecoveryStart - producerStateLoadStart}ms for snapshot load " +
2524
+ info(s " ${logPrefix} Producer state recovery took ${segmentRecoveryStart - producerStateLoadStart}ms for snapshot load " +
2512
2525
s " and ${time.milliseconds() - segmentRecoveryStart}ms for segment recovery from offset $lastOffset" )
2513
2526
}
2514
2527
}
@@ -2535,6 +2548,7 @@ object Log extends Logging {
2535
2548
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
2536
2549
* @param producerStateManager The ProducerStateManager instance (if any) containing state associated
2537
2550
* with the existingSegments
2551
+ * @param logPrefix The logging prefix
2538
2552
* @return List of new segments that replace the input segment
2539
2553
*/
2540
2554
private [log] def splitOverflowedSegment (segment : LogSegment ,
@@ -2544,11 +2558,12 @@ object Log extends Logging {
2544
2558
config : LogConfig ,
2545
2559
scheduler : Scheduler ,
2546
2560
logDirFailureChannel : LogDirFailureChannel ,
2547
- producerStateManager : ProducerStateManager ): List [LogSegment ] = {
2561
+ producerStateManager : ProducerStateManager ,
2562
+ logPrefix : String ): List [LogSegment ] = {
2548
2563
require(Log .isLogFile(segment.log.file), s " Cannot split file ${segment.log.file.getAbsoluteFile}" )
2549
2564
require(segment.hasOverflow, " Split operation is only permitted for segments with overflow" )
2550
2565
2551
- info(s " Splitting overflowed segment $segment" )
2566
+ info(s " ${logPrefix} Splitting overflowed segment $segment" )
2552
2567
2553
2568
val newSegments = ListBuffer [LogSegment ]()
2554
2569
try {
@@ -2581,9 +2596,9 @@ object Log extends Logging {
2581
2596
s " before: ${segment.log.sizeInBytes} after: $totalSizeOfNewSegments" )
2582
2597
2583
2598
// replace old segment with new ones
2584
- info(s " Replacing overflowed segment $segment with split segments $newSegments" )
2599
+ info(s " ${logPrefix} Replacing overflowed segment $segment with split segments $newSegments" )
2585
2600
replaceSegments(existingSegments, newSegments.toList, List (segment), isRecoveredSwapFile = false ,
2586
- dir, topicPartition, config, scheduler, logDirFailureChannel, producerStateManager)
2601
+ dir, topicPartition, config, scheduler, logDirFailureChannel, producerStateManager, logPrefix )
2587
2602
newSegments.toList
2588
2603
} catch {
2589
2604
case e : Exception =>
0 commit comments