Browse Source

KAFKA-9555 Added default RLMM implementation based on internal topic storage. (#10579)

KAFKA-9555 Added default RLMM implementation based on internal topic storage.

This is the initial version of the default RLMM implementation.
This includes changes containing default RLMM configs, RLMM implementation, producer/consumer managers.
Introduced TopicBasedRemoteLogMetadataManagerHarness which takes care of bringing up a Kafka cluster and create remote log metadata topic and initializes TopicBasedRemoteLogMetadataManager.
Refactored existing RemoteLogMetadataCacheTest to RemoteLogSegmentLifecycleTest to have parameterized tests to run both RemoteLogMetadataCache and also TopicBasedRemoteLogMetadataManager.
Refactored existing InmemoryRemoteLogMetadataManagerTest, RemoteLogMetadataManagerTest to have parameterized tests to run both InmemoryRemoteLogMetadataManager and also TopicBasedRemoteLogMetadataManager.

This is part of tiered storage KIP-405 efforts.

Reviewers: Kowshik Prakasam <kprakasam@confluent.io>, Cong Ding <cong@ccding.com>, Jun Rao <junrao@gmail.com>
pull/11081/head
Satish Duggana 3 years ago committed by GitHub
parent
commit
e8ce93bd53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      build.gradle
  2. 3
      checkstyle/import-control.xml
  3. 6
      storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadata.java
  4. 6
      storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
  5. 6
      storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
  6. 130
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
  7. 247
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
  8. 113
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
  9. 59
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTopicPartitioner.java
  10. 43
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
  11. 143
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
  12. 484
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
  13. 228
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
  14. 315
      storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
  15. 5
      storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
  16. 63
      storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleManager.java
  17. 516
      storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
  18. 142
      storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
  19. 109
      storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
  20. 141
      storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
  21. 98
      storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java
  22. 130
      storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManagerTest.java
  23. 152
      storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
  24. 1
      storage/src/test/resources/log4j.properties

9
build.gradle

@ -1505,8 +1505,11 @@ project(':storage') { @@ -1505,8 +1505,11 @@ project(':storage') {
testImplementation project(':clients')
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':core')
testImplementation project(':core').sourceSets.test.output
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation libs.bcpkix
testRuntimeOnly libs.slf4jlog4j
}
@ -1561,6 +1564,12 @@ project(':storage') { @@ -1561,6 +1564,12 @@ project(':storage') {
}
}
test {
useJUnitPlatform {
includeEngines 'junit-jupiter'
}
}
clean.doFirst {
delete "$buildDir/kafka/"
}

3
checkstyle/import-control.xml

@ -297,6 +297,9 @@ @@ -297,6 +297,9 @@
<subpackage name="log">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="kafka.api" />
<allow pkg="kafka.utils" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.log" />
<allow pkg="org.apache.kafka.test" />

6
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadata.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
@ -53,4 +54,9 @@ public abstract class RemoteLogMetadata { @@ -53,4 +54,9 @@ public abstract class RemoteLogMetadata {
public int brokerId() {
return brokerId;
}
/**
* @return TopicIdPartition for which this event is generated.
*/
public abstract TopicIdPartition topicIdPartition();
}

6
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collections;
@ -217,6 +218,11 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata { @@ -217,6 +218,11 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata {
segmentSizeInBytes, rlsmUpdate.state(), segmentLeaderEpochs);
}
@Override
public TopicIdPartition topicIdPartition() {
return remoteLogSegmentId.topicIdPartition();
}
@Override
public boolean equals(Object o) {
if (this == o) {

6
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Objects;
@ -65,6 +66,11 @@ public class RemoteLogSegmentMetadataUpdate extends RemoteLogMetadata { @@ -65,6 +66,11 @@ public class RemoteLogSegmentMetadataUpdate extends RemoteLogMetadata {
return state;
}
@Override
public TopicIdPartition topicIdPartition() {
return remoteLogSegmentId.topicIdPartition();
}
@Override
public boolean equals(Object o) {
if (this == o) {

130
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java

@ -0,0 +1,130 @@ @@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;
import java.util.Set;
/**
* This class manages the consumer thread viz {@link ConsumerTask} that polls messages from the assigned metadata topic partitions.
* It also provides a way to wait until the given record is received by the consumer before it is timed out with an interval of
* {@link TopicBasedRemoteLogMetadataManagerConfig#consumeWaitMs()}.
*/
public class ConsumerManager implements Closeable {
private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class);
private static final long CONSUME_RECHECK_INTERVAL_MS = 50L;
private final TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
private final Time time;
private final ConsumerTask consumerTask;
private final Thread consumerTaskThread;
public ConsumerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner,
Time time) {
this.rlmmConfig = rlmmConfig;
this.time = time;
//Create a task to consume messages and submit the respective events to RemotePartitionMetadataEventHandler.
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(rlmmConfig.consumerProperties());
consumerTask = new ConsumerTask(consumer, remotePartitionMetadataEventHandler, rlmmTopicPartitioner);
consumerTaskThread = KafkaThread.nonDaemon("RLMMConsumerTask", consumerTask);
}
public void startConsumerThread() {
try {
// Start a thread to continuously consume records from topic partitions.
consumerTaskThread.start();
} catch (Exception e) {
throw new KafkaException("Error encountered while initializing and scheduling ConsumerTask thread", e);
}
}
/**
* Wait until the consumption reaches the offset of the metadata partition for the given {@code recordMetadata}.
*
* @param recordMetadata record metadata to be checked for consumption.
*/
public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata) {
final int partition = recordMetadata.partition();
// If the current assignment does not have the subscription for this partition then return immediately.
if (!consumerTask.isPartitionAssigned(partition)) {
throw new KafkaException("This consumer is not subscribed to the target partition " + partition + " on which message is produced.");
}
final long offset = recordMetadata.offset();
long startTimeMs = time.milliseconds();
while (true) {
long receivedOffset = consumerTask.receivedOffsetForPartition(partition).orElse(-1L);
if (receivedOffset >= offset) {
break;
}
log.debug("Committed offset [{}] for partition [{}], but the target offset: [{}], Sleeping for [{}] to retry again",
offset, partition, receivedOffset, CONSUME_RECHECK_INTERVAL_MS);
if (time.milliseconds() - startTimeMs > rlmmConfig.consumeWaitMs()) {
log.warn("Committed offset for partition:[{}] is : [{}], but the target offset: [{}] ",
partition, receivedOffset, offset);
throw new TimeoutException("Timed out in catching up with the expected offset by consumer.");
}
time.sleep(CONSUME_RECHECK_INTERVAL_MS);
}
}
@Override
public void close() throws IOException {
// Consumer task will close the task and it internally closes all the resources including the consumer.
Utils.closeQuietly(consumerTask, "ConsumerTask");
// Wait until the consumer thread finishes.
try {
consumerTaskThread.join();
} catch (Exception e) {
log.error("Encountered error while waiting for consumerTaskThread to finish.", e);
}
}
public void addAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
consumerTask.addAssignmentsForPartitions(partitions);
}
public void removeAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
consumerTask.removeAssignmentsForPartitions(partitions);
}
public Optional<Long> receivedOffsetForPartition(int metadataPartition) {
return consumerTask.receivedOffsetForPartition(metadataPartition);
}
}

247
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java

@ -0,0 +1,247 @@ @@ -0,0 +1,247 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
/**
* This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
* partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove
* for what topic partition's metadata should be consumed by this instance using
* {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively.
* <p>
* When a broker is started, controller sends topic partitions that this broker is leader or follower for and the
* partitions to be deleted. This class receives those notifications with
* {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the
* respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
* Any leadership changes later are called through the same API. We will remove the partitions that are deleted from
* this broker which are received through {@link #removeAssignmentsForPartitions(Set)}.
* <p>
* After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
* which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}.
*/
class ConsumerTask implements Runnable, Closeable {
private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class);
private static final long POLL_INTERVAL_MS = 100L;
private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
private final KafkaConsumer<byte[], byte[]> consumer;
private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler;
private final RemoteLogMetadataTopicPartitioner topicPartitioner;
// It indicates whether the closing process has been started or not. If it is set as true,
// consumer will stop consuming messages and it will not allow partition assignments to be updated.
private volatile boolean closing = false;
// It indicates whether the consumer needs to assign the partitions or not. This is set when it is
// determined that the consumer needs to be assigned with the updated partitions.
private volatile boolean assignPartitions = false;
private final Object assignPartitionsLock = new Object();
// Remote log metadata topic partitions that consumer is assigned to.
private volatile Set<Integer> assignedMetaPartitions = Collections.emptySet();
// User topic partitions that this broker is a leader/follower for.
private Set<TopicIdPartition> assignedTopicPartitions = Collections.emptySet();
// Map of remote log metadata topic partition to consumed offsets.
private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>();
public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
RemoteLogMetadataTopicPartitioner topicPartitioner) {
Objects.requireNonNull(consumer);
Objects.requireNonNull(remotePartitionMetadataEventHandler);
Objects.requireNonNull(topicPartitioner);
this.consumer = consumer;
this.remotePartitionMetadataEventHandler = remotePartitionMetadataEventHandler;
this.topicPartitioner = topicPartitioner;
}
@Override
public void run() {
log.info("Started Consumer task thread.");
try {
while (!closing) {
maybeWaitForPartitionsAssignment();
log.info("Polling consumer to receive remote log metadata topic records");
ConsumerRecords<byte[], byte[]> consumerRecords
= consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS));
for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
handleRemoteLogMetadata(serde.deserialize(record.value()));
partitionToConsumedOffsets.put(record.partition(), record.offset());
}
}
} catch (Exception e) {
log.error("Error occurred in consumer task, close:[{}]", closing, e);
} finally {
closeConsumer();
log.info("Exiting from consumer task thread");
}
}
private void closeConsumer() {
log.info("Closing the consumer instance");
try {
consumer.close(Duration.ofSeconds(30));
} catch (Exception e) {
log.error("Error encountered while closing the consumer", e);
}
}
private void maybeWaitForPartitionsAssignment() {
Set<Integer> assignedMetaPartitionsSnapshot = Collections.emptySet();
synchronized (assignPartitionsLock) {
// If it is closing, return immediately. This should be inside the assignPartitionsLock as the closing is updated
// in close() method with in the same lock to avoid any race conditions.
if (closing) {
return;
}
while (assignedMetaPartitions.isEmpty()) {
// If no partitions are assigned, wait until they are assigned.
log.debug("Waiting for assigned remote log metadata partitions..");
try {
// No timeout is set here, as it is always notified. Even when it is closed, the race can happen
// between the thread calling this method and the thread calling close(). We should have a check
// for closing as that might have been set and notified with assignPartitionsLock by `close`
// method.
assignPartitionsLock.wait();
if (closing) {
return;
}
} catch (InterruptedException e) {
throw new KafkaException(e);
}
}
if (assignPartitions) {
assignedMetaPartitionsSnapshot = new HashSet<>(assignedMetaPartitions);
assignPartitions = false;
}
}
if (!assignedMetaPartitionsSnapshot.isEmpty()) {
executeReassignment(assignedMetaPartitionsSnapshot);
}
}
private void handleRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) {
if (assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) {
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
} else {
log.debug("This event {} is skipped as the topic partition is not assigned for this instance.", remoteLogMetadata);
}
}
private void executeReassignment(Set<Integer> assignedMetaPartitionsSnapshot) {
Set<TopicPartition> assignedMetaTopicPartitions = assignedMetaPartitionsSnapshot.stream()
.map(partitionNum -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
.collect(Collectors.toSet());
log.info("Reassigning partitions to consumer task [{}]", assignedMetaTopicPartitions);
consumer.assign(assignedMetaTopicPartitions);
}
public void addAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
updateAssignmentsForPartitions(partitions, Collections.emptySet());
}
public void removeAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
updateAssignmentsForPartitions(Collections.emptySet(), partitions);
}
private void updateAssignmentsForPartitions(Set<TopicIdPartition> addedPartitions,
Set<TopicIdPartition> removedPartitions) {
log.info("Updating assignments for addedPartitions: {} and removedPartition: {}", addedPartitions, removedPartitions);
Objects.requireNonNull(addedPartitions, "addedPartitions must not be null");
Objects.requireNonNull(removedPartitions, "removedPartitions must not be null");
if (addedPartitions.isEmpty() && removedPartitions.isEmpty()) {
return;
}
synchronized (assignPartitionsLock) {
Set<TopicIdPartition> updatedReassignedPartitions = new HashSet<>(assignedTopicPartitions);
updatedReassignedPartitions.addAll(addedPartitions);
updatedReassignedPartitions.removeAll(removedPartitions);
Set<Integer> updatedAssignedMetaPartitions = new HashSet<>();
for (TopicIdPartition tp : updatedReassignedPartitions) {
updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp));
}
assignedTopicPartitions = Collections.unmodifiableSet(updatedReassignedPartitions);
log.debug("Assigned topic partitions: {}", assignedTopicPartitions);
if (!updatedAssignedMetaPartitions.equals(assignedMetaPartitions)) {
assignedMetaPartitions = Collections.unmodifiableSet(updatedAssignedMetaPartitions);
log.debug("Assigned metadata topic partitions: {}", assignedMetaPartitions);
assignPartitions = true;
assignPartitionsLock.notifyAll();
} else {
log.debug("No change in assigned metadata topic partitions: {}", assignedMetaPartitions);
}
}
}
public Optional<Long> receivedOffsetForPartition(int partition) {
return Optional.ofNullable(partitionToConsumedOffsets.get(partition));
}
public boolean isPartitionAssigned(int partition) {
return assignedMetaPartitions.contains(partition);
}
public void close() {
if (!closing) {
synchronized (assignPartitionsLock) {
// Closing should be updated only after acquiring the lock to avoid race in
// maybeWaitForPartitionsAssignment() where it waits on assignPartitionsLock. It should not wait
// if the closing is already set.
closing = true;
consumer.wakeup();
assignPartitionsLock.notifyAll();
}
}
}
}

113
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java

@ -0,0 +1,113 @@ @@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.time.Duration;
/**
* This class is responsible for publishing messages into the remote log metadata topic partitions.
*
* Caller of this class should take care of not sending messages once the closing of this instance is initiated.
*/
public class ProducerManager implements Closeable {
private static final Logger log = LoggerFactory.getLogger(ProducerManager.class);
private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
private final KafkaProducer<byte[], byte[]> producer;
private final RemoteLogMetadataTopicPartitioner topicPartitioner;
private final TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
public ProducerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner) {
this.rlmmConfig = rlmmConfig;
this.producer = new KafkaProducer<>(rlmmConfig.producerProperties());
topicPartitioner = rlmmTopicPartitioner;
}
public RecordMetadata publishMessage(RemoteLogMetadata remoteLogMetadata) throws KafkaException {
TopicIdPartition topicIdPartition = remoteLogMetadata.topicIdPartition();
int metadataPartitionNum = topicPartitioner.metadataPartition(topicIdPartition);
log.debug("Publishing metadata message of partition:[{}] into metadata topic partition:[{}] with payload: [{}]",
topicIdPartition, metadataPartitionNum, remoteLogMetadata);
if (metadataPartitionNum >= rlmmConfig.metadataTopicPartitionsCount()) {
// This should never occur as long as metadata partitions always remain the same.
throw new KafkaException("Chosen partition no " + metadataPartitionNum +
" must be less than the partition count: " + rlmmConfig.metadataTopicPartitionsCount());
}
ProducerCallback callback = new ProducerCallback();
try {
producer.send(new ProducerRecord<>(rlmmConfig.remoteLogMetadataTopicName(), metadataPartitionNum, null,
serde.serialize(remoteLogMetadata)), callback).get();
} catch (KafkaException e) {
throw e;
} catch (Exception e) {
throw new KafkaException("Exception occurred while publishing message for topicIdPartition: " + topicIdPartition, e);
}
if (callback.exception() == null) {
return callback.recordMetadata();
} else {
Exception ex = callback.exception();
if (ex instanceof KafkaException) {
throw (KafkaException) ex;
} else {
throw new KafkaException(ex);
}
}
}
public void close() {
try {
producer.close(Duration.ofSeconds(30));
} catch (Exception e) {
log.error("Error encountered while closing the producer", e);
}
}
private static class ProducerCallback implements Callback {
private volatile RecordMetadata recordMetadata;
private volatile Exception exception;
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
this.recordMetadata = recordMetadata;
this.exception = exception;
}
public RecordMetadata recordMetadata() {
return recordMetadata;
}
public Exception exception() {
return exception;
}
}
}

59
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTopicPartitioner.java

@ -0,0 +1,59 @@ @@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
public class RemoteLogMetadataTopicPartitioner {
public static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataTopicPartitioner.class);
private final int numMetadataTopicPartitions;
public RemoteLogMetadataTopicPartitioner(int numMetadataTopicPartitions) {
this.numMetadataTopicPartitions = numMetadataTopicPartitions;
}
public int metadataPartition(TopicIdPartition topicIdPartition) {
Objects.requireNonNull(topicIdPartition, "TopicPartition can not be null");
int partitionNum = Utils.toPositive(Utils.murmur2(toBytes(topicIdPartition))) % numMetadataTopicPartitions;
log.debug("No of partitions [{}], partitionNum: [{}] for given topic: [{}]", numMetadataTopicPartitions, partitionNum, topicIdPartition);
return partitionNum;
}
private byte[] toBytes(TopicIdPartition topicIdPartition) {
// We do not want to depend upon hash code generation of Uuid as that may change.
int hash = Objects.hash(topicIdPartition.topicId().getLeastSignificantBits(),
topicIdPartition.topicId().getMostSignificantBits(),
topicIdPartition.topicPartition().partition());
return toBytes(hash);
}
private byte[] toBytes(int n) {
return new byte[]{
(byte) (n >> 24),
(byte) (n >> 16),
(byte) (n >> 8),
(byte) n
};
}
}

43
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java

@ -0,0 +1,43 @@ @@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
public abstract class RemotePartitionMetadataEventHandler {
public void handleRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) {
if (remoteLogMetadata instanceof RemoteLogSegmentMetadata) {
handleRemoteLogSegmentMetadata((RemoteLogSegmentMetadata) remoteLogMetadata);
} else if (remoteLogMetadata instanceof RemoteLogSegmentMetadataUpdate) {
handleRemoteLogSegmentMetadataUpdate((RemoteLogSegmentMetadataUpdate) remoteLogMetadata);
} else if (remoteLogMetadata instanceof RemotePartitionDeleteMetadata) {
handleRemotePartitionDeleteMetadata((RemotePartitionDeleteMetadata) remoteLogMetadata);
} else {
throw new IllegalArgumentException("remoteLogMetadata: " + remoteLogMetadata + " is not supported.");
}
}
protected abstract void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata);
protected abstract void handleRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate);
protected abstract void handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata);
}

143
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java

@ -0,0 +1,143 @@ @@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState;
import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
* This class represents a store to maintain the {@link RemotePartitionDeleteMetadata} and {@link RemoteLogMetadataCache} for each topic partition.
*/
public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHandler implements Closeable {
private static final Logger log = LoggerFactory.getLogger(RemotePartitionMetadataStore.class);
private Map<TopicIdPartition, RemotePartitionDeleteMetadata> idToPartitionDeleteMetadata =
new ConcurrentHashMap<>();
private Map<TopicIdPartition, RemoteLogMetadataCache> idToRemoteLogMetadataCache =
new ConcurrentHashMap<>();
@Override
public void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
log.debug("Adding remote log segment : [{}]", remoteLogSegmentMetadata);
RemoteLogSegmentId remoteLogSegmentId = remoteLogSegmentMetadata.remoteLogSegmentId();
idToRemoteLogMetadataCache
.computeIfAbsent(remoteLogSegmentId.topicIdPartition(), id -> new RemoteLogMetadataCache())
.addCopyInProgressSegment(remoteLogSegmentMetadata);
}
@Override
public void handleRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate rlsmUpdate) {
log.debug("Updating remote log segment: [{}]", rlsmUpdate);
RemoteLogSegmentId remoteLogSegmentId = rlsmUpdate.remoteLogSegmentId();
TopicIdPartition topicIdPartition = remoteLogSegmentId.topicIdPartition();
RemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition);
if (remoteLogMetadataCache != null) {
try {
remoteLogMetadataCache.updateRemoteLogSegmentMetadata(rlsmUpdate);
} catch (RemoteResourceNotFoundException e) {
log.error("Error occurred while updating the remote log segment.");
}
} else {
log.error("No partition metadata found for : " + topicIdPartition);
}
}
@Override
public void handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) {
log.debug("Received partition delete state with: [{}]", remotePartitionDeleteMetadata);
TopicIdPartition topicIdPartition = remotePartitionDeleteMetadata.topicIdPartition();
idToPartitionDeleteMetadata.put(topicIdPartition, remotePartitionDeleteMetadata);
// there will be a trigger to receive delete partition marker and act on that to delete all the segments.
if (remotePartitionDeleteMetadata.state() == RemotePartitionDeleteState.DELETE_PARTITION_FINISHED) {
// remove the association for the partition.
idToRemoteLogMetadataCache.remove(topicIdPartition);
idToPartitionDeleteMetadata.remove(topicIdPartition);
}
}
public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition)
throws RemoteStorageException {
Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
return getRemoteLogMetadataCache(topicIdPartition).listAllRemoteLogSegments();
}
public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch)
throws RemoteStorageException {
Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
return getRemoteLogMetadataCache(topicIdPartition).listRemoteLogSegments(leaderEpoch);
}
private RemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartition topicIdPartition)
throws RemoteResourceNotFoundException {
RemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition);
if (remoteLogMetadataCache == null) {
throw new RemoteResourceNotFoundException("No resource found for partition: " + topicIdPartition);
}
return remoteLogMetadataCache;
}
public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
long offset,
int epochForOffset)
throws RemoteStorageException {
Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
return getRemoteLogMetadataCache(topicIdPartition).remoteLogSegmentMetadata(epochForOffset, offset);
}
public Optional<Long> highestLogOffset(TopicIdPartition topicIdPartition,
int leaderEpoch) throws RemoteStorageException {
Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
return getRemoteLogMetadataCache(topicIdPartition).highestOffsetForEpoch(leaderEpoch);
}
@Override
public void close() throws IOException {
log.info("Clearing the entries from the store.");
// Clear the entries by creating unmodifiable empty maps.
// Practically, we do not use the same instances that are closed.
idToPartitionDeleteMetadata = Collections.emptyMap();
idToRemoteLogMetadataCache = Collections.emptyMap();
}
}

484
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java

@ -0,0 +1,484 @@ @@ -0,0 +1,484 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.internals.FatalExitError;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* This is the {@link RemoteLogMetadataManager} implementation with storage as an internal topic with name {@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}.
* This is used to publish and fetch {@link RemoteLogMetadata} for the registered user topic partitions with
* {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an instance of this class and it subscribes
* to metadata updates for the registered user topic partitions.
*/
public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataManager {
private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class);
private volatile boolean configured = false;
// It indicates whether the close process of this instance is started or not via #close() method.
// Using AtomicBoolean instead of volatile as it may encounter http://findbugs.sourceforge.net/bugDescriptions.html#SP_SPIN_ON_FIELD
// if the field is read but not updated in a spin loop like in #initializeResources() method.
private final AtomicBoolean closing = new AtomicBoolean(false);
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final Time time = Time.SYSTEM;
private Thread initializationThread;
private volatile ProducerManager producerManager;
private volatile ConsumerManager consumerManager;
// This allows to gracefully close this instance using {@link #close()} method while there are some pending or new
// requests calling different methods which use the resources like producer/consumer managers.
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final RemotePartitionMetadataStore remotePartitionMetadataStore = new RemotePartitionMetadataStore();
private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
private volatile RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner;
private final Set<TopicIdPartition> pendingAssignPartitions = Collections.synchronizedSet(new HashSet<>());
private volatile boolean initializationFailed;
@Override
public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata)
throws RemoteStorageException {
Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
// This allows gracefully rejecting the requests while closing of this instance is in progress, which triggers
// closing the producer/consumer manager instances.
lock.readLock().lock();
try {
ensureInitializedAndNotClosed();
// This method is allowed only to add remote log segment with the initial state(which is RemoteLogSegmentState.COPY_SEGMENT_STARTED)
// but not to update the existing remote log segment metadata.
if (remoteLogSegmentMetadata.state() != RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
throw new IllegalArgumentException(
"Given remoteLogSegmentMetadata should have state as " + RemoteLogSegmentState.COPY_SEGMENT_STARTED
+ " but it contains state as: " + remoteLogSegmentMetadata.state());
}
// Publish the message to the topic.
doPublishMetadata(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition(),
remoteLogSegmentMetadata);
} finally {
lock.readLock().unlock();
}
}
@Override
public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate)
throws RemoteStorageException {
Objects.requireNonNull(segmentMetadataUpdate, "segmentMetadataUpdate can not be null");
lock.readLock().lock();
try {
ensureInitializedAndNotClosed();
// Callers should use addRemoteLogSegmentMetadata to add RemoteLogSegmentMetadata with state as
// RemoteLogSegmentState.COPY_SEGMENT_STARTED.
if (segmentMetadataUpdate.state() == RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
throw new IllegalArgumentException("Given remoteLogSegmentMetadata should not have the state as: "
+ RemoteLogSegmentState.COPY_SEGMENT_STARTED);
}
// Publish the message to the topic.
doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(), segmentMetadataUpdate);
} finally {
lock.readLock().unlock();
}
}
@Override
public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata)
throws RemoteStorageException {
Objects.requireNonNull(remotePartitionDeleteMetadata, "remotePartitionDeleteMetadata can not be null");
lock.readLock().lock();
try {
ensureInitializedAndNotClosed();
doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), remotePartitionDeleteMetadata);
} finally {
lock.readLock().unlock();
}
}
private void doPublishMetadata(TopicIdPartition topicIdPartition, RemoteLogMetadata remoteLogMetadata)
throws RemoteStorageException {
log.debug("Publishing metadata for partition: [{}] with context: [{}]", topicIdPartition, remoteLogMetadata);
try {
// Publish the message to the topic.
RecordMetadata recordMetadata = producerManager.publishMessage(remoteLogMetadata);
// Wait until the consumer catches up with this offset. This will ensure read-after-write consistency
// semantics.
consumerManager.waitTillConsumptionCatchesUp(recordMetadata);
} catch (KafkaException e) {
if (e instanceof RetriableException) {
throw e;
} else {
throw new RemoteStorageException(e);
}
}
}
@Override
public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
int epochForOffset,
long offset)
throws RemoteStorageException {
lock.readLock().lock();
try {
ensureInitializedAndNotClosed();
return remotePartitionMetadataStore.remoteLogSegmentMetadata(topicIdPartition, offset, epochForOffset);
} finally {
lock.readLock().unlock();
}
}
@Override
public Optional<Long> highestOffsetForEpoch(TopicIdPartition topicIdPartition,
int leaderEpoch)
throws RemoteStorageException {
lock.readLock().lock();
try {
ensureInitializedAndNotClosed();
return remotePartitionMetadataStore.highestLogOffset(topicIdPartition, leaderEpoch);
} finally {
lock.readLock().unlock();
}
}
@Override
public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition)
throws RemoteStorageException {
Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
lock.readLock().lock();
try {
ensureInitializedAndNotClosed();
return remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition);
} finally {
lock.readLock().unlock();
}
}
@Override
public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch)
throws RemoteStorageException {
Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
lock.readLock().lock();
try {
ensureInitializedAndNotClosed();
return remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, leaderEpoch);
} finally {
lock.readLock().unlock();
}
}
public int metadataPartition(TopicIdPartition topicIdPartition) {
return rlmmTopicPartitioner.metadataPartition(topicIdPartition);
}
// Visible For Testing
public Optional<Long> receivedOffsetForPartition(int metadataPartition) {
return consumerManager.receivedOffsetForPartition(metadataPartition);
}
@Override
public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
Set<TopicIdPartition> followerPartitions) {
Objects.requireNonNull(leaderPartitions, "leaderPartitions can not be null");
Objects.requireNonNull(followerPartitions, "followerPartitions can not be null");
log.info("Received leadership notifications with leader partitions {} and follower partitions {}",
leaderPartitions, followerPartitions);
HashSet<TopicIdPartition> allPartitions = new HashSet<>(leaderPartitions);
allPartitions.addAll(followerPartitions);
lock.readLock().lock();
try {
if (closing.get()) {
throw new IllegalStateException("This instance is in closing state");
}
if (!initialized.get()) {
// If it is not yet initialized, then keep them as pending partitions and assign them
// when it is initialized successfully in initializeResources().
this.pendingAssignPartitions.addAll(allPartitions);
} else {
consumerManager.addAssignmentsForPartitions(allPartitions);
}
} finally {
lock.readLock().unlock();
}
}
@Override
public void onStopPartitions(Set<TopicIdPartition> partitions) {
lock.readLock().lock();
try {
if (closing.get()) {
throw new IllegalStateException("This instance is in closing state");
}
if (!initialized.get()) {
// If it is not yet initialized, then remove them from the pending partitions if any.
if (!pendingAssignPartitions.isEmpty()) {
pendingAssignPartitions.removeAll(partitions);
}
} else {
consumerManager.removeAssignmentsForPartitions(partitions);
}
} finally {
lock.readLock().unlock();
}
}
@Override
public void configure(Map<String, ?> configs) {
Objects.requireNonNull(configs, "configs can not be null.");
lock.writeLock().lock();
try {
if (configured) {
log.info("Skipping configure as it is already configured.");
return;
}
log.info("Started initializing with configs: {}", configs);
rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
rlmmTopicPartitioner = new RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
configured = true;
log.info("Successfully initialized with rlmmConfig: {}", rlmmConfig);
// Scheduling the initialization producer/consumer managers in a separate thread. Required resources may
// not yet be available now. This thread makes sure that it is retried at regular intervals until it is
// successful.
initializationThread = KafkaThread.nonDaemon("RLMMInitializationThread", () -> initializeResources());
initializationThread.start();
} finally {
lock.writeLock().unlock();
}
}
private void initializeResources() {
log.info("Initializing the resources.");
final NewTopic remoteLogMetadataTopicRequest = createRemoteLogMetadataTopicRequest();
boolean topicCreated = false;
long startTimeMs = time.milliseconds();
AdminClient adminClient = null;
try {
adminClient = AdminClient.create(rlmmConfig.producerProperties());
// Stop if it is already initialized or closing.
while (!(initialized.get() || closing.get())) {
// If it is timed out then raise an error to exit.
if (time.milliseconds() - startTimeMs > rlmmConfig.initializationRetryMaxTimeoutMs()) {
log.error("Timed out in initializing the resources, retried to initialize the resource for [{}] ms.",
rlmmConfig.initializationRetryMaxTimeoutMs());
initializationFailed = true;
return;
}
if (!topicCreated) {
topicCreated = createTopic(adminClient, remoteLogMetadataTopicRequest);
}
if (!topicCreated) {
// Sleep for INITIALIZATION_RETRY_INTERVAL_MS before trying to create the topic again.
log.info("Sleep for : {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs());
Utils.sleep(rlmmConfig.initializationRetryIntervalMs());
continue;
} else {
// If topic is already created, validate the existing topic partitions.
try {
String topicName = remoteLogMetadataTopicRequest.name();
// If the existing topic partition size is not same as configured, mark initialization as failed and exit.
if (!isPartitionsCountSameAsConfigured(adminClient, topicName)) {
initializationFailed = true;
}
} catch (Exception e) {
log.info("Sleep for : {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs());
Utils.sleep(rlmmConfig.initializationRetryIntervalMs());
continue;
}
}
// Create producer and consumer managers.
lock.writeLock().lock();
try {
producerManager = new ProducerManager(rlmmConfig, rlmmTopicPartitioner);
consumerManager = new ConsumerManager(rlmmConfig, remotePartitionMetadataStore, rlmmTopicPartitioner, time);
consumerManager.startConsumerThread();
if (!pendingAssignPartitions.isEmpty()) {
consumerManager.addAssignmentsForPartitions(pendingAssignPartitions);
pendingAssignPartitions.clear();
}
initialized.set(true);
log.info("Initialized resources successfully.");
} catch (Exception e) {
log.error("Encountered error while initializing producer/consumer", e);
return;
} finally {
lock.writeLock().unlock();
}
}
} finally {
if (adminClient != null) {
try {
adminClient.close(Duration.ofSeconds(10));
} catch (Exception e) {
// Ignore the error.
log.debug("Error occurred while closing the admin client", e);
}
}
}
}
private boolean isPartitionsCountSameAsConfigured(AdminClient adminClient,
String topicName) throws InterruptedException, ExecutionException {
log.debug("Getting topic details to check for partition count and replication factor.");
TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(topicName))
.values().get(topicName).get();
int expectedPartitions = rlmmConfig.metadataTopicPartitionsCount();
int topicPartitionsSize = topicDescription.partitions().size();
if (topicPartitionsSize != expectedPartitions) {
log.error("Existing topic partition count [{}] is not same as the expected partition count [{}]",
topicPartitionsSize, expectedPartitions);
return false;
}
return true;
}
private NewTopic createRemoteLogMetadataTopicRequest() {
Map<String, String> topicConfigs = new HashMap<>();
topicConfigs.put(TopicConfig.RETENTION_MS_CONFIG, Long.toString(rlmmConfig.metadataTopicRetentionMs()));
topicConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
return new NewTopic(rlmmConfig.remoteLogMetadataTopicName(),
rlmmConfig.metadataTopicPartitionsCount(),
rlmmConfig.metadataTopicReplicationFactor()).configs(topicConfigs);
}
/**
* @param topic topic to be created.
* @return Returns true if the topic already exists or it is created successfully.
*/
private boolean createTopic(AdminClient adminClient, NewTopic topic) {
boolean topicCreated = false;
try {
adminClient.createTopics(Collections.singleton(topic)).all().get();
topicCreated = true;
} catch (Exception e) {
if (e.getCause() instanceof TopicExistsException) {
log.info("Topic [{}] already exists", topic.name());
topicCreated = true;
} else {
log.error("Encountered error while creating remote log metadata topic.", e);
}
}
return topicCreated;
}
public boolean isInitialized() {
return initialized.get();
}
private void ensureInitializedAndNotClosed() {
if (initializationFailed) {
// If initialization is failed, shutdown the broker.
throw new FatalExitError();
}
if (closing.get() || !initialized.get()) {
throw new IllegalStateException("This instance is in invalid state, initialized: " + initialized +
" close: " + closing);
}
}
@Override
public void close() throws IOException {
// Close all the resources.
log.info("Closing the resources.");
if (closing.compareAndSet(false, true)) {
lock.writeLock().lock();
try {
if (initializationThread != null) {
try {
initializationThread.join();
} catch (InterruptedException e) {
log.error("Initialization thread was interrupted while waiting to join on close.", e);
}
}
Utils.closeQuietly(producerManager, "ProducerTask");
Utils.closeQuietly(consumerManager, "RLMMConsumerManager");
Utils.closeQuietly(remotePartitionMetadataStore, "RemotePartitionMetadataStore");
} finally {
lock.writeLock().unlock();
log.info("Closed the resources.");
}
}
}
}

228
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java

@ -0,0 +1,228 @@ @@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
/**
* This class defines the configuration of topic based {@link org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager} implementation.
*/
public final class TopicBasedRemoteLogMetadataManagerConfig {
public static final String REMOTE_LOG_METADATA_TOPIC_NAME = "__remote_log_metadata";
public static final String REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP = "remote.log.metadata.topic.replication.factor";
public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP = "remote.log.metadata.topic.num.partitions";
public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP = "remote.log.metadata.topic.retention.ms";
public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP = "remote.log.metadata.consume.wait.ms";
public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP = "remote.log.metadata.initialization.retry.max.timeout.ms";
public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP = "remote.log.metadata.initialization.retry.interval.ms";
public static final int DEFAULT_REMOTE_LOG_METADATA_TOPIC_PARTITIONS = 50;
public static final long DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS = -1L;
public static final short DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR = 3;
public static final long DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS = 2 * 60 * 1000L;
public static final long DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS = 2 * 60 * 1000L;
public static final long DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS = 5 * 1000L;
public static final String REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_DOC = "Replication factor of remote log metadata Topic.";
public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_DOC = "The number of partitions for remote log metadata Topic.";
public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_DOC = "Remote log metadata topic log retention in milli seconds." +
"Default: -1, that means unlimited. Users can configure this value based on their use cases. " +
"To avoid any data loss, this value should be more than the maximum retention period of any topic enabled with " +
"tiered storage in the cluster.";
public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC = "The amount of time in milli seconds to wait for the local consumer to " +
"receive the published event.";
public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_DOC = "The retry interval in milli seconds for " +
" retrying RemoteLogMetadataManager resources initialization again.";
public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_DOC = "The maximum amount of time in milli seconds " +
" for retrying RemoteLogMetadataManager resources initialization. When total retry intervals reach this timeout, initialization" +
" is considered as failed and broker starts shutting down.";
public static final String REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX = "remote.log.metadata.common.client.";
public static final String REMOTE_LOG_METADATA_PRODUCER_PREFIX = "remote.log.metadata.producer.";
public static final String REMOTE_LOG_METADATA_CONSUMER_PREFIX = "remote.log.metadata.consumer.";
public static final String BROKER_ID = "broker.id";
private static final String REMOTE_LOG_METADATA_CLIENT_PREFIX = "__remote_log_metadata_client";
private static final ConfigDef CONFIG = new ConfigDef();
static {
CONFIG.define(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, SHORT, DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR, atLeast(1), LOW,
REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_DOC)
.define(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, INT, DEFAULT_REMOTE_LOG_METADATA_TOPIC_PARTITIONS, atLeast(1), LOW,
REMOTE_LOG_METADATA_TOPIC_PARTITIONS_DOC)
.define(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, LONG, DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS, LOW,
REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_DOC)
.define(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP, LONG, DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS, atLeast(0), LOW,
REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC)
.define(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP, LONG,
DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS, atLeast(0), LOW,
REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_DOC)
.define(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP, LONG,
DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS, atLeast(0), LOW,
REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_DOC);
}
private final String clientIdPrefix;
private final int metadataTopicPartitionsCount;
private final long consumeWaitMs;
private final long metadataTopicRetentionMs;
private final short metadataTopicReplicationFactor;
private final long initializationRetryMaxTimeoutMs;
private final long initializationRetryIntervalMs;
private Map<String, Object> consumerProps;
private Map<String, Object> producerProps;
public TopicBasedRemoteLogMetadataManagerConfig(Map<String, ?> props) {
Objects.requireNonNull(props, "props can not be null");
Map<String, Object> parsedConfigs = CONFIG.parse(props);
metadataTopicPartitionsCount = (int) parsedConfigs.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP);
metadataTopicReplicationFactor = (short) parsedConfigs.get(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP);
metadataTopicRetentionMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP);
if (metadataTopicRetentionMs != -1 && metadataTopicRetentionMs <= 0) {
throw new IllegalArgumentException("Invalid metadata topic retention in millis: " + metadataTopicRetentionMs);
}
consumeWaitMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP);
initializationRetryIntervalMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP);
initializationRetryMaxTimeoutMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP);
clientIdPrefix = REMOTE_LOG_METADATA_CLIENT_PREFIX + "_" + props.get(BROKER_ID);
initializeProducerConsumerProperties(props);
}
private void initializeProducerConsumerProperties(Map<String, ?> configs) {
Map<String, Object> commonClientConfigs = new HashMap<>();
Map<String, Object> producerOnlyConfigs = new HashMap<>();
Map<String, Object> consumerOnlyConfigs = new HashMap<>();
for (Map.Entry<String, ?> entry : configs.entrySet()) {
String key = entry.getKey();
if (key.startsWith(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX)) {
commonClientConfigs.put(key.substring(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX.length()), entry.getValue());
} else if (key.startsWith(REMOTE_LOG_METADATA_PRODUCER_PREFIX)) {
producerOnlyConfigs.put(key.substring(REMOTE_LOG_METADATA_PRODUCER_PREFIX.length()), entry.getValue());
} else if (key.startsWith(REMOTE_LOG_METADATA_CONSUMER_PREFIX)) {
consumerOnlyConfigs.put(key.substring(REMOTE_LOG_METADATA_CONSUMER_PREFIX.length()), entry.getValue());
}
}
HashMap<String, Object> allProducerConfigs = new HashMap<>(commonClientConfigs);
allProducerConfigs.putAll(producerOnlyConfigs);
producerProps = createProducerProps(allProducerConfigs);
HashMap<String, Object> allConsumerConfigs = new HashMap<>(commonClientConfigs);
allConsumerConfigs.putAll(consumerOnlyConfigs);
consumerProps = createConsumerProps(allConsumerConfigs);
}
public String remoteLogMetadataTopicName() {
return REMOTE_LOG_METADATA_TOPIC_NAME;
}
public int metadataTopicPartitionsCount() {
return metadataTopicPartitionsCount;
}
public short metadataTopicReplicationFactor() {
return metadataTopicReplicationFactor;
}
public long metadataTopicRetentionMs() {
return metadataTopicRetentionMs;
}
public long consumeWaitMs() {
return consumeWaitMs;
}
public long initializationRetryMaxTimeoutMs() {
return initializationRetryMaxTimeoutMs;
}
public long initializationRetryIntervalMs() {
return initializationRetryIntervalMs;
}
public Map<String, Object> consumerProperties() {
return consumerProps;
}
public Map<String, Object> producerProperties() {
return producerProps;
}
private Map<String, Object> createConsumerProps(HashMap<String, Object> allConsumerConfigs) {
Map<String, Object> props = new HashMap<>(allConsumerConfigs);
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientIdPrefix + "_consumer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
return props;
}
private Map<String, Object> createProducerProps(HashMap<String, Object> allProducerConfigs) {
Map<String, Object> props = new HashMap<>(allProducerConfigs);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientIdPrefix + "_producer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
return Collections.unmodifiableMap(props);
}
@Override
public String toString() {
return "TopicBasedRemoteLogMetadataManagerConfig{" +
"clientIdPrefix='" + clientIdPrefix + '\'' +
", metadataTopicPartitionsCount=" + metadataTopicPartitionsCount +
", consumeWaitMs=" + consumeWaitMs +
", metadataTopicRetentionMs=" + metadataTopicRetentionMs +
", metadataTopicReplicationFactor=" + metadataTopicReplicationFactor +
", initializationRetryMaxTimeoutMs=" + initializationRetryMaxTimeoutMs +
", initializationRetryIntervalMs=" + initializationRetryIntervalMs +
", consumerProps=" + consumerProps +
", producerProps=" + producerProps +
'}';
}
}

315
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java

@ -26,24 +26,13 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; @@ -26,24 +26,13 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
public class RemoteLogMetadataCacheTest {
private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataCacheTest.class);
private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(),
new TopicPartition("foo", 0));
@ -53,253 +42,6 @@ public class RemoteLogMetadataCacheTest { @@ -53,253 +42,6 @@ public class RemoteLogMetadataCacheTest {
private final Time time = new MockTime(1);
@Test
public void testSegmentsLifeCycleInCache() throws Exception {
RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
// Create remote log segment metadata and add them to RemoteLogMetadataCache.
// segment 0
// offsets: [0-100]
// leader epochs (0,0), (1,20), (2,80)
Map<Integer, Long> segment0LeaderEpochs = new HashMap<>();
segment0LeaderEpochs.put(0, 0L);
segment0LeaderEpochs.put(1, 20L);
segment0LeaderEpochs.put(2, 80L);
RemoteLogSegmentId segment0Id = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
RemoteLogSegmentMetadata segment0Metadata = new RemoteLogSegmentMetadata(segment0Id, 0L, 100L,
-1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, segment0LeaderEpochs);
cache.addCopyInProgressSegment(segment0Metadata);
// We should not get this as the segment is still getting copied and it is not yet considered successful until
// it reaches RemoteLogSegmentState.COPY_SEGMENT_FINISHED.
Assertions.assertFalse(cache.remoteLogSegmentMetadata(40, 1).isPresent());
// Check that these leader epochs are to considered for highest offsets as they are still getting copied and
// they did nto reach COPY_SEGMENT_FINISHED state.
Stream.of(0, 1, 2).forEach(epoch -> Assertions.assertFalse(cache.highestOffsetForEpoch(epoch).isPresent()));
RemoteLogSegmentMetadataUpdate segment0Update = new RemoteLogSegmentMetadataUpdate(
segment0Id, time.milliseconds(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
cache.updateRemoteLogSegmentMetadata(segment0Update);
RemoteLogSegmentMetadata expectedSegment0Metadata = segment0Metadata.createWithUpdates(segment0Update);
// segment 1
// offsets: [101 - 200]
// no changes in leadership with in this segment
// leader epochs (2, 101)
Map<Integer, Long> segment1LeaderEpochs = Collections.singletonMap(2, 101L);
RemoteLogSegmentMetadata segment1Metadata = createSegmentUpdateWithState(cache, segment1LeaderEpochs, 101L, 200L,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
// segment 2
// offsets: [201 - 300]
// moved to epoch 3 in between
// leader epochs (2, 201), (3, 240)
Map<Integer, Long> segment2LeaderEpochs = new HashMap<>();
segment2LeaderEpochs.put(2, 201L);
segment2LeaderEpochs.put(3, 240L);
RemoteLogSegmentMetadata segment2Metadata = createSegmentUpdateWithState(cache, segment2LeaderEpochs, 201L, 300L,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
// segment 3
// offsets: [250 - 400]
// leader epochs (3, 250), (4, 370)
Map<Integer, Long> segment3LeaderEpochs = new HashMap<>();
segment3LeaderEpochs.put(3, 250L);
segment3LeaderEpochs.put(4, 370L);
RemoteLogSegmentMetadata segment3Metadata = createSegmentUpdateWithState(cache, segment3LeaderEpochs, 250L, 400L,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
//////////////////////////////////////////////////////////////////////////////////////////
// Four segments are added with different boundaries and leader epochs.
// Search for cache.remoteLogSegmentMetadata(leaderEpoch, offset) for different
// epochs and offsets
//////////////////////////////////////////////////////////////////////////////////////////
HashMap<EpochOffset, RemoteLogSegmentMetadata> expectedEpochOffsetToSegmentMetadata = new HashMap<>();
// Existing metadata entries.
expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 40), expectedSegment0Metadata);
expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(2, 110), segment1Metadata);
expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 240), segment2Metadata);
expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 250), segment3Metadata);
expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 375), segment3Metadata);
// Non existing metadata entries.
// Search for offset 110, epoch 1, and it should not exist.
expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 110), null);
// Search for non existing offset 401, epoch 4.
expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 401), null);
// Search for non existing epoch 5.
expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(5, 301), null);
for (Map.Entry<EpochOffset, RemoteLogSegmentMetadata> entry : expectedEpochOffsetToSegmentMetadata.entrySet()) {
EpochOffset epochOffset = entry.getKey();
Optional<RemoteLogSegmentMetadata> segmentMetadata = cache.remoteLogSegmentMetadata(epochOffset.epoch, epochOffset.offset);
RemoteLogSegmentMetadata expectedSegmentMetadata = entry.getValue();
log.debug("Searching for {} , result: {}, expected: {} ", epochOffset, segmentMetadata,
expectedSegmentMetadata);
if (expectedSegmentMetadata != null) {
Assertions.assertEquals(Optional.of(expectedSegmentMetadata), segmentMetadata);
} else {
Assertions.assertFalse(segmentMetadata.isPresent());
}
}
// Update segment with state as DELETE_SEGMENT_STARTED.
// It should not be available when we search for that segment.
cache.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
time.milliseconds(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID_1));
Assertions.assertFalse(cache.remoteLogSegmentMetadata(0, 10).isPresent());
// Update segment with state as DELETE_SEGMENT_FINISHED.
// It should not be available when we search for that segment.
cache.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
time.milliseconds(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID_1));
Assertions.assertFalse(cache.remoteLogSegmentMetadata(0, 10).isPresent());
//////////////////////////////////////////////////////////////////////////////////////////
// Search for cache.highestLogOffset(leaderEpoch) for all the leader epochs
//////////////////////////////////////////////////////////////////////////////////////////
Map<Integer, Long> expectedEpochToHighestOffset = new HashMap<>();
expectedEpochToHighestOffset.put(0, 19L);
expectedEpochToHighestOffset.put(1, 79L);
expectedEpochToHighestOffset.put(2, 239L);
expectedEpochToHighestOffset.put(3, 369L);
expectedEpochToHighestOffset.put(4, 400L);
for (Map.Entry<Integer, Long> entry : expectedEpochToHighestOffset.entrySet()) {
Integer epoch = entry.getKey();
Long expectedOffset = entry.getValue();
Optional<Long> offset = cache.highestOffsetForEpoch(epoch);
log.debug("Fetching highest offset for epoch: {} , returned: {} , expected: {}", epoch, offset, expectedOffset);
Assertions.assertEquals(Optional.of(expectedOffset), offset);
}
// Search for non existing leader epoch
Optional<Long> highestOffsetForEpoch5 = cache.highestOffsetForEpoch(5);
Assertions.assertFalse(highestOffsetForEpoch5.isPresent());
}
private RemoteLogSegmentMetadata createSegmentUpdateWithState(RemoteLogMetadataCache cache,
Map<Integer, Long> segmentLeaderEpochs,
long startOffset,
long endOffset,
RemoteLogSegmentState state)
throws RemoteResourceNotFoundException {
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, -1L,
BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
cache.addCopyInProgressSegment(segmentMetadata);
RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId,
time.milliseconds(), state, BROKER_ID_1);
cache.updateRemoteLogSegmentMetadata(segMetadataUpdate);
return segmentMetadata.createWithUpdates(segMetadataUpdate);
}
@Test
public void testCacheSegmentWithCopySegmentStartedState() throws Exception {
RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
// Create a segment with state COPY_SEGMENT_STARTED, and check for searching that segment and listing the
// segments.
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0L, 50L, -1L, BROKER_ID_0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
cache.addCopyInProgressSegment(segmentMetadata);
// This segment should not be available as the state is not reached to COPY_SEGMENT_FINISHED.
Optional<RemoteLogSegmentMetadata> segMetadataForOffset0Epoch0 = cache.remoteLogSegmentMetadata(0, 0);
Assertions.assertFalse(segMetadataForOffset0Epoch0.isPresent());
// cache.listRemoteLogSegments APIs should contain the above segment.
checkListSegments(cache, 0, segmentMetadata);
}
@Test
public void testCacheSegmentWithCopySegmentFinishedState() throws Exception {
RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
// Create a segment and move it to state COPY_SEGMENT_FINISHED. and check for searching that segment and
// listing the segments.
RemoteLogSegmentMetadata segmentMetadata = createSegmentUpdateWithState(cache, Collections.singletonMap(0, 101L),
101L, 200L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
// Search should return the above segment.
Optional<RemoteLogSegmentMetadata> segMetadataForOffset150 = cache.remoteLogSegmentMetadata(0, 150);
Assertions.assertEquals(Optional.of(segmentMetadata), segMetadataForOffset150);
// cache.listRemoteLogSegments should contain the above segments.
checkListSegments(cache, 0, segmentMetadata);
}
@Test
public void testCacheSegmentWithDeleteSegmentStartedState() throws Exception {
RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
// Create a segment and move it to state DELETE_SEGMENT_STARTED, and check for searching that segment and
// listing the segments.
RemoteLogSegmentMetadata segmentMetadata = createSegmentUpdateWithState(cache, Collections.singletonMap(0, 201L),
201L, 300L, RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
// Search should not return the above segment as their leader epoch state is cleared.
Optional<RemoteLogSegmentMetadata> segmentMetadataForOffset250Epoch0 = cache.remoteLogSegmentMetadata(0, 250);
Assertions.assertFalse(segmentMetadataForOffset250Epoch0.isPresent());
checkListSegments(cache, 0, segmentMetadata);
}
@Test
public void testCacheSegmentsWithDeleteSegmentFinishedState() throws Exception {
RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
// Create a segment and move it to state DELETE_SEGMENT_FINISHED, and check for searching that segment and
// listing the segments.
RemoteLogSegmentMetadata segmentMetadata = createSegmentUpdateWithState(cache, Collections.singletonMap(0, 301L),
301L, 400L, RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
// Search should not return the above segment as their leader epoch state is cleared.
Assertions.assertFalse(cache.remoteLogSegmentMetadata(0, 350).isPresent());
RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
time.milliseconds(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID_1);
cache.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
// listRemoteLogSegments(0) and listRemoteLogSegments() should not contain the above segment.
Assertions.assertFalse(cache.listRemoteLogSegments(0).hasNext());
Assertions.assertFalse(cache.listAllRemoteLogSegments().hasNext());
}
@Test
public void testCacheListSegments() throws Exception {
RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
// Create a few segments and add them to the cache.
RemoteLogSegmentMetadata segment0 = createSegmentUpdateWithState(cache, Collections.singletonMap(0, 0L), 0, 100,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
RemoteLogSegmentMetadata segment1 = createSegmentUpdateWithState(cache, Collections.singletonMap(0, 101L), 101, 200,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
Map<Integer, Long> segment2LeaderEpochs = new HashMap<>();
segment2LeaderEpochs.put(0, 201L);
segment2LeaderEpochs.put(1, 301L);
RemoteLogSegmentMetadata segment2 = createSegmentUpdateWithState(cache, segment2LeaderEpochs, 201, 400,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
// listRemoteLogSegments(0) and listAllRemoteLogSegments() should contain all the above segments.
List<RemoteLogSegmentMetadata> expectedSegmentsForEpoch0 = Arrays.asList(segment0, segment1, segment2);
Assertions.assertTrue(TestUtils.sameElementsWithOrder(cache.listRemoteLogSegments(0),
expectedSegmentsForEpoch0.iterator()));
Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(cache.listAllRemoteLogSegments(),
expectedSegmentsForEpoch0.iterator()));
// listRemoteLogSegments(1) should contain only segment2.
List<RemoteLogSegmentMetadata> expectedSegmentsForEpoch1 = Collections.singletonList(segment2);
Assertions.assertTrue(TestUtils.sameElementsWithOrder(cache.listRemoteLogSegments(1),
expectedSegmentsForEpoch1.iterator()));
}
@Test
public void testAPIsWithInvalidArgs() {
RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
@ -337,51 +79,22 @@ public class RemoteLogMetadataCacheTest { @@ -337,51 +79,22 @@ public class RemoteLogMetadataCacheTest {
});
}
private void checkListSegments(RemoteLogMetadataCache cache,
int leaderEpoch,
RemoteLogSegmentMetadata expectedSegment)
private RemoteLogSegmentMetadata createSegmentUpdateWithState(RemoteLogMetadataCache cache,
Map<Integer, Long> segmentLeaderEpochs,
long startOffset,
long endOffset,
RemoteLogSegmentState state)
throws RemoteResourceNotFoundException {
// cache.listRemoteLogSegments(leaderEpoch) should contain the above segment.
Iterator<RemoteLogSegmentMetadata> segmentsIter = cache.listRemoteLogSegments(leaderEpoch);
Assertions.assertTrue(segmentsIter.hasNext() && Objects.equals(segmentsIter.next(), expectedSegment));
// cache.listAllRemoteLogSegments() should contain the above segment.
Iterator<RemoteLogSegmentMetadata> allSegmentsIter = cache.listAllRemoteLogSegments();
Assertions.assertTrue(allSegmentsIter.hasNext() && Objects.equals(allSegmentsIter.next(), expectedSegment));
}
private static class EpochOffset {
final int epoch;
final long offset;
private EpochOffset(int epoch, long offset) {
this.epoch = epoch;
this.offset = offset;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EpochOffset that = (EpochOffset) o;
return epoch == that.epoch && offset == that.offset;
}
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, -1L,
BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
cache.addCopyInProgressSegment(segmentMetadata);
@Override
public int hashCode() {
return Objects.hash(epoch, offset);
}
RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId,
time.milliseconds(), state, BROKER_ID_1);
cache.updateRemoteLogSegmentMetadata(segMetadataUpdate);
@Override
public String toString() {
return "EpochOffset{" +
"epoch=" + epoch +
", offset=" + offset +
'}';
}
return segmentMetadata.createWithUpdates(segMetadataUpdate);
}
}

5
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java

@ -106,6 +106,11 @@ public class RemoteLogMetadataSerdeTest { @@ -106,6 +106,11 @@ public class RemoteLogMetadataSerdeTest {
public InvalidRemoteLogMetadata(int brokerId, long eventTimestampMs) {
super(brokerId, eventTimestampMs);
}
@Override
public TopicIdPartition topicIdPartition() {
throw new UnsupportedOperationException();
}
}
}

63
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleManager.java

@ -0,0 +1,63 @@ @@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;
/**
* This interface defines the lifecycle methods for {@code RemoteLogSegmentMetadata}. {@link RemoteLogSegmentLifecycleTest} tests
* different implementations of this interface. This is responsible for managing all the segments for a given {@code topicIdPartition}
* registered with {@link #initialize(TopicIdPartition)}.
*
* @see org.apache.kafka.server.log.remote.metadata.storage.RemoteLogSegmentLifecycleTest.RemoteLogMetadataCacheWrapper
* @see org.apache.kafka.server.log.remote.metadata.storage.RemoteLogSegmentLifecycleTest.TopicBasedRemoteLogMetadataManagerWrapper
*/
public interface RemoteLogSegmentLifecycleManager extends Closeable {
/**
* Initialize the resources for this instance and register the given {@code topicIdPartition}.
*
* @param topicIdPartition topic partition to be registered with this instance.
*/
default void initialize(TopicIdPartition topicIdPartition) {
}
@Override
default void close() throws IOException {
}
void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata) throws RemoteStorageException;
void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate) throws RemoteStorageException;
Optional<Long> highestOffsetForEpoch(int epoch) throws RemoteStorageException;
Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int leaderEpoch,
long offset) throws RemoteStorageException;
Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(int leaderEpoch) throws RemoteStorageException;
Iterator<RemoteLogSegmentMetadata> listAllRemoteLogSegments() throws RemoteStorageException;
}

516
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java

@ -0,0 +1,516 @@ @@ -0,0 +1,516 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
public class RemoteLogSegmentLifecycleTest {
private static final Logger log = LoggerFactory.getLogger(RemoteLogSegmentLifecycleTest.class);
private static final int SEG_SIZE = 1024 * 1024;
private static final int BROKER_ID_0 = 0;
private static final int BROKER_ID_1 = 1;
private final TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
private final Time time = new MockTime(1);
@ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
@MethodSource("remoteLogSegmentLifecycleManagers")
public void testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager) throws Exception {
try {
remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
// segment 0
// offsets: [0-100]
// leader epochs (0,0), (1,20), (2,80)
Map<Integer, Long> segment0LeaderEpochs = new HashMap<>();
segment0LeaderEpochs.put(0, 0L);
segment0LeaderEpochs.put(1, 20L);
segment0LeaderEpochs.put(2, 80L);
RemoteLogSegmentId segment0Id = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
RemoteLogSegmentMetadata segment0Metadata = new RemoteLogSegmentMetadata(segment0Id, 0L, 100L,
-1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE,
segment0LeaderEpochs);
remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segment0Metadata);
// We should not get this as the segment is still getting copied and it is not yet considered successful until
// it reaches RemoteLogSegmentState.COPY_SEGMENT_FINISHED.
Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(40, 1).isPresent());
// Check that these leader epochs are not to be considered for highestOffsetForEpoch API as they are still getting copied.
Stream.of(0, 1, 2).forEach(epoch -> {
try {
Assertions.assertFalse(remoteLogSegmentLifecycleManager.highestOffsetForEpoch(epoch).isPresent());
} catch (RemoteStorageException e) {
Assertions.fail(e);
}
});
RemoteLogSegmentMetadataUpdate segment0Update = new RemoteLogSegmentMetadataUpdate(
segment0Id, time.milliseconds(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segment0Update);
RemoteLogSegmentMetadata expectedSegment0Metadata = segment0Metadata.createWithUpdates(segment0Update);
// segment 1
// offsets: [101 - 200]
// no changes in leadership with in this segment
// leader epochs (2, 101)
Map<Integer, Long> segment1LeaderEpochs = Collections.singletonMap(2, 101L);
RemoteLogSegmentMetadata segment1Metadata = createSegmentUpdateWithState(remoteLogSegmentLifecycleManager, segment1LeaderEpochs, 101L,
200L,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
// segment 2
// offsets: [201 - 300]
// moved to epoch 3 in between
// leader epochs (2, 201), (3, 240)
Map<Integer, Long> segment2LeaderEpochs = new HashMap<>();
segment2LeaderEpochs.put(2, 201L);
segment2LeaderEpochs.put(3, 240L);
RemoteLogSegmentMetadata segment2Metadata = createSegmentUpdateWithState(remoteLogSegmentLifecycleManager, segment2LeaderEpochs, 201L,
300L,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
// segment 3
// offsets: [250 - 400]
// leader epochs (3, 250), (4, 370)
Map<Integer, Long> segment3LeaderEpochs = new HashMap<>();
segment3LeaderEpochs.put(3, 250L);
segment3LeaderEpochs.put(4, 370L);
RemoteLogSegmentMetadata segment3Metadata = createSegmentUpdateWithState(remoteLogSegmentLifecycleManager, segment3LeaderEpochs, 250L,
400L,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
//////////////////////////////////////////////////////////////////////////////////////////
// Four segments are added with different boundaries and leader epochs.
// Search for cache.remoteLogSegmentMetadata(leaderEpoch, offset) for different
// epochs and offsets
//////////////////////////////////////////////////////////////////////////////////////////
HashMap<EpochOffset, RemoteLogSegmentMetadata> expectedEpochOffsetToSegmentMetadata = new HashMap<>();
// Existing metadata entries.
expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 40), expectedSegment0Metadata);
expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(2, 110), segment1Metadata);
expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 240), segment2Metadata);
expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 250), segment3Metadata);
expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 375), segment3Metadata);
// Non existing metadata entries.
// Search for offset 110, epoch 1, and it should not exist.
expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 110), null);
// Search for non existing offset 401, epoch 4.
expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 401), null);
// Search for non existing epoch 5.
expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(5, 301), null);
for (Map.Entry<EpochOffset, RemoteLogSegmentMetadata> entry : expectedEpochOffsetToSegmentMetadata.entrySet()) {
EpochOffset epochOffset = entry.getKey();
Optional<RemoteLogSegmentMetadata> segmentMetadata = remoteLogSegmentLifecycleManager
.remoteLogSegmentMetadata(epochOffset.epoch, epochOffset.offset);
RemoteLogSegmentMetadata expectedSegmentMetadata = entry.getValue();
log.debug("Searching for {} , result: {}, expected: {} ", epochOffset, segmentMetadata,
expectedSegmentMetadata);
if (expectedSegmentMetadata != null) {
Assertions.assertEquals(Optional.of(expectedSegmentMetadata), segmentMetadata);
} else {
Assertions.assertFalse(segmentMetadata.isPresent());
}
}
// Update segment with state as DELETE_SEGMENT_STARTED.
// It should not be available when we search for that segment.
remoteLogSegmentLifecycleManager
.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
time.milliseconds(),
RemoteLogSegmentState.DELETE_SEGMENT_STARTED,
BROKER_ID_1));
Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 10).isPresent());
// Update segment with state as DELETE_SEGMENT_FINISHED.
// It should not be available when we search for that segment.
remoteLogSegmentLifecycleManager
.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
time.milliseconds(),
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED,
BROKER_ID_1));
Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 10).isPresent());
//////////////////////////////////////////////////////////////////////////////////////////
// Search for cache.highestLogOffset(leaderEpoch) for all the leader epochs
//////////////////////////////////////////////////////////////////////////////////////////
Map<Integer, Long> expectedEpochToHighestOffset = new HashMap<>();
expectedEpochToHighestOffset.put(0, 19L);
expectedEpochToHighestOffset.put(1, 79L);
expectedEpochToHighestOffset.put(2, 239L);
expectedEpochToHighestOffset.put(3, 369L);
expectedEpochToHighestOffset.put(4, 400L);
for (Map.Entry<Integer, Long> entry : expectedEpochToHighestOffset.entrySet()) {
Integer epoch = entry.getKey();
Long expectedOffset = entry.getValue();
Optional<Long> offset = remoteLogSegmentLifecycleManager.highestOffsetForEpoch(epoch);
log.debug("Fetching highest offset for epoch: {} , returned: {} , expected: {}", epoch, offset, expectedOffset);
Assertions.assertEquals(Optional.of(expectedOffset), offset);
}
// Search for non existing leader epoch
Optional<Long> highestOffsetForEpoch5 = remoteLogSegmentLifecycleManager.highestOffsetForEpoch(5);
Assertions.assertFalse(highestOffsetForEpoch5.isPresent());
} finally {
Utils.closeQuietly(remoteLogSegmentLifecycleManager, "RemoteLogSegmentLifecycleManager");
}
}
private RemoteLogSegmentMetadata createSegmentUpdateWithState(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager,
Map<Integer, Long> segmentLeaderEpochs,
long startOffset,
long endOffset,
RemoteLogSegmentState state)
throws RemoteStorageException {
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, -1L, BROKER_ID_0,
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata);
RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(), state, BROKER_ID_1);
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segMetadataUpdate);
return segmentMetadata.createWithUpdates(segMetadataUpdate);
}
private static class EpochOffset {
final int epoch;
final long offset;
private EpochOffset(int epoch,
long offset) {
this.epoch = epoch;
this.offset = offset;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EpochOffset that = (EpochOffset) o;
return epoch == that.epoch && offset == that.offset;
}
@Override
public int hashCode() {
return Objects.hash(epoch, offset);
}
@Override
public String toString() {
return "EpochOffset{" +
"epoch=" + epoch +
", offset=" + offset +
'}';
}
}
private static Collection<Arguments> remoteLogSegmentLifecycleManagers() {
return Arrays.asList(Arguments.of(new RemoteLogMetadataCacheWrapper()),
Arguments.of(new TopicBasedRemoteLogMetadataManagerWrapper()));
}
private void checkListSegments(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager,
int leaderEpoch,
RemoteLogSegmentMetadata expectedSegment)
throws RemoteStorageException {
// cache.listRemoteLogSegments(leaderEpoch) should contain the above segment.
Iterator<RemoteLogSegmentMetadata> segmentsIter = remoteLogSegmentLifecycleManager.listRemoteLogSegments(leaderEpoch);
Assertions.assertTrue(segmentsIter.hasNext() && Objects.equals(segmentsIter.next(), expectedSegment));
// cache.listAllRemoteLogSegments() should contain the above segment.
Iterator<RemoteLogSegmentMetadata> allSegmentsIter = remoteLogSegmentLifecycleManager.listAllRemoteLogSegments();
Assertions.assertTrue(allSegmentsIter.hasNext() && Objects.equals(allSegmentsIter.next(), expectedSegment));
}
@ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
@MethodSource("remoteLogSegmentLifecycleManagers")
public void testCacheSegmentWithCopySegmentStartedState(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager) throws Exception {
try {
remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
// Create a segment with state COPY_SEGMENT_STARTED, and check for searching that segment and listing the
// segments.
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0L, 50L, -1L, BROKER_ID_0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata);
// This segment should not be available as the state is not reached to COPY_SEGMENT_FINISHED.
Optional<RemoteLogSegmentMetadata> segMetadataForOffset0Epoch0 = remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 0);
Assertions.assertFalse(segMetadataForOffset0Epoch0.isPresent());
// cache.listRemoteLogSegments APIs should contain the above segment.
checkListSegments(remoteLogSegmentLifecycleManager, 0, segmentMetadata);
} finally {
Utils.closeQuietly(remoteLogSegmentLifecycleManager, "RemoteLogSegmentLifecycleManager");
}
}
@ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
@MethodSource("remoteLogSegmentLifecycleManagers")
public void testCacheSegmentWithCopySegmentFinishedState(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager) throws Exception {
try {
remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
// Create a segment and move it to state COPY_SEGMENT_FINISHED. and check for searching that segment and
// listing the segments.
RemoteLogSegmentMetadata segmentMetadata = createSegmentUpdateWithState(remoteLogSegmentLifecycleManager,
Collections.singletonMap(0, 101L),
101L, 200L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
// Search should return the above segment.
Optional<RemoteLogSegmentMetadata> segMetadataForOffset150 = remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 150);
Assertions.assertEquals(Optional.of(segmentMetadata), segMetadataForOffset150);
// cache.listRemoteLogSegments should contain the above segments.
checkListSegments(remoteLogSegmentLifecycleManager, 0, segmentMetadata);
} finally {
Utils.closeQuietly(remoteLogSegmentLifecycleManager, "RemoteLogSegmentLifecycleManager");
}
}
@ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
@MethodSource("remoteLogSegmentLifecycleManagers")
public void testCacheSegmentWithDeleteSegmentStartedState(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager) throws Exception {
try {
remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
// Create a segment and move it to state DELETE_SEGMENT_STARTED, and check for searching that segment and
// listing the segments.
RemoteLogSegmentMetadata segmentMetadata = createSegmentUpdateWithState(remoteLogSegmentLifecycleManager,
Collections.singletonMap(0, 201L),
201L, 300L, RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
// Search should not return the above segment as their leader epoch state is cleared.
Optional<RemoteLogSegmentMetadata> segmentMetadataForOffset250Epoch0 = remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 250);
Assertions.assertFalse(segmentMetadataForOffset250Epoch0.isPresent());
checkListSegments(remoteLogSegmentLifecycleManager, 0, segmentMetadata);
} finally {
Utils.closeQuietly(remoteLogSegmentLifecycleManager, "RemoteLogSegmentLifecycleManager");
}
}
@ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
@MethodSource("remoteLogSegmentLifecycleManagers")
public void testCacheSegmentsWithDeleteSegmentFinishedState(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager) throws Exception {
try {
remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
// Create a segment and move it to state DELETE_SEGMENT_FINISHED, and check for searching that segment and
// listing the segments.
RemoteLogSegmentMetadata segmentMetadata = createSegmentUpdateWithState(remoteLogSegmentLifecycleManager,
Collections.singletonMap(0, 301L),
301L, 400L, RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
// Search should not return the above segment as their leader epoch state is cleared.
Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 350).isPresent());
RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
time.milliseconds(),
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED,
BROKER_ID_1);
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
// listRemoteLogSegments(0) and listRemoteLogSegments() should not contain the above segment.
Assertions.assertFalse(remoteLogSegmentLifecycleManager.listRemoteLogSegments(0).hasNext());
Assertions.assertFalse(remoteLogSegmentLifecycleManager.listAllRemoteLogSegments().hasNext());
} finally {
Utils.closeQuietly(remoteLogSegmentLifecycleManager, "RemoteLogSegmentLifecycleManager");
}
}
@ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
@MethodSource("remoteLogSegmentLifecycleManagers")
public void testCacheListSegments(RemoteLogSegmentLifecycleManager remoteLogSegmentLifecycleManager) throws Exception {
try {
remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
// Create a few segments and add them to the cache.
RemoteLogSegmentMetadata segment0 = createSegmentUpdateWithState(remoteLogSegmentLifecycleManager, Collections.singletonMap(0, 0L), 0,
100,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
RemoteLogSegmentMetadata segment1 = createSegmentUpdateWithState(remoteLogSegmentLifecycleManager, Collections.singletonMap(0, 101L), 101,
200,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
Map<Integer, Long> segment2LeaderEpochs = new HashMap<>();
segment2LeaderEpochs.put(0, 201L);
segment2LeaderEpochs.put(1, 301L);
RemoteLogSegmentMetadata segment2 = createSegmentUpdateWithState(remoteLogSegmentLifecycleManager, segment2LeaderEpochs, 201, 400,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
// listRemoteLogSegments(0) and listAllRemoteLogSegments() should contain all the above segments.
List<RemoteLogSegmentMetadata> expectedSegmentsForEpoch0 = Arrays.asList(segment0, segment1, segment2);
Assertions.assertTrue(TestUtils.sameElementsWithOrder(remoteLogSegmentLifecycleManager.listRemoteLogSegments(0),
expectedSegmentsForEpoch0.iterator()));
Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(remoteLogSegmentLifecycleManager.listAllRemoteLogSegments(),
expectedSegmentsForEpoch0.iterator()));
// listRemoteLogSegments(1) should contain only segment2.
List<RemoteLogSegmentMetadata> expectedSegmentsForEpoch1 = Collections.singletonList(segment2);
Assertions.assertTrue(TestUtils.sameElementsWithOrder(remoteLogSegmentLifecycleManager.listRemoteLogSegments(1),
expectedSegmentsForEpoch1.iterator()));
} finally {
Utils.closeQuietly(remoteLogSegmentLifecycleManager, "RemoteLogSegmentLifecycleManager");
}
}
/**
* This is a wrapper with {@link TopicBasedRemoteLogMetadataManager} implementing {@link RemoteLogSegmentLifecycleManager}.
* This is passed to {@link #testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager)} to test
* {@code RemoteLogMetadataCache} for several lifecycle operations.
* <p>
* This starts a Kafka cluster with {@link #initialize(Set)} with {@link #brokerCount()} no of servers. It also
* creates the remote log metadata topic required for {@code TopicBasedRemoteLogMetadataManager}. This cluster will
* be stopped by invoking {@link #close()}.
*/
static class TopicBasedRemoteLogMetadataManagerWrapper extends TopicBasedRemoteLogMetadataManagerHarness implements RemoteLogSegmentLifecycleManager {
private TopicIdPartition topicIdPartition;
@Override
public synchronized void initialize(TopicIdPartition topicIdPartition) {
this.topicIdPartition = topicIdPartition;
super.initialize(Collections.singleton(topicIdPartition));
}
@Override
public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata) throws RemoteStorageException {
topicBasedRlmm().addRemoteLogSegmentMetadata(segmentMetadata);
}
@Override
public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate) throws RemoteStorageException {
topicBasedRlmm().updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
}
@Override
public Optional<Long> highestOffsetForEpoch(int leaderEpoch) throws RemoteStorageException {
return topicBasedRlmm().highestOffsetForEpoch(topicIdPartition, leaderEpoch);
}
@Override
public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int leaderEpoch,
long offset) throws RemoteStorageException {
return topicBasedRlmm().remoteLogSegmentMetadata(topicIdPartition, leaderEpoch, offset);
}
@Override
public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(int leaderEpoch) throws RemoteStorageException {
return topicBasedRlmm().listRemoteLogSegments(topicIdPartition, leaderEpoch);
}
@Override
public Iterator<RemoteLogSegmentMetadata> listAllRemoteLogSegments() throws RemoteStorageException {
return topicBasedRlmm().listRemoteLogSegments(topicIdPartition);
}
@Override
public void close() throws IOException {
tearDown();
}
@Override
public int brokerCount() {
return 3;
}
}
/**
* This is a wrapper with {@link RemoteLogMetadataCache} implementing {@link RemoteLogSegmentLifecycleManager}.
* This is passed to {@link #testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager)} to test
* {@code RemoteLogMetadataCache} for several lifecycle operations.
*/
static class RemoteLogMetadataCacheWrapper implements RemoteLogSegmentLifecycleManager {
private final RemoteLogMetadataCache metadataCache = new RemoteLogMetadataCache();
@Override
public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate) throws RemoteStorageException {
metadataCache.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
}
@Override
public Optional<Long> highestOffsetForEpoch(int epoch) throws RemoteStorageException {
return metadataCache.highestOffsetForEpoch(epoch);
}
@Override
public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int leaderEpoch,
long offset) throws RemoteStorageException {
return metadataCache.remoteLogSegmentMetadata(leaderEpoch, offset);
}
@Override
public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(int leaderEpoch) throws RemoteResourceNotFoundException {
return metadataCache.listRemoteLogSegments(leaderEpoch);
}
@Override
public Iterator<RemoteLogSegmentMetadata> listAllRemoteLogSegments() {
return metadataCache.listAllRemoteLogSegments();
}
@Override
public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata) throws RemoteStorageException {
metadataCache.addCopyInProgressSegment(segmentMetadata);
}
}
}

142
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java

@ -0,0 +1,142 @@ @@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
public class TopicBasedRemoteLogMetadataManagerConfigTest {
private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerConfigTest.class);
private static final String BOOTSTRAP_SERVERS = "localhost:9091";
@Test
public void testValidConfig() {
Map<String, Object> commonClientConfig = new HashMap<>();
commonClientConfig.put(CommonClientConfigs.RETRIES_CONFIG, 10);
commonClientConfig.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 1000L);
commonClientConfig.put(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 60000L);
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Map<String, Object> props = createValidConfigProps(commonClientConfig, producerConfig, consumerConfig);
// Check for topic properties
TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(props);
Assertions.assertEquals(props.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP), rlmmConfig.metadataTopicPartitionsCount());
// Check for common client configs.
for (Map.Entry<String, Object> entry : commonClientConfig.entrySet()) {
log.info("Checking config: " + entry.getKey());
Assertions.assertEquals(entry.getValue(),
rlmmConfig.producerProperties().get(entry.getKey()));
Assertions.assertEquals(entry.getValue(),
rlmmConfig.consumerProperties().get(entry.getKey()));
}
// Check for producer configs.
for (Map.Entry<String, Object> entry : producerConfig.entrySet()) {
log.info("Checking config: " + entry.getKey());
Assertions.assertEquals(entry.getValue(),
rlmmConfig.producerProperties().get(entry.getKey()));
}
// Check for consumer configs.
for (Map.Entry<String, Object> entry : consumerConfig.entrySet()) {
log.info("Checking config: " + entry.getKey());
Assertions.assertEquals(entry.getValue(),
rlmmConfig.consumerProperties().get(entry.getKey()));
}
}
@Test
public void testProducerConsumerOverridesConfig() {
Map.Entry<String, Long> overrideEntry = new AbstractMap.SimpleImmutableEntry<>(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 60000L);
Map<String, Object> commonClientConfig = new HashMap<>();
commonClientConfig.put(CommonClientConfigs.RETRIES_CONFIG, 10);
commonClientConfig.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 1000L);
commonClientConfig.put(overrideEntry.getKey(), overrideEntry.getValue());
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.ACKS_CONFIG, -1);
Long overriddenProducerPropValue = overrideEntry.getValue() * 2;
producerConfig.put(overrideEntry.getKey(), overriddenProducerPropValue);
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Long overriddenConsumerPropValue = overrideEntry.getValue() * 3;
consumerConfig.put(overrideEntry.getKey(), overriddenConsumerPropValue);
Map<String, Object> props = createValidConfigProps(commonClientConfig, producerConfig, consumerConfig);
TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(props);
Assertions.assertEquals(overriddenProducerPropValue,
rlmmConfig.producerProperties().get(overrideEntry.getKey()));
Assertions.assertEquals(overriddenConsumerPropValue,
rlmmConfig.consumerProperties().get(overrideEntry.getKey()));
}
private Map<String, Object> createValidConfigProps(Map<String, Object> commonClientConfig,
Map<String, Object> producerConfig,
Map<String, Object> consumerConfig) {
Map<String, Object> props = new HashMap<>();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put("broker.id", 1);
props.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, (short) 3);
props.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, 10);
props.put(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, 60 * 60 * 1000L);
// common client configs
for (Map.Entry<String, Object> entry : commonClientConfig.entrySet()) {
props.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + entry.getKey(), entry.getValue());
}
// producer configs
for (Map.Entry<String, Object> entry : producerConfig.entrySet()) {
props.put(REMOTE_LOG_METADATA_PRODUCER_PREFIX + entry.getKey(), entry.getValue());
}
//consumer configs
for (Map.Entry<String, Object> entry : consumerConfig.entrySet()) {
props.put(REMOTE_LOG_METADATA_CONSUMER_PREFIX + entry.getKey(), entry.getValue());
}
return props;
}
}

109
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java

@ -0,0 +1,109 @@ @@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import kafka.api.IntegrationTestHarness;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHarness {
private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerHarness.class);
protected static final int METADATA_TOPIC_PARTITIONS_COUNT = 3;
protected static final short METADATA_TOPIC_REPLICATION_FACTOR = 2;
protected static final long METADATA_TOPIC_RETENTION_MS = 24 * 60 * 60 * 1000L;
private TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager;
protected Map<String, Object> overrideRemoteLogMetadataManagerProps() {
return Collections.emptyMap();
}
public void initialize(Set<TopicIdPartition> topicIdPartitions) {
// Call setup to start the cluster.
super.setUp();
topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager();
// Initialize TopicBasedRemoteLogMetadataManager.
Map<String, Object> configs = new HashMap<>();
configs.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList());
configs.put(BROKER_ID, 0);
configs.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, METADATA_TOPIC_PARTITIONS_COUNT);
configs.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, METADATA_TOPIC_REPLICATION_FACTOR);
configs.put(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, METADATA_TOPIC_RETENTION_MS);
log.debug("TopicBasedRemoteLogMetadataManager configs before adding overridden properties: {}", configs);
// Add override properties.
configs.putAll(overrideRemoteLogMetadataManagerProps());
log.debug("TopicBasedRemoteLogMetadataManager configs after adding overridden properties: {}", configs);
topicBasedRemoteLogMetadataManager.configure(configs);
try {
waitUntilInitialized(60_000);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(topicIdPartitions, Collections.emptySet());
}
// Visible for testing.
public void waitUntilInitialized(long waitTimeMs) throws TimeoutException {
long startMs = System.currentTimeMillis();
while (!topicBasedRemoteLogMetadataManager.isInitialized()) {
long currentTimeMs = System.currentTimeMillis();
if (currentTimeMs > startMs + waitTimeMs) {
throw new TimeoutException("Time out reached before it is initialized successfully");
}
Utils.sleep(100);
}
}
@Override
public int brokerCount() {
return 3;
}
protected TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
return topicBasedRemoteLogMetadataManager;
}
public void close() throws IOException {
Utils.closeQuietly(topicBasedRemoteLogMetadataManager, "TopicBasedRemoteLogMetadataManager");
// Stop the servers and zookeeper.
tearDown();
}
}

141
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java

@ -0,0 +1,141 @@ @@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP;
public class TopicBasedRemoteLogMetadataManagerTest {
private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerTest.class);
private static final int SEG_SIZE = 1024 * 1024;
private final Time time = new MockTime(1);
private final TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness() {
@Override
protected Map<String, Object> overrideRemoteLogMetadataManagerProps() {
return Collections.singletonMap(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP, 5000L);
}
};
@BeforeEach
public void setup() {
// Start the cluster and initialize TopicBasedRemoteLogMetadataManager.
remoteLogMetadataManagerHarness.initialize(Collections.emptySet());
}
@AfterEach
public void teardown() throws IOException {
remoteLogMetadataManagerHarness.close();
}
public TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
return remoteLogMetadataManagerHarness.topicBasedRlmm();
}
@Test
public void testNewPartitionUpdates() throws Exception {
final TopicIdPartition newLeaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("new-leader", 0));
final TopicIdPartition newFollowerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("new-follower", 0));
// Add segments for these partitions but an exception is received as they have not yet been subscribed.
// These messages would have been published to the respective metadata topic partitions but the ConsumerManager
// has not yet been subscribing as they are not yet registered.
RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(newLeaderTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
Assertions.assertThrows(RemoteStorageException.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata));
RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(newFollowerTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
Assertions.assertThrows(RemoteStorageException.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata));
// `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered.
Assertions.assertThrows(RemoteStorageException.class, () -> topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition));
Assertions.assertThrows(RemoteStorageException.class, () -> topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition));
topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(newLeaderTopicIdPartition),
Collections.singleton(newFollowerTopicIdPartition));
// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
waitUntilConsumerCatchesup(newLeaderTopicIdPartition, newFollowerTopicIdPartition, 30_000L);
Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext());
Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext());
}
private void waitUntilConsumerCatchesup(TopicIdPartition newLeaderTopicIdPartition,
TopicIdPartition newFollowerTopicIdPartition,
long timeoutMs) throws TimeoutException {
int leaderMetadataPartition = topicBasedRlmm().metadataPartition(newLeaderTopicIdPartition);
int followerMetadataPartition = topicBasedRlmm().metadataPartition(newFollowerTopicIdPartition);
log.debug("Metadata partition for newLeaderTopicIdPartition: [{}], is: [{}]", newLeaderTopicIdPartition, leaderMetadataPartition);
log.debug("Metadata partition for newFollowerTopicIdPartition: [{}], is: [{}]", newFollowerTopicIdPartition, followerMetadataPartition);
long sleepMs = 100L;
long time = System.currentTimeMillis();
while (true) {
if (System.currentTimeMillis() - time > timeoutMs) {
throw new TimeoutException("Timed out after " + timeoutMs + "ms ");
}
// If both the leader and follower partitions are mapped to the same metadata partition then it should have at least
// 2 messages. That means, received offset should be >= 1 (including duplicate messages if any).
if (leaderMetadataPartition == followerMetadataPartition) {
if (topicBasedRlmm().receivedOffsetForPartition(leaderMetadataPartition).orElse(-1L) >= 1) {
break;
}
} else {
// If the leader partition and the follower partition are mapped to different metadata partitions then
// each of those metadata partitions will have at least 1 message. That means, received offset should
// be >= 0 (including duplicate messages if any).
if (topicBasedRlmm().receivedOffsetForPartition(leaderMetadataPartition).orElse(-1L) >= 0 ||
topicBasedRlmm().receivedOffsetForPartition(followerMetadataPartition).orElse(-1L) >= 0) {
break;
}
}
log.debug("Sleeping for: " + sleepMs);
Utils.sleep(sleepMs);
}
}
}

98
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java

@ -0,0 +1,98 @@ @@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
public class TopicBasedRemoteLogMetadataManagerWrapperWithHarness implements RemoteLogMetadataManager {
private final TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness();
@Override
public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
remoteLogMetadataManagerHarness.topicBasedRlmm().addRemoteLogSegmentMetadata(remoteLogSegmentMetadata);
}
@Override
public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) throws RemoteStorageException {
remoteLogMetadataManagerHarness.topicBasedRlmm().updateRemoteLogSegmentMetadata(remoteLogSegmentMetadataUpdate);
}
@Override
public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
int epochForOffset,
long offset) throws RemoteStorageException {
return remoteLogMetadataManagerHarness.topicBasedRlmm().remoteLogSegmentMetadata(topicIdPartition, epochForOffset, offset);
}
@Override
public Optional<Long> highestOffsetForEpoch(TopicIdPartition topicIdPartition,
int leaderEpoch) throws RemoteStorageException {
return remoteLogMetadataManagerHarness.topicBasedRlmm().highestOffsetForEpoch(topicIdPartition, leaderEpoch);
}
@Override
public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) throws RemoteStorageException {
remoteLogMetadataManagerHarness.topicBasedRlmm().putRemotePartitionDeleteMetadata(remotePartitionDeleteMetadata);
}
@Override
public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition) throws RemoteStorageException {
return remoteLogMetadataManagerHarness.topicBasedRlmm().listRemoteLogSegments(topicIdPartition);
}
@Override
public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition,
int leaderEpoch) throws RemoteStorageException {
return remoteLogMetadataManagerHarness.topicBasedRlmm().listRemoteLogSegments(topicIdPartition, leaderEpoch);
}
@Override
public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
Set<TopicIdPartition> followerPartitions) {
remoteLogMetadataManagerHarness.topicBasedRlmm().onPartitionLeadershipChanges(leaderPartitions, followerPartitions);
}
@Override
public void onStopPartitions(Set<TopicIdPartition> partitions) {
remoteLogMetadataManagerHarness.topicBasedRlmm().onStopPartitions(partitions);
}
@Override
public void close() throws IOException {
remoteLogMetadataManagerHarness.topicBasedRlmm().close();
}
@Override
public void configure(Map<String, ?> configs) {
// This will make sure the cluster is up and TopicBasedRemoteLogMetadataManager is initialized.
remoteLogMetadataManagerHarness.initialize(Collections.emptySet());
remoteLogMetadataManagerHarness.topicBasedRlmm().configure(configs);
}
}

130
storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManagerTest.java

@ -1,130 +0,0 @@ @@ -1,130 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache;
import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCacheTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
/**
* This class covers basic unit tests for {@link InmemoryRemoteLogMetadataManager}. InmemoryRemoteLogMetadataManager is
* used only in integration tests but not in production code. It mostly uses {@link RemoteLogMetadataCache} and it has
* broad test coverage with {@link RemoteLogMetadataCacheTest}.
*/
public class InmemoryRemoteLogMetadataManagerTest {
private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(),
new TopicPartition("foo", 0));
private static final int SEG_SIZE = 1024 * 1024;
private static final int BROKER_ID_0 = 0;
private static final int BROKER_ID_1 = 1;
private final Time time = new MockTime(1);
@Test
public void testFetchSegments() throws Exception {
InmemoryRemoteLogMetadataManager rlmm = new InmemoryRemoteLogMetadataManager();
// 1.Create a segment with state COPY_SEGMENT_STARTED, and this segment should not be available.
Map<Integer, Long> segmentLeaderEpochs = Collections.singletonMap(0, 101L);
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 101L, 200L, -1L, BROKER_ID_0,
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
rlmm.addRemoteLogSegmentMetadata(segmentMetadata);
// Search should not return the above segment.
Assertions.assertFalse(rlmm.remoteLogSegmentMetadata(TP0, 0, 150).isPresent());
// 2.Move that segment to COPY_SEGMENT_FINISHED state and this segment should be available.
RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
rlmm.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
RemoteLogSegmentMetadata expectedSegmentMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate);
// Search should return the above segment.
Optional<RemoteLogSegmentMetadata> segmentMetadataForOffset150 = rlmm.remoteLogSegmentMetadata(TP0, 0, 150);
Assertions.assertEquals(Optional.of(expectedSegmentMetadata), segmentMetadataForOffset150);
}
@Test
public void testRemotePartitionDeletion() throws Exception {
InmemoryRemoteLogMetadataManager rlmm = new InmemoryRemoteLogMetadataManager();
// Create remote log segment metadata and add them to RLMM.
// segment 0
// offsets: [0-100]
// leader epochs (0,0), (1,20), (2,80)
Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
segmentLeaderEpochs.put(0, 0L);
segmentLeaderEpochs.put(1, 20L);
segmentLeaderEpochs.put(2, 50L);
segmentLeaderEpochs.put(3, 80L);
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0L, 100L,
-1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
rlmm.addRemoteLogSegmentMetadata(segmentMetadata);
RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(
segmentId, time.milliseconds(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
rlmm.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
RemoteLogSegmentMetadata expectedSegMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate);
// Check that the segment exists in RLMM.
Optional<RemoteLogSegmentMetadata> segMetadataForOffset30Epoch1 = rlmm.remoteLogSegmentMetadata(TP0, 1, 30L);
Assertions.assertEquals(Optional.of(expectedSegMetadata), segMetadataForOffset30Epoch1);
// Mark the partition for deletion.
rlmm.putRemotePartitionDeleteMetadata(
createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_MARKED));
Optional<RemoteLogSegmentMetadata> segmentMetadataAfterDelMark = rlmm.remoteLogSegmentMetadata(TP0,
1, 30L);
Assertions.assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelMark);
// Set the partition deletion state as started. Partition and segments should still be accessible as they are not
// yet deleted.
rlmm.putRemotePartitionDeleteMetadata(
createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_STARTED));
Optional<RemoteLogSegmentMetadata> segmentMetadataAfterDelStart = rlmm.remoteLogSegmentMetadata(TP0,
1, 30L);
Assertions.assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelStart);
// Set the partition deletion state as finished. RLMM should clear all its internal state for that partition.
rlmm.putRemotePartitionDeleteMetadata(
createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_FINISHED));
Assertions.assertThrows(RemoteResourceNotFoundException.class,
() -> rlmm.remoteLogSegmentMetadata(TP0, 1, 30L));
}
private RemotePartitionDeleteMetadata createRemotePartitionDeleteMetadata(RemotePartitionDeleteState state) {
return new RemotePartitionDeleteMetadata(TP0, state, time.milliseconds(), BROKER_ID_0);
}
}

152
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java

@ -0,0 +1,152 @@ @@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerWrapperWithHarness;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
/**
* This class covers basic tests for {@link RemoteLogMetadataManager} implementations like {@link InmemoryRemoteLogMetadataManager},
* and {@link org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager}.
*/
public class RemoteLogMetadataManagerTest {
private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(),
new TopicPartition("foo", 0));
private static final int SEG_SIZE = 1024 * 1024;
private static final int BROKER_ID_0 = 0;
private static final int BROKER_ID_1 = 1;
private final Time time = new MockTime(1);
@ParameterizedTest(name = "remoteLogMetadataManager = {0}")
@MethodSource("remoteLogMetadataManagers")
public void testFetchSegments(RemoteLogMetadataManager remoteLogMetadataManager) throws Exception {
try {
remoteLogMetadataManager.configure(Collections.emptyMap());
remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(TP0), Collections.emptySet());
// 1.Create a segment with state COPY_SEGMENT_STARTED, and this segment should not be available.
Map<Integer, Long> segmentLeaderEpochs = Collections.singletonMap(0, 101L);
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 101L, 200L, -1L, BROKER_ID_0,
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata);
// Search should not return the above segment.
Assertions.assertFalse(remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 0, 150).isPresent());
// 2.Move that segment to COPY_SEGMENT_FINISHED state and this segment should be available.
RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
BROKER_ID_1);
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
RemoteLogSegmentMetadata expectedSegmentMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate);
// Search should return the above segment.
Optional<RemoteLogSegmentMetadata> segmentMetadataForOffset150 = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 0, 150);
Assertions.assertEquals(Optional.of(expectedSegmentMetadata), segmentMetadataForOffset150);
} finally {
Utils.closeQuietly(remoteLogMetadataManager, "RemoteLogMetadataManager");
}
}
@ParameterizedTest(name = "remoteLogMetadataManager = {0}")
@MethodSource("remoteLogMetadataManagers")
public void testRemotePartitionDeletion(RemoteLogMetadataManager remoteLogMetadataManager) throws Exception {
try {
remoteLogMetadataManager.configure(Collections.emptyMap());
remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(TP0), Collections.emptySet());
// Create remote log segment metadata and add them to RLMM.
// segment 0
// offsets: [0-100]
// leader epochs (0,0), (1,20), (2,80)
Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
segmentLeaderEpochs.put(0, 0L);
segmentLeaderEpochs.put(1, 20L);
segmentLeaderEpochs.put(2, 50L);
segmentLeaderEpochs.put(3, 80L);
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0L, 100L,
-1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE,
segmentLeaderEpochs);
remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata);
RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(
segmentId, time.milliseconds(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
RemoteLogSegmentMetadata expectedSegMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate);
// Check that the segment exists in RLMM.
Optional<RemoteLogSegmentMetadata> segMetadataForOffset30Epoch1 = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L);
Assertions.assertEquals(Optional.of(expectedSegMetadata), segMetadataForOffset30Epoch1);
// Mark the partition for deletion.
remoteLogMetadataManager.putRemotePartitionDeleteMetadata(
createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_MARKED));
Optional<RemoteLogSegmentMetadata> segmentMetadataAfterDelMark = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0,
1, 30L);
Assertions.assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelMark);
// Set the partition deletion state as started. Partition and segments should still be accessible as they are not
// yet deleted.
remoteLogMetadataManager.putRemotePartitionDeleteMetadata(
createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_STARTED));
Optional<RemoteLogSegmentMetadata> segmentMetadataAfterDelStart = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0,
1, 30L);
Assertions.assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelStart);
// Set the partition deletion state as finished. RLMM should clear all its internal state for that partition.
remoteLogMetadataManager.putRemotePartitionDeleteMetadata(
createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_FINISHED));
Assertions.assertThrows(RemoteResourceNotFoundException.class,
() -> remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L));
} finally {
Utils.closeQuietly(remoteLogMetadataManager, "RemoteLogMetadataManager");
}
}
private RemotePartitionDeleteMetadata createRemotePartitionDeleteMetadata(RemotePartitionDeleteState state) {
return new RemotePartitionDeleteMetadata(TP0, state, time.milliseconds(), BROKER_ID_0);
}
private static Collection<Arguments> remoteLogMetadataManagers() {
return Arrays.asList(Arguments.of(new InmemoryRemoteLogMetadataManager()), Arguments.of(new TopicBasedRemoteLogMetadataManagerWrapperWithHarness()));
}
}

1
storage/src/test/resources/log4j.properties

@ -19,3 +19,4 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout @@ -19,3 +19,4 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.org.apache.kafka.server.log.remote.storage=INFO
log4j.logger.org.apache.kafka.server.log.remote.metadata.storage=INFO

Loading…
Cancel
Save