17
17
18
18
package org .apache .kafka .shell ;
19
19
20
- import kafka .raft .KafkaRaftManager ;
21
20
import kafka .tools .TerseFailure ;
22
21
23
22
import org .apache .kafka .common .utils .Exit ;
24
23
import org .apache .kafka .common .utils .Utils ;
25
24
import org .apache .kafka .image .loader .MetadataLoader ;
26
25
import org .apache .kafka .metadata .util .SnapshotFileReader ;
27
- import org .apache .kafka .server .common .ApiMessageAndVersion ;
28
26
import org .apache .kafka .server .fault .FaultHandler ;
29
27
import org .apache .kafka .server .fault .LoggingFaultHandler ;
30
28
import org .apache .kafka .server .util .FileLock ;
@@ -60,17 +58,9 @@ public final class MetadataShell {
60
58
private static final Logger log = LoggerFactory .getLogger (MetadataShell .class );
61
59
62
60
public static class Builder {
63
- private KafkaRaftManager <ApiMessageAndVersion > raftManager = null ;
64
61
private String snapshotPath = null ;
65
62
private FaultHandler faultHandler = new LoggingFaultHandler ("shell" , () -> { });
66
63
67
- // Note: we assume that we have already taken the lock on the log directory before calling
68
- // this method.
69
- public Builder setRaftManager (KafkaRaftManager <ApiMessageAndVersion > raftManager ) {
70
- this .raftManager = raftManager ;
71
- return this ;
72
- }
73
-
74
64
public Builder setSnapshotPath (String snapshotPath ) {
75
65
this .snapshotPath = snapshotPath ;
76
66
return this ;
@@ -82,9 +72,7 @@ public Builder setFaultHandler(FaultHandler faultHandler) {
82
72
}
83
73
84
74
public MetadataShell build () {
85
- return new MetadataShell (raftManager ,
86
- snapshotPath ,
87
- faultHandler );
75
+ return new MetadataShell (snapshotPath , faultHandler );
88
76
}
89
77
}
90
78
@@ -96,7 +84,7 @@ static File parent(File file) {
96
84
File parent = file .getParentFile ();
97
85
return parent == null ? file : parent ;
98
86
}
99
-
87
+
100
88
static File parentParent (File file ) {
101
89
return parent (parent (file ));
102
90
}
@@ -136,8 +124,6 @@ static FileLock takeDirectoryLock(File directory) throws IOException {
136
124
137
125
private final MetadataShellState state ;
138
126
139
- private final KafkaRaftManager <ApiMessageAndVersion > raftManager ;
140
-
141
127
private final String snapshotPath ;
142
128
143
129
private final FaultHandler faultHandler ;
@@ -151,29 +137,17 @@ static FileLock takeDirectoryLock(File directory) throws IOException {
151
137
private MetadataLoader loader ;
152
138
153
139
public MetadataShell (
154
- KafkaRaftManager <ApiMessageAndVersion > raftManager ,
155
140
String snapshotPath ,
156
141
FaultHandler faultHandler
157
142
) {
158
143
this .state = new MetadataShellState ();
159
- this .raftManager = raftManager ;
160
144
this .snapshotPath = snapshotPath ;
161
145
this .faultHandler = faultHandler ;
162
146
this .publisher = new MetadataShellPublisher (state );
163
147
this .fileLock = null ;
164
148
this .snapshotFileReader = null ;
165
149
}
166
150
167
- private void initializeWithRaftManager () {
168
- raftManager .startup ();
169
- this .loader = new MetadataLoader .Builder ().
170
- setFaultHandler (faultHandler ).
171
- setNodeId (-1 ).
172
- setHighWaterMarkAccessor (() -> raftManager .client ().highWatermark ()).
173
- build ();
174
- raftManager .register (loader );
175
- }
176
-
177
151
private void initializeWithSnapshotFileReader () throws Exception {
178
152
this .fileLock = takeDirectoryLockIfExists (parentParent (new File (snapshotPath )));
179
153
this .loader = new MetadataLoader .Builder ().
@@ -186,18 +160,7 @@ private void initializeWithSnapshotFileReader() throws Exception {
186
160
}
187
161
188
162
public void run (List <String > args ) throws Exception {
189
- if (raftManager != null ) {
190
- if (snapshotPath != null ) {
191
- throw new RuntimeException ("Can't specify both a raft manager and " +
192
- "snapshot file reader." );
193
- }
194
- initializeWithRaftManager ();
195
- } else if (snapshotPath != null ) {
196
- initializeWithSnapshotFileReader ();
197
- } else {
198
- throw new RuntimeException ("You must specify either a raft manager or a " +
199
- "snapshot file reader." );
200
- }
163
+ initializeWithSnapshotFileReader ();
201
164
loader .installPublishers (Collections .singletonList (publisher )).get (15 , TimeUnit .MINUTES );
202
165
if (args == null || args .isEmpty ()) {
203
166
// Interactive mode.
@@ -222,14 +185,7 @@ public void run(List<String> args) throws Exception {
222
185
223
186
public void close () {
224
187
Utils .closeQuietly (loader , "loader" );
225
- if (raftManager != null ) {
226
- try {
227
- raftManager .shutdown ();
228
- } catch (Exception e ) {
229
- log .error ("Error shutting down RaftManager" , e );
230
- }
231
- }
232
- Utils .closeQuietly (snapshotFileReader , "raftManager" );
188
+ Utils .closeQuietly (snapshotFileReader , "snapshotFileReader" );
233
189
if (fileLock != null ) {
234
190
try {
235
191
fileLock .destroy ();
@@ -248,7 +204,8 @@ public static void main(String[] args) {
248
204
.description ("The Apache Kafka metadata shell" );
249
205
parser .addArgument ("--snapshot" , "-s" )
250
206
.type (String .class )
251
- .help ("The snapshot file to read." );
207
+ .required (true )
208
+ .help ("The metadata snapshot file to read." );
252
209
parser .addArgument ("command" )
253
210
.nargs ("*" )
254
211
.help ("The command to run." );
0 commit comments