From d0718718aecdb9030205fde30846b0b6481c762b Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 14 Feb 2019 17:59:36 +0000 Subject: [PATCH] MINOR: Make MockClient#poll() more thread-safe (#5942) It used to preallocate an array of responses and then complete each response from the original collection sequentially. The problem was that the original collection could have been modified (another thread completing the response) while this was hapenning --- .../src/test/java/org/apache/kafka/clients/MockClient.java | 3 ++- .../apache/kafka/clients/admin/AdminClientUnitTestEnv.java | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 0dd09d2b4d8..0dd42f8fac3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -276,7 +276,6 @@ public class MockClient implements KafkaClient { maybeAwaitWakeup(); checkTimeoutOfPendingRequests(now); - List copy = new ArrayList<>(this.responses); // We skip metadata updates if all nodes are currently blacked out if (metadataUpdater.isUpdateNeeded() && leastLoadedNode(now) != null) { MetadataUpdate metadataUpdate = metadataUpdates.poll(); @@ -287,9 +286,11 @@ public class MockClient implements KafkaClient { } } + List copy = new ArrayList<>(); ClientResponse response; while ((response = this.responses.poll()) != null) { response.onComplete(); + copy.add(response); } return copy; diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java index 89f5fdec2a5..6023c63022b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java @@ -35,6 +35,10 @@ import java.util.Map; * To use in a test, create an instance and prepare its {@link #kafkaClient() MockClient} with the expected responses * for the {@link AdminClient}. Then, use the {@link #adminClient() AdminClient} in the test, which will then use the MockClient * and receive the responses you provided. + * + * Since {@link #kafkaClient() MockClient} is not thread-safe, + * users should be wary of calling its methods after the {@link #adminClient() AdminClient} is instantiated. + * *

* When finished, be sure to {@link #close() close} the environment object. */