diff --git a/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java b/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java index 0d569af30a5..4f035840bd2 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java @@ -27,6 +27,20 @@ import java.util.Optional; public class LogCaptureAppender extends AppenderSkeleton implements AutoCloseable { private final List events = new LinkedList<>(); + private final List logLevelChanges = new LinkedList<>(); + + public static class LogLevelChange { + + public LogLevelChange(final Level originalLevel, final Class clazz) { + this.originalLevel = originalLevel; + this.clazz = clazz; + } + + private final Level originalLevel; + + private final Class clazz; + + } @SuppressWarnings("OptionalUsedAsFieldOrParameterType") public static class Event { @@ -65,11 +79,13 @@ public class LogCaptureAppender extends AppenderSkeleton implements AutoCloseabl return logCaptureAppender; } - public static void setClassLoggerToDebug(final Class clazz) { + public void setClassLoggerToDebug(final Class clazz) { + logLevelChanges.add(new LogLevelChange(Logger.getLogger(clazz).getLevel(), clazz)); Logger.getLogger(clazz).setLevel(Level.DEBUG); } - public static void setClassLoggerToTrace(final Class clazz) { + public void setClassLoggerToTrace(final Class clazz) { + logLevelChanges.add(new LogLevelChange(Logger.getLogger(clazz).getLevel(), clazz)); Logger.getLogger(clazz).setLevel(Level.TRACE); } @@ -120,6 +136,10 @@ public class LogCaptureAppender extends AppenderSkeleton implements AutoCloseabl @Override public void close() { + for (final LogLevelChange logLevelChange : logLevelChanges) { + Logger.getLogger(logLevelChange.clazz).setLevel(logLevelChange.originalLevel); + } + logLevelChanges.clear(); unregister(this); } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java index bc56d3ee8ea..04cd08000f9 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java @@ -197,7 +197,7 @@ public class MirrorSourceConnectorTest { when(sourceAdmin.describeAcls(any())).thenReturn(describeAclsResult); try (LogCaptureAppender connectorLogs = LogCaptureAppender.createAndRegister(MirrorSourceConnector.class)) { - LogCaptureAppender.setClassLoggerToTrace(MirrorSourceConnector.class); + connectorLogs.setClassLoggerToTrace(MirrorSourceConnector.class); connector.syncTopicAcls(); long aclSyncDisableMessages = connectorLogs.getMessages().stream() .filter(m -> m.contains("Consider disabling topic ACL syncing")) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java index 6c3a85531cd..ef962cf5b44 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java @@ -142,7 +142,7 @@ public class SourceTaskOffsetCommitterTest { committers.put(taskId, taskFuture); try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class)) { - LogCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class); + logCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class); committer.remove(taskId); assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("TRACE"))); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 719dc10d5b1..dd347faca89 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -990,10 +990,10 @@ public class WorkerSourceTaskTest { private void assertShouldSkipCommit() { assertFalse(workerTask.shouldCommitOffsets()); - LogCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class); - LogCaptureAppender.setClassLoggerToTrace(WorkerSourceTask.class); try (LogCaptureAppender committerAppender = LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class); LogCaptureAppender taskAppender = LogCaptureAppender.createAndRegister(WorkerSourceTask.class)) { + committerAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class); + taskAppender.setClassLoggerToTrace(WorkerSourceTask.class); SourceTaskOffsetCommitter.commit(workerTask); assertEquals(Collections.emptyList(), taskAppender.getMessages()); diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 4223294c68a..35d7b3e797b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -1065,8 +1065,8 @@ public class StreamsConfigTest { public void shouldLogWarningWhenEosAlphaIsUsed() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + appender.setClassLoggerToDebug(StreamsConfig.class); new StreamsConfig(props); assertThat( @@ -1085,8 +1085,8 @@ public class StreamsConfigTest { public void shouldLogWarningWhenEosBetaIsUsed() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA); - LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + appender.setClassLoggerToDebug(StreamsConfig.class); new StreamsConfig(props); assertThat( @@ -1103,8 +1103,8 @@ public class StreamsConfigTest { public void shouldLogWarningWhenRetriesIsUsed() { props.put(StreamsConfig.RETRIES_CONFIG, 0); - LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + appender.setClassLoggerToDebug(StreamsConfig.class); new StreamsConfig(props); assertThat( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java index a81977d620a..22ab4971a4a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java @@ -880,8 +880,8 @@ public class InternalTopicManagerTest { topicConfigMap.put(topic1, internalTopicConfig); topicConfigMap.put("internal-topic", internalTopicConfigII); - LogCaptureAppender.setClassLoggerToDebug(InternalTopicManager.class); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(InternalTopicManager.class)) { + appender.setClassLoggerToDebug(InternalTopicManager.class); internalTopicManager.makeReady(topicConfigMap); assertThat( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 599068c0f3c..87ac1b762f2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -564,7 +564,7 @@ public class PartitionGroupTest { assertThat(group.allPartitionsBufferedLocally(), is(false)); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) { - LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class); + appender.setClassLoggerToTrace(PartitionGroup.class); assertThat(group.readyToProcess(0L), is(true)); assertThat( appender.getEvents(), @@ -607,7 +607,7 @@ public class PartitionGroupTest { assertThat(group.allPartitionsBufferedLocally(), is(true)); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) { - LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class); + appender.setClassLoggerToTrace(PartitionGroup.class); assertThat(group.readyToProcess(0L), is(true)); assertThat( appender.getEvents(), @@ -641,7 +641,7 @@ public class PartitionGroupTest { assertThat(group.allPartitionsBufferedLocally(), is(false)); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) { - LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class); + appender.setClassLoggerToTrace(PartitionGroup.class); assertThat(group.readyToProcess(0L), is(false)); assertThat( appender.getEvents(), @@ -680,7 +680,7 @@ public class PartitionGroupTest { assertThat(group.allPartitionsBufferedLocally(), is(false)); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) { - LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class); + appender.setClassLoggerToTrace(PartitionGroup.class); assertThat(group.readyToProcess(0L), is(false)); assertThat( appender.getEvents(), @@ -714,7 +714,7 @@ public class PartitionGroupTest { assertThat(group.allPartitionsBufferedLocally(), is(false)); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) { - LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class); + appender.setClassLoggerToTrace(PartitionGroup.class); assertThat(group.readyToProcess(0L), is(false)); assertThat( appender.getEvents(), @@ -726,7 +726,7 @@ public class PartitionGroupTest { } try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) { - LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class); + appender.setClassLoggerToTrace(PartitionGroup.class); assertThat(group.readyToProcess(1L), is(true)); assertThat( appender.getEvents(), @@ -745,7 +745,7 @@ public class PartitionGroupTest { } try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) { - LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class); + appender.setClassLoggerToTrace(PartitionGroup.class); assertThat(group.readyToProcess(2L), is(true)); assertThat( appender.getEvents(), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index c764d6ede2a..e544f485f46 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -1303,8 +1303,8 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldNotThrowOnUnknownRevokedPartition() { - LogCaptureAppender.setClassLoggerToDebug(StoreChangelogReader.class); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StoreChangelogReader.class)) { + appender.setClassLoggerToDebug(StoreChangelogReader.class); changelogReader.unregister(Collections.singletonList(new TopicPartition("unknown", 0))); assertThat( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 235e3c50c8f..5ac9198ac84 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -4155,7 +4155,7 @@ public class TaskManagerTest { replay(consumer); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(TaskManager.class)) { - LogCaptureAppender.setClassLoggerToDebug(TaskManager.class); + appender.setClassLoggerToDebug(TaskManager.class); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); assertThat(task00.state(), is(Task.State.RUNNING));