Browse Source

KafkaController.RequestSendThread can throw exception on broker socket; patched by Yang Ye; reviewed by Jun Rao; KAFKA-459, KAFKA-460

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1373448 13f79535-47bb-0310-9956-ffa450edef68
0.8.0-beta1-candidate1
Jun Rao 12 years ago
parent
commit
0fe89f7a49
  1. 4
      core/src/main/scala/kafka/log/Log.scala
  2. 78
      core/src/main/scala/kafka/server/KafkaController.scala

4
core/src/main/scala/kafka/log/Log.scala

@ -438,8 +438,8 @@ private[kafka] class Log( val dir: File, val maxSize: Long, val flushInterval: I @@ -438,8 +438,8 @@ private[kafka] class Log( val dir: File, val maxSize: Long, val flushInterval: I
segment.truncateTo(targetOffset)
info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, targetOffset))
case None =>
assert(targetOffset <= segments.view.last.absoluteEndOffset, "Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
error("Cannot truncate log to %d since the log start offset is %d and end offset is %d".format(targetOffset, segments.view.head.start, segments.view.last.absoluteEndOffset))
if(targetOffset > segments.view.last.absoluteEndOffset)
error("Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
}
val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)

78
core/src/main/scala/kafka/server/KafkaController.scala

@ -61,25 +61,24 @@ class RequestSendThread(val controllerId: Int, @@ -61,25 +61,24 @@ class RequestSendThread(val controllerId: Int,
lock synchronized {
channel.send(request)
receive = channel.receive()
var response: RequestOrResponse = null
request.requestId.get match {
case RequestKeys.LeaderAndISRRequest =>
response = LeaderAndISRResponse.readFrom(receive.buffer)
case RequestKeys.StopReplicaRequest =>
response = StopReplicaResponse.readFrom(receive.buffer)
}
trace("got a response %s".format(controllerId, response, toBrokerId))
if(callback != null){
callback(response)
}
}
} catch {
case e =>
// log it and let it go. Let controller shut it down.
debug("Exception occurs", e)
}
var response: RequestOrResponse = null
request.requestId.get match {
case RequestKeys.LeaderAndISRRequest =>
response = LeaderAndISRResponse.readFrom(receive.buffer)
case RequestKeys.StopReplicaRequest =>
response = StopReplicaResponse.readFrom(receive.buffer)
}
trace("got a response %s".format(controllerId, response, toBrokerId))
if(callback != null){
callback(response)
}
}
} catch{
case e: InterruptedException => warn("intterrupted. Shutting down")
@ -94,6 +93,7 @@ class ControllerChannelManager(allBrokers: Set[Broker], config : KafkaConfig) ex @@ -94,6 +93,7 @@ class ControllerChannelManager(allBrokers: Set[Broker], config : KafkaConfig) ex
private val messageChannels = new HashMap[Int, BlockingChannel]
private val messageQueues = new HashMap[Int, BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)]]
private val messageThreads = new HashMap[Int, RequestSendThread]
private val lock = new Object()
this.logIdent = "Channel manager on controller " + config.brokerId + ", "
for(broker <- allBrokers){
brokers.put(broker.id, broker)
@ -117,8 +117,10 @@ class ControllerChannelManager(allBrokers: Set[Broker], config : KafkaConfig) ex @@ -117,8 +117,10 @@ class ControllerChannelManager(allBrokers: Set[Broker], config : KafkaConfig) ex
}
def shutDown() = {
for((brokerId, broker) <- brokers){
removeBroker(brokerId)
lock synchronized {
for((brokerId, broker) <- brokers){
removeBroker(brokerId)
}
}
}
@ -127,30 +129,34 @@ class ControllerChannelManager(allBrokers: Set[Broker], config : KafkaConfig) ex @@ -127,30 +129,34 @@ class ControllerChannelManager(allBrokers: Set[Broker], config : KafkaConfig) ex
}
def addBroker(broker: Broker){
brokers.put(broker.id, broker)
messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
val channel = new BlockingChannel(broker.host, broker.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
config.controllerSocketTimeoutMs)
channel.connect()
messageChannels.put(broker.id, channel)
val thread = new RequestSendThread(config.brokerId, broker.id, messageQueues(broker.id), messageChannels(broker.id))
thread.setDaemon(false)
thread.start()
messageThreads.put(broker.id, thread)
lock synchronized {
brokers.put(broker.id, broker)
messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
val channel = new BlockingChannel(broker.host, broker.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
config.controllerSocketTimeoutMs)
channel.connect()
messageChannels.put(broker.id, channel)
val thread = new RequestSendThread(config.brokerId, broker.id, messageQueues(broker.id), messageChannels(broker.id))
thread.setDaemon(false)
thread.start()
messageThreads.put(broker.id, thread)
}
}
def removeBroker(brokerId: Int){
brokers.remove(brokerId)
try {
messageChannels(brokerId).disconnect()
messageChannels.remove(brokerId)
messageQueues.remove(brokerId)
messageThreads(brokerId).shutDown()
messageThreads.remove(brokerId)
}catch {
case e => error("Error while removing broker by the controller", e)
lock synchronized {
brokers.remove(brokerId)
try {
messageChannels(brokerId).disconnect()
messageChannels.remove(brokerId)
messageQueues.remove(brokerId)
messageThreads(brokerId).shutDown()
messageThreads.remove(brokerId)
}catch {
case e => error("Error while removing broker by the controller", e)
}
}
}
}

Loading…
Cancel
Save