Browse Source

MINOR: Revert log level changes in LogCaptureAppender (#14436)

LogCaptureAppender sets the log level in various tests to check if a certain log message is produced. The log level is however never reverted, changing the log level across the board and introducing flakiness due to non-determinism since the log level depends on execution order. Some log messages change the timing inside tests significantly.

Reviewer: Bruno Cadonna <cadonna@apache.org>
pull/14454/merge
Lucas Brutschy 1 year ago committed by GitHub
parent
commit
9c2e5daf60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java
  2. 2
      connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
  3. 2
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
  4. 4
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
  5. 6
      streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
  6. 2
      streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
  7. 14
      streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
  8. 2
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
  9. 2
      streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

24
clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java

@ -27,6 +27,20 @@ import java.util.Optional; @@ -27,6 +27,20 @@ import java.util.Optional;
public class LogCaptureAppender extends AppenderSkeleton implements AutoCloseable {
private final List<LoggingEvent> events = new LinkedList<>();
private final List<LogLevelChange> logLevelChanges = new LinkedList<>();
public static class LogLevelChange {
public LogLevelChange(final Level originalLevel, final Class<?> clazz) {
this.originalLevel = originalLevel;
this.clazz = clazz;
}
private final Level originalLevel;
private final Class<?> clazz;
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public static class Event {
@ -65,11 +79,13 @@ public class LogCaptureAppender extends AppenderSkeleton implements AutoCloseabl @@ -65,11 +79,13 @@ public class LogCaptureAppender extends AppenderSkeleton implements AutoCloseabl
return logCaptureAppender;
}
public static void setClassLoggerToDebug(final Class<?> clazz) {
public void setClassLoggerToDebug(final Class<?> clazz) {
logLevelChanges.add(new LogLevelChange(Logger.getLogger(clazz).getLevel(), clazz));
Logger.getLogger(clazz).setLevel(Level.DEBUG);
}
public static void setClassLoggerToTrace(final Class<?> clazz) {
public void setClassLoggerToTrace(final Class<?> clazz) {
logLevelChanges.add(new LogLevelChange(Logger.getLogger(clazz).getLevel(), clazz));
Logger.getLogger(clazz).setLevel(Level.TRACE);
}
@ -120,6 +136,10 @@ public class LogCaptureAppender extends AppenderSkeleton implements AutoCloseabl @@ -120,6 +136,10 @@ public class LogCaptureAppender extends AppenderSkeleton implements AutoCloseabl
@Override
public void close() {
for (final LogLevelChange logLevelChange : logLevelChanges) {
Logger.getLogger(logLevelChange.clazz).setLevel(logLevelChange.originalLevel);
}
logLevelChanges.clear();
unregister(this);
}

2
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java

@ -197,7 +197,7 @@ public class MirrorSourceConnectorTest { @@ -197,7 +197,7 @@ public class MirrorSourceConnectorTest {
when(sourceAdmin.describeAcls(any())).thenReturn(describeAclsResult);
try (LogCaptureAppender connectorLogs = LogCaptureAppender.createAndRegister(MirrorSourceConnector.class)) {
LogCaptureAppender.setClassLoggerToTrace(MirrorSourceConnector.class);
connectorLogs.setClassLoggerToTrace(MirrorSourceConnector.class);
connector.syncTopicAcls();
long aclSyncDisableMessages = connectorLogs.getMessages().stream()
.filter(m -> m.contains("Consider disabling topic ACL syncing"))

2
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java

@ -142,7 +142,7 @@ public class SourceTaskOffsetCommitterTest { @@ -142,7 +142,7 @@ public class SourceTaskOffsetCommitterTest {
committers.put(taskId, taskFuture);
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class)) {
LogCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class);
logCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class);
committer.remove(taskId);
assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("TRACE")));
}

4
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java

@ -990,10 +990,10 @@ public class WorkerSourceTaskTest { @@ -990,10 +990,10 @@ public class WorkerSourceTaskTest {
private void assertShouldSkipCommit() {
assertFalse(workerTask.shouldCommitOffsets());
LogCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class);
LogCaptureAppender.setClassLoggerToTrace(WorkerSourceTask.class);
try (LogCaptureAppender committerAppender = LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class);
LogCaptureAppender taskAppender = LogCaptureAppender.createAndRegister(WorkerSourceTask.class)) {
committerAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class);
taskAppender.setClassLoggerToTrace(WorkerSourceTask.class);
SourceTaskOffsetCommitter.commit(workerTask);
assertEquals(Collections.emptyList(), taskAppender.getMessages());

6
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java

@ -1065,8 +1065,8 @@ public class StreamsConfigTest { @@ -1065,8 +1065,8 @@ public class StreamsConfigTest {
public void shouldLogWarningWhenEosAlphaIsUsed() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
appender.setClassLoggerToDebug(StreamsConfig.class);
new StreamsConfig(props);
assertThat(
@ -1085,8 +1085,8 @@ public class StreamsConfigTest { @@ -1085,8 +1085,8 @@ public class StreamsConfigTest {
public void shouldLogWarningWhenEosBetaIsUsed() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA);
LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
appender.setClassLoggerToDebug(StreamsConfig.class);
new StreamsConfig(props);
assertThat(
@ -1103,8 +1103,8 @@ public class StreamsConfigTest { @@ -1103,8 +1103,8 @@ public class StreamsConfigTest {
public void shouldLogWarningWhenRetriesIsUsed() {
props.put(StreamsConfig.RETRIES_CONFIG, 0);
LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
appender.setClassLoggerToDebug(StreamsConfig.class);
new StreamsConfig(props);
assertThat(

2
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java

@ -880,8 +880,8 @@ public class InternalTopicManagerTest { @@ -880,8 +880,8 @@ public class InternalTopicManagerTest {
topicConfigMap.put(topic1, internalTopicConfig);
topicConfigMap.put("internal-topic", internalTopicConfigII);
LogCaptureAppender.setClassLoggerToDebug(InternalTopicManager.class);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(InternalTopicManager.class)) {
appender.setClassLoggerToDebug(InternalTopicManager.class);
internalTopicManager.makeReady(topicConfigMap);
assertThat(

14
streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java

@ -564,7 +564,7 @@ public class PartitionGroupTest { @@ -564,7 +564,7 @@ public class PartitionGroupTest {
assertThat(group.allPartitionsBufferedLocally(), is(false));
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
appender.setClassLoggerToTrace(PartitionGroup.class);
assertThat(group.readyToProcess(0L), is(true));
assertThat(
appender.getEvents(),
@ -607,7 +607,7 @@ public class PartitionGroupTest { @@ -607,7 +607,7 @@ public class PartitionGroupTest {
assertThat(group.allPartitionsBufferedLocally(), is(true));
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
appender.setClassLoggerToTrace(PartitionGroup.class);
assertThat(group.readyToProcess(0L), is(true));
assertThat(
appender.getEvents(),
@ -641,7 +641,7 @@ public class PartitionGroupTest { @@ -641,7 +641,7 @@ public class PartitionGroupTest {
assertThat(group.allPartitionsBufferedLocally(), is(false));
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
appender.setClassLoggerToTrace(PartitionGroup.class);
assertThat(group.readyToProcess(0L), is(false));
assertThat(
appender.getEvents(),
@ -680,7 +680,7 @@ public class PartitionGroupTest { @@ -680,7 +680,7 @@ public class PartitionGroupTest {
assertThat(group.allPartitionsBufferedLocally(), is(false));
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
appender.setClassLoggerToTrace(PartitionGroup.class);
assertThat(group.readyToProcess(0L), is(false));
assertThat(
appender.getEvents(),
@ -714,7 +714,7 @@ public class PartitionGroupTest { @@ -714,7 +714,7 @@ public class PartitionGroupTest {
assertThat(group.allPartitionsBufferedLocally(), is(false));
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
appender.setClassLoggerToTrace(PartitionGroup.class);
assertThat(group.readyToProcess(0L), is(false));
assertThat(
appender.getEvents(),
@ -726,7 +726,7 @@ public class PartitionGroupTest { @@ -726,7 +726,7 @@ public class PartitionGroupTest {
}
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
appender.setClassLoggerToTrace(PartitionGroup.class);
assertThat(group.readyToProcess(1L), is(true));
assertThat(
appender.getEvents(),
@ -745,7 +745,7 @@ public class PartitionGroupTest { @@ -745,7 +745,7 @@ public class PartitionGroupTest {
}
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
appender.setClassLoggerToTrace(PartitionGroup.class);
assertThat(group.readyToProcess(2L), is(true));
assertThat(
appender.getEvents(),

2
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java

@ -1303,8 +1303,8 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @@ -1303,8 +1303,8 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldNotThrowOnUnknownRevokedPartition() {
LogCaptureAppender.setClassLoggerToDebug(StoreChangelogReader.class);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StoreChangelogReader.class)) {
appender.setClassLoggerToDebug(StoreChangelogReader.class);
changelogReader.unregister(Collections.singletonList(new TopicPartition("unknown", 0)));
assertThat(

2
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

@ -4155,7 +4155,7 @@ public class TaskManagerTest { @@ -4155,7 +4155,7 @@ public class TaskManagerTest {
replay(consumer);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(TaskManager.class)) {
LogCaptureAppender.setClassLoggerToDebug(TaskManager.class);
appender.setClassLoggerToDebug(TaskManager.class);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));

Loading…
Cancel
Save