Browse Source

KAFKA-8569: integrate warning message under static membership (#6972)

Static members never leave the group, so potentially we could log a flooding number of warning messages in the hb thread. The solution is to only log as warning when we are on dynamic membership.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
pull/6982/head
Boyang Chen 5 years ago committed by Guozhang Wang
parent
commit
03d61ebfb9
  1. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
  2. 23
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
  3. 6
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
  4. 2
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
  5. 4
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
  6. 2
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java

2
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

@ -1053,7 +1053,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -1053,7 +1053,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
this.subscriptions.unsubscribe();
if (this.coordinator != null)
this.coordinator.maybeLeaveGroup();
this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
log.info("Unsubscribed all topics or patterns and assigned partitions");
} finally {
release();

23
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java

@ -823,7 +823,7 @@ public abstract class AbstractCoordinator implements Closeable { @@ -823,7 +823,7 @@ public abstract class AbstractCoordinator implements Closeable {
// needs this lock to complete and terminate after close flag is set.
synchronized (this) {
if (rebalanceConfig.leaveGroupOnClose) {
maybeLeaveGroup();
maybeLeaveGroup("the consumer is being closed");
}
// At this point, there may be pending commits (async commits or sync commits that were
@ -840,8 +840,9 @@ public abstract class AbstractCoordinator implements Closeable { @@ -840,8 +840,9 @@ public abstract class AbstractCoordinator implements Closeable {
/**
* Leave the current group and reset local generation/memberId.
* @param leaveReason reason to attempt leaving the group
*/
public synchronized void maybeLeaveGroup() {
public synchronized void maybeLeaveGroup(String leaveReason) {
// Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker,
// consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup,
// and the membership expiration is only controlled by session timeout.
@ -849,7 +850,8 @@ public abstract class AbstractCoordinator implements Closeable { @@ -849,7 +850,8 @@ public abstract class AbstractCoordinator implements Closeable {
state != MemberState.UNJOINED && generation.hasMemberId()) {
// this is a minimal effort attempt to leave the group. we do not
// attempt any resending if the request fails or times out.
log.info("Member {} sending LeaveGroup request to coordinator {}", generation.memberId, coordinator);
log.info("Member {} sending LeaveGroup request to coordinator {} due to {}",
generation.memberId, coordinator, leaveReason);
LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(new LeaveGroupRequestData()
.setGroupId(rebalanceConfig.groupId).setMemberId(generation.memberId));
client.send(coordinator, request)
@ -1088,14 +1090,13 @@ public abstract class AbstractCoordinator implements Closeable { @@ -1088,14 +1090,13 @@ public abstract class AbstractCoordinator implements Closeable {
markCoordinatorUnknown();
} else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll(), so we explicitly leave the group.
log.warn("This member will leave the group because consumer poll timeout has expired. This " +
"means the time between subsequent calls to poll() was longer than the configured " +
"max.poll.interval.ms, which typically implies that the poll loop is spending too " +
"much time processing messages. You can address this either by increasing " +
"max.poll.interval.ms or by reducing the maximum size of batches returned in poll() " +
"with max.poll.records.");
maybeLeaveGroup();
// in between calls to poll().
String leaveReason = "consumer poll timeout has expired. This means the time between subsequent calls to poll() " +
"was longer than the configured max.poll.interval.ms, which typically implies that " +
"the poll loop is spending too much time processing messages. " +
"You can address this either by increasing max.poll.interval.ms or by reducing " +
"the maximum size of batches returned in poll() with max.poll.records.";
maybeLeaveGroup(leaveReason);
} else if (!heartbeat.shouldHeartbeat(now)) {
// poll again after waiting for the retry backoff in case the heartbeat failed or the
// coordinator disconnected

6
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java

@ -835,7 +835,7 @@ public class ConsumerCoordinatorTest { @@ -835,7 +835,7 @@ public class ConsumerCoordinatorTest {
leaveRequest.data().groupId().equals(groupId);
}
}, new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
coordinator.maybeLeaveGroup();
coordinator.maybeLeaveGroup("test maybe leave group");
assertTrue(received.get());
AbstractCoordinator.Generation generation = coordinator.generation();
@ -873,7 +873,7 @@ public class ConsumerCoordinatorTest { @@ -873,7 +873,7 @@ public class ConsumerCoordinatorTest {
}
}, new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
coordinator.maybeLeaveGroup();
coordinator.maybeLeaveGroup("pending member leaves");
assertTrue(received.get());
}
@ -1509,7 +1509,7 @@ public class ConsumerCoordinatorTest { @@ -1509,7 +1509,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.NONE.code())));
subscriptions.unsubscribe();
coordinator.maybeLeaveGroup();
coordinator.maybeLeaveGroup("test commit after leave");
subscriptions.assignFromUser(singleton(t1p));
// the client should not reuse generation/memberId from auto-subscribed generation

2
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java

@ -977,7 +977,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @@ -977,7 +977,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
// in case reading the log takes too long, leave the group to ensure a quick rebalance (although by default we should be out of the group already)
// and back off to avoid a tight loop of rejoin-attempt-to-catch-up-leave
log.warn("Didn't reach end of config log quickly enough", e);
member.maybeLeaveGroup();
member.maybeLeaveGroup("taking too long to read the log");
backoff(workerUnsyncBackoffMs);
return false;
}

4
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java

@ -183,8 +183,8 @@ public class WorkerGroupMember { @@ -183,8 +183,8 @@ public class WorkerGroupMember {
coordinator.requestRejoin();
}
public void maybeLeaveGroup() {
coordinator.maybeLeaveGroup();
public void maybeLeaveGroup(String leaveReason) {
coordinator.maybeLeaveGroup(leaveReason);
}
public String ownerUrl(String connector) {

2
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java

@ -1442,7 +1442,7 @@ public class DistributedHerderTest { @@ -1442,7 +1442,7 @@ public class DistributedHerderTest {
// Reading to end of log times out
configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
EasyMock.expectLastCall().andThrow(new TimeoutException());
member.maybeLeaveGroup();
member.maybeLeaveGroup("test join leader catch up fails");
EasyMock.expectLastCall();
PowerMock.expectPrivate(herder, "backoff", DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT);
member.requestRejoin();

Loading…
Cancel
Save