Browse Source

KAFKA-9821: consolidate Streams rebalance triggering mechanisms (#8596)

Persist followup rebalance in assignment and consolidate rebalance triggering mechanisms

Reviewers: John Roesler <vvcephei@apache.org>
pull/8221/head
A. Sophie Blee-Goldman 5 years ago committed by GitHub
parent
commit
58f7a97314
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
  2. 26
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  3. 149
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
  4. 10
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
  5. 4
      streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
  6. 4
      streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
  7. 2
      streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
  8. 2
      streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
  9. 2
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
  10. 50
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
  11. 12
      streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  12. 12
      tests/kafkatest/tests/streams/streams_upgrade_test.py

2
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

@ -870,7 +870,7 @@ public class StreamsConfig extends AbstractConfig { @@ -870,7 +870,7 @@ public class StreamsConfig extends AbstractConfig {
public static final String STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR = "__streams.metadata.state.instance__";
public static final String STREAMS_ADMIN_CLIENT = "__streams.admin.client.instance__";
public static final String ASSIGNMENT_ERROR_CODE = "__assignment.error.code__";
public static final String NEXT_PROBING_REBALANCE_MS = "__next.probing.rebalance.ms__";
public static final String NEXT_SCHEDULED_REBALANCE_MS = "__next.probing.rebalance.ms__";
public static final String TIME = "__time__";
}

26
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -40,7 +40,6 @@ import org.apache.kafka.streams.processor.StateRestoreListener; @@ -40,7 +40,6 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
@ -242,11 +241,6 @@ public class StreamThread extends Thread { @@ -242,11 +241,6 @@ public class StreamThread extends Thread {
}
}
int getAssignmentErrorCode() {
return assignmentErrorCode.get();
}
private final Time time;
private final Logger log;
private final String logPrefix;
@ -256,7 +250,6 @@ public class StreamThread extends Thread { @@ -256,7 +250,6 @@ public class StreamThread extends Thread {
private final int maxPollTimeMs;
private final String originalReset;
private final TaskManager taskManager;
private final AtomicInteger assignmentErrorCode;
private final AtomicLong nextProbingRebalanceMs;
private final StreamsMetricsImpl streamsMetrics;
@ -366,8 +359,8 @@ public class StreamThread extends Thread { @@ -366,8 +359,8 @@ public class StreamThread extends Thread {
consumerConfigs.put(StreamsConfig.InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
final AtomicInteger assignmentErrorCode = new AtomicInteger();
consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentErrorCode);
final AtomicLong nextProbingRebalanceMs = new AtomicLong(Long.MAX_VALUE);
consumerConfigs.put(StreamsConfig.InternalConfig.NEXT_PROBING_REBALANCE_MS, nextProbingRebalanceMs);
final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
consumerConfigs.put(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, nextScheduledRebalanceMs);
String originalReset = null;
if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
@ -392,7 +385,7 @@ public class StreamThread extends Thread { @@ -392,7 +385,7 @@ public class StreamThread extends Thread {
threadId,
logContext,
assignmentErrorCode,
nextProbingRebalanceMs
nextScheduledRebalanceMs
);
return streamThread.updateThreadMetadata(getSharedAdminClientId(clientId));
@ -475,13 +468,12 @@ public class StreamThread extends Thread { @@ -475,13 +468,12 @@ public class StreamThread extends Thread {
this.builder = builder;
this.logPrefix = logContext.logPrefix();
this.log = logContext.logger(StreamThread.class);
this.rebalanceListener = new StreamsRebalanceListener(time, taskManager, this, this.log);
this.rebalanceListener = new StreamsRebalanceListener(time, taskManager, this, this.log, assignmentErrorCode);
this.taskManager = taskManager;
this.restoreConsumer = restoreConsumer;
this.mainConsumer = mainConsumer;
this.changelogReader = changelogReader;
this.originalReset = originalReset;
this.assignmentErrorCode = assignmentErrorCode;
this.nextProbingRebalanceMs = nextProbingRebalanceMs;
this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
@ -556,14 +548,8 @@ public class StreamThread extends Thread { @@ -556,14 +548,8 @@ public class StreamThread extends Thread {
while (isRunning() || taskManager.isRebalanceInProgress()) {
try {
runOnce();
if (assignmentErrorCode.get() == AssignorError.REBALANCE_NEEDED.code()) {
log.info("Detected that the assignor requested a rebalance. Rejoining the consumer group to " +
"trigger a new rebalance.");
assignmentErrorCode.set(AssignorError.NONE.code());
mainConsumer.enforceRebalance();
} else if (nextProbingRebalanceMs.get() < time.milliseconds()) {
log.info("The probing rebalance interval has elapsed since the last rebalance, triggering a " +
"rebalance to probe for newly caught-up clients");
if (nextProbingRebalanceMs.get() < time.milliseconds()) {
log.info("Triggering the followup rebalance scheduled for {} ms.", nextProbingRebalanceMs.get());
mainConsumer.enforceRebalance();
}
} catch (final TaskCorruptedException e) {

149
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java

@ -161,7 +161,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -161,7 +161,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
@SuppressWarnings("deprecation")
private org.apache.kafka.streams.processor.PartitionGrouper partitionGrouper;
private AtomicInteger assignmentErrorCode;
private AtomicLong nextProbingRebalanceMs;
private AtomicLong nextScheduledRebalanceMs;
private Time time;
protected int usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION;
@ -192,7 +192,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -192,7 +192,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
taskManager = assignorConfiguration.getTaskManager();
streamsMetadataState = assignorConfiguration.getStreamsMetadataState();
assignmentErrorCode = assignorConfiguration.getAssignmentErrorCode(configs);
nextProbingRebalanceMs = assignorConfiguration.getNextProbingRebalanceMs(configs);
nextScheduledRebalanceMs = assignorConfiguration.getNextScheduledRebalanceMs(configs);
time = assignorConfiguration.getTime(configs);
assignmentConfigs = assignorConfiguration.getAssignmentConfigs();
partitionGrouper = assignorConfiguration.getPartitionGrouper();
@ -864,7 +864,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -864,7 +864,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final int minUserMetadataVersion,
final int minSupportedMetadataVersion,
final boolean shouldTriggerProbingRebalance) {
// keep track of whether a 2nd rebalance is unavoidable so we can skip trying to get a completely sticky assignment
boolean rebalanceRequired = shouldTriggerProbingRebalance;
final Map<String, Assignment> assignment = new HashMap<>();
@ -878,8 +877,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -878,8 +877,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
// Try to avoid triggering another rebalance by giving active tasks back to their previous owners within a
// client, without violating load balance. If we already know another rebalance will be required, or the
// client had no owned partitions, try to balance the workload as evenly as possible by interleaving the
// tasks among consumers and hopefully spreading the heavier subtopologies evenly across threads.
// client had no owned partitions, try to balance the workload as evenly as possible by interleaving tasks
if (rebalanceRequired || state.ownedPartitions().isEmpty()) {
activeTaskAssignments = interleaveConsumerTasksByGroupId(state.activeTasks(), consumers);
} else if ((activeTaskAssignments = tryStickyAndBalancedTaskAssignmentWithinClient(state, consumers, partitionsForTask, allOwnedPartitions))
@ -894,7 +892,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -894,7 +892,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
// Arbitrarily choose the leader's client to be responsible for triggering the probing rebalance
final boolean encodeNextProbingRebalanceTime = clientId.equals(taskManager.processId()) && shouldTriggerProbingRebalance;
addClientAssignments(
final boolean followupRebalanceScheduled = addClientAssignments(
assignment,
clientMetadata,
partitionsForTask,
@ -907,6 +905,17 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -907,6 +905,17 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
minSupportedMetadataVersion,
false,
encodeNextProbingRebalanceTime);
if (followupRebalanceScheduled) {
rebalanceRequired = true;
log.debug("Requested client {} to schedule a followup rebalance", clientId);
}
}
if (rebalanceRequired) {
log.info("Finished unstable assignment of tasks, a followup rebalance will be scheduled.");
} else {
log.info("Finished stable assignment of tasks, no followup rebalances required.");
}
return assignment;
@ -955,26 +964,29 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -955,26 +964,29 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
false);
}
log.info("Finished unstable assignment of tasks, a followup rebalance will be scheduled due to version probing.");
return assignment;
}
/**
* Adds the encoded assignment for each StreamThread consumer in the client to the overall assignment map
* @return true if this client has been told to schedule a followup rebalance
*/
private void addClientAssignments(final Map<String, Assignment> assignment,
final ClientMetadata clientMetadata,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost,
final Set<TopicPartition> allOwnedPartitions,
final Map<String, List<TaskId>> activeTaskAssignments,
final Map<String, List<TaskId>> standbyTaskAssignments,
final int minUserMetadataVersion,
final int minSupportedMetadataVersion,
final boolean versionProbing,
final boolean probingRebalanceNeeded) {
boolean encodeNextRebalanceTime = probingRebalanceNeeded;
boolean stableAssignment = !probingRebalanceNeeded && !versionProbing;
private boolean addClientAssignments(final Map<String, Assignment> assignment,
final ClientMetadata clientMetadata,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost,
final Set<TopicPartition> allOwnedPartitions,
final Map<String, List<TaskId>> activeTaskAssignments,
final Map<String, List<TaskId>> standbyTaskAssignments,
final int minUserMetadataVersion,
final int minSupportedMetadataVersion,
final boolean versionProbing,
final boolean probingRebalanceNeeded) {
boolean rebalanceRequested = probingRebalanceNeeded || versionProbing;
boolean shouldEncodeProbingRebalance = probingRebalanceNeeded;
// Loop through the consumers and build their assignment
for (final String consumer : clientMetadata.consumers) {
@ -984,17 +996,14 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -984,17 +996,14 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final List<TopicPartition> activePartitionsList = new ArrayList<>();
final List<TaskId> assignedActiveList = new ArrayList<>();
if (populateActiveTaskAndPartitionsLists(
activePartitionsList,
assignedActiveList,
consumer,
clientMetadata.state,
activeTasksForConsumer,
partitionsForTask,
allOwnedPartitions)
) {
stableAssignment = false;
}
final boolean tasksRevoked = populateActiveTaskAndPartitionsLists(
activePartitionsList,
assignedActiveList,
consumer,
clientMetadata.state,
activeTasksForConsumer,
partitionsForTask,
allOwnedPartitions);
final Map<TaskId, Set<TopicPartition>> standbyTaskMap =
buildStandbyTaskMap(standbyTaskAssignments.get(consumer), partitionsForTask);
@ -1009,14 +1018,18 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -1009,14 +1018,18 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
AssignorError.NONE.code()
);
if (encodeNextRebalanceTime) {
if (tasksRevoked) {
// TODO: once KAFKA-9821 is resolved we can leave it to the client to trigger this rebalance
log.debug("Requesting followup rebalance be scheduled immediately due to tasks changing ownership.");
info.setNextRebalanceTime(0L);
rebalanceRequested = true;
} else if (shouldEncodeProbingRebalance) {
final long nextRebalanceTimeMs = time.milliseconds() + probingRebalanceIntervalMs();
log.debug("Requesting followup rebalance be scheduled for {} ms to probe for caught-up replica tasks.", nextRebalanceTimeMs);
info.setNextRebalanceTime(nextRebalanceTimeMs);
log.info("Scheduled a followup probing rebalance for {} ms.", nextRebalanceTimeMs);
encodeNextRebalanceTime = false;
shouldEncodeProbingRebalance = false;
}
// finally, encode the assignment and insert into map with all assignments
assignment.put(
consumer,
new Assignment(
@ -1025,12 +1038,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -1025,12 +1038,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
)
);
}
if (stableAssignment) {
log.info("Finished stable assignment of tasks, no followup rebalances required.");
} else {
log.info("Finished unstable assignment of tasks, a followup probing rebalance will be triggered.");
}
return rebalanceRequested;
}
/**
@ -1057,7 +1065,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -1057,7 +1065,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final boolean newPartitionForConsumer = oldOwner == null || !oldOwner.equals(consumer);
// If the partition is new to this consumer but is still owned by another, remove from the assignment
// until it has been revoked and can safely be reassigned according the COOPERATIVE protocol
// until it has been revoked and can safely be reassigned according to the COOPERATIVE protocol
if (newPartitionForConsumer && allOwnedPartitions.contains(partition)) {
log.info("Removing task {} from assignment until it is safely revoked in followup rebalance", taskId);
clientState.removeFromAssignment(taskId);
@ -1338,7 +1346,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -1338,7 +1346,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
if (info.errCode() != AssignorError.NONE.code()) {
// set flag to shutdown streams app
setAssignmentErrorCode(info.errCode());
assignmentErrorCode.set(info.errCode());
return;
}
/*
@ -1356,18 +1364,13 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -1356,18 +1364,13 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
validateMetadataVersions(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion);
// Check if this was a version probing rebalance and check the error code to trigger another rebalance if so
if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion)) {
log.info("Version probing detected. Rejoining the consumer group to trigger a new rebalance.");
setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
}
// version 1 field
final Map<TaskId, Set<TopicPartition>> activeTasks;
// version 2 fields
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo;
final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost;
final long encodedNextScheduledRebalanceMs;
switch (receivedAssignmentMetadataVersion) {
case 1:
@ -1377,6 +1380,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -1377,6 +1380,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
partitionsByHost = Collections.emptyMap();
standbyPartitionsByHost = Collections.emptyMap();
topicToPartitionInfo = Collections.emptyMap();
encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
break;
case 2:
case 3:
@ -1388,6 +1392,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -1388,6 +1392,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
partitionsByHost = info.partitionsByHost();
standbyPartitionsByHost = Collections.emptyMap();
topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
break;
case 6:
validateActiveTaskEncoding(partitions, info, logPrefix);
@ -1396,6 +1401,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -1396,6 +1401,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
partitionsByHost = info.partitionsByHost();
standbyPartitionsByHost = info.standbyPartitionByHost();
topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
break;
case 7:
validateActiveTaskEncoding(partitions, info, logPrefix);
@ -1404,7 +1410,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -1404,7 +1410,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
partitionsByHost = info.partitionsByHost();
standbyPartitionsByHost = info.standbyPartitionByHost();
topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
nextProbingRebalanceMs.set(info.nextRebalanceMs());
encodedNextScheduledRebalanceMs = info.nextRebalanceMs();
break;
default:
throw new IllegalStateException(
@ -1413,7 +1419,12 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -1413,7 +1419,12 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
);
}
verifyHostInfo(partitionsByHost.keySet());
maybeScheduleFollowupRebalance(
encodedNextScheduledRebalanceMs,
receivedAssignmentMetadataVersion,
latestCommonlySupportedVersion,
partitionsByHost.keySet()
);
final Cluster fakeCluster = Cluster.empty().withPartitions(topicToPartitionInfo);
streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, fakeCluster);
@ -1423,6 +1434,28 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -1423,6 +1434,28 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
taskManager.handleAssignment(activeTasks, info.standbyTasks());
}
private void maybeScheduleFollowupRebalance(final long encodedNextScheduledRebalanceMs,
final int receivedAssignmentMetadataVersion,
final int latestCommonlySupportedVersion,
final Set<HostInfo> groupHostInfo) {
if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion)) {
log.info("Requested to schedule immediate rebalance due to version probing.");
nextScheduledRebalanceMs.set(0L);
} else if (!verifyHostInfo(groupHostInfo)) {
log.info("Requested to schedule immediate rebalance to update group with new host endpoint = {}.", userEndPoint);
nextScheduledRebalanceMs.set(0L);
} else if (encodedNextScheduledRebalanceMs == 0L) {
log.info("Requested to schedule immediate rebalance for new tasks to be safely revoked from current owner.");
nextScheduledRebalanceMs.set(0L);
} else if (encodedNextScheduledRebalanceMs < Long.MAX_VALUE) {
log.info("Requested to schedule probing rebalance for {} ms.", encodedNextScheduledRebalanceMs);
nextScheduledRebalanceMs.set(encodedNextScheduledRebalanceMs);
} else {
log.info("No followup rebalance was requested, resetting the rebalance schedule.");
nextScheduledRebalanceMs.set(Long.MAX_VALUE);
}
}
/**
* Verify that this client's host info was included in the map returned in the assignment, and trigger a
* rebalance if not. This may be necessary when using static membership, as a rejoining client will be handed
@ -1430,15 +1463,15 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -1430,15 +1463,15 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
* to force a rebalance for the other members in the group to get the updated host info for this client.
*
* @param groupHostInfo the HostInfo of all clients in the group
* @return false if the current host info does not match that in the group assignment
*/
private void verifyHostInfo(final Set<HostInfo> groupHostInfo) {
private boolean verifyHostInfo(final Set<HostInfo> groupHostInfo) {
if (userEndPoint != null && !groupHostInfo.isEmpty()) {
final HostInfo myHostInfo = HostInfo.buildFromEndpoint(userEndPoint);
if (!groupHostInfo.contains(myHostInfo)) {
log.info("Triggering a rebalance to update group with new endpoint = {}", userEndPoint);
setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
}
return groupHostInfo.contains(myHostInfo);
} else {
return true;
}
}
@ -1543,10 +1576,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -1543,10 +1576,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
}
}
protected void setAssignmentErrorCode(final Integer errorCode) {
assignmentErrorCode.set(errorCode);
}
// following functions are for test only
void setInternalTopicManager(final InternalTopicManager internalTopicManager) {
this.internalTopicManager = internalTopicManager;

10
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
@ -31,23 +32,26 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener { @@ -31,23 +32,26 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener {
private final TaskManager taskManager;
private final StreamThread streamThread;
private final Logger log;
private final AtomicInteger assignmentErrorCode;
StreamsRebalanceListener(final Time time,
final TaskManager taskManager,
final StreamThread streamThread,
final Logger log) {
final Logger log,
final AtomicInteger assignmentErrorCode) {
this.time = time;
this.taskManager = taskManager;
this.streamThread = streamThread;
this.log = log;
this.assignmentErrorCode = assignmentErrorCode;
}
@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
// NB: all task management is already handled by:
// org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment
if (streamThread.getAssignmentErrorCode() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
log.error("Received error code {} - shutdown", streamThread.getAssignmentErrorCode());
if (assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
log.error("Received error code {} - shutdown", assignmentErrorCode.get());
streamThread.shutdown();
} else {
streamThread.setState(State.PARTITIONS_ASSIGNED);

4
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java

@ -190,8 +190,8 @@ public final class AssignorConfiguration { @@ -190,8 +190,8 @@ public final class AssignorConfiguration {
return (AtomicInteger) ai;
}
public AtomicLong getNextProbingRebalanceMs(final Map<String, ?> configs) {
final Object al = configs.get(InternalConfig.NEXT_PROBING_REBALANCE_MS);
public AtomicLong getNextScheduledRebalanceMs(final Map<String, ?> configs) {
final Object al = configs.get(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS);
if (al == null) {
final KafkaException fatalException = new KafkaException("nextProbingRebalanceMs is not specified");
log.error(fatalException.getMessage(), fatalException);

4
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java

@ -18,9 +18,7 @@ package org.apache.kafka.streams.processor.internals.assignment; @@ -18,9 +18,7 @@ package org.apache.kafka.streams.processor.internals.assignment;
public enum AssignorError {
NONE(0),
INCOMPLETE_SOURCE_TOPIC_METADATA(1),
REBALANCE_NEEDED(2);
INCOMPLETE_SOURCE_TOPIC_METADATA(1);
private final int code;
AssignorError(final int code) {

2
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java

@ -52,6 +52,7 @@ import org.apache.kafka.test.StreamsTestUtils; @@ -52,6 +52,7 @@ import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@ -185,6 +186,7 @@ public class EosBetaUpgradeIntegrationTest { @@ -185,6 +186,7 @@ public class EosBetaUpgradeIntegrationTest {
}
@Test
@Ignore
public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
// We use two KafkaStreams clients that we upgrade from eos-alpha to eos-beta. During the upgrade,
// we ensure that there are pending transaction and verify that data is processed correctly.

2
streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java

@ -123,7 +123,7 @@ public class HighAvailabilityStreamsPartitionAssignorTest { @@ -123,7 +123,7 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
configurationMap.put(InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, streamsMetadataState);
configurationMap.put(InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
configurationMap.put(InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentError);
configurationMap.put(InternalConfig.NEXT_PROBING_REBALANCE_MS, nextProbingRebalanceMs);
configurationMap.put(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, nextProbingRebalanceMs);
configurationMap.put(InternalConfig.TIME, time);
configurationMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, HighAvailabilityTaskAssignor.class.getName());
return configurationMap;

2
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

@ -541,7 +541,7 @@ public class StreamThreadTest { @@ -541,7 +541,7 @@ public class StreamThreadTest {
}
AtomicLong nextRebalanceMs() {
return (AtomicLong) consumerConfigs.get(StreamsConfig.InternalConfig.NEXT_PROBING_REBALANCE_MS);
return (AtomicLong) consumerConfigs.get(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS);
}
}

50
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java

@ -47,7 +47,6 @@ import org.apache.kafka.streams.kstream.ValueJoiner; @@ -47,7 +47,6 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
@ -86,6 +85,7 @@ import static java.util.Collections.emptyList; @@ -86,6 +85,7 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
@ -200,7 +200,7 @@ public class StreamsPartitionAssignorTest { @@ -200,7 +200,7 @@ public class StreamsPartitionAssignorTest {
private final Class<? extends TaskAssignor> taskAssignor;
private final AtomicInteger assignmentError = new AtomicInteger();
private final AtomicLong nextProbingRebalanceMs = new AtomicLong(Long.MAX_VALUE);
private final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
private final MockTime time = new MockTime();
private Map<String, Object> configProps() {
@ -211,7 +211,7 @@ public class StreamsPartitionAssignorTest { @@ -211,7 +211,7 @@ public class StreamsPartitionAssignorTest {
configurationMap.put(InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, streamsMetadataState);
configurationMap.put(InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
configurationMap.put(InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentError);
configurationMap.put(InternalConfig.NEXT_PROBING_REBALANCE_MS, nextProbingRebalanceMs);
configurationMap.put(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, nextScheduledRebalanceMs);
configurationMap.put(InternalConfig.TIME, time);
configurationMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, taskAssignor.getName());
return configurationMap;
@ -1360,7 +1360,7 @@ public class StreamsPartitionAssignorTest { @@ -1360,7 +1360,7 @@ public class StreamsPartitionAssignorTest {
}
@Test
public void shouldTriggerRebalanceOnHostInfoChange() {
public void shouldTriggerImmediateRebalanceOnHostInfoChange() {
final Map<HostInfo, Set<TopicPartition>> oldHostState = mkMap(
mkEntry(new HostInfo("localhost", 9090), mkSet(t1p0, t1p1)),
mkEntry(new HostInfo("otherhost", 9090), mkSet(t2p0, t2p1))
@ -1376,14 +1376,44 @@ public class StreamsPartitionAssignorTest { @@ -1376,14 +1376,44 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.onAssignment(createAssignment(oldHostState), null);
assertThat(assignmentError.get(), is(AssignorError.REBALANCE_NEEDED.code()));
assertThat(nextScheduledRebalanceMs.get(), is(0L));
partitionAssignor.setAssignmentErrorCode(AssignorError.NONE.code());
partitionAssignor.onAssignment(createAssignment(newHostState), null);
assertThat(assignmentError.get(), is(AssignorError.NONE.code()));
assertThat(nextScheduledRebalanceMs.get(), is(Long.MAX_VALUE));
}
EasyMock.verify(taskManager);
@Test
public void shouldTriggerImmediateRebalanceOnTasksRevoked() {
builder.addSource(null, "source1", null, null, null, "topic1");
final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
subscriptions.put(CONSUMER_1,
new Subscription(
Collections.singletonList("topic1"),
getInfo(UUID_1, allTasks, EMPTY_TASKS).encode(),
asList(t1p0, t1p1, t1p2))
);
subscriptions.put(CONSUMER_2,
new Subscription(
Collections.singletonList("topic1"),
getInfo(UUID_1, EMPTY_TASKS, allTasks).encode(),
emptyList())
);
createMockTaskManager(allTasks, allTasks);
configurePartitionAssignorWith(singletonMap(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 0L));
final Map<String, Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
// Verify at least one partition was revoked
assertThat(assignment.get(CONSUMER_1).partitions(), not(allTasks));
assertThat(assignment.get(CONSUMER_2).partitions(), equalTo(emptyList()));
partitionAssignor.onAssignment(assignment.get(CONSUMER_2), null);
assertThat(nextScheduledRebalanceMs.get(), is(0L));
}
@Test
@ -1768,13 +1798,13 @@ public class StreamsPartitionAssignorTest { @@ -1768,13 +1798,13 @@ public class StreamsPartitionAssignorTest {
@Test
public void shouldGetNextProbingRebalanceMs() {
nextProbingRebalanceMs.set(5 * 60 * 1000L);
nextScheduledRebalanceMs.set(5 * 60 * 1000L);
createDefaultMockTaskManager();
final Map<String, Object> props = configProps();
final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(props);
assertThat(assignorConfiguration.getNextProbingRebalanceMs(props).get(), equalTo(5 * 60 * 1000L));
assertThat(assignorConfiguration.getNextScheduledRebalanceMs(props).get(), equalTo(5 * 60 * 1000L));
}
@Test

12
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -38,9 +38,11 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; @@ -38,9 +38,11 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.LegacySubscriptionInfoSerde;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@ -57,6 +59,7 @@ import java.util.Properties; @@ -57,6 +59,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
@ -116,8 +119,10 @@ public class StreamsUpgradeTest { @@ -116,8 +119,10 @@ public class StreamsUpgradeTest {
}
public static class FutureStreamsPartitionAssignor extends StreamsPartitionAssignor {
private final Logger log = LoggerFactory.getLogger(FutureStreamsPartitionAssignor.class);
private AtomicInteger usedSubscriptionMetadataVersionPeek;
private AtomicLong nextScheduledRebalanceMs;
public FutureStreamsPartitionAssignor() {
usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION + 1;
@ -133,6 +138,8 @@ public class StreamsUpgradeTest { @@ -133,6 +138,8 @@ public class StreamsUpgradeTest {
usedSubscriptionMetadataVersionPeek = new AtomicInteger();
}
configs.remove("test.future.metadata");
nextScheduledRebalanceMs = new AssignorConfiguration(configs).getNextScheduledRebalanceMs(configs);
super.configure(configs);
}
@ -194,7 +201,8 @@ public class StreamsUpgradeTest { @@ -194,7 +201,8 @@ public class StreamsUpgradeTest {
assignment.userData().putInt(0, LATEST_SUPPORTED_VERSION));
if (maybeUpdateSubscriptionVersion(usedVersion, info.commonlySupportedVersion())) {
setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
log.info("Requested to schedule immediate rebalance due to version probing.");
nextScheduledRebalanceMs.set(0L);
usedSubscriptionMetadataVersionPeek.set(usedSubscriptionMetadataVersion);
}

12
tests/kafkatest/tests/streams/streams_upgrade_test.py

@ -528,22 +528,22 @@ class StreamsUpgradeTest(Test): @@ -528,22 +528,22 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
timeout_sec=60,
err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",
log_monitor.wait_until("Triggering the followup rebalance scheduled for 0 ms.",
timeout_sec=60,
err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))
err_msg="Could not detect 'Triggering followup rebalance' at upgrading node " + str(node.account))
else:
first_other_monitor.wait_until("Sent a version 7 subscription and group.s latest commonly supported version is 8 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 8 for next rebalance.",
timeout_sec=60,
err_msg="Never saw output 'Upgrade metadata to version 8' on" + str(first_other_node.account))
first_other_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",
first_other_monitor.wait_until("Triggering the followup rebalance scheduled for 0 ms.",
timeout_sec=60,
err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))
err_msg="Could not detect 'Triggering followup rebalance' at upgrading node " + str(node.account))
second_other_monitor.wait_until("Sent a version 7 subscription and group.s latest commonly supported version is 8 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 8 for next rebalance.",
timeout_sec=60,
err_msg="Never saw output 'Upgrade metadata to version 8' on" + str(second_other_node.account))
second_other_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",
second_other_monitor.wait_until("Triggering the followup rebalance scheduled for 0 ms.",
timeout_sec=60,
err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))
err_msg="Could not detect 'Triggering followup rebalance' at upgrading node " + str(node.account))
# version probing should trigger second rebalance
# now we check that after consecutive rebalances we have synchronized generation

Loading…
Cancel
Save