Browse Source

KAFKA-15399: Enable OffloadAndConsumeFromLeader test (#14285)

Reviewers: Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>, Satish Duggana <satishd@apache.org>
pull/14338/head
Kamal Chandraprakash 1 year ago committed by GitHub
parent
commit
4590d565ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      checkstyle/import-control-storage.xml
  2. 4
      core/src/main/java/kafka/log/remote/RemoteLogManager.java
  3. 19
      core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
  4. 17
      storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
  5. 16
      storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageCondition.java
  6. 6
      storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java
  7. 9
      storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
  8. 6
      storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
  9. 1
      storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java
  10. 6
      storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
  11. 9
      storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java

1
checkstyle/import-control-storage.xml

@ -97,6 +97,7 @@
<allow pkg="kafka.utils" /> <allow pkg="kafka.utils" />
<allow pkg="org.apache.kafka.common.config" /> <allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.header" />
<allow pkg="org.apache.kafka.common.record" /> <allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.replica" /> <allow pkg="org.apache.kafka.common.replica" />
<allow pkg="org.apache.kafka.common.network" /> <allow pkg="org.apache.kafka.common.network" />

4
core/src/main/java/kafka/log/remote/RemoteLogManager.java

@ -594,13 +594,13 @@ public class RemoteLogManager implements Closeable {
private void maybeUpdateReadOffset(UnifiedLog log) throws RemoteStorageException { private void maybeUpdateReadOffset(UnifiedLog log) throws RemoteStorageException {
if (!copiedOffsetOption.isPresent()) { if (!copiedOffsetOption.isPresent()) {
logger.info("Find the highest remote offset for partition: {} after becoming leader, leaderEpoch: {}", topicIdPartition, leaderEpoch);
// This is found by traversing from the latest leader epoch from leader epoch history and find the highest offset // This is found by traversing from the latest leader epoch from leader epoch history and find the highest offset
// of a segment with that epoch copied into remote storage. If it can not find an entry then it checks for the // of a segment with that epoch copied into remote storage. If it can not find an entry then it checks for the
// previous leader epoch till it finds an entry, If there are no entries till the earliest leader epoch in leader // previous leader epoch till it finds an entry, If there are no entries till the earliest leader epoch in leader
// epoch cache then it starts copying the segments from the earliest epoch entry's offset. // epoch cache then it starts copying the segments from the earliest epoch entry's offset.
copiedOffsetOption = OptionalLong.of(findHighestRemoteOffset(topicIdPartition, log)); copiedOffsetOption = OptionalLong.of(findHighestRemoteOffset(topicIdPartition, log));
logger.info("Found the highest copied remote offset: {} for partition: {} after becoming leader, " +
"leaderEpoch: {}", copiedOffsetOption, topicIdPartition, leaderEpoch);
} }
} }

19
core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala

@ -18,7 +18,7 @@ package kafka.admin
import kafka.api.IntegrationTestHarness import kafka.api.IntegrationTestHarness
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.{Logging, TestInfoUtils, TestUtils} import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig} import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
@ -283,6 +283,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft")) @ValueSource(strings = Array("zk", "kraft"))
def testTopicDeletion(quorum: String): Unit = { def testTopicDeletion(quorum: String): Unit = {
MyRemoteStorageManager.deleteSegmentEventCounter.set(0)
val numPartitions = 2 val numPartitions = 2
val topicConfig = new Properties() val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
@ -293,12 +294,9 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
TestUtils.deleteTopicWithAdmin(createAdminClient(), testTopicName, brokers) TestUtils.deleteTopicWithAdmin(createAdminClient(), testTopicName, brokers)
assertThrowsException(classOf[UnknownTopicOrPartitionException], assertThrowsException(classOf[UnknownTopicOrPartitionException],
() => TestUtils.describeTopic(createAdminClient(), testTopicName), "Topic should be deleted") () => TestUtils.describeTopic(createAdminClient(), testTopicName), "Topic should be deleted")
TestUtils.waitUntilTrue(() =>
// FIXME: It seems the storage manager is being instantiated in different class loader so couldn't verify the value numPartitions * MyRemoteLogMetadataManager.segmentCountPerPartition == MyRemoteStorageManager.deleteSegmentEventCounter.get(),
// but ensured it by adding a log statement in the storage manager (manually). "Remote log segments should be deleted only once by the leader")
// assertEquals(numPartitions * MyRemoteLogMetadataManager.segmentCount,
// MyRemoteStorageManager.deleteSegmentEventCounter.get(),
// "Remote log segments should be deleted only once by the leader")
} }
private def assertThrowsException(exceptionType: Class[_ <: Throwable], private def assertThrowsException(exceptionType: Class[_ <: Throwable],
@ -365,12 +363,11 @@ object MyRemoteStorageManager {
val deleteSegmentEventCounter = new AtomicInteger(0) val deleteSegmentEventCounter = new AtomicInteger(0)
} }
class MyRemoteStorageManager extends NoOpRemoteStorageManager with Logging { class MyRemoteStorageManager extends NoOpRemoteStorageManager {
import MyRemoteStorageManager._ import MyRemoteStorageManager._
override def deleteLogSegmentData(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Unit = { override def deleteLogSegmentData(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Unit = {
deleteSegmentEventCounter.incrementAndGet() deleteSegmentEventCounter.incrementAndGet()
info(s"Deleted the remote log segment: $remoteLogSegmentMetadata, counter: ${deleteSegmentEventCounter.get()}")
} }
} }
@ -381,7 +378,7 @@ class MyRemoteLogMetadataManager extends NoOpRemoteLogMetadataManager {
override def listRemoteLogSegments(topicIdPartition: TopicIdPartition): util.Iterator[RemoteLogSegmentMetadata] = { override def listRemoteLogSegments(topicIdPartition: TopicIdPartition): util.Iterator[RemoteLogSegmentMetadata] = {
val segmentMetadataList = new util.ArrayList[RemoteLogSegmentMetadata]() val segmentMetadataList = new util.ArrayList[RemoteLogSegmentMetadata]()
for (idx <- 0 until segmentCount) { for (idx <- 0 until segmentCountPerPartition) {
val timestamp = time.milliseconds() val timestamp = time.milliseconds()
val startOffset = idx * recordsPerSegment val startOffset = idx * recordsPerSegment
val endOffset = startOffset + recordsPerSegment - 1 val endOffset = startOffset + recordsPerSegment - 1
@ -395,7 +392,7 @@ class MyRemoteLogMetadataManager extends NoOpRemoteLogMetadataManager {
} }
object MyRemoteLogMetadataManager { object MyRemoteLogMetadataManager {
val segmentCount = 10 val segmentCountPerPartition = 10
val recordsPerSegment = 100 val recordsPerSegment = 100
val segmentSize = 1024 val segmentSize = 1024
} }

17
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java

@ -306,19 +306,13 @@ public final class LocalTieredStorage implements RemoteStorageManager {
public Optional<CustomMetadata> copyLogSegmentData(final RemoteLogSegmentMetadata metadata, final LogSegmentData data) public Optional<CustomMetadata> copyLogSegmentData(final RemoteLogSegmentMetadata metadata, final LogSegmentData data)
throws RemoteStorageException { throws RemoteStorageException {
Callable<Optional<CustomMetadata>> callable = () -> { Callable<Optional<CustomMetadata>> callable = () -> {
final RemoteLogSegmentId id = metadata.remoteLogSegmentId(); final LocalTieredStorageEvent.Builder eventBuilder = newEventBuilder(COPY_SEGMENT, metadata);
final LocalTieredStorageEvent.Builder eventBuilder = newEventBuilder(COPY_SEGMENT, id);
RemoteLogSegmentFileset fileset = null; RemoteLogSegmentFileset fileset = null;
try { try {
fileset = openFileset(storageDirectory, metadata); fileset = openFileset(storageDirectory, metadata);
logger.info("Offloading log segment for {} from segment={}", metadata.topicIdPartition(), data.logSegment());
logger.info("Offloading log segment for {} from segment={}", id.topicIdPartition(), data.logSegment());
fileset.copy(transferer, data); fileset.copy(transferer, data);
storageListeners.onStorageEvent(eventBuilder.withFileset(fileset).build()); storageListeners.onStorageEvent(eventBuilder.withFileset(fileset).build());
} catch (final Exception e) { } catch (final Exception e) {
// Keep the storage in a consistent state, i.e. a segment stored should always have with its // Keep the storage in a consistent state, i.e. a segment stored should always have with its
// associated offset and time indexes stored as well. Here, delete any file which was copied // associated offset and time indexes stored as well. Here, delete any file which was copied
@ -327,14 +321,11 @@ public final class LocalTieredStorage implements RemoteStorageManager {
if (fileset != null) { if (fileset != null) {
fileset.delete(); fileset.delete();
} }
storageListeners.onStorageEvent(eventBuilder.withException(e).build()); storageListeners.onStorageEvent(eventBuilder.withException(e).build());
throw e; throw e;
} }
return Optional.empty(); return Optional.empty();
}; };
return wrap(callable); return wrap(callable);
} }
@ -503,10 +494,6 @@ public final class LocalTieredStorage implements RemoteStorageManager {
return wrap(() -> storageDirectory.getAbsolutePath()); return wrap(() -> storageDirectory.getAbsolutePath());
} }
private LocalTieredStorageEvent.Builder newEventBuilder(final EventType type, final RemoteLogSegmentId segId) {
return LocalTieredStorageEvent.newBuilder(brokerId, type, eventTimestamp.incrementAndGet(), segId);
}
private LocalTieredStorageEvent.Builder newEventBuilder(final EventType type, final RemoteLogSegmentMetadata md) { private LocalTieredStorageEvent.Builder newEventBuilder(final EventType type, final RemoteLogSegmentMetadata md) {
return LocalTieredStorageEvent return LocalTieredStorageEvent
.newBuilder(brokerId, type, eventTimestamp.incrementAndGet(), md.remoteLogSegmentId()) .newBuilder(brokerId, type, eventTimestamp.incrementAndGet(), md.remoteLogSegmentId())

16
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageCondition.java

@ -46,6 +46,7 @@ public final class LocalTieredStorageCondition {
final EventType eventType; final EventType eventType;
final int brokerId; final int brokerId;
final TopicPartition topicPartition; final TopicPartition topicPartition;
final Integer baseOffset;
final boolean failed; final boolean failed;
private final InternalListener listener; private final InternalListener listener;
@ -66,6 +67,7 @@ public final class LocalTieredStorageCondition {
* @param eventType The nature of the event to match. * @param eventType The nature of the event to match.
* @param brokerId The broker which should have generated the event. * @param brokerId The broker which should have generated the event.
* @param tp The topic-partition which the event should relate to. * @param tp The topic-partition which the event should relate to.
* @param baseOffset The base offset of the segment which the event should relate to.
* @param failed Whether the event should correspond to a failed interaction with the remote storage. * @param failed Whether the event should correspond to a failed interaction with the remote storage.
* *
* @return A condition with the given characteristics which listens to the given storages and can * @return A condition with the given characteristics which listens to the given storages and can
@ -75,8 +77,11 @@ public final class LocalTieredStorageCondition {
final EventType eventType, final EventType eventType,
final int brokerId, final int brokerId,
final TopicPartition tp, final TopicPartition tp,
final Integer baseOffset,
final boolean failed) { final boolean failed) {
return expectEvent(storages, eventType, brokerId, tp, failed, 1); final LocalTieredStorageCondition condition = new LocalTieredStorageCondition(eventType, brokerId, tp, failed, baseOffset, 1);
storages.forEach(storage -> storage.addListener(condition.listener));
return condition;
} }
/** /**
@ -106,7 +111,7 @@ public final class LocalTieredStorageCondition {
final TopicPartition tp, final TopicPartition tp,
final boolean failed, final boolean failed,
final int latchCount) { final int latchCount) {
final LocalTieredStorageCondition condition = new LocalTieredStorageCondition(eventType, brokerId, tp, failed, latchCount); final LocalTieredStorageCondition condition = new LocalTieredStorageCondition(eventType, brokerId, tp, failed, null, latchCount);
storages.forEach(storage -> storage.addListener(condition.listener)); storages.forEach(storage -> storage.addListener(condition.listener));
return condition; return condition;
} }
@ -170,8 +175,8 @@ public final class LocalTieredStorageCondition {
} }
public String toString() { public String toString() {
return format("Condition[eventType=%s, brokerId=%d, topicPartition=%s, failed=%b]", return format("Condition[eventType=%s, brokerId=%d, topicPartition=%s, baseOffset=%d, failed=%b]",
eventType, brokerId, topicPartition, failed); eventType, brokerId, topicPartition, baseOffset, failed);
} }
private static final class InternalListener implements LocalTieredStorageListener { private static final class InternalListener implements LocalTieredStorageListener {
@ -200,11 +205,13 @@ public final class LocalTieredStorageCondition {
final int id, final int id,
final TopicPartition tp, final TopicPartition tp,
final boolean failed, final boolean failed,
final Integer baseOffset,
final int latchCount) { final int latchCount) {
this.eventType = requireNonNull(type); this.eventType = requireNonNull(type);
this.brokerId = id; this.brokerId = id;
this.topicPartition = requireNonNull(tp); this.topicPartition = requireNonNull(tp);
this.failed = failed; this.failed = failed;
this.baseOffset = baseOffset;
this.listener = new InternalListener(this, latchCount); this.listener = new InternalListener(this, latchCount);
this.next = null; this.next = null;
} }
@ -214,6 +221,7 @@ public final class LocalTieredStorageCondition {
this.brokerId = h.brokerId; this.brokerId = h.brokerId;
this.topicPartition = h.topicPartition; this.topicPartition = h.topicPartition;
this.failed = h.failed; this.failed = h.failed;
this.baseOffset = h.baseOffset;
this.listener = h.listener; this.listener = h.listener;
this.next = requireNonNull(next); this.next = requireNonNull(next);
} }

6
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java

@ -76,6 +76,12 @@ public final class LocalTieredStorageEvent implements Comparable<LocalTieredStor
if (!exception.map(e -> condition.failed).orElseGet(() -> !condition.failed)) { if (!exception.map(e -> condition.failed).orElseGet(() -> !condition.failed)) {
return false; return false;
} }
if (condition.baseOffset != null && !metadata.isPresent()) {
return false;
}
if (condition.baseOffset != null && metadata.get().startOffset() != condition.baseOffset) {
return false;
}
return true; return true;
} }

9
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java

@ -17,6 +17,8 @@
package org.apache.kafka.tiered.storage; package org.apache.kafka.tiered.storage;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec; import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
import org.apache.kafka.tiered.storage.specs.TopicSpec; import org.apache.kafka.tiered.storage.specs.TopicSpec;
import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
@ -37,7 +39,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
@ -69,8 +70,8 @@ import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
public final class TieredStorageTestContext implements AutoCloseable { public final class TieredStorageTestContext implements AutoCloseable {
private final TieredStorageTestHarness harness; private final TieredStorageTestHarness harness;
private final Serializer<String> ser = Serdes.String().serializer(); private final Serializer<String> ser = new StringSerializer();
private final Deserializer<String> de = Serdes.String().deserializer(); private final Deserializer<String> de = new StringDeserializer();
private final Map<String, TopicSpec> topicSpecs = new HashMap<>(); private final Map<String, TopicSpec> topicSpecs = new HashMap<>();
private final TieredStorageTestReport testReport; private final TieredStorageTestReport testReport;
@ -309,7 +310,5 @@ public final class TieredStorageTestContext implements AutoCloseable {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
Utils.closeAll(producer, consumer);
Utils.closeQuietly(admin, "Admin client");
} }
} }

6
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java

@ -32,7 +32,6 @@ import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.TestInfo;
@ -52,6 +51,7 @@ import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP;
@ -107,6 +107,7 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
TopicBasedRemoteLogMetadataManager.class.getName()); TopicBasedRemoteLogMetadataManager.class.getName());
overridingProps.setProperty(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, RLM_TASK_INTERVAL_MS.toString()); overridingProps.setProperty(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, RLM_TASK_INTERVAL_MS.toString());
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "PLAINTEXT");
overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, storageConfigPrefix("")); overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, storageConfigPrefix(""));
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, metadataConfigPrefix("")); overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, metadataConfigPrefix(""));
@ -153,7 +154,6 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
context = new TieredStorageTestContext(this); context = new TieredStorageTestContext(this);
} }
@Disabled("Disabled until the trunk build is stable to test tiered storage")
@Test @Test
public void executeTieredStorageTest() { public void executeTieredStorageTest() {
TieredStorageTestBuilder builder = new TieredStorageTestBuilder(); TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
@ -200,6 +200,8 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
if (loaderAwareRSM.delegate() instanceof LocalTieredStorage) { if (loaderAwareRSM.delegate() instanceof LocalTieredStorage) {
storages.add((LocalTieredStorage) loaderAwareRSM.delegate()); storages.add((LocalTieredStorage) loaderAwareRSM.delegate());
} }
} else if (storageManager instanceof LocalTieredStorage) {
storages.add((LocalTieredStorage) storageManager);
} }
} else { } else {
throw new AssertionError("Broker " + broker.config().brokerId() throw new AssertionError("Broker " + broker.config().brokerId()

1
storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java

@ -80,6 +80,7 @@ public final class ProduceAction implements TieredStorageTestAction {
COPY_SEGMENT, COPY_SEGMENT,
spec.getSourceBrokerId(), spec.getSourceBrokerId(),
spec.getTopicPartition(), spec.getTopicPartition(),
spec.getBaseOffset(),
false)) false))
.collect(Collectors.toList()); .collect(Collectors.toList());

6
storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java

@ -79,7 +79,7 @@ public final class OffloadAndConsumeFromLeaderTest extends TieredStorageTestHarn
enableRemoteLogStorage) enableRemoteLogStorage)
.withBatchSize(topicA, p0, batchSize) .withBatchSize(topicA, p0, batchSize)
.expectSegmentToBeOffloaded(broker, topicA, p0, 0, new KeyValueSpec("k1", "v1")) .expectSegmentToBeOffloaded(broker, topicA, p0, 0, new KeyValueSpec("k1", "v1"))
.expectSegmentToBeOffloaded(broker, topicA, p0, 0, new KeyValueSpec("k2", "v2")) .expectSegmentToBeOffloaded(broker, topicA, p0, 1, new KeyValueSpec("k2", "v2"))
.produce(topicA, p0, new KeyValueSpec("k1", "v1"), new KeyValueSpec("k2", "v2"), .produce(topicA, p0, new KeyValueSpec("k1", "v1"), new KeyValueSpec("k2", "v2"),
new KeyValueSpec("k3", "v3")) new KeyValueSpec("k3", "v3"))
@ -127,10 +127,10 @@ public final class OffloadAndConsumeFromLeaderTest extends TieredStorageTestHarn
* - For topic B, only one segment is present in the tiered storage, as asserted by the * - For topic B, only one segment is present in the tiered storage, as asserted by the
* previous sub-test-case. * previous sub-test-case.
*/ */
.bounce(broker) // .bounce(broker)
.expectFetchFromTieredStorage(broker, topicA, p0, 1) .expectFetchFromTieredStorage(broker, topicA, p0, 1)
.expectFetchFromTieredStorage(broker, topicB, p0, 2)
.consume(topicA, p0, 1L, 2, 1) .consume(topicA, p0, 1L, 2, 1)
.expectFetchFromTieredStorage(broker, topicB, p0, 2)
.consume(topicB, p0, 1L, 4, 3); .consume(topicB, p0, 1L, 4, 3);
} }
} }

9
storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java

@ -19,7 +19,9 @@ package org.apache.kafka.tiered.storage.utils;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serde;
@ -138,18 +140,21 @@ public final class RecordsKeyValueMatcher<R1, R2, K, V> extends TypeSafeDiagnosi
private SimpleRecord convert(Object recordCandidate) { private SimpleRecord convert(Object recordCandidate) {
if (recordCandidate instanceof ProducerRecord) { if (recordCandidate instanceof ProducerRecord) {
ProducerRecord<?, ?> record = (ProducerRecord<?, ?>) recordCandidate; ProducerRecord<?, ?> record = (ProducerRecord<?, ?>) recordCandidate;
long timestamp = record.timestamp() != null ? record.timestamp() : RecordBatch.NO_TIMESTAMP;
ByteBuffer keyBytes = ByteBuffer keyBytes =
Utils.wrapNullable(keySerde.serializer().serialize(topicPartition.topic(), (K) record.key())); Utils.wrapNullable(keySerde.serializer().serialize(topicPartition.topic(), (K) record.key()));
ByteBuffer valueBytes = ByteBuffer valueBytes =
Utils.wrapNullable(valueSerde.serializer().serialize(topicPartition.topic(), (V) record.value())); Utils.wrapNullable(valueSerde.serializer().serialize(topicPartition.topic(), (V) record.value()));
return new SimpleRecord(record.timestamp(), keyBytes, valueBytes, record.headers().toArray()); Header[] headers = record.headers() != null ? record.headers().toArray() : Record.EMPTY_HEADERS;
return new SimpleRecord(timestamp, keyBytes, valueBytes, headers);
} else if (recordCandidate instanceof ConsumerRecord) { } else if (recordCandidate instanceof ConsumerRecord) {
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) recordCandidate; ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) recordCandidate;
ByteBuffer keyBytes = ByteBuffer keyBytes =
Utils.wrapNullable(keySerde.serializer().serialize(topicPartition.topic(), (K) record.key())); Utils.wrapNullable(keySerde.serializer().serialize(topicPartition.topic(), (K) record.key()));
ByteBuffer valueBytes = ByteBuffer valueBytes =
Utils.wrapNullable(valueSerde.serializer().serialize(topicPartition.topic(), (V) record.value())); Utils.wrapNullable(valueSerde.serializer().serialize(topicPartition.topic(), (V) record.value()));
return new SimpleRecord(record.timestamp(), keyBytes, valueBytes, record.headers().toArray()); Header[] headers = record.headers() != null ? record.headers().toArray() : Record.EMPTY_HEADERS;
return new SimpleRecord(record.timestamp(), keyBytes, valueBytes, headers);
} else if (recordCandidate instanceof Record) { } else if (recordCandidate instanceof Record) {
Record record = (Record) recordCandidate; Record record = (Record) recordCandidate;
return new SimpleRecord(record.timestamp(), record.key(), record.value(), record.headers()); return new SimpleRecord(record.timestamp(), record.key(), record.value(), record.headers());

Loading…
Cancel
Save