Browse Source

KAFKA-13672: Race condition in DynamicBrokerConfig (#11920)

Reviewers: David Jacot <djacot@confluent.io>, Luke Chen <showuon@gmail.com>
pull/11948/head
Liam Clarke-Hutchinson 3 years ago committed by GitHub
parent
commit
e8f09007e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
  2. 1
      core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala

22
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

@ -19,6 +19,7 @@ package kafka.server @@ -19,6 +19,7 @@ package kafka.server
import java.util
import java.util.{Collections, Properties}
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.cluster.EndPoint
import kafka.log.{LogCleaner, LogConfig, LogManager}
@ -201,8 +202,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging @@ -201,8 +202,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private[server] val staticDefaultConfigs = ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
private val dynamicBrokerConfigs = mutable.Map[String, String]()
private val dynamicDefaultConfigs = mutable.Map[String, String]()
private val reconfigurables = mutable.Buffer[Reconfigurable]()
private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
// Use COWArrayList to prevent concurrent modification exception when an item is added by one thread to these
// collections, while another thread is iterating over them.
private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
private val brokerReconfigurables = new CopyOnWriteArrayList[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
private var currentConfig: KafkaConfig = null
private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
@ -259,16 +263,16 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging @@ -259,16 +263,16 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
reconfigurables += reconfigurable
reconfigurables.add(reconfigurable)
}
def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit = CoreUtils.inWriteLock(lock) {
verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs)
brokerReconfigurables += reconfigurable
brokerReconfigurables.add(reconfigurable)
}
def removeReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
reconfigurables -= reconfigurable
reconfigurables.remove(reconfigurable)
}
private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = CoreUtils.inWriteLock(lock) {
@ -320,7 +324,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging @@ -320,7 +324,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
* changes are processed. At the moment, only listener configs are considered for reloading.
*/
private[server] def reloadUpdatedFilesWithoutConfigChange(newProps: Properties): Unit = CoreUtils.inWriteLock(lock) {
reconfigurables
reconfigurables.asScala
.filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
.foreach {
case reconfigurable: ListenerReconfigurable =>
@ -535,8 +539,8 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging @@ -535,8 +539,8 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
if (changeMap.nonEmpty || deletedKeySet.nonEmpty) {
try {
val customConfigs = new util.HashMap[String, Object](newConfig.originalsFromThisConfig) // non-Kafka configs
newConfig.valuesFromThisConfig.keySet.forEach(customConfigs.remove(_))
reconfigurables.foreach {
newConfig.valuesFromThisConfig.keySet.forEach(k => customConfigs.remove(k))
reconfigurables.forEach {
case listenerReconfigurable: ListenerReconfigurable =>
processListenerReconfigurable(listenerReconfigurable, newConfig, customConfigs, validateOnly, reloadOnly = false)
case reconfigurable =>
@ -546,7 +550,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging @@ -546,7 +550,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
// BrokerReconfigurable updates are processed after config is updated. Only do the validation here.
val brokerReconfigurablesToUpdate = mutable.Buffer[BrokerReconfigurable]()
brokerReconfigurables.foreach { reconfigurable =>
brokerReconfigurables.forEach { reconfigurable =>
if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, changeMap.keySet, deletedKeySet)) {
reconfigurable.validateReconfiguration(newConfig)
if (!validateOnly)

1
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala

@ -728,7 +728,6 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup @@ -728,7 +728,6 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
@Test
@Disabled // TODO: To be re-enabled once we can make it less flaky (KAFKA-13672)
def testThreadPoolResize(): Unit = {
val requestHandlerPrefix = "data-plane-kafka-request-handler-"
val networkThreadPrefix = "data-plane-kafka-network-thread-"

Loading…
Cancel
Save