Skip to content

Commit 228b325

Browse files
KAFKA-17921 Support SASL_PLAINTEXT protocol with java.security.auth.login.config (apache#17671)
Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 9d93a4f commit 228b325

File tree

8 files changed

+341
-15
lines changed

8 files changed

+341
-15
lines changed

checkstyle/import-control-test-common.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,5 @@
2424
<allow pkg="org" />
2525
<allow pkg="kafka" />
2626
<allow pkg="scala.jdk.javaapi" />
27+
<allow pkg="javax.security" />
2728
</import-control>
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.common.test;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
import java.util.stream.Collectors;
22+
23+
public record JaasModule(String name, boolean debug, Map<String, String> entries) {
24+
25+
public static JaasModule plainLoginModule(String username, String password, boolean debug, Map<String, String> validUsers) {
26+
String name = "org.apache.kafka.common.security.plain.PlainLoginModule";
27+
28+
Map<String, String> entries = new HashMap<>();
29+
entries.put("username", username);
30+
entries.put("password", password);
31+
validUsers.forEach((user, pass) -> entries.put("user_" + user, pass));
32+
33+
return new JaasModule(
34+
name,
35+
debug,
36+
entries
37+
);
38+
}
39+
40+
@Override
41+
public String toString() {
42+
return String.format("%s required%n debug=%b%n %s;%n", name, debug, entries.entrySet().stream()
43+
.map(e -> e.getKey() + "=\"" + e.getValue() + "\"")
44+
.collect(Collectors.joining("\n ")));
45+
}
46+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.common.test;
18+
19+
import java.io.File;
20+
import java.io.FileOutputStream;
21+
import java.io.IOException;
22+
import java.io.OutputStreamWriter;
23+
import java.nio.charset.StandardCharsets;
24+
import java.util.List;
25+
import java.util.Set;
26+
import java.util.stream.Collectors;
27+
28+
import javax.security.auth.login.Configuration;
29+
30+
public class JaasUtils {
31+
public record JaasSection(String contextName, List<JaasModule> modules) {
32+
@Override
33+
public String toString() {
34+
return String.format(
35+
"%s {%n %s%n};%n",
36+
contextName,
37+
modules.stream().map(Object::toString).collect(Collectors.joining("\n "))
38+
);
39+
}
40+
}
41+
42+
public static final String KAFKA_SERVER_CONTEXT_NAME = "KafkaServer";
43+
44+
public static final String KAFKA_PLAIN_USER1 = "plain-user1";
45+
public static final String KAFKA_PLAIN_USER1_PASSWORD = "plain-user1-secret";
46+
public static final String KAFKA_PLAIN_ADMIN = "plain-admin";
47+
public static final String KAFKA_PLAIN_ADMIN_PASSWORD = "plain-admin-secret";
48+
49+
public static File writeJaasContextsToFile(Set<JaasSection> jaasSections) throws IOException {
50+
File jaasFile = TestUtils.tempFile();
51+
try (FileOutputStream fileStream = new FileOutputStream(jaasFile);
52+
OutputStreamWriter writer = new OutputStreamWriter(fileStream, StandardCharsets.UTF_8);) {
53+
writer.write(String.join("", jaasSections.stream().map(Object::toString).toArray(String[]::new)));
54+
}
55+
return jaasFile;
56+
}
57+
58+
public static void refreshJavaLoginConfigParam(File file) {
59+
System.setProperty(org.apache.kafka.common.security.JaasUtils.JAVA_LOGIN_CONFIG_PARAM, file.getAbsolutePath());
60+
// This will cause a reload of the Configuration singleton when `getConfiguration` is called
61+
Configuration.setConfiguration(null);
62+
}
63+
}

test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,15 @@
2727

2828
import org.apache.kafka.clients.CommonClientConfigs;
2929
import org.apache.kafka.clients.admin.AdminClientConfig;
30+
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
3031
import org.apache.kafka.common.metrics.Metrics;
3132
import org.apache.kafka.common.network.ListenerName;
33+
import org.apache.kafka.common.security.auth.SecurityProtocol;
3234
import org.apache.kafka.common.utils.ThreadUtils;
3335
import org.apache.kafka.common.utils.Time;
3436
import org.apache.kafka.common.utils.Utils;
3537
import org.apache.kafka.controller.Controller;
38+
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
3639
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
3740
import org.apache.kafka.metadata.storage.Formatter;
3841
import org.apache.kafka.network.SocketServerConfigs;
@@ -63,6 +66,7 @@
6366
import java.util.Map.Entry;
6467
import java.util.Optional;
6568
import java.util.Properties;
69+
import java.util.Set;
6670
import java.util.concurrent.CompletableFuture;
6771
import java.util.concurrent.ExecutionException;
6872
import java.util.concurrent.ExecutorService;
@@ -138,6 +142,7 @@ private KafkaConfig createNodeConfig(TestKitNode node) throws IOException {
138142
if (controllerNode != null) {
139143
props.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG,
140144
controllerNode.metadataDirectory());
145+
setSecurityProtocolProps(props, controllerSecurityProtocol);
141146
} else {
142147
props.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG,
143148
node.metadataDirectory());
@@ -146,6 +151,7 @@ private KafkaConfig createNodeConfig(TestKitNode node) throws IOException {
146151
// Set the log.dirs according to the broker node setting (if there is a broker node)
147152
props.put(LOG_DIRS_CONFIG,
148153
String.join(",", brokerNode.logDataDirectories()));
154+
setSecurityProtocolProps(props, brokerSecurityProtocol);
149155
} else {
150156
// Set log.dirs equal to the metadata directory if there is just a controller.
151157
props.put(LOG_DIRS_CONFIG,
@@ -189,11 +195,40 @@ private KafkaConfig createNodeConfig(TestKitNode node) throws IOException {
189195
return new KafkaConfig(props, false);
190196
}
191197

198+
private void setSecurityProtocolProps(Map<String, Object> props, String securityProtocol) {
199+
if (securityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) {
200+
props.putIfAbsent(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, "PLAIN");
201+
props.putIfAbsent(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, "PLAIN");
202+
props.putIfAbsent(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG, "PLAIN");
203+
props.putIfAbsent(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, StandardAuthorizer.class.getName());
204+
props.putIfAbsent(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "false");
205+
props.putIfAbsent(StandardAuthorizer.SUPER_USERS_CONFIG, "User:" + JaasUtils.KAFKA_PLAIN_ADMIN);
206+
}
207+
}
208+
192209
public KafkaClusterTestKit build() throws Exception {
193210
Map<Integer, ControllerServer> controllers = new HashMap<>();
194211
Map<Integer, BrokerServer> brokers = new HashMap<>();
195212
Map<Integer, SharedServer> jointServers = new HashMap<>();
196213
File baseDirectory = null;
214+
File jaasFile = null;
215+
216+
if (brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) {
217+
jaasFile = JaasUtils.writeJaasContextsToFile(Set.of(
218+
new JaasUtils.JaasSection(JaasUtils.KAFKA_SERVER_CONTEXT_NAME,
219+
List.of(
220+
JaasModule.plainLoginModule(
221+
JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD,
222+
true,
223+
Map.of(
224+
JaasUtils.KAFKA_PLAIN_USER1, JaasUtils.KAFKA_PLAIN_USER1_PASSWORD,
225+
JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD)
226+
)
227+
)
228+
)
229+
));
230+
JaasUtils.refreshJavaLoginConfigParam(jaasFile);
231+
}
197232

198233
try {
199234
baseDirectory = new File(nodes.baseDirectory());
@@ -272,7 +307,8 @@ public KafkaClusterTestKit build() throws Exception {
272307
brokers,
273308
baseDirectory,
274309
faultHandlerFactory,
275-
socketFactoryManager);
310+
socketFactoryManager,
311+
jaasFile == null ? Optional.empty() : Optional.of(jaasFile));
276312
}
277313

278314
private String listeners(int node) {
@@ -316,14 +352,16 @@ private static void setupNodeDirectories(File baseDirectory,
316352
private final SimpleFaultHandlerFactory faultHandlerFactory;
317353
private final PreboundSocketFactoryManager socketFactoryManager;
318354
private final String controllerListenerName;
355+
private final Optional<File> jaasFile;
319356

320357
private KafkaClusterTestKit(
321358
TestKitNodes nodes,
322359
Map<Integer, ControllerServer> controllers,
323360
Map<Integer, BrokerServer> brokers,
324361
File baseDirectory,
325362
SimpleFaultHandlerFactory faultHandlerFactory,
326-
PreboundSocketFactoryManager socketFactoryManager
363+
PreboundSocketFactoryManager socketFactoryManager,
364+
Optional<File> jaasFile
327365
) {
328366
/*
329367
Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers
@@ -339,6 +377,7 @@ private KafkaClusterTestKit(
339377
this.faultHandlerFactory = faultHandlerFactory;
340378
this.socketFactoryManager = socketFactoryManager;
341379
this.controllerListenerName = nodes.controllerListenerName().value();
380+
this.jaasFile = jaasFile;
342381
}
343382

344383
public void format() throws Exception {
@@ -602,6 +641,9 @@ public void close() throws Exception {
602641
waitForAllFutures(futureEntries);
603642
futureEntries.clear();
604643
Utils.delete(baseDirectory);
644+
if (jaasFile.isPresent()) {
645+
Utils.delete(jaasFile.get());
646+
}
605647
} catch (Exception e) {
606648
for (Entry<String, Future<?>> entry : futureEntries) {
607649
entry.getValue().cancel(true);

test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,9 @@ public TestKitNodes build() {
158158
throw new IllegalArgumentException("Invalid value for numDisksPerBroker");
159159
}
160160
// TODO: remove this assertion after https://issues.apache.org/jira/browse/KAFKA-16680 is finished
161-
if (brokerSecurityProtocol != SecurityProtocol.PLAINTEXT || controllerSecurityProtocol != SecurityProtocol.PLAINTEXT) {
162-
throw new IllegalArgumentException("Currently only support PLAINTEXT security protocol");
161+
if ((brokerSecurityProtocol != SecurityProtocol.PLAINTEXT && brokerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT) ||
162+
(controllerSecurityProtocol != SecurityProtocol.PLAINTEXT && controllerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT)) {
163+
throw new IllegalArgumentException("Currently only support PLAINTEXT / SASL_PLAINTEXT security protocol");
163164
}
164165
if (baseDirectory == null) {
165166
this.baseDirectory = TestUtils.tempDirectory().toPath();

test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ public class TestKitNodeTest {
3232
@ParameterizedTest
3333
@EnumSource(SecurityProtocol.class)
3434
public void testSecurityProtocol(SecurityProtocol securityProtocol) {
35-
if (securityProtocol != SecurityProtocol.PLAINTEXT) {
36-
assertEquals("Currently only support PLAINTEXT security protocol",
35+
if (securityProtocol != SecurityProtocol.PLAINTEXT && securityProtocol != SecurityProtocol.SASL_PLAINTEXT) {
36+
assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT security protocol",
3737
assertThrows(IllegalArgumentException.class,
3838
() -> new TestKitNodes.Builder().setBrokerSecurityProtocol(securityProtocol).build()).getMessage());
39-
assertEquals("Currently only support PLAINTEXT security protocol",
39+
assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT security protocol",
4040
assertThrows(IllegalArgumentException.class,
4141
() -> new TestKitNodes.Builder().setControllerSecurityProtocol(securityProtocol).build()).getMessage());
4242
}

test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,12 @@
3838
import org.apache.kafka.common.TopicPartition;
3939
import org.apache.kafka.common.acl.AccessControlEntry;
4040
import org.apache.kafka.common.acl.AclBindingFilter;
41+
import org.apache.kafka.common.config.SaslConfigs;
4142
import org.apache.kafka.common.network.ListenerName;
43+
import org.apache.kafka.common.security.auth.SecurityProtocol;
4244
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
4345
import org.apache.kafka.common.serialization.ByteArraySerializer;
46+
import org.apache.kafka.common.test.JaasUtils;
4447
import org.apache.kafka.common.test.TestUtils;
4548
import org.apache.kafka.server.authorizer.Authorizer;
4649
import org.apache.kafka.server.fault.FaultHandlerException;
@@ -164,7 +167,7 @@ default <K, V> Producer<K, V> producer(Map<String, Object> configs) {
164167
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
165168
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
166169
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
167-
return new KafkaProducer<>(props);
170+
return new KafkaProducer<>(setClientSaslConfig(props));
168171
}
169172

170173
default <K, V> Producer<K, V> producer() {
@@ -178,7 +181,7 @@ default <K, V> Consumer<K, V> consumer(Map<String, Object> configs) {
178181
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
179182
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + TestUtils.randomString(5));
180183
props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
181-
return new KafkaConsumer<>(props);
184+
return new KafkaConsumer<>(setClientSaslConfig(props));
182185
}
183186

184187
default <K, V> Consumer<K, V> consumer() {
@@ -194,7 +197,23 @@ default Admin admin(Map<String, Object> configs, boolean usingBootstrapControlle
194197
props.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
195198
props.remove(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
196199
}
197-
return Admin.create(props);
200+
return Admin.create(setClientSaslConfig(props));
201+
}
202+
203+
default Map<String, Object> setClientSaslConfig(Map<String, Object> configs) {
204+
Map<String, Object> props = new HashMap<>(configs);
205+
if (config().brokerSecurityProtocol() == SecurityProtocol.SASL_PLAINTEXT) {
206+
props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
207+
props.putIfAbsent(SaslConfigs.SASL_MECHANISM, "PLAIN");
208+
props.putIfAbsent(
209+
SaslConfigs.SASL_JAAS_CONFIG,
210+
String.format(
211+
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
212+
JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD
213+
)
214+
);
215+
}
216+
return props;
198217
}
199218

200219
default Admin admin(Map<String, Object> configs) {

0 commit comments

Comments
 (0)