diff --git a/checkstyle/import-control-storage.xml b/checkstyle/import-control-storage.xml
index 6da7ac68849..2e0b85dcf76 100644
--- a/checkstyle/import-control-storage.xml
+++ b/checkstyle/import-control-storage.xml
@@ -97,6 +97,7 @@
+
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index f84fd2f80f7..8122d7992da 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -594,13 +594,13 @@ public class RemoteLogManager implements Closeable {
private void maybeUpdateReadOffset(UnifiedLog log) throws RemoteStorageException {
if (!copiedOffsetOption.isPresent()) {
- logger.info("Find the highest remote offset for partition: {} after becoming leader, leaderEpoch: {}", topicIdPartition, leaderEpoch);
-
// This is found by traversing from the latest leader epoch from leader epoch history and find the highest offset
// of a segment with that epoch copied into remote storage. If it can not find an entry then it checks for the
// previous leader epoch till it finds an entry, If there are no entries till the earliest leader epoch in leader
// epoch cache then it starts copying the segments from the earliest epoch entry's offset.
copiedOffsetOption = OptionalLong.of(findHighestRemoteOffset(topicIdPartition, log));
+ logger.info("Found the highest copied remote offset: {} for partition: {} after becoming leader, " +
+ "leaderEpoch: {}", copiedOffsetOption, topicIdPartition, leaderEpoch);
}
}
diff --git a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index bd81b40fa8e..23439e120cf 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -18,7 +18,7 @@ package kafka.admin
import kafka.api.IntegrationTestHarness
import kafka.server.KafkaConfig
-import kafka.utils.{Logging, TestInfoUtils, TestUtils}
+import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
@@ -283,6 +283,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testTopicDeletion(quorum: String): Unit = {
+ MyRemoteStorageManager.deleteSegmentEventCounter.set(0)
val numPartitions = 2
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
@@ -293,12 +294,9 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
TestUtils.deleteTopicWithAdmin(createAdminClient(), testTopicName, brokers)
assertThrowsException(classOf[UnknownTopicOrPartitionException],
() => TestUtils.describeTopic(createAdminClient(), testTopicName), "Topic should be deleted")
-
- // FIXME: It seems the storage manager is being instantiated in different class loader so couldn't verify the value
- // but ensured it by adding a log statement in the storage manager (manually).
- // assertEquals(numPartitions * MyRemoteLogMetadataManager.segmentCount,
- // MyRemoteStorageManager.deleteSegmentEventCounter.get(),
- // "Remote log segments should be deleted only once by the leader")
+ TestUtils.waitUntilTrue(() =>
+ numPartitions * MyRemoteLogMetadataManager.segmentCountPerPartition == MyRemoteStorageManager.deleteSegmentEventCounter.get(),
+ "Remote log segments should be deleted only once by the leader")
}
private def assertThrowsException(exceptionType: Class[_ <: Throwable],
@@ -365,12 +363,11 @@ object MyRemoteStorageManager {
val deleteSegmentEventCounter = new AtomicInteger(0)
}
-class MyRemoteStorageManager extends NoOpRemoteStorageManager with Logging {
+class MyRemoteStorageManager extends NoOpRemoteStorageManager {
import MyRemoteStorageManager._
override def deleteLogSegmentData(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Unit = {
deleteSegmentEventCounter.incrementAndGet()
- info(s"Deleted the remote log segment: $remoteLogSegmentMetadata, counter: ${deleteSegmentEventCounter.get()}")
}
}
@@ -381,7 +378,7 @@ class MyRemoteLogMetadataManager extends NoOpRemoteLogMetadataManager {
override def listRemoteLogSegments(topicIdPartition: TopicIdPartition): util.Iterator[RemoteLogSegmentMetadata] = {
val segmentMetadataList = new util.ArrayList[RemoteLogSegmentMetadata]()
- for (idx <- 0 until segmentCount) {
+ for (idx <- 0 until segmentCountPerPartition) {
val timestamp = time.milliseconds()
val startOffset = idx * recordsPerSegment
val endOffset = startOffset + recordsPerSegment - 1
@@ -395,7 +392,7 @@ class MyRemoteLogMetadataManager extends NoOpRemoteLogMetadataManager {
}
object MyRemoteLogMetadataManager {
- val segmentCount = 10
+ val segmentCountPerPartition = 10
val recordsPerSegment = 100
val segmentSize = 1024
}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
index a8847e5bba9..43c09ccd908 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
@@ -306,19 +306,13 @@ public final class LocalTieredStorage implements RemoteStorageManager {
public Optional copyLogSegmentData(final RemoteLogSegmentMetadata metadata, final LogSegmentData data)
throws RemoteStorageException {
Callable> callable = () -> {
- final RemoteLogSegmentId id = metadata.remoteLogSegmentId();
- final LocalTieredStorageEvent.Builder eventBuilder = newEventBuilder(COPY_SEGMENT, id);
+ final LocalTieredStorageEvent.Builder eventBuilder = newEventBuilder(COPY_SEGMENT, metadata);
RemoteLogSegmentFileset fileset = null;
-
try {
fileset = openFileset(storageDirectory, metadata);
-
- logger.info("Offloading log segment for {} from segment={}", id.topicIdPartition(), data.logSegment());
-
+ logger.info("Offloading log segment for {} from segment={}", metadata.topicIdPartition(), data.logSegment());
fileset.copy(transferer, data);
-
storageListeners.onStorageEvent(eventBuilder.withFileset(fileset).build());
-
} catch (final Exception e) {
// Keep the storage in a consistent state, i.e. a segment stored should always have with its
// associated offset and time indexes stored as well. Here, delete any file which was copied
@@ -327,14 +321,11 @@ public final class LocalTieredStorage implements RemoteStorageManager {
if (fileset != null) {
fileset.delete();
}
-
storageListeners.onStorageEvent(eventBuilder.withException(e).build());
throw e;
}
-
return Optional.empty();
};
-
return wrap(callable);
}
@@ -503,10 +494,6 @@ public final class LocalTieredStorage implements RemoteStorageManager {
return wrap(() -> storageDirectory.getAbsolutePath());
}
- private LocalTieredStorageEvent.Builder newEventBuilder(final EventType type, final RemoteLogSegmentId segId) {
- return LocalTieredStorageEvent.newBuilder(brokerId, type, eventTimestamp.incrementAndGet(), segId);
- }
-
private LocalTieredStorageEvent.Builder newEventBuilder(final EventType type, final RemoteLogSegmentMetadata md) {
return LocalTieredStorageEvent
.newBuilder(brokerId, type, eventTimestamp.incrementAndGet(), md.remoteLogSegmentId())
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageCondition.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageCondition.java
index 6039e18a5db..2bee267e7f6 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageCondition.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageCondition.java
@@ -46,6 +46,7 @@ public final class LocalTieredStorageCondition {
final EventType eventType;
final int brokerId;
final TopicPartition topicPartition;
+ final Integer baseOffset;
final boolean failed;
private final InternalListener listener;
@@ -66,6 +67,7 @@ public final class LocalTieredStorageCondition {
* @param eventType The nature of the event to match.
* @param brokerId The broker which should have generated the event.
* @param tp The topic-partition which the event should relate to.
+ * @param baseOffset The base offset of the segment which the event should relate to.
* @param failed Whether the event should correspond to a failed interaction with the remote storage.
*
* @return A condition with the given characteristics which listens to the given storages and can
@@ -75,8 +77,11 @@ public final class LocalTieredStorageCondition {
final EventType eventType,
final int brokerId,
final TopicPartition tp,
+ final Integer baseOffset,
final boolean failed) {
- return expectEvent(storages, eventType, brokerId, tp, failed, 1);
+ final LocalTieredStorageCondition condition = new LocalTieredStorageCondition(eventType, brokerId, tp, failed, baseOffset, 1);
+ storages.forEach(storage -> storage.addListener(condition.listener));
+ return condition;
}
/**
@@ -106,7 +111,7 @@ public final class LocalTieredStorageCondition {
final TopicPartition tp,
final boolean failed,
final int latchCount) {
- final LocalTieredStorageCondition condition = new LocalTieredStorageCondition(eventType, brokerId, tp, failed, latchCount);
+ final LocalTieredStorageCondition condition = new LocalTieredStorageCondition(eventType, brokerId, tp, failed, null, latchCount);
storages.forEach(storage -> storage.addListener(condition.listener));
return condition;
}
@@ -170,8 +175,8 @@ public final class LocalTieredStorageCondition {
}
public String toString() {
- return format("Condition[eventType=%s, brokerId=%d, topicPartition=%s, failed=%b]",
- eventType, brokerId, topicPartition, failed);
+ return format("Condition[eventType=%s, brokerId=%d, topicPartition=%s, baseOffset=%d, failed=%b]",
+ eventType, brokerId, topicPartition, baseOffset, failed);
}
private static final class InternalListener implements LocalTieredStorageListener {
@@ -200,11 +205,13 @@ public final class LocalTieredStorageCondition {
final int id,
final TopicPartition tp,
final boolean failed,
+ final Integer baseOffset,
final int latchCount) {
this.eventType = requireNonNull(type);
this.brokerId = id;
this.topicPartition = requireNonNull(tp);
this.failed = failed;
+ this.baseOffset = baseOffset;
this.listener = new InternalListener(this, latchCount);
this.next = null;
}
@@ -214,6 +221,7 @@ public final class LocalTieredStorageCondition {
this.brokerId = h.brokerId;
this.topicPartition = h.topicPartition;
this.failed = h.failed;
+ this.baseOffset = h.baseOffset;
this.listener = h.listener;
this.next = requireNonNull(next);
}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java
index 2538eebec5c..1180163a889 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java
@@ -76,6 +76,12 @@ public final class LocalTieredStorageEvent implements Comparable condition.failed).orElseGet(() -> !condition.failed)) {
return false;
}
+ if (condition.baseOffset != null && !metadata.isPresent()) {
+ return false;
+ }
+ if (condition.baseOffset != null && metadata.get().startOffset() != condition.baseOffset) {
+ return false;
+ }
return true;
}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
index d38df06e5ae..593d69cb38c 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
@@ -17,6 +17,8 @@
package org.apache.kafka.tiered.storage;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
import org.apache.kafka.tiered.storage.specs.TopicSpec;
import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
@@ -37,7 +39,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
@@ -69,8 +70,8 @@ import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
public final class TieredStorageTestContext implements AutoCloseable {
private final TieredStorageTestHarness harness;
- private final Serializer ser = Serdes.String().serializer();
- private final Deserializer de = Serdes.String().deserializer();
+ private final Serializer ser = new StringSerializer();
+ private final Deserializer de = new StringDeserializer();
private final Map topicSpecs = new HashMap<>();
private final TieredStorageTestReport testReport;
@@ -309,7 +310,5 @@ public final class TieredStorageTestContext implements AutoCloseable {
@Override
public void close() throws IOException {
- Utils.closeAll(producer, consumer);
- Utils.closeQuietly(admin, "Admin client");
}
}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
index da3762c5e9f..bed5452bdf5 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
@@ -32,7 +32,6 @@ import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -52,6 +51,7 @@ import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP;
@@ -107,6 +107,7 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
TopicBasedRemoteLogMetadataManager.class.getName());
overridingProps.setProperty(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, RLM_TASK_INTERVAL_MS.toString());
+ overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "PLAINTEXT");
overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, storageConfigPrefix(""));
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, metadataConfigPrefix(""));
@@ -153,7 +154,6 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
context = new TieredStorageTestContext(this);
}
- @Disabled("Disabled until the trunk build is stable to test tiered storage")
@Test
public void executeTieredStorageTest() {
TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
@@ -200,6 +200,8 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
if (loaderAwareRSM.delegate() instanceof LocalTieredStorage) {
storages.add((LocalTieredStorage) loaderAwareRSM.delegate());
}
+ } else if (storageManager instanceof LocalTieredStorage) {
+ storages.add((LocalTieredStorage) storageManager);
}
} else {
throw new AssertionError("Broker " + broker.config().brokerId()
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java
index 9b20b3016ff..56287cebdcb 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java
@@ -80,6 +80,7 @@ public final class ProduceAction implements TieredStorageTestAction {
COPY_SEGMENT,
spec.getSourceBrokerId(),
spec.getTopicPartition(),
+ spec.getBaseOffset(),
false))
.collect(Collectors.toList());
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
index 93f2b0338c9..b5da2308d14 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
@@ -79,7 +79,7 @@ public final class OffloadAndConsumeFromLeaderTest extends TieredStorageTestHarn
enableRemoteLogStorage)
.withBatchSize(topicA, p0, batchSize)
.expectSegmentToBeOffloaded(broker, topicA, p0, 0, new KeyValueSpec("k1", "v1"))
- .expectSegmentToBeOffloaded(broker, topicA, p0, 0, new KeyValueSpec("k2", "v2"))
+ .expectSegmentToBeOffloaded(broker, topicA, p0, 1, new KeyValueSpec("k2", "v2"))
.produce(topicA, p0, new KeyValueSpec("k1", "v1"), new KeyValueSpec("k2", "v2"),
new KeyValueSpec("k3", "v3"))
@@ -127,10 +127,10 @@ public final class OffloadAndConsumeFromLeaderTest extends TieredStorageTestHarn
* - For topic B, only one segment is present in the tiered storage, as asserted by the
* previous sub-test-case.
*/
- .bounce(broker)
+ // .bounce(broker)
.expectFetchFromTieredStorage(broker, topicA, p0, 1)
- .expectFetchFromTieredStorage(broker, topicB, p0, 2)
.consume(topicA, p0, 1L, 2, 1)
+ .expectFetchFromTieredStorage(broker, topicB, p0, 2)
.consume(topicB, p0, 1L, 4, 3);
}
}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java
index 5fa7749f74a..902b5c2d713 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java
@@ -19,7 +19,9 @@ package org.apache.kafka.tiered.storage.utils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
@@ -138,18 +140,21 @@ public final class RecordsKeyValueMatcher extends TypeSafeDiagnosi
private SimpleRecord convert(Object recordCandidate) {
if (recordCandidate instanceof ProducerRecord) {
ProducerRecord, ?> record = (ProducerRecord, ?>) recordCandidate;
+ long timestamp = record.timestamp() != null ? record.timestamp() : RecordBatch.NO_TIMESTAMP;
ByteBuffer keyBytes =
Utils.wrapNullable(keySerde.serializer().serialize(topicPartition.topic(), (K) record.key()));
ByteBuffer valueBytes =
Utils.wrapNullable(valueSerde.serializer().serialize(topicPartition.topic(), (V) record.value()));
- return new SimpleRecord(record.timestamp(), keyBytes, valueBytes, record.headers().toArray());
+ Header[] headers = record.headers() != null ? record.headers().toArray() : Record.EMPTY_HEADERS;
+ return new SimpleRecord(timestamp, keyBytes, valueBytes, headers);
} else if (recordCandidate instanceof ConsumerRecord) {
ConsumerRecord, ?> record = (ConsumerRecord, ?>) recordCandidate;
ByteBuffer keyBytes =
Utils.wrapNullable(keySerde.serializer().serialize(topicPartition.topic(), (K) record.key()));
ByteBuffer valueBytes =
Utils.wrapNullable(valueSerde.serializer().serialize(topicPartition.topic(), (V) record.value()));
- return new SimpleRecord(record.timestamp(), keyBytes, valueBytes, record.headers().toArray());
+ Header[] headers = record.headers() != null ? record.headers().toArray() : Record.EMPTY_HEADERS;
+ return new SimpleRecord(record.timestamp(), keyBytes, valueBytes, headers);
} else if (recordCandidate instanceof Record) {
Record record = (Record) recordCandidate;
return new SimpleRecord(record.timestamp(), record.key(), record.value(), record.headers());