Browse Source

Fix for KAFKA-7974: Avoid zombie AdminClient when node host isn't resolvable (#6305)

* Fix for KAFKA-7974: Avoid calling disconnect() when not connecting

* Resolve host only when currentAddress() is called

Moves away from automatically resolving the host when the connection entry is constructed, which can leave ClusterConnectionStates in a confused state.
Instead, resolution is done on demand, ensuring that the entry in the connection list is present even if the resolution failed.

* Add Javadoc to ClusterConnectionStates.connecting()
pull/6390/head
Nicholas Parker 6 years ago committed by Colin Patrick McCabe
parent
commit
7f6bf95c1e
  1. 68
      clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
  2. 10
      clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
  3. 14
      clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
  4. 8
      clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java

68
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java

@ -24,6 +24,7 @@ import org.slf4j.Logger; @@ -24,6 +24,7 @@ import org.slf4j.Logger;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -108,16 +109,18 @@ final class ClusterConnectionStates { @@ -108,16 +109,18 @@ final class ClusterConnectionStates {
}
/**
* Enter the connecting state for the given connection.
* Enter the connecting state for the given connection, moving to a new resolved address if necessary.
* @param id the id of the connection
* @param now the current time
* @throws UnknownHostException
* @param now the current time in ms
* @param host the host of the connection, to be resolved internally if needed
* @param clientDnsLookup the mode of DNS lookup to use when resolving the {@code host}
*/
public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException {
public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) {
NodeConnectionState connectionState = nodeState.get(id);
if (connectionState != null && connectionState.host().equals(host)) {
connectionState.lastConnectAttemptMs = now;
connectionState.state = ConnectionState.CONNECTING;
// Move to next resolved address, or if addresses are exhausted, mark node to be re-resolved
connectionState.moveToNextAddress();
return;
} else if (connectionState != null) {
@ -130,14 +133,19 @@ final class ClusterConnectionStates { @@ -130,14 +133,19 @@ final class ClusterConnectionStates {
this.reconnectBackoffInitMs, host, clientDnsLookup));
}
public InetAddress currentAddress(String id) {
return nodeState.get(id).currentAddress();
/**
* Returns a resolved address for the given connection, resolving it if necessary.
* @param id the id of the connection
* @throws UnknownHostException if the address was not resolvable
*/
public InetAddress currentAddress(String id) throws UnknownHostException {
return nodeState(id).currentAddress();
}
/**
* Enter the disconnected state for the given node.
* @param id the connection we have disconnected
* @param now the current time
* @param now the current time in ms
*/
public void disconnected(String id, long now) {
NodeConnectionState nodeState = nodeState(id);
@ -212,7 +220,7 @@ final class ClusterConnectionStates { @@ -212,7 +220,7 @@ final class ClusterConnectionStates {
/**
* Enter the authentication failed state for the given node.
* @param id the connection identifier
* @param now the current time
* @param now the current time in ms
* @param exception the authentication exception
*/
public void authenticationFailed(String id, long now, AuthenticationException exception) {
@ -227,7 +235,7 @@ final class ClusterConnectionStates { @@ -227,7 +235,7 @@ final class ClusterConnectionStates {
* Return true if the connection is in the READY state and currently not throttled.
*
* @param id the connection identifier
* @param now the current time
* @param now the current time in ms
*/
public boolean isReady(String id, long now) {
return isReady(nodeState.get(id), now);
@ -241,7 +249,7 @@ final class ClusterConnectionStates { @@ -241,7 +249,7 @@ final class ClusterConnectionStates {
* Return true if there is at least one node with connection in the READY state and not throttled. Returns false
* otherwise.
*
* @param now the current time
* @param now the current time in ms
*/
public boolean hasReadyNodes(long now) {
for (Map.Entry<String, NodeConnectionState> entry : nodeState.entrySet()) {
@ -353,14 +361,15 @@ final class ClusterConnectionStates { @@ -353,14 +361,15 @@ final class ClusterConnectionStates {
// Connection is being throttled if current time < throttleUntilTimeMs.
long throttleUntilTimeMs;
private List<InetAddress> addresses;
private int index = 0;
private int addressIndex;
private final String host;
private final ClientDnsLookup clientDnsLookup;
public NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs,
String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException {
private NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs,
String host, ClientDnsLookup clientDnsLookup) {
this.state = state;
this.addresses = ClientUtils.resolve(host, clientDnsLookup);
this.addresses = Collections.emptyList();
this.addressIndex = -1;
this.authenticationException = null;
this.lastConnectAttemptMs = lastConnectAttempt;
this.failedAttempts = 0;
@ -374,17 +383,32 @@ final class ClusterConnectionStates { @@ -374,17 +383,32 @@ final class ClusterConnectionStates {
return host;
}
public InetAddress currentAddress() {
return addresses.get(index);
/**
* Fetches the current selected IP address for this node, resolving {@link #host()} if necessary.
* @return the selected address
* @throws UnknownHostException if resolving {@link #host()} fails
*/
private InetAddress currentAddress() throws UnknownHostException {
if (addresses.isEmpty()) {
// (Re-)initialize list
addresses = ClientUtils.resolve(host, clientDnsLookup);
addressIndex = 0;
}
return addresses.get(addressIndex);
}
/*
* implementing a ring buffer with the addresses
/**
* Jumps to the next available resolved address for this node. If no other addresses are available, marks the
* list to be refreshed on the next {@link #currentAddress()} call.
*/
public void moveToNextAddress() throws UnknownHostException {
index = (index + 1) % addresses.size();
if (index == 0)
addresses = ClientUtils.resolve(host, clientDnsLookup);
private void moveToNextAddress() {
if (addresses.isEmpty())
return; // Avoid div0. List will initialize on next currentAddress() call
addressIndex = (addressIndex + 1) % addresses.size();
if (addressIndex == 0)
addresses = Collections.emptyList(); // Exhausted list. Re-resolve on next currentAddress() call
}
public String toString() {

10
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

@ -697,7 +697,7 @@ public class NetworkClient implements KafkaClient { @@ -697,7 +697,7 @@ public class NetworkClient implements KafkaClient {
* @param responses The list of responses to update
* @param nodeId Id of the node to be disconnected
* @param now The current time
* @param disconnectState The state of the disconnected channel
* @param disconnectState The state of the disconnected channel
*/
private void processDisconnection(List<ClientResponse> responses,
String nodeId,
@ -910,23 +910,25 @@ public class NetworkClient implements KafkaClient { @@ -910,23 +910,25 @@ public class NetworkClient implements KafkaClient {
/**
* Initiate a connection to the given node
* @param node the node to connect to
* @param now current time in epoch milliseconds
*/
private void initiateConnect(Node node, long now) {
String nodeConnectionId = node.idString();
try {
this.connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
InetAddress address = this.connectionStates.currentAddress(nodeConnectionId);
connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
InetAddress address = connectionStates.currentAddress(nodeConnectionId);
log.debug("Initiating connection to node {} using address {}", node, address);
selector.connect(nodeConnectionId,
new InetSocketAddress(address, node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
} catch (IOException e) {
log.warn("Error connecting to node {}", node, e);
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnected(nodeConnectionId, now);
/* maybe the problem is our metadata, update it */
metadataUpdater.requestUpdate();
log.warn("Error connecting to node {}", node, e);
}
}

14
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java

@ -53,7 +53,7 @@ public class ClusterConnectionStatesTest { @@ -53,7 +53,7 @@ public class ClusterConnectionStatesTest {
}
@Test
public void testClusterConnectionStateChanges() throws UnknownHostException {
public void testClusterConnectionStateChanges() {
assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
// Start connecting to Node and check state
@ -97,7 +97,7 @@ public class ClusterConnectionStatesTest { @@ -97,7 +97,7 @@ public class ClusterConnectionStatesTest {
}
@Test
public void testMultipleNodeConnectionStates() throws UnknownHostException {
public void testMultipleNodeConnectionStates() {
// Check initial state, allowed to connect to all nodes, but no nodes shown as ready
assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
assertTrue(connectionStates.canConnect(nodeId2, time.milliseconds()));
@ -135,7 +135,7 @@ public class ClusterConnectionStatesTest { @@ -135,7 +135,7 @@ public class ClusterConnectionStatesTest {
}
@Test
public void testAuthorizationFailed() throws UnknownHostException {
public void testAuthorizationFailed() {
// Try connecting
connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
@ -156,7 +156,7 @@ public class ClusterConnectionStatesTest { @@ -156,7 +156,7 @@ public class ClusterConnectionStatesTest {
}
@Test
public void testRemoveNode() throws UnknownHostException {
public void testRemoveNode() {
connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
time.sleep(1000);
connectionStates.ready(nodeId1);
@ -171,7 +171,7 @@ public class ClusterConnectionStatesTest { @@ -171,7 +171,7 @@ public class ClusterConnectionStatesTest {
}
@Test
public void testMaxReconnectBackoff() throws UnknownHostException {
public void testMaxReconnectBackoff() {
long effectiveMaxReconnectBackoff = Math.round(reconnectBackoffMax * (1 + reconnectBackoffJitter));
connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
time.sleep(1000);
@ -191,7 +191,7 @@ public class ClusterConnectionStatesTest { @@ -191,7 +191,7 @@ public class ClusterConnectionStatesTest {
}
@Test
public void testExponentialReconnectBackoff() throws UnknownHostException {
public void testExponentialReconnectBackoff() {
// Calculate fixed components for backoff process
final int reconnectBackoffExpBase = 2;
double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1))
@ -211,7 +211,7 @@ public class ClusterConnectionStatesTest { @@ -211,7 +211,7 @@ public class ClusterConnectionStatesTest {
}
@Test
public void testThrottled() throws UnknownHostException {
public void testThrottled() {
connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
time.sleep(1000);
connectionStates.ready(nodeId1);

8
clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java

@ -82,7 +82,7 @@ public class NetworkClientTest { @@ -82,7 +82,7 @@ public class NetworkClientTest {
private NetworkClient createNetworkClientWithNoVersionDiscovery() {
return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
ClientDnsLookup.DEFAULT, time, false, new ApiVersions(), new LogContext());
}
@ -116,6 +116,12 @@ public class NetworkClientTest { @@ -116,6 +116,12 @@ public class NetworkClientTest {
checkSimpleRequestResponse(clientWithNoVersionDiscovery);
}
@Test
public void testDnsLookupFailure() {
/* Fail cleanly when the node has a bad hostname */
assertFalse(client.ready(new Node(1234, "badhost", 1234), time.milliseconds()));
}
@Test
public void testClose() {
client.ready(node, time.milliseconds());

Loading…
Cancel
Save