Browse Source

KAFKA-10199: Enable state updater by default (#13927)

Now that the implementation for the state updater is done, we can enable it by default.

This PR enables the state updater by default and fixes code that made assumptions that are not true when the state updater is enabled (mainly tests).

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Walker Carlson <wcarlson@confluent.io>
pull/14485/head
Bruno Cadonna 1 year ago committed by GitHub
parent
commit
c32d2338a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
  2. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
  3. 3
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
  4. 9
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  5. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
  6. 53
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
  7. 3
      streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
  8. 5
      streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
  9. 13
      streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
  10. 85
      streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
  11. 8
      streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
  12. 23
      streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
  13. 4
      streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
  14. 1
      streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
  15. 52
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
  16. 161
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
  17. 2
      streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
  18. 51
      streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

4
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

@ -1198,6 +1198,10 @@ public class StreamsConfig extends AbstractConfig { @@ -1198,6 +1198,10 @@ public class StreamsConfig extends AbstractConfig {
// Private API to enable the state updater (i.e. state updating on a dedicated thread)
public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__";
public static boolean getStateUpdaterEnabled(final Map<String, Object> configs) {
return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, true);
}
public static boolean getBoolean(final Map<String, Object> configs, final String key, final boolean defaultValue) {
final Object value = configs.getOrDefault(key, defaultValue);
if (value instanceof Boolean) {

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java

@ -211,7 +211,7 @@ public class ReadOnlyTask implements Task { @@ -211,7 +211,7 @@ public class ReadOnlyTask implements Task {
@Override
public StateStore getStore(final String name) {
throw new UnsupportedOperationException("This task is read-only");
return task.getStore(name);
}
@Override

3
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java

@ -230,8 +230,7 @@ public class StoreChangelogReader implements ChangelogReader { @@ -230,8 +230,7 @@ public class StoreChangelogReader implements ChangelogReader {
this.restoreConsumer = restoreConsumer;
this.stateRestoreListener = stateRestoreListener;
this.stateUpdaterEnabled =
InternalConfig.getBoolean(config.originals(), InternalConfig.STATE_UPDATER_ENABLED, false);
this.stateUpdaterEnabled = InternalConfig.getStateUpdaterEnabled(config.originals());
this.groupId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));

9
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -374,8 +374,7 @@ public class StreamThread extends Thread { @@ -374,8 +374,7 @@ public class StreamThread extends Thread {
final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
final boolean stateUpdaterEnabled =
InternalConfig.getBoolean(config.originals(), InternalConfig.STATE_UPDATER_ENABLED, false);
final boolean stateUpdaterEnabled = InternalConfig.getStateUpdaterEnabled(config.originals());
final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
topologyMetadata,
config,
@ -558,7 +557,7 @@ public class StreamThread extends Thread { @@ -558,7 +557,7 @@ public class StreamThread extends Thread {
this.numIterations = 1;
this.eosEnabled = eosEnabled(config);
this.stateUpdaterEnabled = InternalConfig.getBoolean(config.originals(), InternalConfig.STATE_UPDATER_ENABLED, false);
this.stateUpdaterEnabled = InternalConfig.getStateUpdaterEnabled(config.originals());
}
private static final class InternalConsumerConfig extends ConsumerConfig {
@ -859,7 +858,7 @@ public class StreamThread extends Thread { @@ -859,7 +858,7 @@ public class StreamThread extends Thread {
if (log.isDebugEnabled()) {
log.debug("Committed all active tasks {} and standby tasks {} in {}ms",
taskManager.activeTaskIds(), taskManager.standbyTaskIds(), commitLatency);
taskManager.activeRunningTaskIds(), taskManager.standbyTaskIds(), commitLatency);
}
}
@ -1128,7 +1127,7 @@ public class StreamThread extends Thread { @@ -1128,7 +1127,7 @@ public class StreamThread extends Thread {
if (now - lastCommitMs > commitTimeMs) {
if (log.isDebugEnabled()) {
log.debug("Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)",
taskManager.activeTaskIds(), taskManager.standbyTaskIds(), now - lastCommitMs, commitTimeMs);
taskManager.activeRunningTaskIds(), taskManager.standbyTaskIds(), now - lastCommitMs, commitTimeMs);
}
committed = taskManager.commit(

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java

@ -178,7 +178,7 @@ public class TaskExecutor { @@ -178,7 +178,7 @@ public class TaskExecutor {
final Set<TaskId> corruptedTasks = new HashSet<>();
if (executionMetadata.processingMode() == EXACTLY_ONCE_ALPHA) {
for (final Task task : taskManager.activeTaskIterable()) {
for (final Task task : taskManager.activeRunningTaskIterable()) {
final Map<TopicPartition, OffsetAndMetadata> taskOffsetsToCommit = offsetsPerTask.getOrDefault(task, emptyMap());
if (!taskOffsetsToCommit.isEmpty() || taskManager.streamsProducerForTask(task.id()).transactionInFlight()) {
try {

53
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

@ -572,7 +572,9 @@ public class TaskManager { @@ -572,7 +572,9 @@ public class TaskManager {
while (iter.hasNext()) {
final Map.Entry<TaskId, Set<TopicPartition>> entry = iter.next();
final TaskId taskId = entry.getKey();
if (taskId.topologyName() != null && !topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
final boolean taskIsOwned = tasks.allTaskIds().contains(taskId)
|| (stateUpdater != null && stateUpdater.getTasks().stream().anyMatch(task -> task.id() == taskId));
if (taskId.topologyName() != null && !taskIsOwned && !topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
log.info("Cannot create the assigned task {} since it's topology name cannot be recognized, will put it " +
"aside as pending for now and create later when topology metadata gets refreshed", taskId);
pendingTasks.put(taskId, entry.getValue());
@ -649,7 +651,12 @@ public class TaskManager { @@ -649,7 +651,12 @@ public class TaskManager {
try {
if (oldTask.isActive()) {
final StandbyTask standbyTask = convertActiveToStandby((StreamTask) oldTask, inputPartitions);
if (stateUpdater != null) {
tasks.removeTask(oldTask);
tasks.addPendingTasksToInit(Collections.singleton(standbyTask));
} else {
tasks.replaceActiveWithStandby(standbyTask);
}
} else {
final StreamTask activeTask = convertStandbyToActive((StandbyTask) oldTask, inputPartitions);
tasks.replaceStandbyWithActive(activeTask);
@ -964,7 +971,7 @@ public class TaskManager { @@ -964,7 +971,7 @@ public class TaskManager {
final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask = new HashMap<>();
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
for (final Task task : activeTaskIterable()) {
for (final Task task : activeRunningTaskIterable()) {
if (remainingRevokedPartitions.containsAll(task.inputPartitions())) {
// when the task input partitions are included in the revoked list,
// this is an active task and should be revoked
@ -1557,12 +1564,16 @@ public class TaskManager { @@ -1557,12 +1564,16 @@ public class TaskManager {
.collect(Collectors.toSet());
}
Set<TaskId> standbyTaskIds() {
return standbyTaskStream()
Set<TaskId> activeRunningTaskIds() {
return activeRunningTaskStream()
.map(Task::id)
.collect(Collectors.toSet());
}
Set<TaskId> standbyTaskIds() {
return standbyTaskStream().map(Task::id).collect(Collectors.toSet());
}
Map<TaskId, Task> allTasks() {
// not bothering with an unmodifiable map, since the tasks themselves are mutable, but
// if any outside code modifies the map or the tasks, it would be a severe transgression.
@ -1615,7 +1626,21 @@ public class TaskManager { @@ -1615,7 +1626,21 @@ public class TaskManager {
return activeTaskStream().collect(Collectors.toList());
}
List<Task> activeRunningTaskIterable() {
return activeRunningTaskStream().collect(Collectors.toList());
}
private Stream<Task> activeTaskStream() {
if (stateUpdater != null) {
return Stream.concat(
activeRunningTaskStream(),
stateUpdater.getTasks().stream().filter(Task::isActive)
);
}
return activeRunningTaskStream();
}
private Stream<Task> activeRunningTaskStream() {
return tasks.allTasks().stream().filter(Task::isActive);
}
@ -1628,7 +1653,15 @@ public class TaskManager { @@ -1628,7 +1653,15 @@ public class TaskManager {
}
private Stream<Task> standbyTaskStream() {
return tasks.allTasks().stream().filter(t -> !t.isActive());
final Stream<Task> standbyTasksInTaskRegistry = tasks.allTasks().stream().filter(t -> !t.isActive());
if (stateUpdater != null) {
return Stream.concat(
stateUpdater.getStandbyTasks().stream(),
standbyTasksInTaskRegistry
);
} else {
return standbyTasksInTaskRegistry;
}
}
// For testing only.
@ -1704,9 +1737,9 @@ public class TaskManager { @@ -1704,9 +1737,9 @@ public class TaskManager {
if (rebalanceInProgress) {
return -1;
} else {
for (final Task task : activeTaskIterable()) {
for (final Task task : activeRunningTaskIterable()) {
if (task.commitRequested() && task.commitNeeded()) {
return commit(activeTaskIterable());
return commit(activeRunningTaskIterable());
}
}
return 0;
@ -1791,7 +1824,7 @@ public class TaskManager { @@ -1791,7 +1824,7 @@ public class TaskManager {
}
void recordTaskProcessRatio(final long totalProcessLatencyMs, final long now) {
for (final Task task : activeTaskIterable()) {
for (final Task task : activeRunningTaskIterable()) {
task.recordProcessTimeRatioAndBufferSize(totalProcessLatencyMs, now);
}
}
@ -1815,7 +1848,7 @@ public class TaskManager { @@ -1815,7 +1848,7 @@ public class TaskManager {
}
final Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
for (final Task task : activeTaskIterable()) {
for (final Task task : activeRunningTaskIterable()) {
for (final Map.Entry<TopicPartition, Long> entry : task.purgeableOffsets().entrySet()) {
recordsToDelete.put(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue()));
}
@ -1915,7 +1948,7 @@ public class TaskManager { @@ -1915,7 +1948,7 @@ public class TaskManager {
}
boolean needsInitializationOrRestoration() {
return activeTaskIterable().stream().anyMatch(Task::needsInitializationOrRestoration);
return activeTaskStream().anyMatch(Task::needsInitializationOrRestoration);
}
// for testing only

3
streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java

@ -783,7 +783,7 @@ public class EosV2UpgradeIntegrationTest { @@ -783,7 +783,7 @@ public class EosV2UpgradeIntegrationTest {
commitRequested.set(0);
stateTransitions1.clear();
stateTransitions2.clear();
streams2V2 = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE_V2);
streams2V2 = getKafkaStreams(APP_DIR_2, StreamsConfig.EXACTLY_ONCE_V2);
streams2V2.setStateListener(
(newState, oldState) -> stateTransitions2.add(KeyValue.pair(oldState, newState))
);
@ -1149,7 +1149,6 @@ public class EosV2UpgradeIntegrationTest { @@ -1149,7 +1149,6 @@ public class EosV2UpgradeIntegrationTest {
keys.add(row.key);
}
}
return true;
},
MAX_WAIT_TIME_MS,

5
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java

@ -207,11 +207,12 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest { @@ -207,11 +207,12 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
});
}
startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120));
for (final KafkaStreams stream: kafkaStreamsList) {
stream.start();
}
// the streams applications should have shut down into `ERROR` due to the IllegalStateException
waitForApplicationState(Arrays.asList(streams, streamsTwo, streamsThree), KafkaStreams.State.ERROR, ofSeconds(60));
}
private void verifyKTableKTableJoin(final Set<KeyValue<String, String>> expectedResult) throws Exception {

13
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java

@ -274,8 +274,9 @@ public class LagFetchIntegrationTest { @@ -274,8 +274,9 @@ public class LagFetchIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, Long> t1 = builder.table(inputTopicName, Materialized.as(stateStoreName));
t1.toStream().to(outputTopicName);
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final AtomicReference<LagInfo> zeroLagRef = new AtomicReference<>();
try {
// First start up the active.
TestUtils.waitForCondition(() -> streams.allLocalStorePartitionLags().size() == 0,
@ -291,7 +292,6 @@ public class LagFetchIntegrationTest { @@ -291,7 +292,6 @@ public class LagFetchIntegrationTest {
WAIT_TIMEOUT_MS);
// check for proper lag values.
final AtomicReference<LagInfo> zeroLagRef = new AtomicReference<>();
TestUtils.waitForCondition(() -> {
final Map<String, Map<Integer, LagInfo>> offsetLagInfoMap = streams.allLocalStorePartitionLags();
assertThat(offsetLagInfoMap.size(), equalTo(1));
@ -312,9 +312,14 @@ public class LagFetchIntegrationTest { @@ -312,9 +312,14 @@ public class LagFetchIntegrationTest {
Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(f -> assertTrue(f.delete(), "Some state " + f + " could not be deleted"));
} finally {
streams.close();
streams.cleanUp();
}
// wait till the lag goes down to 0
final KafkaStreams restartedStreams = new KafkaStreams(builder.build(), props);
try {
// set a state restoration listener to track progress of restoration
final CountDownLatch restorationEndLatch = new CountDownLatch(1);
final Map<String, Map<Integer, LagInfo>> restoreStartLagInfo = new HashMap<>();
@ -356,8 +361,8 @@ public class LagFetchIntegrationTest { @@ -356,8 +361,8 @@ public class LagFetchIntegrationTest {
assertThat(restoreEndLagInfo.get(stateStoreName).get(0), equalTo(zeroLagRef.get()));
} finally {
streams.close();
streams.cleanUp();
restartedStreams.close();
restartedStreams.cleanUp();
}
}
}

85
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java

@ -59,6 +59,7 @@ import org.apache.kafka.streams.utils.UniqueTopicSerdeScope; @@ -59,6 +59,7 @@ import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.TestUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
@ -86,6 +87,7 @@ import java.util.stream.Collectors; @@ -86,6 +87,7 @@ import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.KeyQueryMetadata.NOT_AVAILABLE;
import static org.apache.kafka.streams.KeyValue.pair;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
@ -172,9 +174,9 @@ public class NamedTopologyIntegrationTest { @@ -172,9 +174,9 @@ public class NamedTopologyIntegrationTest {
private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L));
private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching
asList(pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), pair("C", 2L));
private static final List<KeyValue<String, Long>> SUM_OUTPUT_DATA =
asList(pair("B", 200L), pair("A", 400L), pair("C", 350L)); // output of summation with caching
asList(pair("A", 100L), pair("B", 200L), pair("A", 400L), pair("C", 400L), pair("C", 350L));
private static final String TOPIC_PREFIX = "unique_topic_prefix";
private final KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier();
@ -207,6 +209,7 @@ public class NamedTopologyIntegrationTest { @@ -207,6 +209,7 @@ public class NamedTopologyIntegrationTest {
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE, TOPIC_PREFIX);
return streamsConfiguration;
}
@ -318,7 +321,7 @@ public class NamedTopologyIntegrationTest { @@ -318,7 +321,7 @@ public class NamedTopologyIntegrationTest {
.toStream().to(OUTPUT_STREAM_1);
streams.addNamedTopology(topology1Builder.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<String, Long>> results = waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3);
final List<KeyValue<String, Long>> results = waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5);
assertThat(results, equalTo(COUNT_OUTPUT_DATA));
final Set<String> allTopics = CLUSTER.getAllTopicsInCluster();
@ -336,9 +339,9 @@ public class NamedTopologyIntegrationTest { @@ -336,9 +339,9 @@ public class NamedTopologyIntegrationTest {
streams.addNamedTopology(topology3Builder.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(CLUSTER.getAllTopicsInCluster().containsAll(asList(changelog1, changelog2, changelog3)), is(true));
}
@ -365,7 +368,7 @@ public class NamedTopologyIntegrationTest { @@ -365,7 +368,7 @@ public class NamedTopologyIntegrationTest {
streams.addNamedTopology(topology2Builder.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SINGLE_PARTITION_OUTPUT_STREAM, 3), equalTo(COUNT_OUTPUT_DATA));
final ReadOnlyKeyValueStore<String, Long> store =
@ -437,7 +440,7 @@ public class NamedTopologyIntegrationTest { @@ -437,7 +440,7 @@ public class NamedTopologyIntegrationTest {
streams.start();
streams.addNamedTopology(topology1Builder.build()).all().get();
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
}
@Test
@ -448,8 +451,10 @@ public class NamedTopologyIntegrationTest { @@ -448,8 +451,10 @@ public class NamedTopologyIntegrationTest {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
streams.addNamedTopology(topology2Builder.build()).all().get();
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams), State.RUNNING, Duration.ofMillis(DEFAULT_TIMEOUT));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
}
@Test
@ -463,9 +468,11 @@ public class NamedTopologyIntegrationTest { @@ -463,9 +468,11 @@ public class NamedTopologyIntegrationTest {
streams.addNamedTopology(topology3Builder.build()).all().get();
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), equalTo(COUNT_OUTPUT_DATA));
IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams), State.RUNNING, Duration.ofMillis(DEFAULT_TIMEOUT));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 5), equalTo(COUNT_OUTPUT_DATA));
}
@Test
@ -481,14 +488,14 @@ public class NamedTopologyIntegrationTest { @@ -481,14 +488,14 @@ public class NamedTopologyIntegrationTest {
streams2.addNamedTopology(topology1Builder2.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(asList(streams, streams2));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
final AddNamedTopologyResult result = streams.addNamedTopology(topology2Builder.build());
final AddNamedTopologyResult result2 = streams2.addNamedTopology(topology2Builder2.build());
result.all().get();
result2.all().get();
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
}
@Test
@ -501,7 +508,7 @@ public class NamedTopologyIntegrationTest { @@ -501,7 +508,7 @@ public class NamedTopologyIntegrationTest {
streams2.addNamedTopology(topology1Builder2.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(asList(streams, streams2));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
final RemoveNamedTopologyResult result = streams.removeNamedTopology(TOPOLOGY_1, true);
streams2.removeNamedTopology(TOPOLOGY_1, true).all().get();
@ -537,7 +544,7 @@ public class NamedTopologyIntegrationTest { @@ -537,7 +544,7 @@ public class NamedTopologyIntegrationTest {
assertThat(streams.getAllTopologies(), equalTo(singleton(topology2Client1)));
assertThat(streams2.getAllTopologies(), equalTo(singleton(topology2Client2)));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
}
@Test
@ -559,7 +566,7 @@ public class NamedTopologyIntegrationTest { @@ -559,7 +566,7 @@ public class NamedTopologyIntegrationTest {
produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA);
produceToInputTopics(DELAYED_INPUT_STREAM_2, STANDARD_INPUT_DATA);
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
} finally {
CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1, DELAYED_INPUT_STREAM_2);
}
@ -578,8 +585,8 @@ public class NamedTopologyIntegrationTest { @@ -578,8 +585,8 @@ public class NamedTopologyIntegrationTest {
streams.addNamedTopology(topology1Builder.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
streams.removeNamedTopology(TOPOLOGY_1).all().get();
streams.cleanUpNamedTopology(TOPOLOGY_1);
@ -592,8 +599,8 @@ public class NamedTopologyIntegrationTest { @@ -592,8 +599,8 @@ public class NamedTopologyIntegrationTest {
produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA);
streams.addNamedTopology(topology1Builder2.build()).all().get();
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
} finally {
CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1);
@ -610,9 +617,9 @@ public class NamedTopologyIntegrationTest { @@ -610,9 +617,9 @@ public class NamedTopologyIntegrationTest {
streams.addNamedTopology(topology3Builder.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 5), equalTo(COUNT_OUTPUT_DATA));
}
@Test
@ -625,9 +632,9 @@ public class NamedTopologyIntegrationTest { @@ -625,9 +632,9 @@ public class NamedTopologyIntegrationTest {
streams.addNamedTopology(topology3Builder.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 5), equalTo(COUNT_OUTPUT_DATA));
}
@Test
@ -642,8 +649,8 @@ public class NamedTopologyIntegrationTest { @@ -642,8 +649,8 @@ public class NamedTopologyIntegrationTest {
final NamedTopology namedTopology = topology1Builder.build();
streams.addNamedTopology(namedTopology).all().get();
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
streams.removeNamedTopology("topology-1", true).all().get();
streams.cleanUpNamedTopology("topology-1");
@ -662,8 +669,8 @@ public class NamedTopologyIntegrationTest { @@ -662,8 +669,8 @@ public class NamedTopologyIntegrationTest {
final NamedTopology namedTopologyDup = topology1BuilderDup.build();
streams.addNamedTopology(namedTopologyDup).all().get();
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
} finally {
CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
}
@ -680,8 +687,8 @@ public class NamedTopologyIntegrationTest { @@ -680,8 +687,8 @@ public class NamedTopologyIntegrationTest {
final NamedTopology namedTopology = topology1Builder.build();
streams.addNamedTopology(namedTopology).all().get();
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
streams.removeNamedTopology(TOPOLOGY_1, true).all().get();
streams.cleanUpNamedTopology(TOPOLOGY_1);
@ -700,8 +707,8 @@ public class NamedTopologyIntegrationTest { @@ -700,8 +707,8 @@ public class NamedTopologyIntegrationTest {
final NamedTopology namedTopologyDup = topology1BuilderDup.build();
streams.addNamedTopology(namedTopologyDup).all().get();
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
}
@ -723,7 +730,7 @@ public class NamedTopologyIntegrationTest { @@ -723,7 +730,7 @@ public class NamedTopologyIntegrationTest {
streams2.addNamedTopology(topology1Builder2.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(asList(streams, streams2));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
topology2Builder.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
topology2Builder2.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
@ -746,7 +753,7 @@ public class NamedTopologyIntegrationTest { @@ -746,7 +753,7 @@ public class NamedTopologyIntegrationTest {
CLUSTER.createTopic(NEW_STREAM, 2, 1);
produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
// Make sure the threads were not actually killed and replaced
assertThat(streams.metadataForLocalThreads().size(), equalTo(2));
@ -793,7 +800,7 @@ public class NamedTopologyIntegrationTest { @@ -793,7 +800,7 @@ public class NamedTopologyIntegrationTest {
produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
final List<KeyValue<String, Integer>> output =
waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3);
waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5);
output.retainAll(COUNT_OUTPUT_DATA);
assertThat(output, equalTo(COUNT_OUTPUT_DATA));

8
streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java

@ -26,6 +26,7 @@ import org.apache.kafka.common.config.ConfigResource; @@ -26,6 +26,7 @@ import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
@ -86,7 +87,7 @@ public class PurgeRepartitionTopicIntegrationTest { @@ -86,7 +87,7 @@ public class PurgeRepartitionTopicIntegrationTest {
}
private final Time time = CLUSTER.time;
private final Time time = new MockTime(1);
private class RepartitionTopicCreatedWithExpectedConfigs implements TestCondition {
@Override
@ -212,10 +213,11 @@ public class PurgeRepartitionTopicIntegrationTest { @@ -212,10 +213,11 @@ public class PurgeRepartitionTopicIntegrationTest {
);
// we need long enough timeout to by-pass the log manager's InitialTaskDelayMs, which is hard-coded on server side
final long waitForPurgeMs = 60000;
TestUtils.waitForCondition(
new RepartitionTopicVerified(currentSize -> currentSize <= PURGE_SEGMENT_BYTES),
60000,
"Repartition topic " + REPARTITION_TOPIC + " not purged data after 60000 ms."
waitForPurgeMs,
"Repartition topic " + REPARTITION_TOPIC + " not purged data after " + waitForPurgeMs + " ms."
);
}
}

23
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java

@ -775,6 +775,7 @@ public class IntegrationTestUtils { @@ -775,6 +775,7 @@ public class IntegrationTestUtils {
/**
* Wait until final key-value mappings have been consumed.
* Duplicate records are not considered in the comparison.
*
* @param consumerConfig Kafka Consumer configuration
* @param topic Kafka topic to consume from
@ -791,6 +792,7 @@ public class IntegrationTestUtils { @@ -791,6 +792,7 @@ public class IntegrationTestUtils {
/**
* Wait until final key-value mappings have been consumed.
* Duplicate records are not considered in the comparison.
*
* @param consumerConfig Kafka Consumer configuration
* @param topic Kafka topic to consume from
@ -807,6 +809,7 @@ public class IntegrationTestUtils { @@ -807,6 +809,7 @@ public class IntegrationTestUtils {
/**
* Wait until final key-value mappings have been consumed.
* Duplicate records are not considered in the comparison.
*
* @param consumerConfig Kafka Consumer configuration
* @param topic Kafka topic to consume from
@ -850,15 +853,19 @@ public class IntegrationTestUtils { @@ -850,15 +853,19 @@ public class IntegrationTestUtils {
// still need to check that for each key, the ordering is expected
final Map<K, List<T>> finalAccumData = new HashMap<>();
for (final T kv : accumulatedActual) {
finalAccumData.computeIfAbsent(
withTimestamp ? ((KeyValueTimestamp<K, V>) kv).key() : ((KeyValue<K, V>) kv).key,
key -> new ArrayList<>()).add(kv);
final K key = withTimestamp ? ((KeyValueTimestamp<K, V>) kv).key() : ((KeyValue<K, V>) kv).key;
final List<T> records = finalAccumData.computeIfAbsent(key, k -> new ArrayList<>());
if (!records.contains(kv)) {
records.add(kv);
}
}
final Map<K, List<T>> finalExpected = new HashMap<>();
for (final T kv : expectedRecords) {
finalExpected.computeIfAbsent(
withTimestamp ? ((KeyValueTimestamp<K, V>) kv).key() : ((KeyValue<K, V>) kv).key,
key -> new ArrayList<>()).add(kv);
final K key = withTimestamp ? ((KeyValueTimestamp<K, V>) kv).key() : ((KeyValue<K, V>) kv).key;
final List<T> records = finalExpected.computeIfAbsent(key, k -> new ArrayList<>());
if (!records.contains(kv)) {
records.add(kv);
}
}
// returns true only if the remaining records in both lists are the same and in the same order
@ -1037,8 +1044,8 @@ public class IntegrationTestUtils { @@ -1037,8 +1044,8 @@ public class IntegrationTestUtils {
final long millisRemaining = expectedEnd - System.currentTimeMillis();
if (millisRemaining <= 0) {
fail(
"Application did not reach a RUNNING state for all streams instances. " +
"Non-running instances: " + nonRunningStreams
nonRunningStreams.size() + " out of " + streamsList.size() + " Streams clients did not reach the RUNNING state. " +
"Non-running Streams clients: " + nonRunningStreams
);
}

4
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java

@ -1269,7 +1269,7 @@ class DefaultStateUpdaterTest { @@ -1269,7 +1269,7 @@ class DefaultStateUpdaterTest {
}
@Test
public void shouldAddFailedTasksToQueueWhenRestoreThrowsTaskCorruptedException() throws Exception {
public void shouldHandleTaskCorruptedExceptionAndAddFailedTasksToQueue() throws Exception {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
@ -1293,6 +1293,8 @@ class DefaultStateUpdaterTest { @@ -1293,6 +1293,8 @@ class DefaultStateUpdaterTest {
verifyRestoredActiveTasks();
verifyRemovedTasks();
verify(changelogReader).unregister(mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
verify(task1).markChangelogAsCorrupted(mkSet(TOPIC_PARTITION_A_0));
verify(task2).markChangelogAsCorrupted(mkSet(TOPIC_PARTITION_B_0));
}
@Test

1
streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java

@ -45,6 +45,7 @@ class ReadOnlyTaskTest { @@ -45,6 +45,7 @@ class ReadOnlyTaskTest {
add("changelogOffsets");
add("state");
add("id");
add("getStore");
}
};

52
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java

@ -390,37 +390,28 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @@ -390,37 +390,28 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
}
@Test
public void shouldPollWithRightTimeout() {
final TaskId taskId = new TaskId(0, 0);
EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L));
EasyMock.expect(stateManager.taskId()).andReturn(taskId).anyTimes();
EasyMock.replay(stateManager, storeMetadata, store);
consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L));
final StoreChangelogReader changelogReader =
new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
changelogReader.register(tp, stateManager);
if (type == STANDBY) {
changelogReader.transitToUpdateStandby();
public void shouldPollWithRightTimeoutWithStateUpdater() {
shouldPollWithRightTimeout(true);
}
changelogReader.restore(Collections.singletonMap(taskId, mock(Task.class)));
if (type == ACTIVE) {
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
} else {
assertEquals(Duration.ZERO, consumer.lastPollTimeout());
@Test
public void shouldPollWithRightTimeoutWithoutStateUpdater() {
shouldPollWithRightTimeout(false);
}
private void shouldPollWithRightTimeout(final boolean stateUpdaterEnabled) {
final Properties properties = new Properties();
properties.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
shouldPollWithRightTimeout(properties);
}
@Test
public void shouldPollWithRightTimeoutWithStateUpdater() {
public void shouldPollWithRightTimeoutWithStateUpdaterDefault() {
final Properties properties = new Properties();
shouldPollWithRightTimeout(properties);
}
private void shouldPollWithRightTimeout(final Properties properties) {
final TaskId taskId = new TaskId(0, 0);
EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
@ -431,8 +422,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @@ -431,8 +422,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L));
final Properties properties = new Properties();
properties.put(InternalConfig.STATE_UPDATER_ENABLED, true);
final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig("test-reader", properties));
final StoreChangelogReader changelogReader =
@ -445,7 +434,16 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @@ -445,7 +434,16 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
}
changelogReader.restore(Collections.singletonMap(taskId, mock(Task.class)));
if (type == ACTIVE) {
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
} else {
if (!properties.containsKey(InternalConfig.STATE_UPDATER_ENABLED)
|| (boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED)) {
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
} else {
assertEquals(Duration.ZERO, consumer.lastPollTimeout());
}
}
}
@Test

161
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

@ -47,8 +47,10 @@ import org.apache.kafka.common.metrics.MetricsContext; @@ -47,8 +47,10 @@ import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
@ -73,7 +75,6 @@ import org.apache.kafka.streams.processor.api.Record; @@ -73,7 +75,6 @@ import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StreamThread.State;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
@ -130,6 +131,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet; @@ -130,6 +131,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
@ -241,6 +243,13 @@ public class StreamThreadTest { @@ -241,6 +243,13 @@ public class StreamThreadTest {
private StreamThread createStreamThread(@SuppressWarnings("SameParameterValue") final String clientId,
final StreamsConfig config,
final boolean eosEnabled) {
return createStreamThread(clientId, config, mockTime, eosEnabled);
}
private StreamThread createStreamThread(@SuppressWarnings("SameParameterValue") final String clientId,
final StreamsConfig config,
final Time time,
final boolean eosEnabled) {
if (eosEnabled) {
clientSupplier.setApplicationIdForProducer(APPLICATION_ID);
}
@ -251,7 +260,7 @@ public class StreamThreadTest { @@ -251,7 +260,7 @@ public class StreamThreadTest {
metrics,
APPLICATION_ID,
config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
mockTime
time
);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
@ -265,7 +274,7 @@ public class StreamThreadTest { @@ -265,7 +274,7 @@ public class StreamThreadTest {
PROCESS_ID,
clientId,
streamsMetrics,
mockTime,
time,
streamsMetadataState,
0,
stateDirectory,
@ -334,7 +343,7 @@ public class StreamThreadTest { @@ -334,7 +343,7 @@ public class StreamThreadTest {
@Test
public void shouldChangeStateAtStartClose() throws Exception {
final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
final StreamThread thread = createStreamThread(CLIENT_ID, config, new MockTime(1), false);
final StateListenerStub stateListener = new StateListenerStub();
thread.setStateListener(stateListener);
@ -634,6 +643,7 @@ public class StreamThreadTest { @@ -634,6 +643,7 @@ public class StreamThreadTest {
@Test
public void shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing() throws InterruptedException {
final Time mockTime = new MockTime(1);
final StreamsConfig config = new StreamsConfig(configProps(false));
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
@ -694,6 +704,7 @@ public class StreamThreadTest { @@ -694,6 +704,7 @@ public class StreamThreadTest {
@Test
public void shouldNotEnforceRebalanceWhenCurrentlyRebalancing() throws InterruptedException {
final Time mockTime = new MockTime(1);
final StreamsConfig config = new StreamsConfig(configProps(false));
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
@ -704,6 +715,7 @@ public class StreamThreadTest { @@ -704,6 +715,7 @@ public class StreamThreadTest {
final Consumer<byte[], byte[]> mockConsumer = EasyMock.createNiceMock(Consumer.class);
expect(mockConsumer.poll(anyObject())).andStubReturn(ConsumerRecords.empty());
expect(mockConsumer.assignment()).andStubReturn(emptySet());
final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
expect(mockConsumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
@ -1034,6 +1046,7 @@ public class StreamThreadTest { @@ -1034,6 +1046,7 @@ public class StreamThreadTest {
expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
expect(consumer.poll(anyObject())).andStubReturn(new ConsumerRecords<>(Collections.emptyMap()));
expect(consumer.assignment()).andStubReturn(emptySet());
final Task task = niceMock(Task.class);
expect(task.id()).andStubReturn(task1);
expect(task.inputPartitions()).andStubReturn(Collections.singleton(t1p1));
@ -1046,7 +1059,9 @@ public class StreamThreadTest { @@ -1046,7 +1059,9 @@ public class StreamThreadTest {
final StandbyTaskCreator standbyTaskCreator = mock(StandbyTaskCreator.class);
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
EasyMock.replay(consumer, consumerGroupMetadata, task, activeTaskCreator, standbyTaskCreator);
EasyMock.replay(consumer, consumerGroupMetadata, activeTaskCreator, standbyTaskCreator);
final StateUpdater stateUpdater = Mockito.mock(StateUpdater.class);
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
@ -1064,7 +1079,7 @@ public class StreamThreadTest { @@ -1064,7 +1079,7 @@ public class StreamThreadTest {
topologyMetadata,
null,
null,
null
stateUpdater
) {
@Override
int commit(final Collection<Task> tasksToCommit) {
@ -1246,7 +1261,17 @@ public class StreamThreadTest { @@ -1246,7 +1261,17 @@ public class StreamThreadTest {
public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws InterruptedException {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true);
final Properties props = configProps(true);
// The state updater is disabled for this test because this test relies on the fact the mainConsumer.resume()
// is not called. This is not true when the state updater is enabled which leads to
// java.lang.IllegalStateException: No current assignment for partition topic1-2.
// Since this tests verifies an aspect that is independent from the state updater, it is OK to disable
// the state updater and leave the rewriting of the test to later, when the code path for disabled state updater
// is removed.
props.put(InternalConfig.STATE_UPDATER_ENABLED, false);
final StreamThread thread =
createStreamThread(CLIENT_ID, new StreamsConfig(props), new MockTime(1), true);
thread.taskManager().handleRebalanceStart(Collections.singleton(topic1));
@ -1290,7 +1315,12 @@ public class StreamThreadTest { @@ -1290,7 +1315,12 @@ public class StreamThreadTest {
public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedException {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true);
final StreamThread thread = createStreamThread(
CLIENT_ID,
new StreamsConfig(configProps(true)),
new MockTime(1),
true
);
thread.start();
TestUtils.waitForCondition(
@ -1353,42 +1383,68 @@ public class StreamThreadTest { @@ -1353,42 +1383,68 @@ public class StreamThreadTest {
}
@Test
public void shouldNotReturnDataAfterTaskMigrated() {
public void shouldNotReturnDataAfterTaskMigratedWithStateUpdaterEnabled() {
shouldNotReturnDataAfterTaskMigrated(true);
}
@Test
public void shouldNotReturnDataAfterTaskMigratedWithStateUpdaterDisabled() {
shouldNotReturnDataAfterTaskMigrated(false);
}
private void shouldNotReturnDataAfterTaskMigrated(final boolean stateUpdaterEnabled) {
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
expect(taskManager.allOwnedTasks()).andStubReturn(emptyMap());
final InternalTopologyBuilder internalTopologyBuilder = EasyMock.createNiceMock(InternalTopologyBuilder.class);
expect(internalTopologyBuilder.fullSourceTopicNames()).andReturn(Collections.singletonList(topic1)).times(2);
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
final MockConsumer<byte[], byte[]> restoreConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
consumer.subscribe(Collections.singletonList(topic1), new MockRebalanceListener());
consumer.rebalance(Collections.singletonList(t1p1));
consumer.updateEndOffsets(Collections.singletonMap(t1p1, 10L));
consumer.seekToEnd(Collections.singletonList(t1p1));
final ChangelogReader changelogReader = new MockChangelogReader() {
final TaskMigratedException taskMigratedException = new TaskMigratedException(
"Changelog restore found task migrated", new RuntimeException("restore task migrated"));
ChangelogReader changelogReader = this.changelogReader;
if (stateUpdaterEnabled) {
expect(taskManager.checkStateUpdater(anyLong(), anyObject())).andAnswer(() -> {
consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new byte[0], new byte[0]));
consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new byte[1], new byte[0]));
throw taskMigratedException;
});
} else {
changelogReader = new MockChangelogReader() {
@Override
public long restore(final Map<TaskId, Task> tasks) {
consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new byte[0], new byte[0]));
consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new byte[1], new byte[0]));
throw new TaskMigratedException(
"Changelog restore found task migrated", new RuntimeException("restore task migrated"));
throw taskMigratedException;
}
};
}
taskManager.handleLostAll();
EasyMock.replay(taskManager, internalTopologyBuilder);
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
final Properties props = configProps(false);
props.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
final StreamsConfig config = new StreamsConfig(props);
final StreamThread thread = new StreamThread(
mockTime,
new MockTime(1),
config,
null,
consumer,
consumer,
restoreConsumer,
changelogReader,
null,
taskManager,
@ -1584,8 +1640,20 @@ public class StreamThreadTest { @@ -1584,8 +1640,20 @@ public class StreamThreadTest {
}
@Test
public void shouldReinitializeRevivedTasksInAnyState() {
final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(false)), false);
public void shouldReinitializeRevivedTasksInAnyStateWithStateUpdaterEnabled() throws Exception {
shouldReinitializeRevivedTasksInAnyState(true);
}
@Test
public void shouldReinitializeRevivedTasksInAnyStateWithStateUpdaterDisabled() throws Exception {
shouldReinitializeRevivedTasksInAnyState(false);
}
private void shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterEnabled) throws Exception {
final Properties streamsConfigProps = configProps(false);
streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
final StreamsConfig config = new StreamsConfig(streamsConfigProps);
final StreamThread thread = createStreamThread(CLIENT_ID, config, new MockTime(1), false);
final String storeName = "store";
final String storeChangelog = "stream-thread-test-store-changelog";
@ -1652,12 +1720,30 @@ public class StreamThreadTest { @@ -1652,12 +1720,30 @@ public class StreamThreadTest {
thread.runOnce();
// the third actually polls, processes the record, and throws the corruption exception
if (stateUpdaterEnabled) {
TestUtils.waitForCondition(
() -> thread.taskManager().checkStateUpdater(
mockTime.milliseconds(),
topicPartitions -> mockConsumer.seekToBeginning(singleton(t1p1))
),
10 * 1000,
"State updater never returned tasks.");
}
addRecord(mockConsumer, 0L);
shouldThrow.set(true);
final TaskCorruptedException taskCorruptedException = assertThrows(TaskCorruptedException.class, thread::runOnce);
// Now, we can handle the corruption
thread.taskManager().handleCorruption(taskCorruptedException.corruptedTasks());
if (stateUpdaterEnabled) {
TestUtils.waitForCondition(
() -> thread.taskManager().checkStateUpdater(
mockTime.milliseconds(),
topicPartitions -> mockConsumer.seekToBeginning(singleton(t1p1))
),
10 * 1000,
"State updater never returned tasks.");
}
// again, complete the restoration
thread.runOnce();
@ -1836,7 +1922,7 @@ public class StreamThreadTest { @@ -1836,7 +1922,7 @@ public class StreamThreadTest {
.groupByKey().count(Materialized.as("count-one"));
internalStreamsBuilder.buildAndOptimizeTopology();
final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
final StreamThread thread = createStreamThread(CLIENT_ID, config, new MockTime(1), false);
final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
restoreConsumer.updatePartitions(
STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG,
@ -1883,6 +1969,10 @@ public class StreamThreadTest { @@ -1883,6 +1969,10 @@ public class StreamThreadTest {
final String storeName2 = "table-two";
final String changelogName1 = APPLICATION_ID + "-" + storeName1 + "-changelog";
final String changelogName2 = APPLICATION_ID + "-" + storeName2 + "-changelog";
final Properties props = configProps(false);
// Updating standby tasks on the stream thread only happens when the state updater is disabled
props.put(InternalConfig.STATE_UPDATER_ENABLED, false);
final StreamsConfig config = new StreamsConfig(props);
final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
@ -2004,6 +2094,10 @@ public class StreamThreadTest { @@ -2004,6 +2094,10 @@ public class StreamThreadTest {
final String storeName2 = "table-two";
final String changelogName1 = APPLICATION_ID + "-" + storeName1 + "-changelog";
final String changelogName2 = APPLICATION_ID + "-" + storeName2 + "-changelog";
final Properties props = configProps(false);
// Updating standby tasks on the stream thread only happens when the state updater is disabled
props.put(InternalConfig.STATE_UPDATER_ENABLED, false);
final StreamsConfig config = new StreamsConfig(props);
final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
@ -2243,13 +2337,25 @@ public class StreamThreadTest { @@ -2243,13 +2337,25 @@ public class StreamThreadTest {
}
@Test
public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() throws Exception {
public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestoreWithStateUpdaterEnabled() throws Exception {
shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(true);
}
@Test
public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestoreWithStateUpdaterDiabled() throws Exception {
shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(false);
}
private void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(final boolean stateUpdaterEnabled) throws Exception {
internalStreamsBuilder.stream(Collections.singleton("topic"), consumed)
.groupByKey()
.count(Materialized.as("count"));
internalStreamsBuilder.buildAndOptimizeTopology();
final StreamThread thread = createStreamThread("clientId", config, false);
final Properties props = configProps(false);
props.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
final StreamsConfig config = new StreamsConfig(props);
final StreamThread thread = createStreamThread("clientId", config, new MockTime(1), false);
final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer();
final MockConsumer<byte[], byte[]> mockRestoreConsumer = (MockConsumer<byte[], byte[]>) thread.restoreConsumer();
final MockAdminClient mockAdminClient = (MockAdminClient) thread.adminClient();
@ -2276,6 +2382,8 @@ public class StreamThreadTest { @@ -2276,6 +2382,8 @@ public class StreamThreadTest {
)
);
mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
mockConsumer.subscribe(mkSet(topicPartition.topic()));
mockConsumer.rebalance(Collections.singleton(topicPartition));
mockRestoreConsumer.updatePartitions(
"stream-thread-test-count-changelog",
@ -2325,6 +2433,12 @@ public class StreamThreadTest { @@ -2325,6 +2433,12 @@ public class StreamThreadTest {
}
});
// after handling the exception and reviving the task, with the state updater the changelog topic is
// registered again with the changelog reader
TestUtils.waitForCondition(
() -> mockRestoreConsumer.assignment().size() == 1,
"Never get the assignment");
// after handling the exception and reviving the task, the position
// should be reset to the beginning.
TestUtils.waitForCondition(
@ -2344,12 +2458,18 @@ public class StreamThreadTest { @@ -2344,12 +2458,18 @@ public class StreamThreadTest {
"K2".getBytes(),
"V2".getBytes()));
if (stateUpdaterEnabled) {
TestUtils.waitForCondition(
() -> mockRestoreConsumer.assignment().size() == 0,
"Never get the assignment");
} else {
TestUtils.waitForCondition(
() -> {
mockRestoreConsumer.assign(changelogPartitionSet);
return mockRestoreConsumer.position(changelogPartition) == 2L;
},
"Never finished restore");
}
} finally {
thread.shutdown();
thread.join(10000);
@ -3190,6 +3310,7 @@ public class StreamThreadTest { @@ -3190,6 +3310,7 @@ public class StreamThreadTest {
@Test
public void shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater() {
final Properties streamsConfigProps = StreamsTestUtils.getStreamsConfig();
streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, false);
final StreamThread streamThread = setUpThread(streamsConfigProps);
streamThread.setState(State.STARTING);
streamThread.setState(State.PARTITIONS_ASSIGNED);
@ -3320,7 +3441,7 @@ public class StreamThreadTest { @@ -3320,7 +3441,7 @@ public class StreamThreadTest {
return null;
}
StandbyTask standbyTask(final TaskManager taskManager, final TopicPartition partition) {
final Stream<Task> standbys = taskManager.allTasks().values().stream().filter(t -> !t.isActive());
final Stream<Task> standbys = taskManager.standbyTaskMap().values().stream();
for (final Task task : (Iterable<Task>) standbys::iterator) {
if (task.inputPartitions().contains(partition)) {
return (StandbyTask) task;

2
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java

@ -72,7 +72,7 @@ public class TaskExecutorTest { @@ -72,7 +72,7 @@ public class TaskExecutorTest {
final Tasks tasks = mock(Tasks.class);
final ConsumerGroupMetadata groupMetadata = mock(ConsumerGroupMetadata.class);
final TaskManager taskManager = mock(TaskManager.class);
when(taskManager.activeTaskIterable()).thenReturn(Collections.singletonList(task));
when(taskManager.activeRunningTaskIterable()).thenReturn(Collections.singletonList(task));
when(taskManager.consumerGroupMetadata()).thenReturn(groupMetadata);
final StreamsProducer producer = mock(StreamsProducer.class);

51
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

@ -590,28 +590,49 @@ public class TaskManagerTest { @@ -590,28 +590,49 @@ public class TaskManagerTest {
}
@Test
public void shouldAssignActiveTaskInTasksRegistryToBeRecycledWithStateUpdaterEnabled() {
final StreamTask activeTaskToRecycle = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.SUSPENDED)
.withInputPartitions(taskId03Partitions).build();
final StandbyTask recycledStandbyTask = standbyTask(taskId03, taskId03ChangelogPartitions)
.inState(State.CREATED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
public void shouldAddRecycledStandbyTaskfromActiveToPendingTasksToInitWithStateUpdaterEnabled() {
final StreamTask activeTaskToRecycle = statefulTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
final StandbyTask standbyTask = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.CREATED).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToRecycle));
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, taskId01Partitions))
.thenReturn(standbyTask);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, activeTaskToRecycle.inputPartitions()))
.thenReturn(recycledStandbyTask);
taskManager.handleAssignment(
Collections.emptyMap(),
mkMap(mkEntry(activeTaskToRecycle.id(), activeTaskToRecycle.inputPartitions()))
);
taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions)));
Mockito.verify(activeTaskToRecycle).prepareCommit();
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
Mockito.verify(tasks).addPendingTasksToInit(mkSet(standbyTask));
Mockito.verify(tasks).removeTask(activeTaskToRecycle);
Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldAddRecycledStandbyTaskfromActiveToTaskRegistryWithStateUpdaterDisabled() {
final StreamTask activeTaskToRecycle = statefulTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
final StandbyTask standbyTask = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.CREATED).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToRecycle));
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, taskId01Partitions))
.thenReturn(standbyTask);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions)));
Mockito.verify(activeTaskToRecycle).prepareCommit();
Mockito.verify(tasks).replaceActiveWithStandby(recycledStandbyTask);
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
Mockito.verify(tasks).replaceActiveWithStandby(standbyTask);
Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}

Loading…
Cancel
Save