Skip to content

Commit c73d97d

Browse files
authored
KAFKA-14523: Move kafka.log.remote classes to storage (apache#19474)
Pretty much a straight forward move of these classes. I just updated `RemoteLogManagerTest` to not use `KafkaConfig` Reviewers: Chia-Ping Tsai <[email protected]>
1 parent bb7d8eb commit c73d97d

21 files changed

+68
-106
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2242,6 +2242,7 @@ project(':storage') {
22422242
testImplementation project(':clients').sourceSets.test.output
22432243
testImplementation project(':core')
22442244
testImplementation project(':core').sourceSets.test.output
2245+
testImplementation project(':storage:storage-api').sourceSets.test.output
22452246
testImplementation project(':test-common:test-common-internal-api')
22462247
testImplementation project(':test-common:test-common-runtime')
22472248
testImplementation project(':test-common:test-common-util')

checkstyle/import-control-core.xml

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -66,20 +66,6 @@
6666
<allow class="com.fasterxml.jackson.annotation.JsonIgnoreProperties" />
6767
</subpackage>
6868

69-
<subpackage name="log.remote">
70-
<allow pkg="org.apache.kafka.server.common" />
71-
<allow pkg="org.apache.kafka.server.log.remote" />
72-
<allow pkg="org.apache.kafka.server.log.remote.quota" />
73-
<allow pkg="org.apache.kafka.server.metrics" />
74-
<allow pkg="org.apache.kafka.storage.internals" />
75-
<allow pkg="org.apache.kafka.storage.log.metrics" />
76-
<allow pkg="kafka.log" />
77-
<allow pkg="kafka.cluster" />
78-
<allow pkg="kafka.server" />
79-
<allow pkg="org.mockito" />
80-
<allow pkg="org.apache.kafka.test" />
81-
</subpackage>
82-
8369
<subpackage name="server">
8470
<allow pkg="kafka" />
8571
<allow pkg="org.apache.kafka" />

checkstyle/import-control-storage.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
<allow pkg="org.junit" />
3030
<allow pkg="org.hamcrest" />
3131
<allow pkg="org.mockito" />
32+
<allow pkg="org.opentest4j" />
3233
<allow pkg="java.security" />
3334
<allow pkg="javax.net.ssl" />
3435
<allow pkg="javax.security" />
@@ -74,8 +75,12 @@
7475
</subpackage>
7576
<subpackage name="storage">
7677
<allow pkg="com.yammer.metrics.core" />
77-
<allow pkg="org.apache.kafka.server.metrics" />
7878
<allow pkg="org.apache.kafka.common.test" />
79+
<allow pkg="org.apache.kafka.server.metrics" />
80+
<allow pkg="org.apache.kafka.server.purgatory" />
81+
<allow pkg="org.apache.kafka.server.quota" />
82+
<allow pkg="org.apache.kafka.server.storage.log" />
83+
<allow pkg="org.apache.kafka.server.util" />
7984
</subpackage>
8085
</subpackage>
8186
</subpackage>

checkstyle/suppressions.xml

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,11 @@
3737

3838
<!-- core -->
3939
<suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder|SharePartition|SharePartitionManager).java"/>
40-
<suppress checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling|JavaNCSS" files="(RemoteLogManager|RemoteLogManagerTest).java"/>
41-
<suppress checks="MethodLength" files="RemoteLogManager.java"/>
42-
<suppress checks="MethodLength" files="RemoteLogManagerConfig.java"/>
43-
<suppress checks="ClassFanOutComplexity" files="RemoteLogManagerTest.java"/>
4440
<suppress checks="MethodLength"
4541
files="(KafkaClusterTestKit).java"/>
4642
<suppress checks="NPathComplexity" files="TestKitNodes.java"/>
4743
<suppress checks="JavaNCSS"
48-
files="(RemoteLogManagerTest|SharePartitionManagerTest|SharePartitionTest).java"/>
44+
files="(SharePartitionManagerTest|SharePartitionTest).java"/>
4945
<suppress checks="ClassDataAbstractionCoupling|ClassFanOutComplexity" files="SharePartitionManagerTest"/>
5046
<suppress checks="CyclomaticComplexity" files="SharePartition.java"/>
5147

@@ -364,7 +360,9 @@
364360
<suppress checks="ParameterNumber"
365361
files="(LogAppendInfo|LogLoader|RemoteLogManagerConfig|UnifiedLog).java"/>
366362
<suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)"
367-
files="(UnifiedLog).java"/>
363+
files="(UnifiedLog|RemoteLogManager|RemoteLogManagerTest).java"/>
364+
<suppress checks="MethodLength" files="(RemoteLogManager|RemoteLogManagerConfig).java"/>
365+
<suppress checks="JavaNCSS" files="RemoteLogManagerTest.java"/>
368366

369367
<!-- benchmarks -->
370368
<suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)"

core/src/main/java/kafka/server/TierStateMachine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package kafka.server;
1919

2020
import kafka.cluster.Partition;
21-
import kafka.log.remote.RemoteLogManager;
2221

2322
import org.apache.kafka.common.KafkaException;
2423
import org.apache.kafka.common.TopicPartition;
@@ -29,6 +28,7 @@
2928
import org.apache.kafka.common.utils.Utils;
3029
import org.apache.kafka.server.common.CheckpointFile;
3130
import org.apache.kafka.server.common.OffsetAndEpoch;
31+
import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
3232
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
3333
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
3434
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;

core/src/main/scala/kafka/cluster/Partition.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.util.Optional
2222
import java.util.concurrent.{CompletableFuture, CopyOnWriteArrayList}
2323
import kafka.controller.StateChangeLogger
2424
import kafka.log._
25-
import kafka.log.remote.RemoteLogManager
2625
import kafka.server._
2726
import kafka.server.share.DelayedShareFetch
2827
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
@@ -41,6 +40,7 @@ import org.apache.kafka.common.utils.Time
4140
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache}
4241
import org.apache.kafka.server.common.RequestLocal
4342
import org.apache.kafka.server.log.remote.TopicPartitionLog
43+
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
4444
import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, VerificationGuard}
4545
import org.apache.kafka.server.metrics.KafkaMetricsGroup
4646
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, TopicPartitionOperationKey}

core/src/main/scala/kafka/server/BrokerServer.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package kafka.server
2020
import kafka.coordinator.group.{CoordinatorLoaderImpl, CoordinatorPartitionWriter}
2121
import kafka.coordinator.transaction.TransactionCoordinator
2222
import kafka.log.LogManager
23-
import kafka.log.remote.RemoteLogManager
2423
import kafka.network.SocketServer
2524
import kafka.raft.KafkaRaftManager
2625
import kafka.server.metadata._
@@ -48,7 +47,7 @@ import org.apache.kafka.security.CredentialProvider
4847
import org.apache.kafka.server.authorizer.Authorizer
4948
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
5049
import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
51-
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
50+
import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, RemoteLogManagerConfig}
5251
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, KafkaYammerMetrics}
5352
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
5453
import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpStatePersister, Persister, PersisterStateManager}

core/src/main/scala/kafka/server/KafkaBroker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package kafka.server
1919

2020
import kafka.log.LogManager
21-
import kafka.log.remote.RemoteLogManager
2221
import kafka.network.SocketServer
2322
import kafka.utils.Logging
2423
import org.apache.kafka.common.ClusterResource
@@ -32,6 +31,7 @@ import org.apache.kafka.metadata.{BrokerState, MetadataCache}
3231
import org.apache.kafka.security.CredentialProvider
3332
import org.apache.kafka.server.authorizer.Authorizer
3433
import org.apache.kafka.server.common.NodeToControllerChannelManager
34+
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
3535
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
3636
import org.apache.kafka.server.util.Scheduler
3737
import org.apache.kafka.storage.internals.log.LogDirFailureChannel

core/src/main/scala/kafka/server/ReplicaManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import com.yammer.metrics.core.Meter
2020
import kafka.cluster.{Partition, PartitionListener}
2121
import kafka.controller.StateChangeLogger
2222
import kafka.log.LogManager
23-
import kafka.log.remote.RemoteLogManager
2423
import kafka.server.HostedPartition.Online
2524
import kafka.server.QuotaFactory.QuotaManagers
2625
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported}
@@ -54,6 +53,7 @@ import org.apache.kafka.metadata.MetadataCache
5453
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition}
5554
import org.apache.kafka.server.log.remote.TopicPartitionLog
5655
import org.apache.kafka.server.config.ReplicationConfigs
56+
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
5757
import org.apache.kafka.server.metrics.KafkaMetricsGroup
5858
import org.apache.kafka.server.network.BrokerEndPoint
5959
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, TopicPartitionOperationKey}

core/src/test/scala/unit/kafka/log/LogTestUtils.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package kafka.log
1919

20-
import kafka.log.remote.RemoteLogManager
21-
2220
import java.io.File
2321
import java.util.Properties
2422
import kafka.utils.TestUtils
@@ -34,6 +32,7 @@ import org.apache.kafka.common.config.TopicConfig
3432
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
3533
import org.apache.kafka.server.common.RequestLocal
3634
import org.apache.kafka.server.config.ServerLogConfigs
35+
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
3736
import org.apache.kafka.server.storage.log.FetchIsolation
3837
import org.apache.kafka.server.util.Scheduler
3938
import org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG, DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG}

core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package kafka.log
1919

20-
import kafka.log.remote.RemoteLogManager
2120
import kafka.server.KafkaConfig
2221
import kafka.utils.TestUtils
2322
import org.apache.kafka.common.compress.Compression
@@ -36,7 +35,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
3635
import org.apache.kafka.server.common.RequestLocal
3736
import org.apache.kafka.server.config.KRaftConfigs
3837
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
39-
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig}
38+
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManager, RemoteLogManagerConfig}
4039
import org.apache.kafka.server.metrics.KafkaYammerMetrics
4140
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, DelayedRemoteListOffsets}
4241
import org.apache.kafka.server.storage.log.{FetchIsolation, UnexpectedAppendOffsetException}

core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.util.{Optional, Properties, Map => JMap}
2222
import java.util.concurrent.{CompletionStage, TimeUnit}
2323
import java.util.concurrent.atomic.AtomicReference
2424
import kafka.log.LogManager
25-
import kafka.log.remote.RemoteLogManager
2625
import kafka.network.{DataPlaneAcceptor, SocketServer}
2726
import kafka.utils.TestUtils
2827
import org.apache.kafka.common.{Endpoint, Reconfigurable}
@@ -37,7 +36,7 @@ import org.apache.kafka.network.SocketServerConfigs
3736
import org.apache.kafka.server.DynamicThreadPool
3837
import org.apache.kafka.server.authorizer._
3938
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
40-
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
39+
import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, RemoteLogManagerConfig}
4140
import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs}
4241
import org.apache.kafka.server.util.KafkaScheduler
4342
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, ProducerStateManagerConfig}

core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package kafka.server
1818

1919
import kafka.cluster.Partition
2020
import kafka.integration.KafkaServerTestHarness
21-
import kafka.log.remote.RemoteLogManager
2221
import kafka.utils.TestUtils.random
2322
import kafka.utils._
2423
import org.apache.kafka.clients.CommonClientConfigs
@@ -36,6 +35,7 @@ import org.apache.kafka.coordinator.group.GroupConfig
3635
import org.apache.kafka.metadata.MetadataCache
3736
import org.apache.kafka.server.config.{QuotaConfig, ServerLogConfigs}
3837
import org.apache.kafka.server.log.remote.TopicPartitionLog
38+
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
3939
import org.apache.kafka.storage.internals.log.{LogConfig, UnifiedLog}
4040
import org.apache.kafka.test.TestUtils.assertFutureThrows
4141
import org.junit.jupiter.api.Assertions._

core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import com.yammer.metrics.core.{Gauge, Meter, Timer}
2121
import kafka.cluster.PartitionTest.MockPartitionListener
2222
import kafka.cluster.Partition
2323
import kafka.log.LogManager
24-
import kafka.log.remote.RemoteLogManager
2524
import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS
2625
import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics
2726
import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}

core/src/main/java/kafka/log/remote/RemoteLogManager.java renamed to storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package kafka.log.remote;
17+
package org.apache.kafka.server.log.remote.storage;
1818

1919
import org.apache.kafka.common.Endpoint;
2020
import org.apache.kafka.common.KafkaException;
@@ -50,18 +50,7 @@
5050
import org.apache.kafka.server.log.remote.quota.RLMQuotaManager;
5151
import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig;
5252
import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics;
53-
import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
54-
import org.apache.kafka.server.log.remote.storage.CustomMetadataSizeLimitExceededException;
55-
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
56-
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
57-
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
58-
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
59-
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
6053
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
61-
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
62-
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
63-
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
64-
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
6554
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
6655
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
6756
import org.apache.kafka.server.purgatory.DelayedRemoteListOffsets;
@@ -194,7 +183,8 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader {
194183
// topic ids that are received on leadership changes, this map is cleared on stop partitions
195184
private final ConcurrentMap<TopicPartition, Uuid> topicIdByPartitionMap = new ConcurrentHashMap<>();
196185
private final String clusterId;
197-
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass());
186+
// For compatibility, metrics are defined to be under the `kafka.log.remote.RemoteLogManager` class
187+
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.log.remote", "RemoteLogManager");
198188

199189
// The endpoint for remote log metadata manager to connect to
200190
private Optional<Endpoint> endpoint = Optional.empty();

core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java renamed to storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogOffsetReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package kafka.log.remote;
17+
package org.apache.kafka.server.log.remote.storage;
1818

1919
import org.apache.kafka.common.TopicPartition;
2020
import org.apache.kafka.common.record.FileRecords;

core/src/main/java/kafka/log/remote/RemoteLogReader.java renamed to storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package kafka.log.remote;
17+
package org.apache.kafka.server.log.remote.storage;
1818

1919
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
2020
import org.apache.kafka.server.log.remote.quota.RLMQuotaManager;

0 commit comments

Comments
 (0)