diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java index 0ae7dcada93..f30dd2d8b38 100644 --- a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java +++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java @@ -41,6 +41,8 @@ import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CL import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; +import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG; +import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM; import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG; import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_TYPE_CONFIG; @@ -63,7 +65,9 @@ public class KafkaLog4jAppender extends AppenderSkeleton { private String sslKeystoreLocation; private String sslKeystorePassword; private String saslKerberosServiceName; + private String saslMechanism; private String clientJaasConfPath; + private String clientJaasConf; private String kerb5ConfPath; private Integer maxBlockMs; @@ -210,6 +214,22 @@ public class KafkaLog4jAppender extends AppenderSkeleton { return clientJaasConfPath; } + public void setSaslMechanism(String saslMechanism) { + this.saslMechanism = saslMechanism; + } + + public String getSaslMechanism() { + return this.saslMechanism; + } + + public void setClientJaasConf(final String clientJaasConf) { + this.clientJaasConf = clientJaasConf; + } + + public String getClientJaasConf() { + return this.clientJaasConf; + } + public String getKerb5ConfPath() { return kerb5ConfPath; } @@ -257,9 +277,15 @@ public class KafkaLog4jAppender extends AppenderSkeleton { if (securityProtocol != null && securityProtocol.contains("SASL") && saslKerberosServiceName != null && clientJaasConfPath != null) { props.put(SASL_KERBEROS_SERVICE_NAME, saslKerberosServiceName); System.setProperty("java.security.auth.login.config", clientJaasConfPath); - if (kerb5ConfPath != null) { - System.setProperty("java.security.krb5.conf", kerb5ConfPath); - } + } + if (kerb5ConfPath != null) { + System.setProperty("java.security.krb5.conf", kerb5ConfPath); + } + if (saslMechanism != null) { + props.put(SASL_MECHANISM, saslMechanism); + } + if (clientJaasConf != null) { + props.put(SASL_JAAS_CONFIG, clientJaasConf); } if (maxBlockMs != null) { props.put(MAX_BLOCK_MS_CONFIG, maxBlockMs); diff --git a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java index d5342e8f9e3..4616c980478 100644 --- a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java +++ b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java @@ -16,9 +16,14 @@ */ package org.apache.kafka.log4jappender; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.not; + import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.SaslConfigs; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import org.apache.log4j.helpers.LogLog; @@ -77,6 +82,45 @@ public class KafkaLog4jAppenderTest { } } + @Test + public void testSetSaslMechanism() { + Properties props = getLog4jConfig(false); + props.put("log4j.appender.KAFKA.SaslMechanism", "PLAIN"); + PropertyConfigurator.configure(props); + + MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender(); + Assert.assertThat( + mockKafkaLog4jAppender.getProducerProperties().getProperty(SaslConfigs.SASL_MECHANISM), + equalTo("PLAIN")); + } + + @Test + public void testSaslMechanismNotSet() { + testProducerPropertyNotSet(SaslConfigs.SASL_MECHANISM); + } + + @Test + public void testSetJaasConfig() { + Properties props = getLog4jConfig(false); + props.put("log4j.appender.KAFKA.ClientJaasConf", "jaas-config"); + PropertyConfigurator.configure(props); + + MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender(); + Assert.assertThat( + mockKafkaLog4jAppender.getProducerProperties().getProperty(SaslConfigs.SASL_JAAS_CONFIG), + equalTo("jaas-config")); + } + + @Test + public void testJaasConfigNotSet() { + testProducerPropertyNotSet(SaslConfigs.SASL_JAAS_CONFIG); + } + + private void testProducerPropertyNotSet(String name) { + PropertyConfigurator.configure(getLog4jConfig(false)); + MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender(); + Assert.assertThat(mockKafkaLog4jAppender.getProducerProperties().stringPropertyNames(), not(hasItem(name))); + } @Test public void testLog4jAppends() { diff --git a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java index a9eb5fb0721..b699fa9516d 100644 --- a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java +++ b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java @@ -29,8 +29,11 @@ public class MockKafkaLog4jAppender extends KafkaLog4jAppender { private MockProducer mockProducer = new MockProducer<>(false, new MockSerializer(), new MockSerializer()); + private Properties producerProperties; + @Override protected Producer getKafkaProducer(Properties props) { + producerProperties = props; return mockProducer; } @@ -49,4 +52,8 @@ public class MockKafkaLog4jAppender extends KafkaLog4jAppender { List> getHistory() { return mockProducer.history(); } + + public Properties getProducerProperties() { + return producerProperties; + } }