Browse Source

KAFKA-12193: Re-resolve IPs after a client disconnects (#9902) (#10067) (#10108)

This patch changes the NetworkClient behavior to resolve the target node's hostname after disconnecting from an established connection, rather than waiting until the previously-resolved addresses are exhausted. This is to handle the scenario when the node's IP addresses have changed during the lifetime of the connection, and means that the client does not have to try to connect to invalid IP addresses until it has tried each address.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Satish Duggana <satishd@apache.org>, David Jacot <djacot@confluent.io>
2.1
Bob Barrett 4 years ago committed by GitHub
parent
commit
9216968ef6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
  2. 27
      clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
  3. 29
      clients/src/main/java/org/apache/kafka/clients/DefaultHostResolver.java
  4. 26
      clients/src/main/java/org/apache/kafka/clients/HostResolver.java
  5. 73
      clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
  6. 49
      clients/src/test/java/org/apache/kafka/clients/AddressChangeHostResolver.java
  7. 7
      clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
  8. 70
      clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
  9. 157
      clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
  10. 11
      clients/src/test/java/org/apache/kafka/test/MockSelector.java

12
clients/src/main/java/org/apache/kafka/clients/ClientUtils.java

@ -96,8 +96,9 @@ public final class ClientUtils { @@ -96,8 +96,9 @@ public final class ClientUtils {
clientSaslMechanism, true);
}
static List<InetAddress> resolve(String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException {
InetAddress[] addresses = InetAddress.getAllByName(host);
static List<InetAddress> resolve(String host, ClientDnsLookup clientDnsLookup,
HostResolver hostResolver) throws UnknownHostException {
InetAddress[] addresses = hostResolver.resolve(host);
if (ClientDnsLookup.USE_ALL_DNS_IPS == clientDnsLookup) {
return filterPreferredAddresses(addresses);
} else {
@ -105,6 +106,13 @@ public final class ClientUtils { @@ -105,6 +106,13 @@ public final class ClientUtils {
}
}
/**
* Return a list containing the first address in `allAddresses` and subsequent addresses
* that are a subtype of the first address.
*
* The outcome is that all returned addresses are either IPv4 or IPv6 (InetAddress has two
* subclasses: Inet4Address and Inet6Address).
*/
static List<InetAddress> filterPreferredAddresses(InetAddress[] allAddresses) {
List<InetAddress> preferredAddresses = new ArrayList<>();
Class<? extends InetAddress> clazz = null;

27
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java

@ -40,13 +40,16 @@ final class ClusterConnectionStates { @@ -40,13 +40,16 @@ final class ClusterConnectionStates {
private final double reconnectBackoffMaxExp;
private final Map<String, NodeConnectionState> nodeState;
private final Logger log;
private final HostResolver hostResolver;
public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs, LogContext logContext) {
public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs,
LogContext logContext, HostResolver hostResolver) {
this.log = logContext.logger(ClusterConnectionStates.class);
this.reconnectBackoffInitMs = reconnectBackoffMs;
this.reconnectBackoffMaxMs = reconnectBackoffMaxMs;
this.reconnectBackoffMaxExp = Math.log(this.reconnectBackoffMaxMs / (double) Math.max(reconnectBackoffMs, 1)) / Math.log(RECONNECT_BACKOFF_EXP_BASE);
this.nodeState = new HashMap<>();
this.hostResolver = hostResolver;
}
/**
@ -130,7 +133,7 @@ final class ClusterConnectionStates { @@ -130,7 +133,7 @@ final class ClusterConnectionStates {
// Create a new NodeConnectionState if nodeState does not already contain one
// for the specified id or if the hostname associated with the node id changed.
nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now,
this.reconnectBackoffInitMs, host, clientDnsLookup));
this.reconnectBackoffInitMs, host, clientDnsLookup, hostResolver));
}
/**
@ -149,9 +152,14 @@ final class ClusterConnectionStates { @@ -149,9 +152,14 @@ final class ClusterConnectionStates {
*/
public void disconnected(String id, long now) {
NodeConnectionState nodeState = nodeState(id);
nodeState.state = ConnectionState.DISCONNECTED;
nodeState.lastConnectAttemptMs = now;
updateReconnectBackoff(nodeState);
if (nodeState.state.isConnected()) {
// If a connection had previously been established, clear the addresses to trigger a new DNS resolution
// because the node IPs may have changed
nodeState.clearAddresses();
}
nodeState.state = ConnectionState.DISCONNECTED;
}
/**
@ -364,9 +372,10 @@ final class ClusterConnectionStates { @@ -364,9 +372,10 @@ final class ClusterConnectionStates {
private int addressIndex;
private final String host;
private final ClientDnsLookup clientDnsLookup;
private final HostResolver hostResolver;
private NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs,
String host, ClientDnsLookup clientDnsLookup) {
String host, ClientDnsLookup clientDnsLookup, HostResolver hostResolver) {
this.state = state;
this.addresses = Collections.emptyList();
this.addressIndex = -1;
@ -377,6 +386,7 @@ final class ClusterConnectionStates { @@ -377,6 +386,7 @@ final class ClusterConnectionStates {
this.throttleUntilTimeMs = 0;
this.host = host;
this.clientDnsLookup = clientDnsLookup;
this.hostResolver = hostResolver;
}
public String host() {
@ -391,7 +401,7 @@ final class ClusterConnectionStates { @@ -391,7 +401,7 @@ final class ClusterConnectionStates {
private InetAddress currentAddress() throws UnknownHostException {
if (addresses.isEmpty()) {
// (Re-)initialize list
addresses = ClientUtils.resolve(host, clientDnsLookup);
addresses = ClientUtils.resolve(host, clientDnsLookup, hostResolver);
addressIndex = 0;
}
@ -411,6 +421,13 @@ final class ClusterConnectionStates { @@ -411,6 +421,13 @@ final class ClusterConnectionStates {
addresses = Collections.emptyList(); // Exhausted list. Re-resolve on next currentAddress() call
}
/**
* Clears the resolved addresses in order to trigger re-resolving on the next {@link #currentAddress()} call.
*/
private void clearAddresses() {
addresses = Collections.emptyList();
}
public String toString() {
return "NodeState(" + state + ", " + lastConnectAttemptMs + ", " + failedAttempts + ", " + throttleUntilTimeMs + ")";
}

29
clients/src/main/java/org/apache/kafka/clients/DefaultHostResolver.java

@ -0,0 +1,29 @@ @@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class DefaultHostResolver implements HostResolver {
@Override
public InetAddress[] resolve(String host) throws UnknownHostException {
return InetAddress.getAllByName(host);
}
}

26
clients/src/main/java/org/apache/kafka/clients/HostResolver.java

@ -0,0 +1,26 @@ @@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;
import java.net.InetAddress;
import java.net.UnknownHostException;
public interface HostResolver {
InetAddress[] resolve(String host) throws UnknownHostException;
}

73
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

@ -138,9 +138,8 @@ public class NetworkClient implements KafkaClient { @@ -138,9 +138,8 @@ public class NetworkClient implements KafkaClient {
boolean discoverBrokerVersions,
ApiVersions apiVersions,
LogContext logContext) {
this(null,
this(selector,
metadata,
selector,
clientId,
maxInFlightRequestsPerConnection,
reconnectBackoffMs,
@ -157,20 +156,20 @@ public class NetworkClient implements KafkaClient { @@ -157,20 +156,20 @@ public class NetworkClient implements KafkaClient {
}
public NetworkClient(Selectable selector,
Metadata metadata,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext) {
Metadata metadata,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext) {
this(null,
metadata,
selector,
@ -186,7 +185,8 @@ public class NetworkClient implements KafkaClient { @@ -186,7 +185,8 @@ public class NetworkClient implements KafkaClient {
discoverBrokerVersions,
apiVersions,
throttleTimeSensor,
logContext);
logContext,
new DefaultHostResolver());
}
public NetworkClient(Selectable selector,
@ -218,25 +218,27 @@ public class NetworkClient implements KafkaClient { @@ -218,25 +218,27 @@ public class NetworkClient implements KafkaClient {
discoverBrokerVersions,
apiVersions,
null,
logContext);
logContext,
new DefaultHostResolver());
}
private NetworkClient(MetadataUpdater metadataUpdater,
Metadata metadata,
Selectable selector,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext) {
public NetworkClient(MetadataUpdater metadataUpdater,
Metadata metadata,
Selectable selector,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext,
HostResolver hostResolver) {
/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
* possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
* super constructor is invoked.
@ -251,7 +253,8 @@ public class NetworkClient implements KafkaClient { @@ -251,7 +253,8 @@ public class NetworkClient implements KafkaClient {
this.selector = selector;
this.clientId = clientId;
this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax, logContext);
this.connectionStates = new ClusterConnectionStates(
reconnectBackoffMs, reconnectBackoffMax, logContext, hostResolver);
this.socketSendBuffer = socketSendBuffer;
this.socketReceiveBuffer = socketReceiveBuffer;
this.correlation = 0;

49
clients/src/test/java/org/apache/kafka/clients/AddressChangeHostResolver.java

@ -0,0 +1,49 @@ @@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;
import java.net.InetAddress;
class AddressChangeHostResolver implements HostResolver {
private boolean useNewAddresses;
private InetAddress[] initialAddresses;
private InetAddress[] newAddresses;
private int resolutionCount = 0;
public AddressChangeHostResolver(InetAddress[] initialAddresses, InetAddress[] newAddresses) {
this.initialAddresses = initialAddresses;
this.newAddresses = newAddresses;
}
@Override
public InetAddress[] resolve(String host) {
++resolutionCount;
return useNewAddresses ? newAddresses : initialAddresses;
}
public void changeAddresses() {
useNewAddresses = true;
}
public boolean useNewAddresses() {
return useNewAddresses;
}
public int resolutionCount() {
return resolutionCount;
}
}

7
clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java

@ -31,6 +31,7 @@ import java.util.stream.Collectors; @@ -31,6 +31,7 @@ import java.util.stream.Collectors;
public class ClientUtilsTest {
private HostResolver hostResolver = new DefaultHostResolver();
@Test
public void testParseAndValidateAddresses() throws UnknownHostException {
@ -98,17 +99,17 @@ public class ClientUtilsTest { @@ -98,17 +99,17 @@ public class ClientUtilsTest {
@Test(expected = UnknownHostException.class)
public void testResolveUnknownHostException() throws UnknownHostException {
ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.DEFAULT);
ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.DEFAULT, hostResolver);
}
@Test
public void testResolveDnsLookup() throws UnknownHostException {
assertEquals(1, ClientUtils.resolve("localhost", ClientDnsLookup.DEFAULT).size());
assertEquals(1, ClientUtils.resolve("localhost", ClientDnsLookup.DEFAULT, hostResolver).size());
}
@Test
public void testResolveDnsLookupAllIps() throws UnknownHostException {
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1);
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS, hostResolver).size() > 1);
}
private List<InetSocketAddress> checkWithoutLookup(String... url) {

70
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java

@ -23,12 +23,13 @@ import static org.junit.Assert.assertNotSame; @@ -23,12 +23,13 @@ import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@ -37,19 +38,47 @@ import org.junit.Test; @@ -37,19 +38,47 @@ import org.junit.Test;
public class ClusterConnectionStatesTest {
private static ArrayList<InetAddress> initialAddresses;
private static ArrayList<InetAddress> newAddresses;
static {
try {
initialAddresses = new ArrayList<>(Arrays.asList(
InetAddress.getByName("10.200.20.100"),
InetAddress.getByName("10.200.20.101"),
InetAddress.getByName("10.200.20.102")
));
newAddresses = new ArrayList<>(Arrays.asList(
InetAddress.getByName("10.200.20.103"),
InetAddress.getByName("10.200.20.104"),
InetAddress.getByName("10.200.20.105")
));
} catch (UnknownHostException e) {
fail("Attempted to create an invalid InetAddress, this should not happen");
}
}
private final MockTime time = new MockTime();
private final long reconnectBackoffMs = 10 * 1000;
private final long reconnectBackoffMax = 60 * 1000;
private final double reconnectBackoffJitter = 0.2;
private final String nodeId1 = "1001";
private final String nodeId2 = "2002";
private final String hostTwoIps = "kafka.apache.org";
private final String nodeId3 = "3003";
private final String hostTwoIps = "multiple.ip.address";
private ClusterConnectionStates connectionStates;
// For testing nodes with a single IP address, use localhost and default DNS resolution
private DefaultHostResolver singleIPHostResolver = new DefaultHostResolver();
// For testing nodes with multiple IP addresses, mock DNS resolution to get consistent results
private AddressChangeHostResolver multipleIPHostResolver = new AddressChangeHostResolver(
initialAddresses.toArray(new InetAddress[0]), newAddresses.toArray(new InetAddress[0]));;
@Before
public void setup() {
this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax, new LogContext());
this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax,
new LogContext(), this.singleIPHostResolver);
}
@Test
@ -246,7 +275,7 @@ public class ClusterConnectionStatesTest { @@ -246,7 +275,7 @@ public class ClusterConnectionStatesTest {
@Test
public void testSingleIPWithUseAll() throws UnknownHostException {
assertEquals(1, ClientUtils.resolve("localhost", ClientDnsLookup.USE_ALL_DNS_IPS).size());
assertEquals(1, ClientUtils.resolve("localhost", ClientDnsLookup.USE_ALL_DNS_IPS, singleIPHostResolver).size());
connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.USE_ALL_DNS_IPS);
InetAddress currAddress = connectionStates.currentAddress(nodeId1);
@ -256,7 +285,9 @@ public class ClusterConnectionStatesTest { @@ -256,7 +285,9 @@ public class ClusterConnectionStatesTest {
@Test
public void testMultipleIPsWithDefault() throws UnknownHostException {
assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1);
setupMultipleIPs();
assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS, multipleIPHostResolver).size() > 1);
connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT);
InetAddress currAddress = connectionStates.currentAddress(nodeId1);
@ -266,7 +297,9 @@ public class ClusterConnectionStatesTest { @@ -266,7 +297,9 @@ public class ClusterConnectionStatesTest {
@Test
public void testMultipleIPsWithUseAll() throws UnknownHostException {
assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1);
setupMultipleIPs();
assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS, multipleIPHostResolver).size() > 1);
connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS);
InetAddress addr1 = connectionStates.currentAddress(nodeId1);
@ -280,19 +313,14 @@ public class ClusterConnectionStatesTest { @@ -280,19 +313,14 @@ public class ClusterConnectionStatesTest {
@Test
public void testHostResolveChange() throws UnknownHostException, ReflectiveOperationException {
assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1);
setupMultipleIPs();
assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS, multipleIPHostResolver).size() > 1);
connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT);
InetAddress addr1 = connectionStates.currentAddress(nodeId1);
// reflection to simulate host change in DNS lookup
Method nodeStateMethod = connectionStates.getClass().getDeclaredMethod("nodeState", String.class);
nodeStateMethod.setAccessible(true);
Object nodeState = nodeStateMethod.invoke(connectionStates, nodeId1);
Field hostField = nodeState.getClass().getDeclaredField("host");
hostField.setAccessible(true);
hostField.set(nodeState, "localhost");
multipleIPHostResolver.changeAddresses();
connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
InetAddress addr2 = connectionStates.currentAddress(nodeId1);
@ -301,12 +329,20 @@ public class ClusterConnectionStatesTest { @@ -301,12 +329,20 @@ public class ClusterConnectionStatesTest {
@Test
public void testNodeWithNewHostname() throws UnknownHostException {
setupMultipleIPs();
connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
InetAddress addr1 = connectionStates.currentAddress(nodeId1);
this.multipleIPHostResolver.changeAddresses();
connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT);
InetAddress addr2 = connectionStates.currentAddress(nodeId1);
assertNotSame(addr1, addr2);
}
private void setupMultipleIPs() {
this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax,
new LogContext(), this.multipleIPHostResolver);
}
}

157
clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java

@ -38,17 +38,21 @@ import org.apache.kafka.test.TestUtils; @@ -38,17 +38,21 @@ import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class NetworkClientTest {
@ -67,6 +71,26 @@ public class NetworkClientTest { @@ -67,6 +71,26 @@ public class NetworkClientTest {
private final NetworkClient clientWithStaticNodes = createNetworkClientWithStaticNodes();
private final NetworkClient clientWithNoVersionDiscovery = createNetworkClientWithNoVersionDiscovery();
private static ArrayList<InetAddress> initialAddresses;
private static ArrayList<InetAddress> newAddresses;
static {
try {
initialAddresses = new ArrayList<>(Arrays.asList(
InetAddress.getByName("10.200.20.100"),
InetAddress.getByName("10.200.20.101"),
InetAddress.getByName("10.200.20.102")
));
newAddresses = new ArrayList<>(Arrays.asList(
InetAddress.getByName("10.200.20.103"),
InetAddress.getByName("10.200.20.104"),
InetAddress.getByName("10.200.20.105")
));
} catch (UnknownHostException e) {
fail("Attempted to create an invalid InetAddress, this should not happen");
}
}
private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) {
return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024,
@ -535,6 +559,139 @@ public class NetworkClientTest { @@ -535,6 +559,139 @@ public class NetworkClientTest {
assertTrue(client.canConnect(node, time.milliseconds()));
}
@Test
public void testReconnectAfterAddressChange() {
AddressChangeHostResolver mockHostResolver = new AddressChangeHostResolver(
initialAddresses.toArray(new InetAddress[0]), newAddresses.toArray(new InetAddress[0]));
AtomicInteger initialAddressConns = new AtomicInteger();
AtomicInteger newAddressConns = new AtomicInteger();
MockSelector selector = new MockSelector(this.time, inetSocketAddress -> {
InetAddress inetAddress = inetSocketAddress.getAddress();
if (initialAddresses.contains(inetAddress)) {
initialAddressConns.incrementAndGet();
} else if (newAddresses.contains(inetAddress)) {
newAddressConns.incrementAndGet();
}
return (mockHostResolver.useNewAddresses() && newAddresses.contains(inetAddress)) ||
(!mockHostResolver.useNewAddresses() && initialAddresses.contains(inetAddress));
});
NetworkClient client = new NetworkClient(new ManualMetadataUpdater(Arrays.asList(node)), null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, ClientDnsLookup.USE_ALL_DNS_IPS, time, false, new ApiVersions(),
null, new LogContext(), mockHostResolver);
// Connect to one the initial addresses, then change the addresses and disconnect
client.ready(node, time.milliseconds());
client.poll(0, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()));
mockHostResolver.changeAddresses();
selector.serverDisconnect(node.idString());
client.poll(0, time.milliseconds());
assertFalse(client.isReady(node, time.milliseconds()));
time.sleep(reconnectBackoffMaxMsTest);
client.ready(node, time.milliseconds());
client.poll(0, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()));
// We should have tried to connect to one initial address and one new address, and resolved DNS twice
assertEquals(1, initialAddressConns.get());
assertEquals(1, newAddressConns.get());
assertEquals(2, mockHostResolver.resolutionCount());
}
@Test
public void testFailedConnectionToFirstAddress() {
AddressChangeHostResolver mockHostResolver = new AddressChangeHostResolver(
initialAddresses.toArray(new InetAddress[0]), newAddresses.toArray(new InetAddress[0]));
AtomicInteger initialAddressConns = new AtomicInteger();
AtomicInteger newAddressConns = new AtomicInteger();
MockSelector selector = new MockSelector(this.time, inetSocketAddress -> {
InetAddress inetAddress = inetSocketAddress.getAddress();
if (initialAddresses.contains(inetAddress)) {
initialAddressConns.incrementAndGet();
} else if (newAddresses.contains(inetAddress)) {
newAddressConns.incrementAndGet();
}
// Refuse first connection attempt
return initialAddressConns.get() > 1;
});
NetworkClient client = new NetworkClient(new ManualMetadataUpdater(Arrays.asList(node)), null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, ClientDnsLookup.USE_ALL_DNS_IPS, time, false, new ApiVersions(),
null, new LogContext(), mockHostResolver);
// First connection attempt should fail
client.ready(node, time.milliseconds());
// Simulate a failed connection (this will process the state change without the connection having been established)
selector.serverDisconnect(node.idString());
client.poll(0, time.milliseconds());
assertFalse(client.isReady(node, time.milliseconds()));
// Second connection attempt should succeed
time.sleep(reconnectBackoffMaxMsTest);
client.ready(node, time.milliseconds());
client.poll(0, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()));
// We should have tried to connect to two of the initial addresses, none of the new address, and should
// only have resolved DNS once
assertEquals(2, initialAddressConns.get());
assertEquals(0, newAddressConns.get());
assertEquals(1, mockHostResolver.resolutionCount());
}
@Test
public void testFailedConnectionToFirstAddressAfterReconnect() {
AddressChangeHostResolver mockHostResolver = new AddressChangeHostResolver(
initialAddresses.toArray(new InetAddress[0]), newAddresses.toArray(new InetAddress[0]));
AtomicInteger initialAddressConns = new AtomicInteger();
AtomicInteger newAddressConns = new AtomicInteger();
MockSelector selector = new MockSelector(this.time, inetSocketAddress -> {
InetAddress inetAddress = inetSocketAddress.getAddress();
if (initialAddresses.contains(inetAddress)) {
initialAddressConns.incrementAndGet();
} else if (newAddresses.contains(inetAddress)) {
newAddressConns.incrementAndGet();
}
// Refuse first connection attempt to the new addresses
return initialAddresses.contains(inetAddress) || newAddressConns.get() > 1;
});
NetworkClient client = new NetworkClient(new ManualMetadataUpdater(Arrays.asList(node)), null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, ClientDnsLookup.USE_ALL_DNS_IPS, time, false, new ApiVersions(),
null, new LogContext(), mockHostResolver);
// Connect to one the initial addresses, then change the addresses and disconnect
client.ready(node, time.milliseconds());
client.poll(0, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()));
mockHostResolver.changeAddresses();
selector.serverDisconnect(node.idString());
client.poll(0, time.milliseconds());
assertFalse(client.isReady(node, time.milliseconds()));
// First connection attempt to new addresses should fail
time.sleep(reconnectBackoffMaxMsTest);
client.ready(node, time.milliseconds());
client.poll(0, time.milliseconds());
assertFalse(client.isReady(node, time.milliseconds()));
// Second connection attempt to new addresses should succeed
time.sleep(reconnectBackoffMaxMsTest);
client.ready(node, time.milliseconds());
client.poll(0, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()));
// We should have tried to connect to one of the initial addresses and two of the new addresses (the first one
// failed), and resolved DNS twice, once for each set of addresses
assertEquals(1, initialAddressConns.get());
assertEquals(2, newAddressConns.get());
assertEquals(2, mockHostResolver.resolutionCount());
}
private void awaitInFlightApiVersionRequest() throws Exception {
client.ready(node, time.milliseconds());
TestUtils.waitForCondition(new TestCondition() {

11
clients/src/test/java/org/apache/kafka/test/MockSelector.java

@ -32,6 +32,7 @@ import java.util.HashMap; @@ -32,6 +32,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
/**
* A fake selector to use for testing
@ -45,14 +46,22 @@ public class MockSelector implements Selectable { @@ -45,14 +46,22 @@ public class MockSelector implements Selectable {
private final Map<String, ChannelState> disconnected = new HashMap<>();
private final List<String> connected = new ArrayList<>();
private final List<DelayedReceive> delayedReceives = new ArrayList<>();
private final Predicate<InetSocketAddress> canConnect;
public MockSelector(Time time) {
this(time, null);
}
public MockSelector(Time time, Predicate<InetSocketAddress> canConnect) {
this.time = time;
this.canConnect = canConnect;
}
@Override
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
this.connected.add(id);
if (canConnect == null || canConnect.test(address)) {
this.connected.add(id);
}
}
@Override

Loading…
Cancel
Save