diff --git a/build.gradle b/build.gradle index 7ac8f982d75..c4287726aea 100644 --- a/build.gradle +++ b/build.gradle @@ -416,7 +416,7 @@ subprojects { testsToExclude.addAll([ // connect tests "**/KafkaConfigBackingStoreTest.*", - "**/KafkaBasedLogTest.*", "**/StandaloneHerderTest.*", + "**/StandaloneHerderTest.*", "**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*" ]) } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 046d84923ae..3a9014eda65 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -102,8 +102,8 @@ public class KafkaBasedLog { private Consumer consumer; private Optional> producer; private TopicAdmin admin; - - private Thread thread; + // Visible for testing + Thread thread; private boolean stopRequested; private final Queue> readLogEndOffsetCallbacks; private final java.util.function.Consumer initializer; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java index e1c7e6dd5db..40ad5ba5c62 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -36,21 +36,15 @@ import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.MockTime; -import org.easymock.Capture; -import org.easymock.EasyMock; +import org.apache.kafka.common.utils.Time; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.api.easymock.annotation.Mock; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.powermock.reflect.Whitebox; - -import java.nio.ByteBuffer; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -64,6 +58,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Supplier; import static org.junit.Assert.assertEquals; @@ -72,10 +67,14 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(KafkaBasedLog.class) -@PowerMockIgnore("javax.management.*") +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) public class KafkaBasedLogTest { private static final String TOPIC = "connect-log"; @@ -114,29 +113,37 @@ public class KafkaBasedLogTest { private static final String TP0_VALUE_NEW = "VAL0_NEW"; private static final String TP1_VALUE_NEW = "VAL1_NEW"; - private Time time = new MockTime(); + private final Time time = new MockTime(); private KafkaBasedLog store; @Mock - private Runnable initializer; + private Consumer initializer; @Mock private KafkaProducer producer; - private MockConsumer consumer; - @Mock private TopicAdmin admin; + private final Supplier topicAdminSupplier = () -> admin; + private MockConsumer consumer; - private Map>> consumedRecords = new HashMap<>(); - private Callback> consumedCallback = (error, record) -> { + private final Map>> consumedRecords = new HashMap<>(); + private final Callback> consumedCallback = (error, record) -> { TopicPartition partition = new TopicPartition(record.topic(), record.partition()); List> records = consumedRecords.computeIfAbsent(partition, k -> new ArrayList<>()); records.add(record); }; - @SuppressWarnings("unchecked") @Before public void setUp() { - store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, - TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, initializer); + store = new KafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, topicAdminSupplier, consumedCallback, time, initializer) { + @Override + protected KafkaProducer createProducer() { + return producer; + } + + @Override + protected MockConsumer createConsumer() { + return consumer; + } + }; consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1)); Map beginningOffsets = new HashMap<>(); @@ -146,12 +153,7 @@ public class KafkaBasedLogTest { } @Test - public void testStartStop() throws Exception { - expectStart(); - expectStop(); - - PowerMock.replayAll(); - + public void testStartStop() { Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); @@ -160,19 +162,11 @@ public class KafkaBasedLogTest { assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); store.stop(); - - assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); - PowerMock.verifyAll(); + verifyStartAndStop(); } @Test public void testReloadOnStart() throws Exception { - expectStart(); - expectStop(); - - PowerMock.replayAll(); - Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 1L); endOffsets.put(TP1, 1L); @@ -206,19 +200,11 @@ public class KafkaBasedLogTest { assertEquals(TP1_VALUE, consumedRecords.get(TP1).get(0).value()); store.stop(); - - assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); - PowerMock.verifyAll(); + verifyStartAndStop(); } @Test - public void testReloadOnStartWithNoNewRecordsPresent() throws Exception { - expectStart(); - expectStop(); - - PowerMock.replayAll(); - + public void testReloadOnStartWithNoNewRecordsPresent() { Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 7L); endOffsets.put(TP1, 7L); @@ -241,30 +227,19 @@ public class KafkaBasedLogTest { store.stop(); - assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); - PowerMock.verifyAll(); + verifyStartAndStop(); } @Test public void testSendAndReadToEnd() throws Exception { - expectStart(); TestFuture tp0Future = new TestFuture<>(); ProducerRecord tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE); - Capture callback0 = EasyMock.newCapture(); - EasyMock.expect(producer.send(EasyMock.eq(tp0Record), EasyMock.capture(callback0))).andReturn(tp0Future); + ArgumentCaptor callback0 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class); + when(producer.send(eq(tp0Record), callback0.capture())).thenReturn(tp0Future); TestFuture tp1Future = new TestFuture<>(); ProducerRecord tp1Record = new ProducerRecord<>(TOPIC, TP1_KEY, TP1_VALUE); - Capture callback1 = EasyMock.newCapture(); - EasyMock.expect(producer.send(EasyMock.eq(tp1Record), EasyMock.capture(callback1))).andReturn(tp1Future); - - // Producer flushes when read to log end is called - producer.flush(); - PowerMock.expectLastCall(); - - expectStop(); - - PowerMock.replayAll(); + ArgumentCaptor callback1 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class); + when(producer.send(eq(tp1Record), callback1.capture())).thenReturn(tp1Future); Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); @@ -335,18 +310,13 @@ public class KafkaBasedLogTest { // Cleanup store.stop(); - assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); - PowerMock.verifyAll(); + // Producer flushes when read to log end is called + verify(producer).flush(); + verifyStartAndStop(); } @Test public void testPollConsumerError() throws Exception { - expectStart(); - expectStop(); - - PowerMock.replayAll(); - final CountDownLatch finishedLatch = new CountDownLatch(1); Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 1L); @@ -376,22 +346,11 @@ public class KafkaBasedLogTest { store.stop(); - assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); - PowerMock.verifyAll(); + verifyStartAndStop(); } @Test public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception { - expectStart(); - - // Producer flushes when read to log end is called - producer.flush(); - PowerMock.expectLastCall(); - - expectStop(); - - PowerMock.replayAll(); final CountDownLatch finishedLatch = new CountDownLatch(1); Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); @@ -433,22 +392,17 @@ public class KafkaBasedLogTest { store.stop(); - assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); - PowerMock.verifyAll(); + // Producer flushes when read to log end is called + verify(producer).flush(); + verifyStartAndStop(); } @Test - public void testProducerError() throws Exception { - expectStart(); + public void testProducerError() { TestFuture tp0Future = new TestFuture<>(); ProducerRecord tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE); - Capture callback0 = EasyMock.newCapture(); - EasyMock.expect(producer.send(EasyMock.eq(tp0Record), EasyMock.capture(callback0))).andReturn(tp0Future); - - expectStop(); - - PowerMock.replayAll(); + ArgumentCaptor callback0 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class); + when(producer.send(eq(tp0Record), callback0.capture())).thenReturn(tp0Future); Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); @@ -471,42 +425,31 @@ public class KafkaBasedLogTest { store.stop(); - assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); - PowerMock.verifyAll(); + verifyStartAndStop(); } @Test - public void testReadEndOffsetsUsingAdmin() throws Exception { - // Create a log that uses the admin supplier - setupWithAdmin(); - expectProducerAndConsumerCreate(); - + public void testReadEndOffsetsUsingAdmin() { Set tps = new HashSet<>(Arrays.asList(TP0, TP1)); Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); - admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong()); - PowerMock.expectLastCall().andReturn(endOffsets).times(1); - admin.endOffsets(EasyMock.eq(tps)); - PowerMock.expectLastCall().andReturn(endOffsets).times(1); - - PowerMock.replayAll(); + admin = mock(TopicAdmin.class); + when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenReturn(endOffsets); + when(admin.endOffsets(eq(tps))).thenReturn(endOffsets); store.start(); assertEquals(endOffsets, store.readEndOffsets(tps, false)); + verify(admin).retryEndOffsets(eq(tps), any(), anyLong()); + verify(admin).endOffsets(eq(tps)); } @Test - public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() throws Exception { - // Create a log that uses the admin supplier - setupWithAdmin(); - expectProducerAndConsumerCreate(); - + public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() { Set tps = new HashSet<>(Arrays.asList(TP0, TP1)); + admin = mock(TopicAdmin.class); // Getting end offsets using the admin client should fail with unsupported version - admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong()); - PowerMock.expectLastCall().andThrow(new UnsupportedVersionException("too old")); + when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenThrow(new UnsupportedVersionException("too old")); // Falls back to the consumer Map endOffsets = new HashMap<>(); @@ -514,65 +457,33 @@ public class KafkaBasedLogTest { endOffsets.put(TP1, 0L); consumer.updateEndOffsets(endOffsets); - PowerMock.replayAll(); - store.start(); assertEquals(endOffsets, store.readEndOffsets(tps, false)); + verify(admin).retryEndOffsets(eq(tps), any(), anyLong()); } @Test - public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exception { - // Create a log that uses the admin supplier - setupWithAdmin(); - expectProducerAndConsumerCreate(); - + public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() { Set tps = new HashSet<>(Arrays.asList(TP0, TP1)); Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); + admin = mock(TopicAdmin.class); // Getting end offsets upon startup should work fine - admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong()); - PowerMock.expectLastCall().andReturn(endOffsets).times(1); + when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenReturn(endOffsets); // Getting end offsets using the admin client should fail with leader not available - admin.endOffsets(EasyMock.eq(tps)); - PowerMock.expectLastCall().andThrow(new LeaderNotAvailableException("retry")); - - PowerMock.replayAll(); + when(admin.endOffsets(eq(tps))).thenThrow(new LeaderNotAvailableException("retry")); store.start(); assertThrows(LeaderNotAvailableException.class, () -> store.readEndOffsets(tps, false)); + verify(admin).retryEndOffsets(eq(tps), any(), anyLong()); + verify(admin).endOffsets(eq(tps)); } - @SuppressWarnings("unchecked") - private void setupWithAdmin() { - Supplier adminSupplier = () -> admin; - java.util.function.Consumer initializer = admin -> { }; - store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, - TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); - } - - private void expectProducerAndConsumerCreate() throws Exception { - PowerMock.expectPrivate(store, "createProducer") - .andReturn(producer); - PowerMock.expectPrivate(store, "createConsumer") - .andReturn(consumer); - } - - private void expectStart() throws Exception { - initializer.run(); - EasyMock.expectLastCall().times(1); - - expectProducerAndConsumerCreate(); - } - - private void expectStop() { - producer.close(); - PowerMock.expectLastCall(); - // MockConsumer close is checked after test. - } - - private static ByteBuffer buffer(String v) { - return ByteBuffer.wrap(v.getBytes()); + private void verifyStartAndStop() { + verify(initializer).accept(admin); + verify(producer).close(); + assertTrue(consumer.closed()); + assertFalse(store.thread.isAlive()); } - }