Browse Source

KAFKA-15410: Reassign replica expand, move and shrink integration tests (2/4) (#14328)

- Updated the log-start-offset to the correct value while building the replica state in ReplicaFetcherTierStateMachine#buildRemoteLogAuxState

Integration tests added:
1. ReassignReplicaExpandTest
2. ReassignReplicaMoveTest and
3. ReassignReplicaShrinkTest

Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>
pull/14342/head
Kamal Chandraprakash 1 year ago committed by GitHub
parent
commit
9f2ac375c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
  2. 7
      core/src/main/scala/kafka/cluster/Partition.scala
  3. 8
      core/src/main/scala/kafka/log/LogManager.scala
  4. 8
      core/src/main/scala/kafka/log/UnifiedLog.scala
  5. 48
      core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
  6. 100
      storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java
  7. 3
      storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
  8. 32
      storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaExpandTest.java
  9. 32
      storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaMoveTest.java
  10. 106
      storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaShrinkTest.java

2
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java

@ -227,7 +227,7 @@ public class ReplicaFetcherTierStateMachine implements TierStateMachine { @@ -227,7 +227,7 @@ public class ReplicaFetcherTierStateMachine implements TierStateMachine {
// Truncate the existing local log before restoring the leader epoch cache and producer snapshots.
Partition partition = replicaMgr.getPartitionOrException(topicPartition);
partition.truncateFullyAndStartAt(nextOffset, false);
partition.truncateFullyAndStartAt(nextOffset, false, Option.apply(leaderLogStartOffset));
// Build leader epoch cache.
unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented);

7
core/src/main/scala/kafka/cluster/Partition.scala

@ -1646,12 +1646,15 @@ class Partition(val topicPartition: TopicPartition, @@ -1646,12 +1646,15 @@ class Partition(val topicPartition: TopicPartition,
*
* @param newOffset The new offset to start the log with
* @param isFuture True iff the truncation should be performed on the future log of this partition
* @param logStartOffsetOpt The log start offset to set for the log. If None, the new offset will be used.
*/
def truncateFullyAndStartAt(newOffset: Long, isFuture: Boolean): Unit = {
def truncateFullyAndStartAt(newOffset: Long,
isFuture: Boolean,
logStartOffsetOpt: Option[Long] = None): Unit = {
// The read lock is needed to prevent the follower replica from being truncated while ReplicaAlterDirThread
// is executing maybeReplaceCurrentWithFutureReplica() to replace follower replica with the future replica.
inReadLock(leaderIsrUpdateLock) {
logManager.truncateFullyAndStartAt(topicPartition, newOffset, isFuture = isFuture)
logManager.truncateFullyAndStartAt(topicPartition, newOffset, isFuture = isFuture, logStartOffsetOpt)
}
}

8
core/src/main/scala/kafka/log/LogManager.scala

@ -705,8 +705,12 @@ class LogManager(logDirs: Seq[File], @@ -705,8 +705,12 @@ class LogManager(logDirs: Seq[File],
* @param topicPartition The partition whose log needs to be truncated
* @param newOffset The new offset to start the log with
* @param isFuture True iff the truncation should be performed on the future log of the specified partition
* @param logStartOffsetOpt The log start offset to set for the log. If None, the new offset will be used.
*/
def truncateFullyAndStartAt(topicPartition: TopicPartition, newOffset: Long, isFuture: Boolean): Unit = {
def truncateFullyAndStartAt(topicPartition: TopicPartition,
newOffset: Long,
isFuture: Boolean,
logStartOffsetOpt: Option[Long] = None): Unit = {
val log = {
if (isFuture)
futureLogs.get(topicPartition)
@ -719,7 +723,7 @@ class LogManager(logDirs: Seq[File], @@ -719,7 +723,7 @@ class LogManager(logDirs: Seq[File],
if (!isFuture)
abortAndPauseCleaning(topicPartition)
try {
log.truncateFullyAndStartAt(newOffset)
log.truncateFullyAndStartAt(newOffset, logStartOffsetOpt)
if (!isFuture)
maybeTruncateCleanerCheckpointToActiveSegmentBaseOffset(log, topicPartition)
} finally {

8
core/src/main/scala/kafka/log/UnifiedLog.scala

@ -1745,15 +1745,17 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -1745,15 +1745,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* Delete all data in the log and start at the new offset
*
* @param newOffset The new offset to start the log with
* @param logStartOffsetOpt The log start offset to set for the log. If None, the new offset will be used.
*/
def truncateFullyAndStartAt(newOffset: Long): Unit = {
def truncateFullyAndStartAt(newOffset: Long,
logStartOffsetOpt: Option[Long] = None): Unit = {
maybeHandleIOException(s"Error while truncating the entire log for $topicPartition in dir ${dir.getParent}") {
debug(s"Truncate and start at offset $newOffset")
debug(s"Truncate and start at offset $newOffset, logStartOffset: ${logStartOffsetOpt.getOrElse(newOffset)}")
lock synchronized {
localLog.truncateFullyAndStartAt(newOffset)
leaderEpochCache.foreach(_.clearAndFlush())
producerStateManager.truncateFullyAndStartAt(newOffset)
logStartOffset = newOffset
logStartOffset = logStartOffsetOpt.getOrElse(newOffset)
rebuildProducerState(newOffset, producerStateManager)
updateHighWatermark(localLog.logEndOffsetMetadata)
}

48
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala

@ -171,15 +171,55 @@ class UnifiedLogTest { @@ -171,15 +171,55 @@ class UnifiedLogTest {
@Test
def testTruncateBelowFirstUnstableOffset(): Unit = {
testTruncateBelowFirstUnstableOffset(_.truncateTo)
testTruncateBelowFirstUnstableOffset((log, targetOffset) => log.truncateTo(targetOffset))
}
@Test
def testTruncateFullyAndStartBelowFirstUnstableOffset(): Unit = {
testTruncateBelowFirstUnstableOffset(_.truncateFullyAndStartAt)
testTruncateBelowFirstUnstableOffset((log, targetOffset) => log.truncateFullyAndStartAt(targetOffset))
}
private def testTruncateBelowFirstUnstableOffset(truncateFunc: UnifiedLog => (Long => Unit)): Unit = {
@Test
def testTruncateFullyAndStart(): Unit = {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
val log = createLog(logDir, logConfig)
val producerId = 17L
val producerEpoch: Short = 10
val sequence = 0
log.appendAsLeader(TestUtils.records(List(
new SimpleRecord("0".getBytes),
new SimpleRecord("1".getBytes),
new SimpleRecord("2".getBytes)
)), leaderEpoch = 0)
log.appendAsLeader(MemoryRecords.withTransactionalRecords(
CompressionType.NONE,
producerId,
producerEpoch,
sequence,
new SimpleRecord("3".getBytes),
new SimpleRecord("4".getBytes)
), leaderEpoch = 0)
assertEquals(Some(3L), log.firstUnstableOffset)
// We close and reopen the log to ensure that the first unstable offset segment
// position will be undefined when we truncate the log.
log.close()
val reopened = createLog(logDir, logConfig)
assertEquals(Optional.of(new LogOffsetMetadata(3L)), reopened.producerStateManager.firstUnstableOffset)
reopened.truncateFullyAndStartAt(2L, Some(1L))
assertEquals(None, reopened.firstUnstableOffset)
assertEquals(java.util.Collections.emptyMap(), reopened.producerStateManager.activeProducers)
assertEquals(1L, reopened.logStartOffset)
assertEquals(2L, reopened.logEndOffset)
}
private def testTruncateBelowFirstUnstableOffset(truncateFunc: (UnifiedLog, Long) => Unit): Unit = {
// Verify that truncation below the first unstable offset correctly
// resets the producer state. Specifically we are testing the case when
// the segment position of the first unstable offset is unknown.
@ -215,7 +255,7 @@ class UnifiedLogTest { @@ -215,7 +255,7 @@ class UnifiedLogTest {
val reopened = createLog(logDir, logConfig)
assertEquals(Optional.of(new LogOffsetMetadata(3L)), reopened.producerStateManager.firstUnstableOffset)
truncateFunc(reopened)(0L)
truncateFunc(reopened, 0L)
assertEquals(None, reopened.firstUnstableOffset)
assertEquals(java.util.Collections.emptyMap(), reopened.producerStateManager.activeProducers)
}

100
storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java

@ -0,0 +1,100 @@ @@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.integration;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
public abstract class BaseReassignReplicaTest extends TieredStorageTestHarness {
protected final Integer broker0 = 0;
protected final Integer broker1 = 1;
/**
* Cluster of two brokers
* @return number of brokers in the cluster
*/
@Override
public int brokerCount() {
return 2;
}
/**
* Number of partitions in the '__remote_log_metadata' topic
* @return number of partitions in the '__remote_log_metadata' topic
*/
@Override
public int numRemoteLogMetadataPartitions() {
return 2;
}
@Override
protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
final String topicA = "topicA";
final String topicB = "topicB";
final Integer p0 = 0;
final Integer partitionCount = 5;
final Integer replicationFactor = 2;
final Integer maxBatchCountPerSegment = 1;
final Map<Integer, List<Integer>> replicaAssignment = null;
final boolean enableRemoteLogStorage = true;
final List<Integer> metadataPartitions = new ArrayList<>();
for (int i = 0; i < numRemoteLogMetadataPartitions(); i++) {
metadataPartitions.add(i);
}
builder
// create topicA with 5 partitions, 2 RF and ensure that the user-topic-partitions are mapped to
// metadata partitions
.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment,
replicaAssignment, enableRemoteLogStorage)
.expectUserTopicMappedToMetadataPartitions(topicA, metadataPartitions)
// create topicB with 1 partition and 1 RF
.createTopic(topicB, 1, 1, maxBatchCountPerSegment,
mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
// send records to partition 0
.expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
.expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
.expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
.produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"))
// The newly created replica gets mapped to one of the metadata partition which is being actively
// consumed by both the brokers
.reassignReplica(topicB, p0, replicaIds())
.expectLeader(topicB, p0, broker1, true)
// produce some more events and verify the earliest local offset
.expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
.produce(topicB, p0, new KeyValueSpec("k3", "v3"))
// consume from the beginning of the topic to read data from local and remote storage
.expectFetchFromTieredStorage(broker1, topicB, p0, 3)
.consume(topicB, p0, 0L, 4, 3);
}
/**
* Replicas of the topic
* @return the replica-ids of the topic
*/
protected abstract List<Integer> replicaIds();
}

3
storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java

@ -83,7 +83,8 @@ public final class PartitionsExpandTest extends TieredStorageTestHarness { @@ -83,7 +83,8 @@ public final class PartitionsExpandTest extends TieredStorageTestHarness {
new KeyValueSpec("k2", "v2"))
// produce some more events to partition 0 and expect the segments to be offloaded
// NOTE: Support needs to be added to capture the offloaded segment event for already sent message (k2, v2)
// KAFKA-15431: Support needs to be added to capture the offloaded segment event for already sent
// message (k2, v2)
// .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new KeyValueSpec("k2", "v2"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new KeyValueSpec("k3", "v3"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 4, new KeyValueSpec("k4", "v4"))

32
storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaExpandTest.java

@ -0,0 +1,32 @@ @@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.integration;
import java.util.Arrays;
import java.util.List;
public final class ReassignReplicaExpandTest extends BaseReassignReplicaTest {
/**
* Expand the replication factor of the topic by changing the replica list from 0 to 0, 1
* @return the replica-ids of the topic
*/
@Override
protected List<Integer> replicaIds() {
return Arrays.asList(broker0, broker1);
}
}

32
storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaMoveTest.java

@ -0,0 +1,32 @@ @@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.integration;
import java.util.Collections;
import java.util.List;
public final class ReassignReplicaMoveTest extends BaseReassignReplicaTest {
/**
* Move the replica of the topic from broker0 to broker1
* @return the replica-ids of the topic
*/
@Override
protected List<Integer> replicaIds() {
return Collections.singletonList(broker1);
}
}

106
storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaShrinkTest.java

@ -0,0 +1,106 @@ @@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.integration;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
public final class ReassignReplicaShrinkTest extends TieredStorageTestHarness {
/**
* Cluster of two brokers
* @return number of brokers in the cluster
*/
@Override
public int brokerCount() {
return 2;
}
/**
* Number of partitions in the '__remote_log_metadata' topic
* @return number of partitions in the '__remote_log_metadata' topic
*/
@Override
public int numRemoteLogMetadataPartitions() {
return 2;
}
@Override
protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
final Integer broker0 = 0;
final Integer broker1 = 1;
final String topicA = "topicA";
final Integer p0 = 0;
final Integer p1 = 1;
final Integer partitionCount = 2;
final Integer replicationFactor = 2;
final Integer maxBatchCountPerSegment = 1;
final boolean enableRemoteLogStorage = true;
final Map<Integer, List<Integer>> replicaAssignment = mkMap(
mkEntry(p0, Arrays.asList(broker0, broker1)),
mkEntry(p1, Arrays.asList(broker1, broker0))
);
builder
// create topicA with 2 partitions and 2 RF
.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment,
replicaAssignment, enableRemoteLogStorage)
// send records to partition 0, expect that the segments are uploaded and removed from local log dir
.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new KeyValueSpec("k0", "v0"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"))
// send records to partition 1, expect that the segments are uploaded and removed from local log dir
.expectSegmentToBeOffloaded(broker1, topicA, p1, 0, new KeyValueSpec("k0", "v0"))
.expectSegmentToBeOffloaded(broker1, topicA, p1, 1, new KeyValueSpec("k1", "v1"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p1, 2L)
.produce(topicA, p1, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"))
// shrink the replication factor to 1
.shrinkReplica(topicA, p0, Collections.singletonList(broker1))
.shrinkReplica(topicA, p1, Collections.singletonList(broker0))
.expectLeader(topicA, p0, broker1, false)
.expectLeader(topicA, p1, broker0, false)
// produce some more events to partition 0
// KAFKA-15431: Support needs to be added to capture the offloaded segment event for already sent
// message (k2, v2)
// .expectSegmentToBeOffloaded(broker1, topicA, p0, 2, new KeyValueSpec("k2", "v2"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
.produce(topicA, p0, new KeyValueSpec("k3", "v3"))
// produce some more events to partition 1
// KAFKA-15431: Support needs to be added to capture the offloaded segment event for already sent
// message (k2, v2)
// .expectSegmentToBeOffloaded(broker0, topicA, p1, 2, new KeyValueSpec("k2", "v2"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p1, 3L)
.produce(topicA, p1, new KeyValueSpec("k3", "v3"))
// consume from the beginning of the topic to read data from local and remote storage
.expectFetchFromTieredStorage(broker1, topicA, p0, 3)
.consume(topicA, p0, 0L, 4, 3)
.expectFetchFromTieredStorage(broker0, topicA, p1, 3)
.consume(topicA, p1, 0L, 4, 3);
}
}
Loading…
Cancel
Save