Browse Source

KAFKA-8677: Simplify the best-effort network client poll to never throw exception (#7613)

Within KafkaConsumer.poll, we have an optimization to try to send the next fetch request before returning the data in order to pipelining the fetch requests; however, this pollNoWakeup should NOT throw any exceptions, since at this point the fetch position has been updated. If an exception is thrown and the callers decide to capture and continue, those records would never be returned again, causing data loss.

Also fix the flaky test itself.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Jason Gustafson <jason@confluent.io>
pull/7669/head
Guozhang Wang 5 years ago committed by GitHub
parent
commit
6df058ec15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
  2. 21
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  3. 29
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
  4. 3
      core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala

2
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

@ -1249,7 +1249,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -1249,7 +1249,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.pollNoWakeup();
client.transmitSends();
}
return this.interceptors.onConsume(new ConsumerRecords<>(records));

21
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java

@ -303,6 +303,27 @@ public class ConsumerNetworkClient implements Closeable { @@ -303,6 +303,27 @@ public class ConsumerNetworkClient implements Closeable {
poll(time.timer(0), null, true);
}
/**
* Poll for network IO in best-effort only trying to transmit the ready-to-send request
* Do not check any pending requests or metadata errors so that no exception should ever
* be thrown, also no wakeups be triggered and no interrupted exception either.
*/
public void transmitSends() {
Timer timer = time.timer(0);
// do not try to handle any disconnects, prev request failures, metadata exception etc;
// just try once and return immediately
lock.lock();
try {
// send all the requests we can send now
trySend(timer.currentTimeMs());
client.poll(0, timer.currentTimeMs());
} finally {
lock.unlock();
}
}
/**
* Block until all pending requests from the given node have finished.
* @param node The node to await requests from

29
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java

@ -18,7 +18,6 @@ package org.apache.kafka.clients.consumer.internals; @@ -18,7 +18,6 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
@ -34,7 +33,6 @@ import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; @@ -34,7 +33,6 @@ import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
@ -498,33 +496,6 @@ public class AbstractCoordinatorTest { @@ -498,33 +496,6 @@ public class AbstractCoordinatorTest {
assertTrue(leaveGroupFuture.exception() instanceof UnknownMemberIdException);
}
@Test
public void testHandleSingleLeaveGroupRequest() {
setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE, Optional.empty());
mockClient.setNodeApiVersions(NodeApiVersions.create(ApiKeys.LEAVE_GROUP.id, (short) 2, (short) 2));
LeaveGroupResponse expectedResponse = leaveGroupResponse(Collections.singletonList(
new MemberResponse()
.setErrorCode(Errors.NONE.code())
.setMemberId(memberId)));
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));
mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
mockClient.prepareResponse(body -> {
if (body instanceof LeaveGroupRequest) {
LeaveGroupRequest request = (LeaveGroupRequest) body;
return request.data().memberId().equals(memberId)
&& request.data().members().isEmpty();
} else {
return false;
}
}, expectedResponse);
coordinator.ensureActiveGroup();
RequestFuture<Void> leaveGroupFuture = coordinator.maybeLeaveGroup("test single leave group");
assertTrue(leaveGroupFuture.succeeded());
}
private RequestFuture<Void> setupLeaveGroup(LeaveGroupResponse leaveGroupResponse) {
setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE, Optional.empty());

3
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala

@ -181,6 +181,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas @@ -181,6 +181,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
this.serverConfig.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3")
this.serverConfig.setProperty(KafkaConfig.ConnectionsMaxReauthMsProp, "1500")
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group")
this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1500")
/**
* Starts MiniKDC and only then sets up the parent trait.
@ -363,7 +364,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas @@ -363,7 +364,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
// Add ACLs and verify successful produce/consume/describe on first topic
setReadAndWriteAcls(tp)
consumeRecordsIgnoreOneAuthorizationException(consumer, numRecords, startingOffset = numRecords, topic2)
consumeRecordsIgnoreOneAuthorizationException(consumer, numRecords, startingOffset = 1, topic2)
sendRecords(producer, numRecords, tp)
consumeRecords(consumer, numRecords, topic = topic)
val describeResults2 = adminClient.describeTopics(Set(topic, topic2).asJava).values

Loading…
Cancel
Save