Browse Source

MINOR: Use enum for close mode in Selector instead of two booleans (#4559)

pull/4571/head
Rajini Sivaram 7 years ago committed by Jason Gustafson
parent
commit
d030caf956
  1. 45
      clients/src/main/java/org/apache/kafka/common/network/Selector.java

45
clients/src/main/java/org/apache/kafka/common/network/Selector.java

@ -87,6 +87,18 @@ public class Selector implements Selectable, AutoCloseable { @@ -87,6 +87,18 @@ public class Selector implements Selectable, AutoCloseable {
public static final long NO_IDLE_TIMEOUT_MS = -1;
private enum CloseMode {
GRACEFUL(true), // process outstanding staged receives, notify disconnect
NOTIFY_ONLY(true), // discard any outstanding receives, notify disconnect
DISCARD_NO_NOTIFY(false); // discard any outstanding receives, no disconnect notification
boolean notifyDisconnect;
CloseMode(boolean notifyDisconnect) {
this.notifyDisconnect = notifyDisconnect;
}
}
private final Logger log;
private final java.nio.channels.Selector nioSelector;
private final Map<String, KafkaChannel> channels;
@ -327,7 +339,7 @@ public class Selector implements Selectable, AutoCloseable { @@ -327,7 +339,7 @@ public class Selector implements Selectable, AutoCloseable {
channel.state(ChannelState.FAILED_SEND);
// ensure notification via `disconnected` when `failedSends` are processed in the next poll
this.failedSends.add(connectionId);
close(channel, false, false);
close(channel, CloseMode.DISCARD_NO_NOTIFY);
if (!(e instanceof CancelledKeyException)) {
log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}",
connectionId, e);
@ -507,7 +519,7 @@ public class Selector implements Selectable, AutoCloseable { @@ -507,7 +519,7 @@ public class Selector implements Selectable, AutoCloseable {
/* cancel any defunct sockets */
if (!key.isValid())
close(channel, true, true);
close(channel, CloseMode.GRACEFUL);
} catch (Exception e) {
String desc = channel.socketDescription();
@ -517,7 +529,7 @@ public class Selector implements Selectable, AutoCloseable { @@ -517,7 +529,7 @@ public class Selector implements Selectable, AutoCloseable {
log.debug("Connection with {} disconnected due to authentication exception", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel, !sendFailed, true);
close(channel, sendFailed ? CloseMode.NOTIFY_ONLY : CloseMode.GRACEFUL);
} finally {
maybeRecordTimePerConnection(channel, channelStartTimeNanos);
}
@ -627,7 +639,7 @@ public class Selector implements Selectable, AutoCloseable { @@ -627,7 +639,7 @@ public class Selector implements Selectable, AutoCloseable {
log.trace("About to close the idle connection from {} due to being idle for {} millis",
connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000);
channel.state(ChannelState.EXPIRED);
close(channel, true, true);
close(channel, CloseMode.GRACEFUL);
}
}
}
@ -681,7 +693,7 @@ public class Selector implements Selectable, AutoCloseable { @@ -681,7 +693,7 @@ public class Selector implements Selectable, AutoCloseable {
// There is no disconnect notification for local close, but updating
// channel state here anyway to avoid confusion.
channel.state(ChannelState.LOCAL_CLOSE);
close(channel, false, false);
close(channel, CloseMode.DISCARD_NO_NOTIFY);
} else {
KafkaChannel closingChannel = this.closingChannels.remove(id);
// Close any closing channel, leave the channel in the state in which closing was triggered
@ -692,20 +704,15 @@ public class Selector implements Selectable, AutoCloseable { @@ -692,20 +704,15 @@ public class Selector implements Selectable, AutoCloseable {
/**
* Begin closing this connection.
* If 'closeMode' is `CloseMode.GRACEFUL`, the channel is disconnected here, but staged receives
* are processed. The channel is closed when there are no outstanding receives or if a send is
* requested. For other values of `closeMode`, outstanding receives are discarded and the channel
* is closed immediately.
*
* If 'processOutstanding' is true, the channel is disconnected here, but staged receives are
* processed. The channel is closed when there are no outstanding receives or if a send
* is requested. The channel will be added to disconnect list when it is actually closed.
*
* If 'processOutstanding' is false, outstanding receives are discarded and the channel is
* closed immediately. The channel will not be added to disconnected list and it is the
* responsibility of the caller to handle disconnect notifications.
* The channel will be added to disconnect list when it is actually closed if `closeMode.notifyDisconnect`
* is true.
*/
private void close(KafkaChannel channel, boolean processOutstanding, boolean notifyDisconnect) {
if (processOutstanding && !notifyDisconnect)
throw new IllegalStateException("Disconnect notification required for remote disconnect after processing outstanding requests");
private void close(KafkaChannel channel, CloseMode closeMode) {
channel.disconnect();
// Ensure that `connected` does not have closed channels. This could happen if `prepare` throws an exception
@ -719,12 +726,12 @@ public class Selector implements Selectable, AutoCloseable { @@ -719,12 +726,12 @@ public class Selector implements Selectable, AutoCloseable {
// a send fails or all outstanding receives are processed. Mute state of disconnected channels
// are tracked to ensure that requests are processed one-by-one by the broker to preserve ordering.
Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
if (processOutstanding && deque != null && !deque.isEmpty()) {
if (closeMode == CloseMode.GRACEFUL && deque != null && !deque.isEmpty()) {
// stagedReceives will be moved to completedReceives later along with receives from other channels
closingChannels.put(channel.id(), channel);
log.debug("Tracking closing connection {} to process outstanding requests", channel.id());
} else
doClose(channel, notifyDisconnect);
doClose(channel, closeMode.notifyDisconnect);
this.channels.remove(channel.id());
if (idleExpiryManager != null)

Loading…
Cancel
Save