From 03d61ebfb93aab53b0b0ecdfc77175d18a58e861 Mon Sep 17 00:00:00 2001 From: Boyang Chen Date: Thu, 20 Jun 2019 13:56:00 -0700 Subject: [PATCH] KAFKA-8569: integrate warning message under static membership (#6972) Static members never leave the group, so potentially we could log a flooding number of warning messages in the hb thread. The solution is to only log as warning when we are on dynamic membership. Reviewers: Guozhang Wang --- .../kafka/clients/consumer/KafkaConsumer.java | 2 +- .../internals/AbstractCoordinator.java | 23 ++++++++++--------- .../internals/ConsumerCoordinatorTest.java | 6 ++--- .../distributed/DistributedHerder.java | 2 +- .../distributed/WorkerGroupMember.java | 4 ++-- .../distributed/DistributedHerderTest.java | 2 +- 6 files changed, 20 insertions(+), 19 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 79afafa3886..fa5cc99a188 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1053,7 +1053,7 @@ public class KafkaConsumer implements Consumer { fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet()); this.subscriptions.unsubscribe(); if (this.coordinator != null) - this.coordinator.maybeLeaveGroup(); + this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics"); log.info("Unsubscribed all topics or patterns and assigned partitions"); } finally { release(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index efe6cb75ce5..5a5444e99c7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -823,7 +823,7 @@ public abstract class AbstractCoordinator implements Closeable { // needs this lock to complete and terminate after close flag is set. synchronized (this) { if (rebalanceConfig.leaveGroupOnClose) { - maybeLeaveGroup(); + maybeLeaveGroup("the consumer is being closed"); } // At this point, there may be pending commits (async commits or sync commits that were @@ -840,8 +840,9 @@ public abstract class AbstractCoordinator implements Closeable { /** * Leave the current group and reset local generation/memberId. + * @param leaveReason reason to attempt leaving the group */ - public synchronized void maybeLeaveGroup() { + public synchronized void maybeLeaveGroup(String leaveReason) { // Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker, // consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup, // and the membership expiration is only controlled by session timeout. @@ -849,7 +850,8 @@ public abstract class AbstractCoordinator implements Closeable { state != MemberState.UNJOINED && generation.hasMemberId()) { // this is a minimal effort attempt to leave the group. we do not // attempt any resending if the request fails or times out. - log.info("Member {} sending LeaveGroup request to coordinator {}", generation.memberId, coordinator); + log.info("Member {} sending LeaveGroup request to coordinator {} due to {}", + generation.memberId, coordinator, leaveReason); LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(new LeaveGroupRequestData() .setGroupId(rebalanceConfig.groupId).setMemberId(generation.memberId)); client.send(coordinator, request) @@ -1088,14 +1090,13 @@ public abstract class AbstractCoordinator implements Closeable { markCoordinatorUnknown(); } else if (heartbeat.pollTimeoutExpired(now)) { // the poll timeout has expired, which means that the foreground thread has stalled - // in between calls to poll(), so we explicitly leave the group. - log.warn("This member will leave the group because consumer poll timeout has expired. This " + - "means the time between subsequent calls to poll() was longer than the configured " + - "max.poll.interval.ms, which typically implies that the poll loop is spending too " + - "much time processing messages. You can address this either by increasing " + - "max.poll.interval.ms or by reducing the maximum size of batches returned in poll() " + - "with max.poll.records."); - maybeLeaveGroup(); + // in between calls to poll(). + String leaveReason = "consumer poll timeout has expired. This means the time between subsequent calls to poll() " + + "was longer than the configured max.poll.interval.ms, which typically implies that " + + "the poll loop is spending too much time processing messages. " + + "You can address this either by increasing max.poll.interval.ms or by reducing " + + "the maximum size of batches returned in poll() with max.poll.records."; + maybeLeaveGroup(leaveReason); } else if (!heartbeat.shouldHeartbeat(now)) { // poll again after waiting for the retry backoff in case the heartbeat failed or the // coordinator disconnected diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index f4b87d2f16e..1dc1780cc1b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -835,7 +835,7 @@ public class ConsumerCoordinatorTest { leaveRequest.data().groupId().equals(groupId); } }, new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()))); - coordinator.maybeLeaveGroup(); + coordinator.maybeLeaveGroup("test maybe leave group"); assertTrue(received.get()); AbstractCoordinator.Generation generation = coordinator.generation(); @@ -873,7 +873,7 @@ public class ConsumerCoordinatorTest { } }, new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()))); - coordinator.maybeLeaveGroup(); + coordinator.maybeLeaveGroup("pending member leaves"); assertTrue(received.get()); } @@ -1509,7 +1509,7 @@ public class ConsumerCoordinatorTest { client.prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData() .setErrorCode(Errors.NONE.code()))); subscriptions.unsubscribe(); - coordinator.maybeLeaveGroup(); + coordinator.maybeLeaveGroup("test commit after leave"); subscriptions.assignFromUser(singleton(t1p)); // the client should not reuse generation/memberId from auto-subscribed generation diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 8e59d87153a..b1a41c0b4ba 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -977,7 +977,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { // in case reading the log takes too long, leave the group to ensure a quick rebalance (although by default we should be out of the group already) // and back off to avoid a tight loop of rejoin-attempt-to-catch-up-leave log.warn("Didn't reach end of config log quickly enough", e); - member.maybeLeaveGroup(); + member.maybeLeaveGroup("taking too long to read the log"); backoff(workerUnsyncBackoffMs); return false; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index 94cf97df1fa..4819db5da74 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -183,8 +183,8 @@ public class WorkerGroupMember { coordinator.requestRejoin(); } - public void maybeLeaveGroup() { - coordinator.maybeLeaveGroup(); + public void maybeLeaveGroup(String leaveReason) { + coordinator.maybeLeaveGroup(leaveReason); } public String ownerUrl(String connector) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index b03ddf341ed..f6cbfd6fd48 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -1442,7 +1442,7 @@ public class DistributedHerderTest { // Reading to end of log times out configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)); EasyMock.expectLastCall().andThrow(new TimeoutException()); - member.maybeLeaveGroup(); + member.maybeLeaveGroup("test join leader catch up fails"); EasyMock.expectLastCall(); PowerMock.expectPrivate(herder, "backoff", DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT); member.requestRejoin();