|
|
|
@ -14,6 +14,7 @@ package org.apache.kafka.clients.producer.internals;
@@ -14,6 +14,7 @@ package org.apache.kafka.clients.producer.internals;
|
|
|
|
|
|
|
|
|
|
import java.util.ArrayList; |
|
|
|
|
import java.util.HashMap; |
|
|
|
|
import java.util.Iterator; |
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.Map; |
|
|
|
|
import java.util.Set; |
|
|
|
@ -146,9 +147,11 @@ public class Sender implements Runnable {
@@ -146,9 +147,11 @@ public class Sender implements Runnable {
|
|
|
|
|
Set<Node> ready = this.accumulator.ready(cluster, now); |
|
|
|
|
|
|
|
|
|
// remove any nodes we aren't ready to send to
|
|
|
|
|
for (Node node : ready) { |
|
|
|
|
Iterator<Node> iter = ready.iterator(); |
|
|
|
|
while (iter.hasNext()) { |
|
|
|
|
Node node = iter.next(); |
|
|
|
|
if (!this.client.ready(node, now)) |
|
|
|
|
ready.remove(node); |
|
|
|
|
iter.remove(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// create produce requests
|
|
|
|
|