Browse Source

KAFKA-3068: Remove retry with nodesEverSeen

ewencp ijuma if this looks good please merge when you can. Thanks.

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #823 from enothereska/kafka-3068-alt
pull/823/merge
Eno Thereska 9 years ago committed by Ewen Cheslack-Postava
parent
commit
c4f32c53ed
  1. 48
      clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

48
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

@ -33,10 +33,8 @@ import org.slf4j.LoggerFactory; @@ -33,10 +33,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@ -54,14 +52,9 @@ public class NetworkClient implements KafkaClient { @@ -54,14 +52,9 @@ public class NetworkClient implements KafkaClient {
private final Selectable selector;
private final MetadataUpdater metadataUpdater;
/* a list of nodes we've connected to in the past */
private final List<Integer> nodesEverSeen;
private final Map<Integer, Node> nodesEverSeenById;
/* random offset into nodesEverSeen list */
private final Random randOffset;
/* the state of each node's connection */
private final ClusterConnectionStates connectionStates;
@ -142,9 +135,6 @@ public class NetworkClient implements KafkaClient { @@ -142,9 +135,6 @@ public class NetworkClient implements KafkaClient {
this.correlation = 0;
this.randOffset = new Random();
this.requestTimeoutMs = requestTimeoutMs;
this.nodesEverSeen = new ArrayList<>();
this.nodesEverSeenById = new HashMap<>();
this.time = time;
}
@ -374,19 +364,6 @@ public class NetworkClient implements KafkaClient { @@ -374,19 +364,6 @@ public class NetworkClient implements KafkaClient {
found = node;
}
}
// if we found no node in the current list, try one from the nodes seen before
if (found == null && nodesEverSeen.size() > 0) {
offset = randOffset.nextInt(nodesEverSeen.size());
for (int i = 0; i < nodesEverSeen.size(); i++) {
int idx = (offset + i) % nodesEverSeen.size();
Node node = nodesEverSeenById.get(nodesEverSeen.get(idx));
log.debug("No node found. Trying previously-seen node with ID {}", node.id());
if (!this.connectionStates.isBlackedOut(node.idString(), now)) {
found = node;
}
}
}
return found;
}
@ -596,28 +573,6 @@ public class NetworkClient implements KafkaClient { @@ -596,28 +573,6 @@ public class NetworkClient implements KafkaClient {
this.metadata.requestUpdate();
}
/*
* Keep track of any nodes we've ever seen. Add current
* alive nodes to this tracking list.
* @param nodes Current alive nodes
*/
private void updateNodesEverSeen(List<Node> nodes) {
for (Node n : nodes) {
Node existing = nodesEverSeenById.get(n.id());
if (existing == null) {
nodesEverSeenById.put(n.id(), n);
log.debug("Adding node {} to nodes ever seen", n.id());
nodesEverSeen.add(n.id());
} else {
// check if the nodes are really equal. There could be a case
// where node.id() is the same but node has moved to different host
if (!existing.equals(n)) {
nodesEverSeenById.put(n.id(), n);
}
}
}
}
private void handleResponse(RequestHeader header, Struct body, long now) {
this.metadataFetchInProgress = false;
MetadataResponse response = new MetadataResponse(body);
@ -630,7 +585,6 @@ public class NetworkClient implements KafkaClient { @@ -630,7 +585,6 @@ public class NetworkClient implements KafkaClient {
// created which means we will get errors and no nodes until it exists
if (cluster.nodes().size() > 0) {
this.metadata.update(cluster, now);
this.updateNodesEverSeen(cluster.nodes());
} else {
log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
this.metadata.failedUpdate(now);

Loading…
Cancel
Save