Skip to content

Remove buildConsumerProperties(SslBundles) from KafkaProperties #45723

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,21 +187,8 @@ private Map<String, Object> buildCommonProperties(SslBundles sslBundles) {
* instance
*/
public Map<String, Object> buildConsumerProperties() {
return buildConsumerProperties(null);
}

/**
* Create an initial map of consumer properties from the state of this instance.
* <p>
* This allows you to add additional properties, if necessary, and override the
* default {@code kafkaConsumerFactory} bean.
* @param sslBundles bundles providing SSL trust material
* @return the consumer properties initialized with the customizations defined on this
* instance
*/
public Map<String, Object> buildConsumerProperties(SslBundles sslBundles) {
Map<String, Object> properties = buildCommonProperties(sslBundles);
properties.putAll(this.consumer.buildProperties(sslBundles));
Map<String, Object> properties = buildCommonProperties(null);
properties.putAll(this.consumer.buildProperties(null));
return properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,13 @@
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.IsolationLevel;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
import org.springframework.boot.ssl.DefaultSslBundleRegistry;
import org.springframework.boot.ssl.SslBundle;
import org.springframework.core.io.ClassPathResource;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.listener.ContainerProperties;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.Mockito.mock;

/**
* Tests for {@link KafkaProperties}.
Expand All @@ -47,8 +44,6 @@
*/
class KafkaPropertiesTests {

private final SslBundle sslBundle = mock(SslBundle.class);

@Test
void isolationLevelEnumConsistentWithKafkaVersion() {
org.apache.kafka.common.IsolationLevel[] original = org.apache.kafka.common.IsolationLevel.values();
Expand Down Expand Up @@ -101,15 +96,6 @@ void sslPemConfigurationWithEmptyBundle() {
"-----BEGINchain");
}

@Test
void sslBundleConfiguration() {
KafkaProperties properties = new KafkaProperties();
properties.getSsl().setBundle("myBundle");
Map<String, Object> consumerProperties = properties
.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle));
assertThat(consumerProperties).doesNotContainKey(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG);
}

@Test
void sslPropertiesWhenKeyStoreLocationAndKeySetShouldThrowException() {
KafkaProperties properties = new KafkaProperties();
Expand All @@ -128,42 +114,6 @@ void sslPropertiesWhenTrustStoreLocationAndCertificatesSetShouldThrowException()
.isThrownBy(properties::buildConsumerProperties);
}

@Test
void sslPropertiesWhenKeyStoreLocationAndBundleSetShouldThrowException() {
KafkaProperties properties = new KafkaProperties();
properties.getSsl().setBundle("myBundle");
properties.getSsl().setKeyStoreLocation(new ClassPathResource("ksLoc"));
assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy(
() -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle)));
}

@Test
void sslPropertiesWhenKeyStoreKeyAndBundleSetShouldThrowException() {
KafkaProperties properties = new KafkaProperties();
properties.getSsl().setBundle("myBundle");
properties.getSsl().setKeyStoreKey("-----BEGIN");
assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy(
() -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle)));
}

@Test
void sslPropertiesWhenTrustStoreLocationAndBundleSetShouldThrowException() {
KafkaProperties properties = new KafkaProperties();
properties.getSsl().setBundle("myBundle");
properties.getSsl().setTrustStoreLocation(new ClassPathResource("tsLoc"));
assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy(
() -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle)));
}

@Test
void sslPropertiesWhenTrustStoreCertificatesAndBundleSetShouldThrowException() {
KafkaProperties properties = new KafkaProperties();
properties.getSsl().setBundle("myBundle");
properties.getSsl().setTrustStoreCertificates("-----BEGIN");
assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy(
() -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle)));
}

@Test
void cleanupConfigDefaultValuesAreConsistent() {
CleanupConfig cleanupConfig = new CleanupConfig();
Expand Down