Browse Source

KAFKA-7311: Reset next batch expiry time on each poll loop

Sender/RecordAccumulator never resets the next batch expiry time. Its always computed as the min of the current value and the expiry time for all batches being processed. This means that its always set to the expiry time of the first batch, and once that time has passed Sender starts spinning on epoll with a timeout of 0, which consumes a lot of CPU. This patch updates Sender to reset the next batch expiry time on each poll loop so that a new value reflecting the expiry time for the current set of batches is computed.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
pull/5540/head
Rohan 6 years ago committed by Guozhang Wang
parent
commit
ea4078e72a
  1. 4
      clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  2. 1
      clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
  3. 43
      clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

4
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

@ -273,6 +273,10 @@ public final class RecordAccumulator { @@ -273,6 +273,10 @@ public final class RecordAccumulator {
return result;
}
public void resetNextBatchExpiryTime() {
nextBatchExpiryTimeMs = Long.MAX_VALUE;
}
public void maybeUpdateNextBatchExpiryTime(ProducerBatch batch) {
if (batch.createdMs + deliveryTimeoutMs > 0) {
// the non-negative check is to guard us against potential overflow due to setting

1
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

@ -351,6 +351,7 @@ public class Sender implements Runnable { @@ -351,6 +351,7 @@ public class Sender implements Runnable {
}
}
accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);

43
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

@ -85,6 +85,18 @@ import org.junit.After; @@ -85,6 +85,18 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.easymock.EasyMock.anyBoolean;
import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.geq;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
@ -2023,6 +2035,37 @@ public class SenderTest { @@ -2023,6 +2035,37 @@ public class SenderTest {
assertEquals(0, sender.inFlightBatches(tp0).size());
}
@Test
public void testResetNextBatchExpiry() throws Exception {
MockClient delegateClient = new MockClient(time);
client = mock(MockClient.class);
expect(client.ready(anyObject(), anyLong())).andDelegateTo(delegateClient).anyTimes();
expect(
client.newClientRequest(
anyString(), anyObject(), anyLong(), anyBoolean(), anyInt(), anyObject()))
.andDelegateTo(delegateClient).anyTimes();
client.send(anyObject(), anyLong());
expectLastCall().andDelegateTo(delegateClient).anyTimes();
expect(client.poll(eq(0L), anyLong())).andDelegateTo(delegateClient).times(1);
expect(client.poll(eq(accumulator.getDeliveryTimeoutMs()), anyLong()))
.andDelegateTo(delegateClient)
.times(1);
expect(client.poll(geq(1L), anyLong())).andDelegateTo(delegateClient).times(1);
replay(client);
setupWithTransactionState(null);
accumulator.append(
tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
sender.run(time.milliseconds());
sender.run(time.milliseconds());
time.setCurrentTimeMs(time.milliseconds() + accumulator.getDeliveryTimeoutMs() + 1);
sender.run(time.milliseconds());
verify(client);
}
private class MatchingBufferPool extends BufferPool {
IdentityHashMap<ByteBuffer, Boolean> allocatedBuffers;

Loading…
Cancel
Save