Browse Source

MINOR: Make MockClient#poll() more thread-safe (#5942)

It used to preallocate an array of responses and then complete each response from the original collection sequentially. The problem was that the original collection could have been modified (another thread completing the response) while this was hapenning
pull/6271/head
Stanislav Kozlovski 6 years ago committed by Jason Gustafson
parent
commit
d0718718ae
  1. 3
      clients/src/test/java/org/apache/kafka/clients/MockClient.java
  2. 4
      clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java

3
clients/src/test/java/org/apache/kafka/clients/MockClient.java

@ -276,7 +276,6 @@ public class MockClient implements KafkaClient {
maybeAwaitWakeup(); maybeAwaitWakeup();
checkTimeoutOfPendingRequests(now); checkTimeoutOfPendingRequests(now);
List<ClientResponse> copy = new ArrayList<>(this.responses);
// We skip metadata updates if all nodes are currently blacked out // We skip metadata updates if all nodes are currently blacked out
if (metadataUpdater.isUpdateNeeded() && leastLoadedNode(now) != null) { if (metadataUpdater.isUpdateNeeded() && leastLoadedNode(now) != null) {
MetadataUpdate metadataUpdate = metadataUpdates.poll(); MetadataUpdate metadataUpdate = metadataUpdates.poll();
@ -287,9 +286,11 @@ public class MockClient implements KafkaClient {
} }
} }
List<ClientResponse> copy = new ArrayList<>();
ClientResponse response; ClientResponse response;
while ((response = this.responses.poll()) != null) { while ((response = this.responses.poll()) != null) {
response.onComplete(); response.onComplete();
copy.add(response);
} }
return copy; return copy;

4
clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java

@ -35,6 +35,10 @@ import java.util.Map;
* To use in a test, create an instance and prepare its {@link #kafkaClient() MockClient} with the expected responses * To use in a test, create an instance and prepare its {@link #kafkaClient() MockClient} with the expected responses
* for the {@link AdminClient}. Then, use the {@link #adminClient() AdminClient} in the test, which will then use the MockClient * for the {@link AdminClient}. Then, use the {@link #adminClient() AdminClient} in the test, which will then use the MockClient
* and receive the responses you provided. * and receive the responses you provided.
*
* Since {@link #kafkaClient() MockClient} is not thread-safe,
* users should be wary of calling its methods after the {@link #adminClient() AdminClient} is instantiated.
*
* <p> * <p>
* When finished, be sure to {@link #close() close} the environment object. * When finished, be sure to {@link #close() close} the environment object.
*/ */

Loading…
Cancel
Save