diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 8c1c5439b6a..d4c40690211 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -33,10 +33,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Random; import java.util.Set; @@ -54,14 +52,9 @@ public class NetworkClient implements KafkaClient { private final Selectable selector; private final MetadataUpdater metadataUpdater; - - /* a list of nodes we've connected to in the past */ - private final List nodesEverSeen; - private final Map nodesEverSeenById; - /* random offset into nodesEverSeen list */ private final Random randOffset; - + /* the state of each node's connection */ private final ClusterConnectionStates connectionStates; @@ -142,9 +135,6 @@ public class NetworkClient implements KafkaClient { this.correlation = 0; this.randOffset = new Random(); this.requestTimeoutMs = requestTimeoutMs; - this.nodesEverSeen = new ArrayList<>(); - this.nodesEverSeenById = new HashMap<>(); - this.time = time; } @@ -374,19 +364,6 @@ public class NetworkClient implements KafkaClient { found = node; } } - - // if we found no node in the current list, try one from the nodes seen before - if (found == null && nodesEverSeen.size() > 0) { - offset = randOffset.nextInt(nodesEverSeen.size()); - for (int i = 0; i < nodesEverSeen.size(); i++) { - int idx = (offset + i) % nodesEverSeen.size(); - Node node = nodesEverSeenById.get(nodesEverSeen.get(idx)); - log.debug("No node found. Trying previously-seen node with ID {}", node.id()); - if (!this.connectionStates.isBlackedOut(node.idString(), now)) { - found = node; - } - } - } return found; } @@ -596,28 +573,6 @@ public class NetworkClient implements KafkaClient { this.metadata.requestUpdate(); } - /* - * Keep track of any nodes we've ever seen. Add current - * alive nodes to this tracking list. - * @param nodes Current alive nodes - */ - private void updateNodesEverSeen(List nodes) { - for (Node n : nodes) { - Node existing = nodesEverSeenById.get(n.id()); - if (existing == null) { - nodesEverSeenById.put(n.id(), n); - log.debug("Adding node {} to nodes ever seen", n.id()); - nodesEverSeen.add(n.id()); - } else { - // check if the nodes are really equal. There could be a case - // where node.id() is the same but node has moved to different host - if (!existing.equals(n)) { - nodesEverSeenById.put(n.id(), n); - } - } - } - } - private void handleResponse(RequestHeader header, Struct body, long now) { this.metadataFetchInProgress = false; MetadataResponse response = new MetadataResponse(body); @@ -630,7 +585,6 @@ public class NetworkClient implements KafkaClient { // created which means we will get errors and no nodes until it exists if (cluster.nodes().size() > 0) { this.metadata.update(cluster, now); - this.updateNodesEverSeen(cluster.nodes()); } else { log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId()); this.metadata.failedUpdate(now);