Browse Source

HOTFIX: Try to complete Send even if no bytes were written (#7622)

If there are pending bytes in the transport layer, we may
complete a send even if no bytes were recorded as written.
We assume bytes are written when they are in the netWriteBuffer,
but we only consider the send as completed when it's in
the socket channel buffer.

This fixes a regression introduced via 0971f66ff5. The impact is
that we would sometimes throw the following exception in
`MultiRecordsSend.writeTo`:

```java
if (completed())
    throw new KafkaException("This operation cannot be invoked on a complete request.");
```

Added unit test verifying the bug fix. While in the area, I simplified one of the
`SslSelectorTest` methods.

Reviewers: Jun Rao <junrao@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
pull/6329/merge
Ismael Juma 5 years ago committed by GitHub
parent
commit
7bdbdf1900
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 52
      clients/src/main/java/org/apache/kafka/common/network/Selector.java
  2. 11
      clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
  3. 16
      clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java

52
clients/src/main/java/org/apache/kafka/common/network/Selector.java

@ -584,26 +584,11 @@ public class Selector implements Selectable, AutoCloseable {
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */ /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
long nowNanos = channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos; long nowNanos = channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos;
if (channel.hasSend() try {
&& channel.ready() attemptWrite(key, channel, nowNanos);
&& key.isWritable() } catch (Exception e) {
&& !channel.maybeBeginClientReauthentication(() -> nowNanos)) { sendFailed = true;
try { throw e;
long bytesSent = channel.write();
if (bytesSent > 0) {
long currentTimeMs = time.milliseconds();
this.sensors.recordBytesSent(nodeId, bytesSent, currentTimeMs);
Send send = channel.maybeCompleteSend();
if (send != null) {
this.completedSends.add(send);
this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
}
}
} catch (Exception e) {
sendFailed = true;
throw e;
}
} }
/* cancel any defunct sockets */ /* cancel any defunct sockets */
@ -639,6 +624,33 @@ public class Selector implements Selectable, AutoCloseable {
} }
} }
private void attemptWrite(SelectionKey key, KafkaChannel channel, long nowNanos) throws IOException {
if (channel.hasSend()
&& channel.ready()
&& key.isWritable()
&& !channel.maybeBeginClientReauthentication(() -> nowNanos)) {
write(channel);
}
}
// package-private for testing
void write(KafkaChannel channel) throws IOException {
String nodeId = channel.id();
long bytesSent = channel.write();
Send send = channel.maybeCompleteSend();
// We may complete the send with bytesSent < 1 if `TransportLayer.hasPendingWrites` was true and `channel.write()`
// caused the pending writes to be written to the socket channel buffer
if (bytesSent > 0 || send != null) {
long currentTimeMs = time.milliseconds();
if (bytesSent > 0)
this.sensors.recordBytesSent(nodeId, bytesSent, currentTimeMs);
if (send != null) {
this.completedSends.add(send);
this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
}
}
}
private Collection<SelectionKey> determineHandlingOrder(Set<SelectionKey> selectionKeys) { private Collection<SelectionKey> determineHandlingOrder(Set<SelectionKey> selectionKeys) {
//it is possible that the iteration order over selectionKeys is the same every invocation. //it is possible that the iteration order over selectionKeys is the same every invocation.
//this may cause starvation of reads when memory is low. to address this we shuffle the keys if memory is low. //this may cause starvation of reads when memory is low. to address this we shuffle the keys if memory is low.

11
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java

@ -53,6 +53,7 @@ import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
@ -773,6 +774,16 @@ public class SelectorTest {
assertEquals(1, metrics.metrics().size()); assertEquals(1, metrics.metrics().size());
} }
@Test
public void testWriteCompletesSendWithNoBytesWritten() throws IOException {
KafkaChannel channel = mock(KafkaChannel.class);
when(channel.id()).thenReturn("1");
when(channel.write()).thenReturn(0L);
ByteBufferSend send = new ByteBufferSend("destination", ByteBuffer.allocate(0));
when(channel.maybeCompleteSend()).thenReturn(send);
selector.write(channel);
assertEquals(asList(send), selector.completedSends());
}
private String blockingRequest(String node, String s) throws IOException { private String blockingRequest(String node, String s) throws IOException {
selector.send(createSend(node, s)); selector.send(createSend(node, s));

16
clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java

@ -31,7 +31,6 @@ import org.apache.kafka.common.security.ssl.mock.TestTrustManagerFactory;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestSslUtils; import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.After; import org.junit.After;
@ -157,15 +156,12 @@ public class SslSelectorTest extends SelectorTest {
} }
private void waitForBytesBuffered(Selector selector, String node) throws Exception { private void waitForBytesBuffered(Selector selector, String node) throws Exception {
TestUtils.waitForCondition(new TestCondition() { TestUtils.waitForCondition(() -> {
@Override try {
public boolean conditionMet() { selector.poll(0L);
try { return selector.channel(node).hasBytesBuffered();
selector.poll(0L); } catch (IOException e) {
return selector.channel(node).hasBytesBuffered(); throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
}, 2000L, "Failed to reach socket state with bytes buffered"); }, 2000L, "Failed to reach socket state with bytes buffered");
} }

Loading…
Cancel
Save