Browse Source

KAFKA-7012: Don't process SSL channels without data to process (#5237)

Avoid unnecessary processing of SSL channels when there are some bytes buffered, but not enough to make progress.

Reviewers: Radai Rosenblatt <radai.rosenblatt@gmail.com>, Jun Rao <junrao@gmail.com>
pull/5251/head
Rajini Sivaram 7 years ago committed by GitHub
parent
commit
e8955f731e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      clients/src/main/java/org/apache/kafka/common/network/Selector.java
  2. 27
      clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
  3. 1
      clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
  4. 91
      clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java

4
clients/src/main/java/org/apache/kafka/common/network/Selector.java

@ -498,7 +498,9 @@ public class Selector implements Selectable, AutoCloseable { @@ -498,7 +498,9 @@ public class Selector implements Selectable, AutoCloseable {
//this channel has bytes enqueued in intermediary buffers that we could not read
//(possibly because no memory). it may be the case that the underlying socket will
//not come up in the next poll() and so we need to remember this channel for the
//next poll call otherwise data may be stuck in said buffers forever.
//next poll call otherwise data may be stuck in said buffers forever. If we attempt
//to process buffered data and no progress is made, the channel buffered status is
//cleared to avoid the overhead of checking every time.
keysWithBufferedRead.add(key);
}

27
clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java

@ -64,6 +64,7 @@ public class SslTransportLayer implements TransportLayer { @@ -64,6 +64,7 @@ public class SslTransportLayer implements TransportLayer {
private ByteBuffer netReadBuffer;
private ByteBuffer netWriteBuffer;
private ByteBuffer appReadBuffer;
private boolean hasBytesBuffered;
private ByteBuffer emptyBuf = ByteBuffer.allocate(0);
public static SslTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
@ -503,13 +504,17 @@ public class SslTransportLayer implements TransportLayer { @@ -503,13 +504,17 @@ public class SslTransportLayer implements TransportLayer {
read = readFromAppBuffer(dst);
}
boolean readFromNetwork = false;
boolean isClosed = false;
// Each loop reads at most once from the socket.
while (dst.remaining() > 0) {
int netread = 0;
netReadBuffer = Utils.ensureCapacity(netReadBuffer, netReadBufferSize());
if (netReadBuffer.remaining() > 0)
if (netReadBuffer.remaining() > 0) {
netread = readFromSocketChannel();
if (netread > 0)
readFromNetwork = true;
}
while (netReadBuffer.position() > 0) {
netReadBuffer.flip();
@ -563,6 +568,7 @@ public class SslTransportLayer implements TransportLayer { @@ -563,6 +568,7 @@ public class SslTransportLayer implements TransportLayer {
if (netread <= 0 || isClosed)
break;
}
updateBytesBuffered(readFromNetwork || read > 0);
// If data has been read and unwrapped, return the data even if end-of-stream, channel will be closed
// on a subsequent poll.
return read;
@ -793,6 +799,11 @@ public class SslTransportLayer implements TransportLayer { @@ -793,6 +799,11 @@ public class SslTransportLayer implements TransportLayer {
return netReadBuffer;
}
// Visibility for testing
protected ByteBuffer appReadBuffer() {
return appReadBuffer;
}
/**
* SSL exceptions are propagated as authentication failures so that clients can avoid
* retries and report the failure. If `flush` is true, exceptions are propagated after
@ -826,12 +837,22 @@ public class SslTransportLayer implements TransportLayer { @@ -826,12 +837,22 @@ public class SslTransportLayer implements TransportLayer {
@Override
public boolean hasBytesBuffered() {
return netReadBuffer.position() != 0 || appReadBuffer.position() != 0;
return hasBytesBuffered;
}
// Update `hasBytesBuffered` status. If any bytes were read from the network or
// if data was returned from read, `hasBytesBuffered` is set to true if any buffered
// data is still remaining. If not, `hasBytesBuffered` is set to false since no progress
// can be made until more data is available to read from the network.
private void updateBytesBuffered(boolean madeProgress) {
if (madeProgress)
hasBytesBuffered = netReadBuffer.position() != 0 || appReadBuffer.position() != 0;
else
hasBytesBuffered = false;
}
@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
return fileChannel.transferTo(position, count, this);
}
}

1
clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java

@ -94,6 +94,7 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan @@ -94,6 +94,7 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan
/**
* @return true if channel has bytes to be read in any intermediate buffers
* which may be processed without reading additional data from the network.
*/
boolean hasBytesBuffered();

91
clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java

@ -42,6 +42,9 @@ import java.util.Collections; @@ -42,6 +42,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -96,6 +99,13 @@ public class SslSelectorTest extends SelectorTest { @@ -96,6 +99,13 @@ public class SslSelectorTest extends SelectorTest {
connect(node, new InetSocketAddress("localhost", server.port));
selector.send(createSend(node, request));
waitForBytesBuffered(selector, node);
selector.close(node);
verifySelectorEmpty();
}
private void waitForBytesBuffered(Selector selector, String node) throws Exception {
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
@ -107,8 +117,72 @@ public class SslSelectorTest extends SelectorTest { @@ -107,8 +117,72 @@ public class SslSelectorTest extends SelectorTest {
}
}
}, 2000L, "Failed to reach socket state with bytes buffered");
}
selector.close(node);
@Test
public void testBytesBufferedChannelWithNoIncomingBytes() throws Exception {
verifyNoUnnecessaryPollWithBytesBuffered(key ->
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ));
}
@Test
public void testBytesBufferedChannelAfterMute() throws Exception {
verifyNoUnnecessaryPollWithBytesBuffered(key -> ((KafkaChannel) key.attachment()).mute());
}
private void verifyNoUnnecessaryPollWithBytesBuffered(Consumer<SelectionKey> disableRead)
throws Exception {
this.selector.close();
String node1 = "1";
String node2 = "2";
final AtomicInteger node1Polls = new AtomicInteger();
this.channelBuilder = new TestSslChannelBuilder(Mode.CLIENT);
this.channelBuilder.configure(sslClientConfigs);
this.selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext()) {
@Override
void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
for (SelectionKey key : selectionKeys) {
KafkaChannel channel = (KafkaChannel) key.attachment();
if (channel != null && channel.id().equals(node1))
node1Polls.incrementAndGet();
}
super.pollSelectionKeys(selectionKeys, isImmediatelyConnected, currentTimeNanos);
}
};
// Get node1 into bytes buffered state and then disable read on the socket.
// Truncate the read buffers to ensure that there is buffered data, but not enough to make progress.
int largeRequestSize = 100 * 1024;
connect(node1, new InetSocketAddress("localhost", server.port));
selector.send(createSend(node1, TestUtils.randomString(largeRequestSize)));
waitForBytesBuffered(selector, node1);
TestSslChannelBuilder.TestSslTransportLayer.transportLayers.get(node1).truncateReadBuffer();
disableRead.accept(selector.channel(node1).selectionKey());
// Clear poll count and count the polls from now on
node1Polls.set(0);
// Process sends and receives on node2. Test verifies that we don't process node1
// unnecessarily on each of these polls.
connect(node2, new InetSocketAddress("localhost", server.port));
int received = 0;
String request = TestUtils.randomString(10);
selector.send(createSend(node2, request));
while (received < 100) {
received += selector.completedReceives().size();
if (!selector.completedSends().isEmpty()) {
selector.send(createSend(node2, request));
}
selector.poll(5);
}
// Verify that pollSelectionKeys was invoked once to process buffered data
// but not again since there isn't sufficient data to process.
assertEquals(1, node1Polls.get());
selector.close(node1);
selector.close(node2);
verifySelectorEmpty();
}
@ -252,22 +326,33 @@ public class SslSelectorTest extends SelectorTest { @@ -252,22 +326,33 @@ public class SslSelectorTest extends SelectorTest {
* TestSslTransportLayer will read from socket once every two tries. This increases
* the chance that there will be bytes buffered in the transport layer after read().
*/
class TestSslTransportLayer extends SslTransportLayer {
static class TestSslTransportLayer extends SslTransportLayer {
static Map<String, TestSslTransportLayer> transportLayers = new HashMap<>();
boolean muteSocket = false;
public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
super(channelId, key, sslEngine);
transportLayers.put(channelId, this);
}
@Override
protected int readFromSocketChannel() throws IOException {
if (muteSocket) {
muteSocket = false;
if ((selectionKey().interestOps() & SelectionKey.OP_READ) != 0)
muteSocket = false;
return 0;
}
muteSocket = true;
return super.readFromSocketChannel();
}
// Leave one byte in network read buffer so that some buffered bytes are present,
// but not enough to make progress on a read.
void truncateReadBuffer() throws Exception {
netReadBuffer().position(1);
appReadBuffer().position(0);
muteSocket = true;
}
}
}

Loading…
Cancel
Save