@ -29,6 +29,7 @@ import kafka.server.checkpoints.OffsetCheckpoints
@@ -29,6 +29,7 @@ import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils.CoreUtils. { inReadLock , inWriteLock }
import kafka.utils._
import kafka.zk. { AdminZkClient , KafkaZkClient }
import kafka.zookeeper.ZooKeeperClientException
import org.apache.kafka.common.errors._
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.protocol.Errors
@ -499,7 +500,15 @@ class Partition(val topicPartition: TopicPartition,
@@ -499,7 +500,15 @@ class Partition(val topicPartition: TopicPartition,
addingReplicas = addingReplicas ,
removingReplicas = removingReplicas
)
createLogIfNotExists ( partitionState . isNew , isFutureReplica = false , highWatermarkCheckpoints )
try {
createLogIfNotExists ( partitionState . isNew , isFutureReplica = false , highWatermarkCheckpoints )
} catch {
case e : ZooKeeperClientException =>
stateChangeLogger . error ( s" A ZooKeeper client exception has occurred and makeLeader will be skipping the " +
s" state change for the partition $topicPartition with leader epoch: $leaderEpoch " , e )
return false
}
val leaderLog = localLogOrException
val leaderEpochStartOffset = leaderLog . logEndOffset
@ -570,7 +579,15 @@ class Partition(val topicPartition: TopicPartition,
@@ -570,7 +579,15 @@ class Partition(val topicPartition: TopicPartition,
addingReplicas = partitionState . addingReplicas . asScala . map ( _ . toInt ) ,
removingReplicas = partitionState . removingReplicas . asScala . map ( _ . toInt )
)
createLogIfNotExists ( partitionState . isNew , isFutureReplica = false , highWatermarkCheckpoints )
try {
createLogIfNotExists ( partitionState . isNew , isFutureReplica = false , highWatermarkCheckpoints )
} catch {
case e : ZooKeeperClientException =>
stateChangeLogger . error ( s" A ZooKeeper client exception has occurred. makeFollower will be skipping the " +
s" state change for the partition $topicPartition with leader epoch: $leaderEpoch . " , e )
return false
}
val followerLog = localLogOrException
val leaderEpochEndOffset = followerLog . logEndOffset