Browse Source

Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)" (#11873)

This reverts commit 2ccc834faa.

This reverts commit 2ccc834. We were seeing serious regressions in our state heavy benchmarks. We saw that our state heavy benchmarks were experiencing a really bad regression. The State heavy benchmarks runs with rolling bounces with 10 nodes.

We regularly saw this exception:  java.lang.OutOfMemoryError: Java heap space                                                                                                                                                                                              

I ran through a git bisect and found this commit. We verified that the commit right before did not have the same issues as this one did. I then reverted the problematic commit and ran the benchmarks again on this commit and did not see any more issues. We are still looking into the root cause, but for now since this isn't a critical improvement so we can remove it temporarily.

Reviewers: Bruno Cadonna <cadonna@confluent.io>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>, David Jacot <djacot@confluent.io>, Ismael Juma <ismael@confluent.io>
pull/11879/head
Walker Carlson 3 years ago committed by GitHub
parent
commit
4d5a28973f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  2. 8
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

6
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -585,7 +585,7 @@ public class StreamThread extends Thread { @@ -585,7 +585,7 @@ public class StreamThread extends Thread {
runOnce();
if (nextProbingRebalanceMs.get() < time.milliseconds()) {
log.info("Triggering the followup rebalance scheduled for {} ms.", nextProbingRebalanceMs.get());
mainConsumer.enforceRebalance("Scheduled probing rebalance.");
mainConsumer.enforceRebalance();
nextProbingRebalanceMs.set(Long.MAX_VALUE);
}
} catch (final TaskCorruptedException e) {
@ -597,7 +597,7 @@ public class StreamThread extends Thread { @@ -597,7 +597,7 @@ public class StreamThread extends Thread {
final boolean enforceRebalance = taskManager.handleCorruption(e.corruptedTasks());
if (enforceRebalance && eosEnabled) {
log.info("Active task(s) got corrupted. Triggering a rebalance.");
mainConsumer.enforceRebalance("Active tasks corrupted.");
mainConsumer.enforceRebalance();
}
} catch (final TaskMigratedException taskMigrated) {
handleTaskMigrated(taskMigrated);
@ -639,7 +639,7 @@ public class StreamThread extends Thread { @@ -639,7 +639,7 @@ public class StreamThread extends Thread {
if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
log.warn("Detected that shutdown was requested. " +
"All clients in this app will now begin to shutdown");
mainConsumer.enforceRebalance("Shutdown requested.");
mainConsumer.enforceRebalance();
}
}

8
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

@ -481,13 +481,10 @@ public class StreamThreadTest { @@ -481,13 +481,10 @@ public class StreamThreadTest {
expect(mockConsumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
EasyMock.replay(consumerGroupMetadata);
final EasyMockConsumerClientSupplier mockClientSupplier = new EasyMockConsumerClientSupplier(mockConsumer);
mockClientSupplier.setCluster(createCluster());
mockConsumer.enforceRebalance("Scheduled probing rebalance.");
mockClientSupplier.setCluster(createCluster());
EasyMock.replay(mockConsumer);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
final StreamThread thread = StreamThread.create(
@ -508,6 +505,7 @@ public class StreamThreadTest { @@ -508,6 +505,7 @@ public class StreamThreadTest {
null
);
mockConsumer.enforceRebalance();
mockClientSupplier.nextRebalanceMs().set(mockTime.milliseconds() - 1L);
@ -2359,7 +2357,7 @@ public class StreamThreadTest { @@ -2359,7 +2357,7 @@ public class StreamThreadTest {
expect(task2.id()).andReturn(taskId2).anyTimes();
expect(taskManager.handleCorruption(corruptedTasks)).andReturn(true);
consumer.enforceRebalance("Active tasks corrupted.");
consumer.enforceRebalance();
expectLastCall();
EasyMock.replay(task1, task2, taskManager, consumer);

Loading…
Cancel
Save