Browse Source

KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions (#14127)

This PR adds the following changes to the `TopicBasedRemoteLogMetadataManager`

1. Added a guard in RemoteLogMetadataCache so that the incoming request can be served from the cache iff the corresponding user-topic-partition is initalized
2. Improve error handling in ConsumerTask thread so that is not killed when there are errors in reading the internal topic
3. ConsumerTask initialization should handle the case when there are no records to read
and some other minor changes

Added Unit Tests for the changes

Co-authored-by: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>

Reviewers: Luke Chen <showuon@gmail.com>, Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>, Christo Lolov <lolovc@amazon.com>, Satish Duggana <satishd@apache.org>
pull/13891/merge
Abhijeet Kumar 1 year ago committed by GitHub
parent
commit
ff3e6842ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 32
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
  2. 546
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
  3. 11
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
  4. 5
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
  5. 19
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
  6. 20
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
  7. 417
      storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
  8. 10
      storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
  9. 178
      storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
  10. 34
      storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
  11. 10
      storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java

32
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java

@ -27,9 +27,7 @@ import org.slf4j.Logger; @@ -27,9 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
@ -41,8 +39,6 @@ import java.util.concurrent.TimeoutException; @@ -41,8 +39,6 @@ import java.util.concurrent.TimeoutException;
*/
public class ConsumerManager implements Closeable {
public static final String COMMITTED_OFFSETS_FILE_NAME = "_rlmm_committed_offsets";
private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class);
private static final long CONSUME_RECHECK_INTERVAL_MS = 50L;
@ -60,15 +56,13 @@ public class ConsumerManager implements Closeable { @@ -60,15 +56,13 @@ public class ConsumerManager implements Closeable {
//Create a task to consume messages and submit the respective events to RemotePartitionMetadataEventHandler.
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(rlmmConfig.consumerProperties());
Path committedOffsetsPath = new File(rlmmConfig.logDir(), COMMITTED_OFFSETS_FILE_NAME).toPath();
consumerTask = new ConsumerTask(
consumer,
rlmmConfig.remoteLogMetadataTopicName(),
remotePartitionMetadataEventHandler,
topicPartitioner,
committedOffsetsPath,
time,
60_000L
remotePartitionMetadataEventHandler,
topicPartitioner,
consumer,
100L,
300_000L,
time
);
consumerTaskThread = KafkaThread.nonDaemon("RLMMConsumerTask", consumerTask);
}
@ -110,7 +104,7 @@ public class ConsumerManager implements Closeable { @@ -110,7 +104,7 @@ public class ConsumerManager implements Closeable {
log.info("Waiting until consumer is caught up with the target partition: [{}]", partition);
// If the current assignment does not have the subscription for this partition then return immediately.
if (!consumerTask.isPartitionAssigned(partition)) {
if (!consumerTask.isMetadataPartitionAssigned(partition)) {
throw new KafkaException("This consumer is not assigned to the target partition " + partition + ". " +
"Partitions currently assigned: " + consumerTask.metadataPartitionsAssigned());
}
@ -119,17 +113,17 @@ public class ConsumerManager implements Closeable { @@ -119,17 +113,17 @@ public class ConsumerManager implements Closeable {
long startTimeMs = time.milliseconds();
while (true) {
log.debug("Checking if partition [{}] is up to date with offset [{}]", partition, offset);
long receivedOffset = consumerTask.receivedOffsetForPartition(partition).orElse(-1L);
if (receivedOffset >= offset) {
long readOffset = consumerTask.readOffsetForMetadataPartition(partition).orElse(-1L);
if (readOffset >= offset) {
return;
}
log.debug("Expected offset [{}] for partition [{}], but the committed offset: [{}], Sleeping for [{}] to retry again",
offset, partition, receivedOffset, consumeCheckIntervalMs);
offset, partition, readOffset, consumeCheckIntervalMs);
if (time.milliseconds() - startTimeMs > timeoutMs) {
log.warn("Expected offset for partition:[{}] is : [{}], but the committed offset: [{}] ",
partition, receivedOffset, offset);
partition, readOffset, offset);
throw new TimeoutException("Timed out in catching up with the expected offset by consumer.");
}
@ -158,7 +152,7 @@ public class ConsumerManager implements Closeable { @@ -158,7 +152,7 @@ public class ConsumerManager implements Closeable {
consumerTask.removeAssignmentsForPartitions(partitions);
}
public Optional<Long> receivedOffsetForPartition(int metadataPartition) {
return consumerTask.receivedOffsetForPartition(metadataPartition);
public Optional<Long> readOffsetForPartition(int metadataPartition) {
return consumerTask.readOffsetForMetadataPartition(metadataPartition);
}
}

546
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java

@ -16,12 +16,13 @@ @@ -16,12 +16,13 @@
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
@ -30,8 +31,6 @@ import org.slf4j.Logger; @@ -30,8 +31,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
@ -64,302 +63,403 @@ import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemo @@ -64,302 +63,403 @@ import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemo
class ConsumerTask implements Runnable, Closeable {
private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class);
private static final long POLL_INTERVAL_MS = 100L;
private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
private final KafkaConsumer<byte[], byte[]> consumer;
private final String metadataTopicName;
private final Consumer<byte[], byte[]> consumer;
private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler;
private final RemoteLogMetadataTopicPartitioner topicPartitioner;
// The timeout for the consumer to poll records from the remote log metadata topic.
private final long pollTimeoutMs;
private final Time time;
// It indicates whether the closing process has been started or not. If it is set as true,
// consumer will stop consuming messages, and it will not allow partition assignments to be updated.
private volatile boolean closing = false;
// It indicates whether the consumer needs to assign the partitions or not. This is set when it is
// determined that the consumer needs to be assigned with the updated partitions.
private volatile boolean assignPartitions = false;
// It indicates whether the ConsumerTask is closed or not.
private volatile boolean isClosed = false;
// It indicates whether the user topic partition assignment to the consumer has changed or not. If the assignment
// has changed, the consumer will eventually start tracking the newly assigned partitions and stop tracking the
// ones it is no longer assigned to.
// The initial value is set to true to wait for partition assignment on the first execution; otherwise thread will
// be busy without actually doing anything
private volatile boolean hasAssignmentChanged = true;
// It represents a lock for any operations related to the assignedTopicPartitions.
private final Object assignPartitionsLock = new Object();
// Remote log metadata topic partitions that consumer is assigned to.
private volatile Set<Integer> assignedMetaPartitions = Collections.emptySet();
private volatile Set<Integer> assignedMetadataPartitions = Collections.emptySet();
// User topic partitions that this broker is a leader/follower for.
private Set<TopicIdPartition> assignedTopicPartitions = Collections.emptySet();
private volatile Map<TopicIdPartition, UserTopicIdPartition> assignedUserTopicIdPartitions = Collections.emptyMap();
private volatile Set<TopicIdPartition> processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
// Map of remote log metadata topic partition to consumed offsets. Received consumer records
// may or may not have been processed based on the assigned topic partitions.
private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>();
private long uninitializedAt;
private boolean isAllUserTopicPartitionsInitialized;
// Map of remote log metadata topic partition to processed offsets that were synced in committedOffsetsFile.
private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = Collections.emptyMap();
// Map of remote log metadata topic partition to consumed offsets.
private final Map<Integer, Long> readOffsetsByMetadataPartition = new ConcurrentHashMap<>();
private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition = new HashMap<>();
private final long committedOffsetSyncIntervalMs;
private CommittedOffsetsFile committedOffsetsFile;
private long lastSyncedTimeMs;
private Map<TopicPartition, StartAndEndOffsetHolder> offsetHolderByMetadataPartition = new HashMap<>();
private boolean hasLastOffsetsFetchFailed = false;
private long lastFailedFetchOffsetsTimestamp;
// The interval between retries to fetch the start and end offsets for the metadata partitions after a failed fetch.
private final long offsetFetchRetryIntervalMs;
public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
String metadataTopicName,
RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
public ConsumerTask(RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
RemoteLogMetadataTopicPartitioner topicPartitioner,
Path committedOffsetsPath,
Time time,
long committedOffsetSyncIntervalMs) {
this.consumer = Objects.requireNonNull(consumer);
this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
Consumer<byte[], byte[]> consumer,
long pollTimeoutMs,
long offsetFetchRetryIntervalMs,
Time time) {
this.consumer = consumer;
this.remotePartitionMetadataEventHandler = Objects.requireNonNull(remotePartitionMetadataEventHandler);
this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
this.pollTimeoutMs = pollTimeoutMs;
this.offsetFetchRetryIntervalMs = offsetFetchRetryIntervalMs;
this.time = Objects.requireNonNull(time);
this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
initializeConsumerAssignment(committedOffsetsPath);
}
private void initializeConsumerAssignment(Path committedOffsetsPath) {
try {
committedOffsetsFile = new CommittedOffsetsFile(committedOffsetsPath.toFile());
} catch (IOException e) {
throw new KafkaException(e);
}
Map<Integer, Long> committedOffsets = Collections.emptyMap();
try {
// Load committed offset and assign them in the consumer.
committedOffsets = committedOffsetsFile.readEntries();
} catch (IOException e) {
// Ignore the error and consumer consumes from the earliest offset.
log.error("Encountered error while building committed offsets from the file. " +
"Consumer will consume from the earliest offset for the assigned partitions.", e);
}
if (!committedOffsets.isEmpty()) {
// Assign topic partitions from the earlier committed offsets file.
Set<Integer> earlierAssignedPartitions = committedOffsets.keySet();
assignedMetaPartitions = Collections.unmodifiableSet(earlierAssignedPartitions);
Set<TopicPartition> metadataTopicPartitions = earlierAssignedPartitions.stream()
.map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
.collect(Collectors.toSet());
consumer.assign(metadataTopicPartitions);
// Seek to the committed offsets
for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) {
log.debug("Updating consumed offset: [{}] for partition [{}]", entry.getValue(), entry.getKey());
partitionToConsumedOffsets.put(entry.getKey(), entry.getValue());
consumer.seek(new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), entry.getValue());
}
lastSyncedPartitionToConsumedOffsets = Collections.unmodifiableMap(committedOffsets);
}
this.uninitializedAt = time.milliseconds();
}
@Override
public void run() {
log.info("Started Consumer task thread.");
lastSyncedTimeMs = time.milliseconds();
try {
while (!closing) {
maybeWaitForPartitionsAssignment();
log.info("Starting consumer task thread.");
while (!isClosed) {
try {
if (hasAssignmentChanged) {
maybeWaitForPartitionAssignments();
}
log.trace("Polling consumer to receive remote log metadata topic records");
ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS));
final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(pollTimeoutMs));
for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
processConsumerRecord(record);
}
maybeSyncCommittedDataAndOffsets(false);
maybeMarkUserPartitionsAsReady();
} catch (final WakeupException ex) {
// ignore logging the error
isClosed = true;
} catch (final RetriableException ex) {
log.warn("Retriable error occurred while processing the records. Retrying...", ex);
} catch (final Exception ex) {
isClosed = true;
log.error("Error occurred while processing the records", ex);
}
} catch (Exception e) {
log.error("Error occurred in consumer task, close:[{}]", closing, e);
} finally {
maybeSyncCommittedDataAndOffsets(true);
closeConsumer();
log.info("Exiting from consumer task thread");
}
try {
consumer.close(Duration.ofSeconds(30));
} catch (final Exception e) {
log.error("Error encountered while closing the consumer", e);
}
log.info("Exited from consumer task thread");
}
private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
// Taking assignPartitionsLock here as updateAssignmentsForPartitions changes assignedTopicPartitions
// and also calls remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition) for the removed
// partitions.
RemoteLogMetadata remoteLogMetadata = serde.deserialize(record.value());
synchronized (assignPartitionsLock) {
if (assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) {
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
} else {
log.debug("This event {} is skipped as the topic partition is not assigned for this instance.", remoteLogMetadata);
}
log.debug("Updating consumed offset: [{}] for partition [{}]", record.offset(), record.partition());
partitionToConsumedOffsets.put(record.partition(), record.offset());
final RemoteLogMetadata remoteLogMetadata = serde.deserialize(record.value());
if (shouldProcess(remoteLogMetadata, record.offset())) {
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(), record.offset());
} else {
log.debug("The event {} is skipped because it is either already processed or not assigned to this consumer", remoteLogMetadata);
}
log.debug("Updating consumed offset: [{}] for partition [{}]", record.offset(), record.partition());
readOffsetsByMetadataPartition.put(record.partition(), record.offset());
}
private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
// Return immediately if there is no consumption from last time.
boolean noConsumedOffsetUpdates = partitionToConsumedOffsets.equals(lastSyncedPartitionToConsumedOffsets);
if (noConsumedOffsetUpdates || !forceSync && time.milliseconds() - lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
log.debug("Skip syncing committed offsets, noConsumedOffsetUpdates: {}, forceSync: {}", noConsumedOffsetUpdates, forceSync);
private boolean shouldProcess(final RemoteLogMetadata metadata, final long recordOffset) {
final TopicIdPartition tpId = metadata.topicIdPartition();
final Long readOffset = readOffsetsByUserTopicPartition.get(tpId);
return processedAssignmentOfUserTopicIdPartitions.contains(tpId) && (readOffset == null || readOffset < recordOffset);
}
private void maybeMarkUserPartitionsAsReady() {
if (isAllUserTopicPartitionsInitialized) {
return;
}
try {
// Need to take lock on assignPartitionsLock as assignedTopicPartitions might
// get updated by other threads.
synchronized (assignPartitionsLock) {
for (TopicIdPartition topicIdPartition : assignedTopicPartitions) {
int metadataPartition = topicPartitioner.metadataPartition(topicIdPartition);
Long offset = partitionToConsumedOffsets.get(metadataPartition);
if (offset != null) {
remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, metadataPartition, offset);
maybeFetchStartAndEndOffsets();
boolean isAllInitialized = true;
for (final UserTopicIdPartition utp : assignedUserTopicIdPartitions.values()) {
if (utp.isAssigned && !utp.isInitialized) {
final Integer metadataPartition = utp.metadataPartition;
final StartAndEndOffsetHolder holder = offsetHolderByMetadataPartition.get(toRemoteLogPartition(metadataPartition));
// The offset-holder can be null, when the recent assignment wasn't picked up by the consumer.
if (holder != null) {
final Long readOffset = readOffsetsByMetadataPartition.getOrDefault(metadataPartition, -1L);
// 1) The end-offset was fetched only once during reassignment. The metadata-partition can receive
// new stream of records, so the consumer can read records more than the last-fetched end-offset.
// 2) When the internal topic becomes empty due to breach by size/time/start-offset, then there
// are no records to read.
if (readOffset + 1 >= holder.endOffset || holder.endOffset.equals(holder.startOffset)) {
markInitialized(utp);
} else {
log.debug("Skipping sync-up of the remote-log-metadata-file for partition: [{}] , with remote log metadata partition{}, and no offset",
topicIdPartition, metadataPartition);
log.debug("The user-topic-partition {} could not be marked initialized since the read-offset is {} " +
"but the end-offset is {} for the metadata-partition {}", utp, readOffset, holder.endOffset,
metadataPartition);
}
} else {
log.debug("The offset-holder is null for the metadata-partition {}. The consumer may not have picked" +
" up the recent assignment", metadataPartition);
}
// Write partitionToConsumedOffsets into committed offsets file as we do not want to process them again
// in case of restarts.
committedOffsetsFile.writeEntries(partitionToConsumedOffsets);
lastSyncedPartitionToConsumedOffsets = new HashMap<>(partitionToConsumedOffsets);
}
lastSyncedTimeMs = time.milliseconds();
} catch (IOException e) {
throw new KafkaException("Error encountered while writing committed offsets to a local file", e);
isAllInitialized = isAllInitialized && utp.isAssigned && utp.isInitialized;
}
}
private void closeConsumer() {
log.info("Closing the consumer instance");
try {
consumer.close(Duration.ofSeconds(30));
} catch (Exception e) {
log.error("Error encountered while closing the consumer", e);
if (isAllInitialized) {
log.info("Initialized for all the {} assigned user-partitions mapped to the {} meta-partitions in {} ms",
assignedUserTopicIdPartitions.size(), assignedMetadataPartitions.size(),
time.milliseconds() - uninitializedAt);
}
isAllUserTopicPartitionsInitialized = isAllInitialized;
}
private void maybeWaitForPartitionsAssignment() {
Set<Integer> assignedMetaPartitionsSnapshot = Collections.emptySet();
void maybeWaitForPartitionAssignments() throws InterruptedException {
// Snapshots of the metadata-partition and user-topic-partition are used to reduce the scope of the
// synchronization block.
// 1) LEADER_AND_ISR and STOP_REPLICA requests adds / removes the user-topic-partitions from the request
// handler threads. Those threads should not be blocked for a long time, therefore scope of the
// synchronization block is reduced to bare minimum.
// 2) Note that the consumer#position, consumer#seekToBeginning, consumer#seekToEnd and the other consumer APIs
// response times are un-predictable. Those should not be kept in the synchronization block.
final Set<Integer> metadataPartitionSnapshot = new HashSet<>();
final Set<UserTopicIdPartition> assignedUserTopicIdPartitionsSnapshot = new HashSet<>();
synchronized (assignPartitionsLock) {
// If it is closing, return immediately. This should be inside the assignPartitionsLock as the closing is updated
// in close() method with in the same lock to avoid any race conditions.
if (closing) {
return;
while (!isClosed && assignedUserTopicIdPartitions.isEmpty()) {
log.debug("Waiting for remote log metadata partitions to be assigned");
assignPartitionsLock.wait();
}
while (assignedMetaPartitions.isEmpty()) {
// If no partitions are assigned, wait until they are assigned.
log.debug("Waiting for assigned remote log metadata partitions..");
try {
// No timeout is set here, as it is always notified. Even when it is closed, the race can happen
// between the thread calling this method and the thread calling close(). We should have a check
// for closing as that might have been set and notified with assignPartitionsLock by `close`
// method.
assignPartitionsLock.wait();
if (closing) {
return;
}
} catch (InterruptedException e) {
throw new KafkaException(e);
}
}
if (assignPartitions) {
assignedMetaPartitionsSnapshot = new HashSet<>(assignedMetaPartitions);
// Removing unassigned meta partitions from partitionToConsumedOffsets and partitionToCommittedOffsets
partitionToConsumedOffsets.entrySet().removeIf(entry -> !assignedMetaPartitions.contains(entry.getKey()));
assignPartitions = false;
if (!isClosed && hasAssignmentChanged) {
assignedUserTopicIdPartitions.values().forEach(utp -> {
metadataPartitionSnapshot.add(utp.metadataPartition);
assignedUserTopicIdPartitionsSnapshot.add(utp);
});
hasAssignmentChanged = false;
}
}
if (!assignedMetaPartitionsSnapshot.isEmpty()) {
executeReassignment(assignedMetaPartitionsSnapshot);
if (!metadataPartitionSnapshot.isEmpty()) {
final Set<TopicPartition> remoteLogPartitions = toRemoteLogPartitions(metadataPartitionSnapshot);
consumer.assign(remoteLogPartitions);
this.assignedMetadataPartitions = Collections.unmodifiableSet(metadataPartitionSnapshot);
// for newly assigned user-partitions, read from the beginning of the corresponding metadata partition
final Set<TopicPartition> seekToBeginOffsetPartitions = assignedUserTopicIdPartitionsSnapshot
.stream()
.filter(utp -> !utp.isAssigned)
.map(utp -> toRemoteLogPartition(utp.metadataPartition))
.collect(Collectors.toSet());
consumer.seekToBeginning(seekToBeginOffsetPartitions);
// for other metadata partitions, read from the offset where the processing left last time.
remoteLogPartitions.stream()
.filter(tp -> !seekToBeginOffsetPartitions.contains(tp) &&
readOffsetsByMetadataPartition.containsKey(tp.partition()))
.forEach(tp -> consumer.seek(tp, readOffsetsByMetadataPartition.get(tp.partition())));
Set<TopicIdPartition> processedAssignmentPartitions = new HashSet<>();
// mark all the user-topic-partitions as assigned to the consumer.
assignedUserTopicIdPartitionsSnapshot.forEach(utp -> {
if (!utp.isAssigned) {
// Note that there can be a race between `remove` and `add` partition assignment. Calling the
// `maybeLoadPartition` here again to be sure that the partition gets loaded on the handler.
remotePartitionMetadataEventHandler.maybeLoadPartition(utp.topicIdPartition);
utp.isAssigned = true;
}
processedAssignmentPartitions.add(utp.topicIdPartition);
});
processedAssignmentOfUserTopicIdPartitions = new HashSet<>(processedAssignmentPartitions);
clearResourcesForUnassignedUserTopicPartitions(processedAssignmentPartitions);
isAllUserTopicPartitionsInitialized = false;
uninitializedAt = time.milliseconds();
fetchStartAndEndOffsets();
}
}
private void executeReassignment(Set<Integer> assignedMetaPartitionsSnapshot) {
Set<TopicPartition> assignedMetaTopicPartitions =
assignedMetaPartitionsSnapshot.stream()
.map(partitionNum -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
.collect(Collectors.toSet());
log.info("Reassigning partitions to consumer task [{}]", assignedMetaTopicPartitions);
consumer.assign(assignedMetaTopicPartitions);
private void clearResourcesForUnassignedUserTopicPartitions(Set<TopicIdPartition> assignedPartitions) {
// Note that there can be previously assigned user-topic-partitions where no records are there to read
// (eg) none of the segments for a partition were uploaded. Those partition resources won't be cleared.
// It can be fixed later when required since they are empty resources.
Set<TopicIdPartition> unassignedPartitions = readOffsetsByUserTopicPartition.keySet()
.stream()
.filter(e -> !assignedPartitions.contains(e))
.collect(Collectors.toSet());
unassignedPartitions.forEach(unassignedPartition -> {
remotePartitionMetadataEventHandler.clearTopicPartition(unassignedPartition);
readOffsetsByUserTopicPartition.remove(unassignedPartition);
});
log.info("Unassigned user-topic-partitions: {}", unassignedPartitions.size());
}
public void addAssignmentsForPartitions(final Set<TopicIdPartition> partitions) {
updateAssignments(Objects.requireNonNull(partitions), Collections.emptySet());
}
public void removeAssignmentsForPartitions(final Set<TopicIdPartition> partitions) {
updateAssignments(Collections.emptySet(), Objects.requireNonNull(partitions));
}
public void addAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
updateAssignmentsForPartitions(partitions, Collections.emptySet());
private void updateAssignments(final Set<TopicIdPartition> addedPartitions,
final Set<TopicIdPartition> removedPartitions) {
log.info("Updating assignments for partitions added: {} and removed: {}", addedPartitions, removedPartitions);
if (!addedPartitions.isEmpty() || !removedPartitions.isEmpty()) {
synchronized (assignPartitionsLock) {
// Make a copy of the existing assignments and update the copy.
final Map<TopicIdPartition, UserTopicIdPartition> updatedUserPartitions = new HashMap<>(assignedUserTopicIdPartitions);
addedPartitions.forEach(tpId -> updatedUserPartitions.putIfAbsent(tpId, newUserTopicIdPartition(tpId)));
removedPartitions.forEach(updatedUserPartitions::remove);
if (!updatedUserPartitions.equals(assignedUserTopicIdPartitions)) {
assignedUserTopicIdPartitions = Collections.unmodifiableMap(updatedUserPartitions);
hasAssignmentChanged = true;
log.debug("Assigned user-topic-partitions: {}", assignedUserTopicIdPartitions);
assignPartitionsLock.notifyAll();
}
}
}
}
public void removeAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
updateAssignmentsForPartitions(Collections.emptySet(), partitions);
public Optional<Long> readOffsetForMetadataPartition(final int partition) {
return Optional.ofNullable(readOffsetsByMetadataPartition.get(partition));
}
private void updateAssignmentsForPartitions(Set<TopicIdPartition> addedPartitions,
Set<TopicIdPartition> removedPartitions) {
log.info("Updating assignments for addedPartitions: {} and removedPartition: {}", addedPartitions, removedPartitions);
public boolean isMetadataPartitionAssigned(final int partition) {
return assignedMetadataPartitions.contains(partition);
}
Objects.requireNonNull(addedPartitions, "addedPartitions must not be null");
Objects.requireNonNull(removedPartitions, "removedPartitions must not be null");
public boolean isUserPartitionAssigned(final TopicIdPartition partition) {
final UserTopicIdPartition utp = assignedUserTopicIdPartitions.get(partition);
return utp != null && utp.isAssigned;
}
if (addedPartitions.isEmpty() && removedPartitions.isEmpty()) {
return;
@Override
public void close() {
if (!isClosed) {
log.info("Closing the instance");
synchronized (assignPartitionsLock) {
isClosed = true;
assignedUserTopicIdPartitions.values().forEach(this::markInitialized);
consumer.wakeup();
assignPartitionsLock.notifyAll();
}
}
}
synchronized (assignPartitionsLock) {
Set<TopicIdPartition> updatedReassignedPartitions = new HashSet<>(assignedTopicPartitions);
updatedReassignedPartitions.addAll(addedPartitions);
updatedReassignedPartitions.removeAll(removedPartitions);
Set<Integer> updatedAssignedMetaPartitions = new HashSet<>();
for (TopicIdPartition tp : updatedReassignedPartitions) {
updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp));
}
public Set<Integer> metadataPartitionsAssigned() {
return Collections.unmodifiableSet(assignedMetadataPartitions);
}
private void fetchStartAndEndOffsets() {
try {
final Set<TopicPartition> uninitializedPartitions = assignedUserTopicIdPartitions.values().stream()
.filter(utp -> utp.isAssigned && !utp.isInitialized)
.map(utp -> toRemoteLogPartition(utp.metadataPartition))
.collect(Collectors.toSet());
// Removing the previous offset holder if it exists. During reassignment, if the list-offset
// call to `earliest` and `latest` offset fails, then we should not use the previous values.
uninitializedPartitions.forEach(tp -> offsetHolderByMetadataPartition.remove(tp));
if (!uninitializedPartitions.isEmpty()) {
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(uninitializedPartitions);
Map<TopicPartition, Long> startOffsets = consumer.beginningOffsets(uninitializedPartitions);
offsetHolderByMetadataPartition = endOffsets.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey,
e -> new StartAndEndOffsetHolder(startOffsets.get(e.getKey()), e.getValue())));
// Clear removed topic partitions from in-memory cache.
for (TopicIdPartition removedPartition : removedPartitions) {
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition);
}
hasLastOffsetsFetchFailed = false;
} catch (final RetriableException ex) {
// ignore LEADER_NOT_AVAILABLE error, this can happen when the partition leader is not yet assigned.
hasLastOffsetsFetchFailed = true;
lastFailedFetchOffsetsTimestamp = time.milliseconds();
}
}
assignedTopicPartitions = Collections.unmodifiableSet(updatedReassignedPartitions);
log.debug("Assigned topic partitions: {}", assignedTopicPartitions);
private void maybeFetchStartAndEndOffsets() {
// If the leader for a `__remote_log_metadata` partition is not available, then the call to `ListOffsets`
// will fail after the default timeout of 1 min. Added a delay between the retries to prevent the thread from
// aggressively fetching the list offsets. During this time, the recently reassigned user-topic-partitions
// won't be marked as initialized.
if (hasLastOffsetsFetchFailed && lastFailedFetchOffsetsTimestamp + offsetFetchRetryIntervalMs < time.milliseconds()) {
fetchStartAndEndOffsets();
}
}
if (!updatedAssignedMetaPartitions.equals(assignedMetaPartitions)) {
assignedMetaPartitions = Collections.unmodifiableSet(updatedAssignedMetaPartitions);
log.debug("Assigned metadata topic partitions: {}", assignedMetaPartitions);
private UserTopicIdPartition newUserTopicIdPartition(final TopicIdPartition tpId) {
return new UserTopicIdPartition(tpId, topicPartitioner.metadataPartition(tpId));
}
assignPartitions = true;
assignPartitionsLock.notifyAll();
} else {
log.debug("No change in assigned metadata topic partitions: {}", assignedMetaPartitions);
}
private void markInitialized(final UserTopicIdPartition utp) {
// Silently not initialize the utp
if (!utp.isAssigned) {
log.warn("Tried to initialize a UTP: {} that was not yet assigned!", utp);
return;
}
if (!utp.isInitialized) {
remotePartitionMetadataEventHandler.markInitialized(utp.topicIdPartition);
utp.isInitialized = true;
}
}
public Optional<Long> receivedOffsetForPartition(int partition) {
return Optional.ofNullable(partitionToConsumedOffsets.get(partition));
static Set<TopicPartition> toRemoteLogPartitions(final Set<Integer> partitions) {
return partitions.stream()
.map(ConsumerTask::toRemoteLogPartition)
.collect(Collectors.toSet());
}
public boolean isPartitionAssigned(int partition) {
return assignedMetaPartitions.contains(partition);
static TopicPartition toRemoteLogPartition(int partition) {
return new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partition);
}
public void close() {
if (!closing) {
synchronized (assignPartitionsLock) {
// Closing should be updated only after acquiring the lock to avoid race in
// maybeWaitForPartitionsAssignment() where it waits on assignPartitionsLock. It should not wait
// if the closing is already set.
closing = true;
consumer.wakeup();
assignPartitionsLock.notifyAll();
}
static class UserTopicIdPartition {
private final TopicIdPartition topicIdPartition;
private final Integer metadataPartition;
// The `utp` will be initialized once it reads all the existing events from the remote log metadata topic.
boolean isInitialized;
// denotes whether this `utp` is assigned to the consumer
boolean isAssigned;
/**
* UserTopicIdPartition denotes the user topic-partitions for which this broker acts as a leader/follower of.
*
* @param tpId the unique topic partition identifier
* @param metadataPartition the remote log metadata partition mapped for this user-topic-partition.
*/
public UserTopicIdPartition(final TopicIdPartition tpId, final Integer metadataPartition) {
this.topicIdPartition = Objects.requireNonNull(tpId);
this.metadataPartition = Objects.requireNonNull(metadataPartition);
this.isInitialized = false;
this.isAssigned = false;
}
@Override
public String toString() {
return "UserTopicIdPartition{" +
"topicIdPartition=" + topicIdPartition +
", metadataPartition=" + metadataPartition +
", isInitialized=" + isInitialized +
", isAssigned=" + isAssigned +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UserTopicIdPartition that = (UserTopicIdPartition) o;
return topicIdPartition.equals(that.topicIdPartition) && metadataPartition.equals(that.metadataPartition);
}
@Override
public int hashCode() {
return Objects.hash(topicIdPartition, metadataPartition);
}
}
public Set<Integer> metadataPartitionsAssigned() {
return Collections.unmodifiableSet(assignedMetaPartitions);
static class StartAndEndOffsetHolder {
Long startOffset;
Long endOffset;
public StartAndEndOffsetHolder(Long startOffset, Long endOffset) {
this.startOffset = startOffset;
this.endOffset = endOffset;
}
@Override
public String toString() {
return "StartAndEndOffsetHolder{" +
"startOffset=" + startOffset +
", endOffset=" + endOffset +
'}';
}
}
}
}

11
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java

@ -32,6 +32,7 @@ import java.util.Objects; @@ -32,6 +32,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
/**
* This class provides an in-memory cache of remote log segment metadata. This maintains the lineage of segments
@ -104,6 +105,16 @@ public class RemoteLogMetadataCache { @@ -104,6 +105,16 @@ public class RemoteLogMetadataCache {
// https://issues.apache.org/jira/browse/KAFKA-12641
protected final ConcurrentMap<Integer, RemoteLogLeaderEpochState> leaderEpochEntries = new ConcurrentHashMap<>();
private final CountDownLatch initializedLatch = new CountDownLatch(1);
public void markInitialized() {
initializedLatch.countDown();
}
public boolean isInitialized() {
return initializedLatch.getCount() == 0;
}
/**
* Returns {@link RemoteLogSegmentMetadata} if it exists for the given leader-epoch containing the offset and with
* {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED} state, else returns {@link Optional#empty()}.

5
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java

@ -50,4 +50,9 @@ public abstract class RemotePartitionMetadataEventHandler { @@ -50,4 +50,9 @@ public abstract class RemotePartitionMetadataEventHandler {
public abstract void clearTopicPartition(TopicIdPartition topicIdPartition);
public abstract void markInitialized(TopicIdPartition partition);
public abstract boolean isInitialized(TopicIdPartition partition);
public abstract void maybeLoadPartition(TopicIdPartition partition);
}

19
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java

@ -18,6 +18,7 @@ package org.apache.kafka.server.log.remote.metadata.storage; @@ -18,6 +18,7 @@ package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
@ -151,6 +152,12 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa @@ -151,6 +152,12 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa
throw new RemoteResourceNotFoundException("No resource found for partition: " + topicIdPartition);
}
if (!remoteLogMetadataCache.isInitialized()) {
// Throwing a retriable ReplicaNotAvailableException here for clients retry. We can introduce a new more
// appropriate exception with a KIP in the future.
throw new ReplicaNotAvailableException("Remote log metadata cache is not initialized for partition: " + topicIdPartition);
}
return remoteLogMetadataCache;
}
@ -180,9 +187,21 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa @@ -180,9 +187,21 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa
idToRemoteLogMetadataCache = Collections.emptyMap();
}
@Override
public void maybeLoadPartition(TopicIdPartition partition) {
idToRemoteLogMetadataCache.computeIfAbsent(partition,
topicIdPartition -> new FileBasedRemoteLogMetadataCache(topicIdPartition, partitionLogDirectory(topicIdPartition.topicPartition())));
}
@Override
public void markInitialized(TopicIdPartition partition) {
idToRemoteLogMetadataCache.get(partition).markInitialized();
log.trace("Remote log components are initialized for user-partition: {}", partition);
}
@Override
public boolean isInitialized(TopicIdPartition topicIdPartition) {
RemoteLogMetadataCache metadataCache = idToRemoteLogMetadataCache.get(topicIdPartition);
return metadataCache != null && metadataCache.isInitialized();
}
}

20
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java

@ -84,7 +84,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana @@ -84,7 +84,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
private RemotePartitionMetadataStore remotePartitionMetadataStore;
private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
private volatile RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner;
private volatile RemoteLogMetadataTopicPartitioner rlmTopicPartitioner;
private final Set<TopicIdPartition> pendingAssignPartitions = Collections.synchronizedSet(new HashSet<>());
private volatile boolean initializationFailed;
@ -260,12 +260,12 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana @@ -260,12 +260,12 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
}
public int metadataPartition(TopicIdPartition topicIdPartition) {
return rlmmTopicPartitioner.metadataPartition(topicIdPartition);
return rlmTopicPartitioner.metadataPartition(topicIdPartition);
}
// Visible For Testing
public Optional<Long> receivedOffsetForPartition(int metadataPartition) {
return consumerManager.receivedOffsetForPartition(metadataPartition);
public Optional<Long> readOffsetForPartition(int metadataPartition) {
return consumerManager.readOffsetForPartition(metadataPartition);
}
@Override
@ -357,7 +357,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana @@ -357,7 +357,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
log.info("Started configuring topic-based RLMM with configs: {}", configs);
rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
rlmmTopicPartitioner = new RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
rlmTopicPartitioner = new RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
remotePartitionMetadataStore = new RemotePartitionMetadataStore(new File(rlmmConfig.logDir()).toPath());
configured = true;
log.info("Successfully configured topic-based RLMM with config: {}", rlmmConfig);
@ -416,8 +416,8 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana @@ -416,8 +416,8 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
// Create producer and consumer managers.
lock.writeLock().lock();
try {
producerManager = new ProducerManager(rlmmConfig, rlmmTopicPartitioner);
consumerManager = new ConsumerManager(rlmmConfig, remotePartitionMetadataStore, rlmmTopicPartitioner, time);
producerManager = new ProducerManager(rlmmConfig, rlmTopicPartitioner);
consumerManager = new ConsumerManager(rlmmConfig, remotePartitionMetadataStore, rlmTopicPartitioner, time);
if (startConsumerThread) {
consumerManager.startConsumerThread();
} else {
@ -509,10 +509,8 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana @@ -509,10 +509,8 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
}
// Visible for testing.
public void startConsumerThread() {
if (consumerManager != null) {
consumerManager.startConsumerThread();
}
void setRlmTopicPartitioner(RemoteLogMetadataTopicPartitioner rlmTopicPartitioner) {
this.rlmTopicPartitioner = Objects.requireNonNull(rlmTopicPartitioner);
}
@Override

417
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java

@ -0,0 +1,417 @@ @@ -0,0 +1,417 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class ConsumerTaskTest {
private final int numMetadataTopicPartitions = 5;
private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
private final DummyEventHandler handler = new DummyEventHandler();
private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed()
.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
private final Uuid topicId = Uuid.randomUuid();
private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
private ConsumerTask consumerTask;
private MockConsumer<byte[], byte[]> consumer;
private Thread thread;
@BeforeEach
public void beforeEach() {
final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream()
.collect(Collectors.toMap(Function.identity(), e -> 0L));
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
consumer.updateBeginningOffsets(offsets);
consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 300_000L, new SystemTime());
thread = new Thread(consumerTask);
}
@AfterEach
public void afterEach() throws InterruptedException {
if (thread != null) {
assertDoesNotThrow(() -> consumerTask.close(), "Close method threw exception");
thread.join(10_000);
assertFalse(thread.isAlive(), "Consumer task thread is still alive");
}
}
/**
* Tests that the consumer task shuts down gracefully when there were no assignments.
*/
@Test
public void testCloseOnNoAssignment() throws InterruptedException {
thread.start();
Thread.sleep(10);
assertDoesNotThrow(() -> consumerTask.close(), "Close method threw exception");
}
@Test
public void testIdempotentClose() {
thread.start();
consumerTask.close();
consumerTask.close();
}
@Test
public void testUserTopicIdPartitionEquals() {
final TopicIdPartition tpId = new TopicIdPartition(topicId, new TopicPartition("sample", 0));
final UserTopicIdPartition utp1 = new UserTopicIdPartition(tpId, partitioner.metadataPartition(tpId));
final UserTopicIdPartition utp2 = new UserTopicIdPartition(tpId, partitioner.metadataPartition(tpId));
utp1.isInitialized = true;
utp1.isAssigned = true;
assertFalse(utp2.isInitialized);
assertFalse(utp2.isAssigned);
assertEquals(utp1, utp2);
}
@Test
public void testAddAssignmentsForPartitions() throws InterruptedException {
final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 3);
final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
.map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp)))
.collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b));
consumer.updateEndOffsets(endOffsets);
consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
thread.start();
for (final TopicIdPartition idPartition : idPartitions) {
TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + idPartition + " to be assigned");
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
assertTrue(handler.isPartitionLoaded.get(idPartition));
}
}
@Test
public void testRemoveAssignmentsForPartitions() throws InterruptedException {
final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 3);
final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
.map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp)))
.collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b));
consumer.updateEndOffsets(endOffsets);
consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
thread.start();
final TopicIdPartition tpId = allPartitions.get(0);
TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + " to be assigned");
addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
"Couldn't read record");
final Set<TopicIdPartition> removePartitions = Collections.singleton(tpId);
consumerTask.removeAssignmentsForPartitions(removePartitions);
for (final TopicIdPartition idPartition : allPartitions) {
final TestCondition condition = () -> removePartitions.contains(idPartition) == !consumerTask.isUserPartitionAssigned(idPartition);
TestUtils.waitForCondition(condition, "Timed out waiting for " + idPartition + " to be removed");
}
for (TopicIdPartition removePartition : removePartitions) {
TestUtils.waitForCondition(() -> handler.isPartitionCleared.containsKey(removePartition),
"Timed out waiting for " + removePartition + " to be cleared");
}
}
@Test
public void testConcurrentPartitionAssignments() throws InterruptedException, ExecutionException {
final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 100);
final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
.map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp)))
.collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b));
consumer.updateEndOffsets(endOffsets);
final AtomicBoolean isAllPartitionsAssigned = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
Thread assignor = new Thread(() -> {
int partitionsAssigned = 0;
for (TopicIdPartition partition : allPartitions) {
if (partitionsAssigned == 50) {
// Once half the topic partitions are assigned, wait for the consumer to catch up. This ensures
// that the consumer is already running when the rest of the partitions are assigned.
try {
latch.await(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
fail(e.getMessage());
}
}
consumerTask.addAssignmentsForPartitions(Collections.singleton(partition));
partitionsAssigned++;
}
isAllPartitionsAssigned.set(true);
});
Runnable consumerRunnable = () -> {
try {
while (!isAllPartitionsAssigned.get()) {
consumerTask.maybeWaitForPartitionAssignments();
latch.countDown();
}
} catch (Exception e) {
fail(e.getMessage());
}
};
ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();
Future<?> future = consumerExecutor.submit(consumerRunnable);
assignor.start();
assignor.join();
future.get();
}
@Test
public void testCanProcessRecord() throws InterruptedException {
final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg");
final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new TopicPartition("sample", 0));
final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new TopicPartition("sample", 1));
final TopicIdPartition tpId2 = new TopicIdPartition(topicId, new TopicPartition("sample", 2));
assertEquals(partitioner.metadataPartition(tpId0), partitioner.metadataPartition(tpId1));
assertEquals(partitioner.metadataPartition(tpId0), partitioner.metadataPartition(tpId2));
final int metadataPartition = partitioner.metadataPartition(tpId0);
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 0L));
final Set<TopicIdPartition> assignments = Collections.singleton(tpId0);
consumerTask.addAssignmentsForPartitions(assignments);
thread.start();
TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId0), "Timed out waiting for " + tpId0 + " to be assigned");
addRecord(consumer, metadataPartition, tpId0, 0);
addRecord(consumer, metadataPartition, tpId0, 1);
TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), "Couldn't read record");
assertEquals(2, handler.metadataCounter);
// should only read the tpId1 records
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 + " to be assigned");
addRecord(consumer, metadataPartition, tpId1, 2);
TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(2L)), "Couldn't read record");
assertEquals(3, handler.metadataCounter);
// shouldn't read tpId2 records because it's not assigned
addRecord(consumer, metadataPartition, tpId2, 3);
TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(3L)), "Couldn't read record");
assertEquals(3, handler.metadataCounter);
}
@Test
public void testMaybeMarkUserPartitionsAsReady() throws InterruptedException {
final TopicIdPartition tpId = getIdPartitions("hello", 1).get(0);
final int metadataPartition = partitioner.metadataPartition(tpId);
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 2L));
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
thread.start();
TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be assigned");
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
assertFalse(handler.isPartitionInitialized.containsKey(tpId));
IntStream.range(0, 5).forEach(offset -> addRecord(consumer, metadataPartition, tpId, offset));
TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(4L)), "Couldn't read record");
assertTrue(handler.isPartitionInitialized.get(tpId));
}
@ParameterizedTest
@CsvSource(value = {"0, 0", "500, 500"})
public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long beginOffset,
long endOffset) throws InterruptedException {
final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
final int metadataPartition = partitioner.metadataPartition(tpId);
consumer.updateBeginningOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), beginOffset));
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), endOffset));
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
thread.start();
TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be assigned");
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
TestUtils.waitForCondition(() -> handler.isPartitionInitialized.containsKey(tpId),
"should have initialized the partition");
assertFalse(consumerTask.readOffsetForMetadataPartition(metadataPartition).isPresent());
}
@Test
public void testConcurrentAccess() throws InterruptedException {
thread.start();
final CountDownLatch latch = new CountDownLatch(1);
final TopicIdPartition tpId = getIdPartitions("concurrent", 1).get(0);
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(partitioner.metadataPartition(tpId)), 0L));
final Thread assignmentThread = new Thread(() -> {
try {
latch.await();
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
} catch (final InterruptedException e) {
fail("Shouldn't have thrown an exception");
}
});
final Thread closeThread = new Thread(() -> {
try {
latch.await();
consumerTask.close();
} catch (final InterruptedException e) {
fail("Shouldn't have thrown an exception");
}
});
assignmentThread.start();
closeThread.start();
latch.countDown();
assignmentThread.join();
closeThread.join();
}
@Test
public void testConsumerShouldNotCloseOnRetriableError() throws InterruptedException {
final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
final int metadataPartition = partitioner.metadataPartition(tpId);
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 1L));
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
thread.start();
TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be assigned");
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
consumer.setPollException(new LeaderNotAvailableException("leader not available!"));
addRecord(consumer, metadataPartition, tpId, 0);
consumer.setPollException(new TimeoutException("Not able to complete the operation within the timeout"));
addRecord(consumer, metadataPartition, tpId, 1);
TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), "Couldn't read record");
assertEquals(2, handler.metadataCounter);
}
@Test
public void testConsumerShouldCloseOnNonRetriableError() throws InterruptedException {
final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
final int metadataPartition = partitioner.metadataPartition(tpId);
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 1L));
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
thread.start();
TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be assigned");
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
consumer.setPollException(new AuthorizationException("Unauthorized to read the topic!"));
TestUtils.waitForCondition(() -> consumer.closed(), "Should close the consume on non-retriable error");
}
private void addRecord(final MockConsumer<byte[], byte[]> consumer,
final int metadataPartition,
final TopicIdPartition idPartition,
final long recordOffset) {
final RemoteLogSegmentId segmentId = new RemoteLogSegmentId(idPartition, Uuid.randomUuid());
final RemoteLogMetadata metadata = new RemoteLogSegmentMetadata(segmentId, 0L, 1L, 0L, 0, 0L, 1, Collections.singletonMap(0, 0L));
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME, metadataPartition, recordOffset, null, serde.serialize(metadata));
consumer.addRecord(record);
}
private List<TopicIdPartition> getIdPartitions(final String topic, final int partitionCount) {
final List<TopicIdPartition> idPartitions = new ArrayList<>();
for (int partition = 0; partition < partitionCount; partition++) {
idPartitions.add(new TopicIdPartition(topicId, new TopicPartition(topic, partition)));
}
return idPartitions;
}
private static class DummyEventHandler extends RemotePartitionMetadataEventHandler {
private int metadataCounter = 0;
private final Map<TopicIdPartition, Boolean> isPartitionInitialized = new HashMap<>();
private final Map<TopicIdPartition, Boolean> isPartitionLoaded = new HashMap<>();
private final Map<TopicIdPartition, Boolean> isPartitionCleared = new HashMap<>();
@Override
protected void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
metadataCounter++;
}
@Override
protected void handleRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) {
}
@Override
protected void handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) {
}
@Override
public void syncLogMetadataSnapshot(TopicIdPartition topicIdPartition, int metadataPartition, Long metadataPartitionOffset) {
}
@Override
public void clearTopicPartition(TopicIdPartition topicIdPartition) {
isPartitionCleared.put(topicIdPartition, true);
}
@Override
public void markInitialized(TopicIdPartition partition) {
isPartitionInitialized.put(partition, true);
}
@Override
public boolean isInitialized(TopicIdPartition partition) {
return true;
}
@Override
public void maybeLoadPartition(TopicIdPartition partition) {
isPartitionLoaded.put(partition, true);
}
}
}

10
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java

@ -63,11 +63,12 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa @@ -63,11 +63,12 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa
// Call setup to start the cluster.
super.setUp(new EmptyTestInfo());
initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread);
initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, null);
}
public void initializeRemoteLogMetadataManager(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread) {
boolean startConsumerThread,
RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner) {
String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath();
topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager(startConsumerThread) {
@Override
@ -104,6 +105,9 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa @@ -104,6 +105,9 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa
log.debug("TopicBasedRemoteLogMetadataManager configs after adding overridden properties: {}", configs);
topicBasedRemoteLogMetadataManager.configure(configs);
if (remoteLogMetadataTopicPartitioner != null) {
topicBasedRemoteLogMetadataManager.setRlmTopicPartitioner(remoteLogMetadataTopicPartitioner);
}
try {
waitUntilInitialized(60_000);
} catch (TimeoutException e) {
@ -145,4 +149,4 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa @@ -145,4 +149,4 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa
public void closeRemoteLogMetadataManager() {
Utils.closeQuietly(topicBasedRemoteLogMetadataManager, "TopicBasedRemoteLogMetadataManager");
}
}
}

178
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java

@ -0,0 +1,178 @@ @@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import kafka.utils.EmptyTestInfo;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.class);
private static final int SEG_SIZE = 1024 * 1024;
private final Time time = new MockTime(1);
private final TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness();
private TopicBasedRemoteLogMetadataManager rlmm() {
return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
}
@BeforeEach
public void setup() {
// Start the cluster only.
remoteLogMetadataManagerHarness.setUp(new EmptyTestInfo());
}
@AfterEach
public void teardown() throws IOException {
remoteLogMetadataManagerHarness.close();
}
@Test
public void testMultiplePartitionSubscriptions() throws Exception {
// Create topics.
String leaderTopic = "leader";
HashMap<Object, Seq<Object>> assignedLeaderTopicReplicas = new HashMap<>();
List<Object> leaderTopicReplicas = new ArrayList<>();
// Set broker id 0 as the first entry which is taken as the leader.
leaderTopicReplicas.add(0);
leaderTopicReplicas.add(1);
leaderTopicReplicas.add(2);
assignedLeaderTopicReplicas.put(0, JavaConverters.asScalaBuffer(leaderTopicReplicas));
remoteLogMetadataManagerHarness.createTopicWithAssignment(leaderTopic,
JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas),
remoteLogMetadataManagerHarness.listenerName());
String followerTopic = "follower";
HashMap<Object, Seq<Object>> assignedFollowerTopicReplicas = new HashMap<>();
List<Object> followerTopicReplicas = new ArrayList<>();
// Set broker id 1 as the first entry which is taken as the leader.
followerTopicReplicas.add(1);
followerTopicReplicas.add(2);
followerTopicReplicas.add(0);
assignedFollowerTopicReplicas.put(0, JavaConverters.asScalaBuffer(followerTopicReplicas));
remoteLogMetadataManagerHarness.createTopicWithAssignment(
followerTopic, JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas),
remoteLogMetadataManagerHarness.listenerName());
String topicWithNoMessages = "no-messages-topic";
HashMap<Object, Seq<Object>> assignedTopicReplicas = new HashMap<>();
List<Object> noMessagesTopicReplicas = new ArrayList<>();
// Set broker id 1 as the first entry which is taken as the leader.
noMessagesTopicReplicas.add(1);
noMessagesTopicReplicas.add(2);
noMessagesTopicReplicas.add(0);
assignedTopicReplicas.put(0, JavaConverters.asScalaBuffer(noMessagesTopicReplicas));
remoteLogMetadataManagerHarness.createTopicWithAssignment(
topicWithNoMessages, JavaConverters.mapAsScalaMap(assignedTopicReplicas),
remoteLogMetadataManagerHarness.listenerName());
final TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
final TopicIdPartition emptyTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(topicWithNoMessages, 0));
RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(10) {
@Override
public int metadataPartition(TopicIdPartition topicIdPartition) {
// Always return partition 0 except for noMessagesTopicIdPartition. So that, any new user
// partition(other than noMessagesTopicIdPartition) added to RLMM will use the same metadata partition.
// That will make the secondary consumer assignment.
if (emptyTopicIdPartition.equals(topicIdPartition)) {
return 1;
} else {
return 0;
}
}
};
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), true, partitioner);
// Add segments for these partitions but an exception is received as they have not yet been subscribed.
// These messages would have been published to the respective metadata topic partitions but the ConsumerManager
// has not yet been subscribing as they are not yet registered.
RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () -> rlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Partitions currently assigned: []",
exception.getMessage());
RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
exception = Assertions.assertThrows(ExecutionException.class, () -> rlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Partitions currently assigned: []",
exception.getMessage());
// `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered.
Assertions.assertThrows(RemoteStorageException.class, () -> rlmm().listRemoteLogSegments(leaderTopicIdPartition));
Assertions.assertThrows(RemoteStorageException.class, () -> rlmm().listRemoteLogSegments(followerTopicIdPartition));
rlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition),
Collections.emptySet());
// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
waitUntilConsumerCatchesUp(30_000L);
// leader partitions would have received as it is registered, but follower partition is not yet registered,
// hence it throws an exception.
Assertions.assertTrue(rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext());
Assertions.assertThrows(RemoteStorageException.class, () -> rlmm().listRemoteLogSegments(followerTopicIdPartition));
// Register follower partition
rlmm().onPartitionLeadershipChanges(Collections.singleton(emptyTopicIdPartition),
Collections.singleton(followerTopicIdPartition));
// In this state, all the metadata should be available in RLMM for both leader and follower partitions.
TestUtils.waitForCondition(() -> rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext(), "No segments found");
TestUtils.waitForCondition(() -> rlmm().listRemoteLogSegments(followerTopicIdPartition).hasNext(), "No segments found");
}
private void waitUntilConsumerCatchesUp(long timeoutMs) throws TimeoutException, InterruptedException {
TestUtils.waitForCondition(() -> {
// If both the leader and follower partitions are mapped to the same metadata partition which is 0, it
// should have at least 2 messages. That means, read offset should be >= 1 (including duplicate messages if any).
return rlmm().readOffsetForPartition(0).orElse(-1L) >= 1;
}, timeoutMs, "Consumer did not catch up");
}
}

34
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java

@ -31,18 +31,14 @@ import org.junit.jupiter.api.Test; @@ -31,18 +31,14 @@ import org.junit.jupiter.api.Test;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerManager.COMMITTED_OFFSETS_FILE_NAME;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
@ -69,7 +65,7 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest { @@ -69,7 +65,7 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
}
private void startTopicBasedRemoteLogMetadataManagerHarness(boolean startConsumerThread) {
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), startConsumerThread);
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), startConsumerThread, null);
}
@AfterEach
@ -136,9 +132,8 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest { @@ -136,9 +132,8 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
// Stop TopicBasedRemoteLogMetadataManager only.
stopTopicBasedRemoteLogMetadataManagerHarness();
// Start TopicBasedRemoteLogMetadataManager but do not start consumer thread to check whether the stored metadata is
// loaded successfully or not.
startTopicBasedRemoteLogMetadataManagerHarness(false);
// Start TopicBasedRemoteLogMetadataManager
startTopicBasedRemoteLogMetadataManagerHarness(true);
// Register these partitions to RLMM, which loads the respective metadata snapshots.
topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition), Collections.singleton(followerTopicIdPartition));
@ -148,29 +143,6 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest { @@ -148,29 +143,6 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
topicBasedRlmm().listRemoteLogSegments(leaderTopicIdPartition)));
Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(Collections.singleton(followerSegmentMetadata).iterator(),
topicBasedRlmm().listRemoteLogSegments(followerTopicIdPartition)));
// Check whether the check-pointed consumer offsets are stored or not.
Path committedOffsetsPath = new File(logDir, COMMITTED_OFFSETS_FILE_NAME).toPath();
Assertions.assertTrue(committedOffsetsPath.toFile().exists());
CommittedOffsetsFile committedOffsetsFile = new CommittedOffsetsFile(committedOffsetsPath.toFile());
int metadataPartition1 = topicBasedRlmm().metadataPartition(leaderTopicIdPartition);
int metadataPartition2 = topicBasedRlmm().metadataPartition(followerTopicIdPartition);
Optional<Long> receivedOffsetForPartition1 = topicBasedRlmm().receivedOffsetForPartition(metadataPartition1);
Optional<Long> receivedOffsetForPartition2 = topicBasedRlmm().receivedOffsetForPartition(metadataPartition2);
Assertions.assertTrue(receivedOffsetForPartition1.isPresent());
Assertions.assertTrue(receivedOffsetForPartition2.isPresent());
// Make sure these offsets are at least 0.
Assertions.assertTrue(receivedOffsetForPartition1.get() >= 0);
Assertions.assertTrue(receivedOffsetForPartition2.get() >= 0);
// Check the stored entries and the offsets that were set on consumer are the same.
Map<Integer, Long> partitionToOffset = committedOffsetsFile.readEntries();
Assertions.assertEquals(partitionToOffset.get(metadataPartition1), receivedOffsetForPartition1.get());
Assertions.assertEquals(partitionToOffset.get(metadataPartition2), receivedOffsetForPartition2.get());
// Start Consumer thread
topicBasedRlmm().startConsumerThread();
// Add one more segment
RemoteLogSegmentMetadata leaderSegmentMetadata2 = new RemoteLogSegmentMetadata(

10
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java

@ -149,17 +149,17 @@ public class TopicBasedRemoteLogMetadataManagerTest { @@ -149,17 +149,17 @@ public class TopicBasedRemoteLogMetadataManagerTest {
}
// If both the leader and follower partitions are mapped to the same metadata partition then it should have at least
// 2 messages. That means, received offset should be >= 1 (including duplicate messages if any).
// 2 messages. That means, read offset should be >= 1 (including duplicate messages if any).
if (leaderMetadataPartition == followerMetadataPartition) {
if (topicBasedRlmm().receivedOffsetForPartition(leaderMetadataPartition).orElse(-1L) >= 1) {
if (topicBasedRlmm().readOffsetForPartition(leaderMetadataPartition).orElse(-1L) >= 1) {
break;
}
} else {
// If the leader partition and the follower partition are mapped to different metadata partitions then
// each of those metadata partitions will have at least 1 message. That means, received offset should
// each of those metadata partitions will have at least 1 message. That means, read offset should
// be >= 0 (including duplicate messages if any).
if (topicBasedRlmm().receivedOffsetForPartition(leaderMetadataPartition).orElse(-1L) >= 0 ||
topicBasedRlmm().receivedOffsetForPartition(followerMetadataPartition).orElse(-1L) >= 0) {
if (topicBasedRlmm().readOffsetForPartition(leaderMetadataPartition).orElse(-1L) >= 0 ||
topicBasedRlmm().readOffsetForPartition(followerMetadataPartition).orElse(-1L) >= 0) {
break;
}
}

Loading…
Cancel
Save