From e8955f731eff61417ec04253547f125cb921a59a Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Tue, 19 Jun 2018 15:16:16 +0100 Subject: [PATCH] KAFKA-7012: Don't process SSL channels without data to process (#5237) Avoid unnecessary processing of SSL channels when there are some bytes buffered, but not enough to make progress. Reviewers: Radai Rosenblatt , Jun Rao --- .../apache/kafka/common/network/Selector.java | 4 +- .../common/network/SslTransportLayer.java | 27 +++++- .../kafka/common/network/TransportLayer.java | 1 + .../kafka/common/network/SslSelectorTest.java | 91 ++++++++++++++++++- 4 files changed, 116 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 334ca79f035..a269f0fd604 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -498,7 +498,9 @@ public class Selector implements Selectable, AutoCloseable { //this channel has bytes enqueued in intermediary buffers that we could not read //(possibly because no memory). it may be the case that the underlying socket will //not come up in the next poll() and so we need to remember this channel for the - //next poll call otherwise data may be stuck in said buffers forever. + //next poll call otherwise data may be stuck in said buffers forever. If we attempt + //to process buffered data and no progress is made, the channel buffered status is + //cleared to avoid the overhead of checking every time. keysWithBufferedRead.add(key); } diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index 704a19818e2..06e7e937886 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -64,6 +64,7 @@ public class SslTransportLayer implements TransportLayer { private ByteBuffer netReadBuffer; private ByteBuffer netWriteBuffer; private ByteBuffer appReadBuffer; + private boolean hasBytesBuffered; private ByteBuffer emptyBuf = ByteBuffer.allocate(0); public static SslTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException { @@ -503,13 +504,17 @@ public class SslTransportLayer implements TransportLayer { read = readFromAppBuffer(dst); } + boolean readFromNetwork = false; boolean isClosed = false; // Each loop reads at most once from the socket. while (dst.remaining() > 0) { int netread = 0; netReadBuffer = Utils.ensureCapacity(netReadBuffer, netReadBufferSize()); - if (netReadBuffer.remaining() > 0) + if (netReadBuffer.remaining() > 0) { netread = readFromSocketChannel(); + if (netread > 0) + readFromNetwork = true; + } while (netReadBuffer.position() > 0) { netReadBuffer.flip(); @@ -563,6 +568,7 @@ public class SslTransportLayer implements TransportLayer { if (netread <= 0 || isClosed) break; } + updateBytesBuffered(readFromNetwork || read > 0); // If data has been read and unwrapped, return the data even if end-of-stream, channel will be closed // on a subsequent poll. return read; @@ -793,6 +799,11 @@ public class SslTransportLayer implements TransportLayer { return netReadBuffer; } + // Visibility for testing + protected ByteBuffer appReadBuffer() { + return appReadBuffer; + } + /** * SSL exceptions are propagated as authentication failures so that clients can avoid * retries and report the failure. If `flush` is true, exceptions are propagated after @@ -826,12 +837,22 @@ public class SslTransportLayer implements TransportLayer { @Override public boolean hasBytesBuffered() { - return netReadBuffer.position() != 0 || appReadBuffer.position() != 0; + return hasBytesBuffered; + } + + // Update `hasBytesBuffered` status. If any bytes were read from the network or + // if data was returned from read, `hasBytesBuffered` is set to true if any buffered + // data is still remaining. If not, `hasBytesBuffered` is set to false since no progress + // can be made until more data is available to read from the network. + private void updateBytesBuffered(boolean madeProgress) { + if (madeProgress) + hasBytesBuffered = netReadBuffer.position() != 0 || appReadBuffer.position() != 0; + else + hasBytesBuffered = false; } @Override public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException { return fileChannel.transferTo(position, count, this); } - } diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java index 3673d21dae6..a8a4b873028 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java @@ -94,6 +94,7 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan /** * @return true if channel has bytes to be read in any intermediate buffers + * which may be processed without reading additional data from the network. */ boolean hasBytesBuffered(); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java index 1d78e5aa8e1..3bdb07a87c3 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java @@ -42,6 +42,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -96,6 +99,13 @@ public class SslSelectorTest extends SelectorTest { connect(node, new InetSocketAddress("localhost", server.port)); selector.send(createSend(node, request)); + waitForBytesBuffered(selector, node); + + selector.close(node); + verifySelectorEmpty(); + } + + private void waitForBytesBuffered(Selector selector, String node) throws Exception { TestUtils.waitForCondition(new TestCondition() { @Override public boolean conditionMet() { @@ -107,8 +117,72 @@ public class SslSelectorTest extends SelectorTest { } } }, 2000L, "Failed to reach socket state with bytes buffered"); + } - selector.close(node); + @Test + public void testBytesBufferedChannelWithNoIncomingBytes() throws Exception { + verifyNoUnnecessaryPollWithBytesBuffered(key -> + key.interestOps(key.interestOps() & ~SelectionKey.OP_READ)); + } + + @Test + public void testBytesBufferedChannelAfterMute() throws Exception { + verifyNoUnnecessaryPollWithBytesBuffered(key -> ((KafkaChannel) key.attachment()).mute()); + } + + private void verifyNoUnnecessaryPollWithBytesBuffered(Consumer disableRead) + throws Exception { + this.selector.close(); + + String node1 = "1"; + String node2 = "2"; + final AtomicInteger node1Polls = new AtomicInteger(); + + this.channelBuilder = new TestSslChannelBuilder(Mode.CLIENT); + this.channelBuilder.configure(sslClientConfigs); + this.selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext()) { + @Override + void pollSelectionKeys(Set selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { + for (SelectionKey key : selectionKeys) { + KafkaChannel channel = (KafkaChannel) key.attachment(); + if (channel != null && channel.id().equals(node1)) + node1Polls.incrementAndGet(); + } + super.pollSelectionKeys(selectionKeys, isImmediatelyConnected, currentTimeNanos); + } + }; + + // Get node1 into bytes buffered state and then disable read on the socket. + // Truncate the read buffers to ensure that there is buffered data, but not enough to make progress. + int largeRequestSize = 100 * 1024; + connect(node1, new InetSocketAddress("localhost", server.port)); + selector.send(createSend(node1, TestUtils.randomString(largeRequestSize))); + waitForBytesBuffered(selector, node1); + TestSslChannelBuilder.TestSslTransportLayer.transportLayers.get(node1).truncateReadBuffer(); + disableRead.accept(selector.channel(node1).selectionKey()); + + // Clear poll count and count the polls from now on + node1Polls.set(0); + + // Process sends and receives on node2. Test verifies that we don't process node1 + // unnecessarily on each of these polls. + connect(node2, new InetSocketAddress("localhost", server.port)); + int received = 0; + String request = TestUtils.randomString(10); + selector.send(createSend(node2, request)); + while (received < 100) { + received += selector.completedReceives().size(); + if (!selector.completedSends().isEmpty()) { + selector.send(createSend(node2, request)); + } + selector.poll(5); + } + + // Verify that pollSelectionKeys was invoked once to process buffered data + // but not again since there isn't sufficient data to process. + assertEquals(1, node1Polls.get()); + selector.close(node1); + selector.close(node2); verifySelectorEmpty(); } @@ -252,22 +326,33 @@ public class SslSelectorTest extends SelectorTest { * TestSslTransportLayer will read from socket once every two tries. This increases * the chance that there will be bytes buffered in the transport layer after read(). */ - class TestSslTransportLayer extends SslTransportLayer { + static class TestSslTransportLayer extends SslTransportLayer { + static Map transportLayers = new HashMap<>(); boolean muteSocket = false; public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException { super(channelId, key, sslEngine); + transportLayers.put(channelId, this); } @Override protected int readFromSocketChannel() throws IOException { if (muteSocket) { - muteSocket = false; + if ((selectionKey().interestOps() & SelectionKey.OP_READ) != 0) + muteSocket = false; return 0; } muteSocket = true; return super.readFromSocketChannel(); } + + // Leave one byte in network read buffer so that some buffered bytes are present, + // but not enough to make progress on a read. + void truncateReadBuffer() throws Exception { + netReadBuffer().position(1); + appReadBuffer().position(0); + muteSocket = true; + } } }