Browse Source

KAFKA-5611; AbstractCoordinator should handle wakeup raised from onJoinComplete

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3571 from hachikuji/KAFKA-5611
pull/3554/merge
Jason Gustafson 7 years ago
parent
commit
3620035c45
  1. 16
      clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
  2. 13
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
  3. 32
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java

16
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java

@ -87,8 +87,16 @@ public interface ConsumerRebalanceListener { @@ -87,8 +87,16 @@ public interface ConsumerRebalanceListener {
* For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
* <p>
* <b>NOTE:</b> This method is only called before rebalances. It is not called prior to {@link KafkaConsumer#close()}.
* <p>
* It is common for the revocation callback to use the consumer instance in order to commit offsets. It is possible
* for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}
* to be raised from one these nested invocations. In this case, the exception will be propagated to the current
* invocation of {@link KafkaConsumer#poll(long)} in which this callback is being executed. This means it is not
* necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
*
* @param partitions The list of partitions that were assigned to the consumer on the last rebalance
* @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
* @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}
*/
void onPartitionsRevoked(Collection<TopicPartition> partitions);
@ -100,9 +108,17 @@ public interface ConsumerRebalanceListener { @@ -100,9 +108,17 @@ public interface ConsumerRebalanceListener {
* It is guaranteed that all the processes in a consumer group will execute their
* {@link #onPartitionsRevoked(Collection)} callback before any instance executes its
* {@link #onPartitionsAssigned(Collection)} callback.
* <p>
* It is common for the assignment callback to use the consumer instance in order to query offsets. It is possible
* for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}
* to be raised from one these nested invocations. In this case, the exception will be propagated to the current
* invocation of {@link KafkaConsumer#poll(long)} in which this callback is being executed. This means it is not
* necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
*
* @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
* assigned to the consumer)
* @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
* @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}
*/
void onPartitionsAssigned(Collection<TopicPartition> partitions);
}

13
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java

@ -182,7 +182,10 @@ public abstract class AbstractCoordinator implements Closeable { @@ -182,7 +182,10 @@ public abstract class AbstractCoordinator implements Closeable {
Map<String, ByteBuffer> allMemberMetadata);
/**
* Invoked when a group member has successfully joined a group.
* Invoked when a group member has successfully joined a group. If this call is woken up (i.e.
* if the invocation raises {@link org.apache.kafka.common.errors.WakeupException}), then it
* will be retried on the next call to {@link #ensureActiveGroup()}.
*
* @param generation The generation that was joined
* @param memberId The identifier for the local member in the group
* @param protocol The protocol selected by the coordinator
@ -360,12 +363,16 @@ public abstract class AbstractCoordinator implements Closeable { @@ -360,12 +363,16 @@ public abstract class AbstractCoordinator implements Closeable {
RequestFuture<ByteBuffer> future = initiateJoinGroup();
client.poll(future);
resetJoinGroupFuture();
if (future.succeeded()) {
needsJoinPrepare = true;
onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());
// We reset the join group future only after the completion callback returns. This ensures
// that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
resetJoinGroupFuture();
needsJoinPrepare = true;
} else {
resetJoinGroupFuture();
RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||

32
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java

@ -479,7 +479,36 @@ public class AbstractCoordinatorTest { @@ -479,7 +479,36 @@ public class AbstractCoordinatorTest {
assertEquals(0, coordinator.onJoinCompleteInvokes);
assertFalse(heartbeatReceived.get());
coordinator.ensureActiveGroup();
assertEquals(1, coordinator.onJoinPrepareInvokes);
assertEquals(1, coordinator.onJoinCompleteInvokes);
awaitFirstHeartbeat(heartbeatReceived);
}
@Test
public void testWakeupInOnJoinComplete() throws Exception {
setupCoordinator(RETRY_BACKOFF_MS);
coordinator.wakeupOnJoinComplete = true;
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
try {
coordinator.ensureActiveGroup();
fail("Should have woken up from ensureActiveGroup()");
} catch (WakeupException e) {
}
assertEquals(1, coordinator.onJoinPrepareInvokes);
assertEquals(0, coordinator.onJoinCompleteInvokes);
assertFalse(heartbeatReceived.get());
// the join group completes in this poll()
coordinator.wakeupOnJoinComplete = false;
consumerClient.poll(0);
coordinator.ensureActiveGroup();
@ -534,6 +563,7 @@ public class AbstractCoordinatorTest { @@ -534,6 +563,7 @@ public class AbstractCoordinatorTest {
private int onJoinPrepareInvokes = 0;
private int onJoinCompleteInvokes = 0;
private boolean wakeupOnJoinComplete = false;
public DummyCoordinator(ConsumerNetworkClient client,
Metrics metrics,
@ -567,6 +597,8 @@ public class AbstractCoordinatorTest { @@ -567,6 +597,8 @@ public class AbstractCoordinatorTest {
@Override
protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) {
if (wakeupOnJoinComplete)
throw new WakeupException();
onJoinCompleteInvokes++;
}
}

Loading…
Cancel
Save