|
|
|
@ -78,9 +78,11 @@ class MetadataCacheControllerNodeProvider(
@@ -78,9 +78,11 @@ class MetadataCacheControllerNodeProvider(
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
object RaftControllerNodeProvider { |
|
|
|
|
def apply(raftManager: RaftManager[ApiMessageAndVersion], |
|
|
|
|
def apply( |
|
|
|
|
raftManager: RaftManager[ApiMessageAndVersion], |
|
|
|
|
config: KafkaConfig, |
|
|
|
|
controllerQuorumVoterNodes: Seq[Node]): RaftControllerNodeProvider = { |
|
|
|
|
controllerQuorumVoterNodes: Seq[Node] |
|
|
|
|
): RaftControllerNodeProvider = { |
|
|
|
|
val controllerListenerName = new ListenerName(config.controllerListenerNames.head) |
|
|
|
|
val controllerSecurityProtocol = config.effectiveListenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value())) |
|
|
|
|
val controllerSaslMechanism = config.saslMechanismControllerProtocol |
|
|
|
@ -98,12 +100,13 @@ object RaftControllerNodeProvider {
@@ -98,12 +100,13 @@ object RaftControllerNodeProvider {
|
|
|
|
|
* Finds the controller node by checking the metadata log manager. |
|
|
|
|
* This provider is used when we are using a Raft-based metadata quorum. |
|
|
|
|
*/ |
|
|
|
|
class RaftControllerNodeProvider(val raftManager: RaftManager[ApiMessageAndVersion], |
|
|
|
|
class RaftControllerNodeProvider( |
|
|
|
|
val raftManager: RaftManager[ApiMessageAndVersion], |
|
|
|
|
controllerQuorumVoterNodes: Seq[Node], |
|
|
|
|
val listenerName: ListenerName, |
|
|
|
|
val securityProtocol: SecurityProtocol, |
|
|
|
|
val saslMechanism: String |
|
|
|
|
) extends ControllerNodeProvider with Logging { |
|
|
|
|
) extends ControllerNodeProvider with Logging { |
|
|
|
|
val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> node).toMap |
|
|
|
|
|
|
|
|
|
override def get(): Option[Node] = { |
|
|
|
@ -133,7 +136,6 @@ object BrokerToControllerChannelManager {
@@ -133,7 +136,6 @@ object BrokerToControllerChannelManager {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
trait BrokerToControllerChannelManager { |
|
|
|
|
def start(): Unit |
|
|
|
|
def shutdown(): Unit |
|
|
|
@ -144,7 +146,6 @@ trait BrokerToControllerChannelManager {
@@ -144,7 +146,6 @@ trait BrokerToControllerChannelManager {
|
|
|
|
|
): Unit |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* This class manages the connection between a broker and the controller. It runs a single |
|
|
|
|
* [[BrokerToControllerRequestThread]] which uses the broker's metadata cache as its own metadata to find |
|
|
|
@ -250,13 +251,14 @@ class BrokerToControllerChannelManagerImpl(
@@ -250,13 +251,14 @@ class BrokerToControllerChannelManagerImpl(
|
|
|
|
|
)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
def controllerApiVersions(): Option[NodeApiVersions] = |
|
|
|
|
requestThread.activeControllerAddress().flatMap( |
|
|
|
|
activeController => if (activeController.id() == config.brokerId) |
|
|
|
|
def controllerApiVersions(): Option[NodeApiVersions] = { |
|
|
|
|
requestThread.activeControllerAddress().flatMap { activeController => |
|
|
|
|
if (activeController.id == config.brokerId) |
|
|
|
|
Some(currentNodeApiVersions) |
|
|
|
|
else |
|
|
|
|
Option(apiVersions.get(activeController.idString())) |
|
|
|
|
) |
|
|
|
|
Option(apiVersions.get(activeController.idString)) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
abstract class ControllerRequestCompletionHandler extends RequestCompletionHandler { |
|
|
|
@ -351,10 +353,10 @@ class BrokerToControllerRequestThread(
@@ -351,10 +353,10 @@ class BrokerToControllerRequestThread(
|
|
|
|
|
requestQueue.putFirst(queueItem) |
|
|
|
|
} else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) { |
|
|
|
|
// just close the controller connection and wait for metadata cache update in doWork |
|
|
|
|
activeControllerAddress().foreach { controllerAddress => { |
|
|
|
|
activeControllerAddress().foreach { controllerAddress => |
|
|
|
|
networkClient.disconnect(controllerAddress.idString) |
|
|
|
|
updateControllerAddress(null) |
|
|
|
|
}} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
requestQueue.putFirst(queueItem) |
|
|
|
|
} else { |
|
|
|
|