|
|
|
@ -29,7 +29,7 @@ import kafka.server._
@@ -29,7 +29,7 @@ import kafka.server._
|
|
|
|
|
import kafka.utils.{CoreUtils, MockScheduler, MockTime, TestUtils} |
|
|
|
|
import kafka.zk.KafkaZkClient |
|
|
|
|
import org.apache.kafka.common.TopicPartition |
|
|
|
|
import org.apache.kafka.common.errors.{ApiException, LeaderNotAvailableException, OffsetNotAvailableException, ReplicaNotAvailableException} |
|
|
|
|
import org.apache.kafka.common.errors.{ApiException, OffsetNotAvailableException, ReplicaNotAvailableException} |
|
|
|
|
import org.apache.kafka.common.metrics.Metrics |
|
|
|
|
import org.apache.kafka.common.protocol.Errors |
|
|
|
|
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset |
|
|
|
@ -405,9 +405,6 @@ class PartitionTest {
@@ -405,9 +405,6 @@ class PartitionTest {
|
|
|
|
|
val batch2 = TestUtils.records(records = List(new SimpleRecord("k3".getBytes, "v1".getBytes), |
|
|
|
|
new SimpleRecord(20,"k4".getBytes, "v2".getBytes), |
|
|
|
|
new SimpleRecord(21,"k5".getBytes, "v3".getBytes))) |
|
|
|
|
val batch3 = TestUtils.records(records = List( |
|
|
|
|
new SimpleRecord(30,"k6".getBytes, "v1".getBytes), |
|
|
|
|
new SimpleRecord(31,"k7".getBytes, "v2".getBytes))) |
|
|
|
|
|
|
|
|
|
val partition = Partition(topicPartition, time, replicaManager) |
|
|
|
|
assertTrue("Expected first makeLeader() to return 'leader changed'", |
|
|
|
@ -421,7 +418,7 @@ class PartitionTest {
@@ -421,7 +418,7 @@ class PartitionTest {
|
|
|
|
|
val follower2Replica = partition.getReplica(follower2).get |
|
|
|
|
|
|
|
|
|
// append records with initial leader epoch |
|
|
|
|
val lastOffsetOfFirstBatch = partition.appendRecordsToLeader(batch1, isFromClient = true).lastOffset |
|
|
|
|
partition.appendRecordsToLeader(batch1, isFromClient = true) |
|
|
|
|
partition.appendRecordsToLeader(batch2, isFromClient = true) |
|
|
|
|
assertEquals("Expected leader's HW not move", leaderReplica.logStartOffset, leaderReplica.highWatermark.messageOffset) |
|
|
|
|
|
|
|
|
|