diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 964ac3c1558..80a9d0c2bd8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -273,6 +273,10 @@ public final class RecordAccumulator { return result; } + public void resetNextBatchExpiryTime() { + nextBatchExpiryTimeMs = Long.MAX_VALUE; + } + public void maybeUpdateNextBatchExpiryTime(ProducerBatch batch) { if (batch.createdMs + deliveryTimeoutMs > 0) { // the non-negative check is to guard us against potential overflow due to setting diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 7077f154c6f..7640377ac17 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -351,6 +351,7 @@ public class Sender implements Runnable { } } + accumulator.resetNextBatchExpiryTime(); List expiredInflightBatches = getExpiredInflightBatches(now); List expiredBatches = this.accumulator.expiredBatches(now); expiredBatches.addAll(expiredInflightBatches); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 2fbe3df2067..1a6e7788e33 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -85,6 +85,18 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import static org.easymock.EasyMock.anyBoolean; +import static org.easymock.EasyMock.anyInt; +import static org.easymock.EasyMock.anyLong; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.geq; +import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; @@ -2023,6 +2035,37 @@ public class SenderTest { assertEquals(0, sender.inFlightBatches(tp0).size()); } + @Test + public void testResetNextBatchExpiry() throws Exception { + MockClient delegateClient = new MockClient(time); + client = mock(MockClient.class); + expect(client.ready(anyObject(), anyLong())).andDelegateTo(delegateClient).anyTimes(); + expect( + client.newClientRequest( + anyString(), anyObject(), anyLong(), anyBoolean(), anyInt(), anyObject())) + .andDelegateTo(delegateClient).anyTimes(); + client.send(anyObject(), anyLong()); + expectLastCall().andDelegateTo(delegateClient).anyTimes(); + expect(client.poll(eq(0L), anyLong())).andDelegateTo(delegateClient).times(1); + expect(client.poll(eq(accumulator.getDeliveryTimeoutMs()), anyLong())) + .andDelegateTo(delegateClient) + .times(1); + expect(client.poll(geq(1L), anyLong())).andDelegateTo(delegateClient).times(1); + replay(client); + + setupWithTransactionState(null); + + accumulator.append( + tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT); + + sender.run(time.milliseconds()); + sender.run(time.milliseconds()); + time.setCurrentTimeMs(time.milliseconds() + accumulator.getDeliveryTimeoutMs() + 1); + sender.run(time.milliseconds()); + + verify(client); + } + private class MatchingBufferPool extends BufferPool { IdentityHashMap allocatedBuffers;