Browse Source

kafka-649; Cleanup log4j logging; patched by Jun Rao; reviewed by Jay Kreps

0.8.0-beta1-candidate1
Jun Rao 12 years ago
parent
commit
ae362b0864
  1. 5
      core/src/main/scala/kafka/cluster/Partition.scala
  2. 2
      core/src/main/scala/kafka/consumer/ConsumerIterator.scala
  3. 10
      core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
  4. 2
      core/src/main/scala/kafka/log/FileMessageSet.scala
  5. 2
      core/src/main/scala/kafka/log/Log.scala
  6. 2
      core/src/main/scala/kafka/log/OffsetIndex.scala
  7. 2
      core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
  8. 12
      core/src/main/scala/kafka/server/KafkaApis.scala
  9. 6
      core/src/main/scala/kafka/server/ReplicaManager.scala
  10. 8
      core/src/main/scala/kafka/utils/ZkUtils.scala

5
core/src/main/scala/kafka/cluster/Partition.scala

@ -220,7 +220,8 @@ class Partition(val topic: String, @@ -220,7 +220,8 @@ class Partition(val topic: String,
if (!inSyncReplicas.contains(replica) && replica.logEndOffset >= leaderHW) {
// expand ISR
val newInSyncReplicas = inSyncReplicas + replica
info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(", ")))
info("Expanding ISR for topic %s partition %d from %s to %s"
.format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
// update ISR in ZK and cache
updateIsr(newInSyncReplicas)
replicaManager.isrExpandRate.mark()
@ -315,7 +316,7 @@ class Partition(val topic: String, @@ -315,7 +316,7 @@ class Partition(val topic: String,
}
private def updateIsr(newIsr: Set[Replica]) {
info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(", ")))
debug("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(",")))
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
// use the epoch of the controller that made the leadership decision, instead of the current controller epoch
val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,

2
core/src/main/scala/kafka/consumer/ConsumerIterator.scala

@ -109,7 +109,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk @@ -109,7 +109,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
def clearCurrentChunk() {
try {
info("Clearing the current data chunk for this consumer iterator")
debug("Clearing the current data chunk for this consumer iterator")
current.set(null)
}
}

10
core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala

@ -53,8 +53,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten @@ -53,8 +53,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
case true =>
debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s"
.format(liveAssignedReplicasToThisPartition.mkString(",")))
debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s"
.format(topicAndPartition, liveAssignedReplicasToThisPartition.mkString(",")))
liveAssignedReplicasToThisPartition.isEmpty match {
case true =>
throw new NoReplicaOnlineException(("No replica for partition " +
@ -63,13 +63,13 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten @@ -63,13 +63,13 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
case false =>
ControllerStats.uncleanLeaderElectionRate.mark()
val newLeader = liveAssignedReplicasToThisPartition.head
warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) +
"There's potential data loss")
warn("No broker in ISR is alive for %s. Elect leader from broker %s. There's potential data loss."
.format(topicAndPartition, newLeader))
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
}
case false =>
val newLeader = liveBrokersInIsr.head
debug("Some broker in ISR is alive, selecting the leader from the ISR: " + newLeader)
debug("Some broker in ISR is alive for %s. Select %d from ISR to be the leader.".format(topicAndPartition, newLeader))
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
}
info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))

2
core/src/main/scala/kafka/log/FileMessageSet.scala

@ -44,7 +44,7 @@ class FileMessageSet private[kafka](val file: File, @@ -44,7 +44,7 @@ class FileMessageSet private[kafka](val file: File,
private val _size = new AtomicInteger(scala.math.min(channel.size().toInt, limit) - start)
if (initChannelPositionToEnd) {
info("Creating or reloading log segment %s".format(file.getAbsolutePath))
debug("Creating or reloading log segment %s".format(file.getAbsolutePath))
/* set the file position to the last byte in the file */
channel.position(channel.size)
}

2
core/src/main/scala/kafka/log/Log.scala

@ -127,7 +127,7 @@ private[kafka] class Log(val dir: File, @@ -127,7 +127,7 @@ private[kafka] class Log(val dir: File,
/* Calculate the offset of the next message */
private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset())
debug("Completed load of log %s with log end offset %d".format(name, logEndOffset))
info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
newGauge(name + "-" + "NumLogSegments",
new Gauge[Int] { def getValue = numberOfSegments })

2
core/src/main/scala/kafka/log/OffsetIndex.scala

@ -90,7 +90,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = @@ -90,7 +90,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
/* the last offset in the index */
var lastOffset = readLastOffset()
info("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
.format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position))
/* the maximum number of entries this index can hold */

2
core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala

@ -135,7 +135,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, @@ -135,7 +135,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
} else {
// currently, if in async mode, we just log the serialization error. We need to revisit
// this when doing kafka-496
error("Error serializing message ", t)
error("Error serializing message for topic %s".format(e.topic), t)
}
}
}

12
core/src/main/scala/kafka/server/KafkaApis.scala

@ -205,10 +205,10 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -205,10 +205,10 @@ class KafkaApis(val requestChannel: RequestChannel,
Runtime.getRuntime.halt(1)
null
case utpe: UnknownTopicOrPartitionException =>
warn(utpe.getMessage)
warn("Produce request: " + utpe.getMessage)
new ProduceResult(topicAndPartition, utpe)
case nle: NotLeaderForPartitionException =>
warn(nle.getMessage)
warn("Produce request: " + nle.getMessage)
new ProduceResult(topicAndPartition, nle)
case e =>
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
@ -291,15 +291,17 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -291,15 +291,17 @@ class KafkaApis(val requestChannel: RequestChannel,
// since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request
// for a partition it is the leader for
case utpe: UnknownTopicOrPartitionException =>
warn(utpe.getMessage)
warn("Fetch request: " + utpe.getMessage)
new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
case nle: NotLeaderForPartitionException =>
warn(nle.getMessage)
warn("Fetch request: " + nle.getMessage)
new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
case t =>
BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
error("error when processing request " + (topic, partition, offset, fetchSize), t)
error("Error when processing fetch request for topic %s partition %d offset %d from %s with correlation id %d"
.format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId),
t)
new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
}
(TopicAndPartition(topic, partition), partitionData)

6
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -131,7 +131,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -131,7 +131,7 @@ class ReplicaManager(val config: KafkaConfig,
def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[(String, Int), Short], Short) = {
val responseMap = new collection.mutable.HashMap[(String, Int), Short]
if(stopReplicaRequest.controllerEpoch < controllerEpoch) {
stateChangeLogger.error("Broker %d received stop replica request from an old controller epoch %d."
stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d."
.format(localBrokerId, stopReplicaRequest.controllerEpoch) +
" Latest known controller epoch is %d " + controllerEpoch)
(responseMap, ErrorMapping.StaleControllerEpochCode)
@ -196,14 +196,14 @@ class ReplicaManager(val config: KafkaConfig, @@ -196,14 +196,14 @@ class ReplicaManager(val config: KafkaConfig,
def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = {
leaderAndISRRequest.partitionStateInfos.foreach(p =>
stateChangeLogger.trace("Broker %d handling LeaderAndIsr request correlationId %d received from controller %d epoch %d for partition [%s,%d]"
stateChangeLogger.trace("Broker %d handling LeaderAndIsr request correlation id %d received from controller %d epoch %d for partition [%s,%d]"
.format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId,
leaderAndISRRequest.controllerEpoch, p._1._1, p._1._2)))
info("Handling LeaderAndIsr request %s".format(leaderAndISRRequest))
val responseMap = new collection.mutable.HashMap[(String, Int), Short]
if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
stateChangeLogger.error("Broker %d received LeaderAndIsr request correlationId %d with an old controllerEpoch %d, latest known controllerEpoch is %d"
stateChangeLogger.warn("Broker %d received LeaderAndIsr request correlation id %d with an old controller epoch %d. Latest known controller epoch is %d"
.format(localBrokerId, leaderAndISRRequest.controllerEpoch, leaderAndISRRequest.correlationId, controllerEpoch))
(responseMap, ErrorMapping.StaleControllerEpochCode)
}else {

8
core/src/main/scala/kafka/utils/ZkUtils.scala

@ -328,12 +328,12 @@ object ZkUtils extends Logging { @@ -328,12 +328,12 @@ object ZkUtils extends Logging {
def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = {
try {
val stat = client.writeData(path, data, expectVersion)
info("Conditional update of zkPath %s with value %s and expected version %d succeeded, returning the new version: %d"
debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
.format(path, data, expectVersion, stat.getVersion))
(true, stat.getVersion)
} catch {
case e: Exception =>
error("Conditional update of zkPath %s with data %s and expected version %d failed".format(path, data,
error("Conditional update of path %s with data %s and expected version %d failed".format(path, data,
expectVersion), e)
(false, -1)
}
@ -346,13 +346,13 @@ object ZkUtils extends Logging { @@ -346,13 +346,13 @@ object ZkUtils extends Logging {
def conditionalUpdatePersistentPathIfExists(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = {
try {
val stat = client.writeData(path, data, expectVersion)
info("Conditional update of zkPath %s with value %s and expected version %d succeeded, returning the new version: %d"
debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
.format(path, data, expectVersion, stat.getVersion))
(true, stat.getVersion)
} catch {
case nne: ZkNoNodeException => throw nne
case e: Exception =>
error("Conditional update of zkPath %s with data %s and expected version %d failed".format(path, data,
error("Conditional update of path %s with data %s and expected version %d failed".format(path, data,
expectVersion), e)
(false, -1)
}

Loading…
Cancel
Save