Browse Source

KAFKA-14132: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest (#14153)

Reviewers: Yash Mayya <yash.mayya@gmail.com>, Divij Vaidya <diviv@amazon.com>
pull/14194/head
bachmanity1 1 year ago committed by GitHub
parent
commit
f137da04fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      build.gradle
  2. 4
      connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
  3. 233
      connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java

2
build.gradle

@ -416,7 +416,7 @@ subprojects { @@ -416,7 +416,7 @@ subprojects {
testsToExclude.addAll([
// connect tests
"**/KafkaConfigBackingStoreTest.*",
"**/KafkaBasedLogTest.*", "**/StandaloneHerderTest.*",
"**/StandaloneHerderTest.*",
"**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*"
])
}

4
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java

@ -102,8 +102,8 @@ public class KafkaBasedLog<K, V> { @@ -102,8 +102,8 @@ public class KafkaBasedLog<K, V> {
private Consumer<K, V> consumer;
private Optional<Producer<K, V>> producer;
private TopicAdmin admin;
private Thread thread;
// Visible for testing
Thread thread;
private boolean stopRequested;
private final Queue<Callback<Void>> readLogEndOffsetCallbacks;
private final java.util.function.Consumer<TopicAdmin> initializer;

233
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java

@ -36,21 +36,15 @@ import org.apache.kafka.common.errors.WakeupException; @@ -36,21 +36,15 @@ import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.MockTime;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.apache.kafka.common.utils.Time;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import java.nio.ByteBuffer;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -64,6 +58,7 @@ import java.util.concurrent.TimeUnit; @@ -64,6 +58,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
@ -72,10 +67,14 @@ import static org.junit.Assert.assertNotNull; @@ -72,10 +67,14 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
@PrepareForTest(KafkaBasedLog.class)
@PowerMockIgnore("javax.management.*")
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class KafkaBasedLogTest {
private static final String TOPIC = "connect-log";
@ -114,29 +113,37 @@ public class KafkaBasedLogTest { @@ -114,29 +113,37 @@ public class KafkaBasedLogTest {
private static final String TP0_VALUE_NEW = "VAL0_NEW";
private static final String TP1_VALUE_NEW = "VAL1_NEW";
private Time time = new MockTime();
private final Time time = new MockTime();
private KafkaBasedLog<String, String> store;
@Mock
private Runnable initializer;
private Consumer<TopicAdmin> initializer;
@Mock
private KafkaProducer<String, String> producer;
private MockConsumer<String, String> consumer;
@Mock
private TopicAdmin admin;
private final Supplier<TopicAdmin> topicAdminSupplier = () -> admin;
private MockConsumer<String, String> consumer;
private Map<TopicPartition, List<ConsumerRecord<String, String>>> consumedRecords = new HashMap<>();
private Callback<ConsumerRecord<String, String>> consumedCallback = (error, record) -> {
private final Map<TopicPartition, List<ConsumerRecord<String, String>>> consumedRecords = new HashMap<>();
private final Callback<ConsumerRecord<String, String>> consumedCallback = (error, record) -> {
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
List<ConsumerRecord<String, String>> records = consumedRecords.computeIfAbsent(partition, k -> new ArrayList<>());
records.add(record);
};
@SuppressWarnings("unchecked")
@Before
public void setUp() {
store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"},
TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, initializer);
store = new KafkaBasedLog<String, String>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, topicAdminSupplier, consumedCallback, time, initializer) {
@Override
protected KafkaProducer<String, String> createProducer() {
return producer;
}
@Override
protected MockConsumer<String, String> createConsumer() {
return consumer;
}
};
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1));
Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
@ -146,12 +153,7 @@ public class KafkaBasedLogTest { @@ -146,12 +153,7 @@ public class KafkaBasedLogTest {
}
@Test
public void testStartStop() throws Exception {
expectStart();
expectStop();
PowerMock.replayAll();
public void testStartStop() {
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
endOffsets.put(TP1, 0L);
@ -160,19 +162,11 @@ public class KafkaBasedLogTest { @@ -160,19 +162,11 @@ public class KafkaBasedLogTest {
assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
store.stop();
assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
assertTrue(consumer.closed());
PowerMock.verifyAll();
verifyStartAndStop();
}
@Test
public void testReloadOnStart() throws Exception {
expectStart();
expectStop();
PowerMock.replayAll();
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 1L);
endOffsets.put(TP1, 1L);
@ -206,19 +200,11 @@ public class KafkaBasedLogTest { @@ -206,19 +200,11 @@ public class KafkaBasedLogTest {
assertEquals(TP1_VALUE, consumedRecords.get(TP1).get(0).value());
store.stop();
assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
assertTrue(consumer.closed());
PowerMock.verifyAll();
verifyStartAndStop();
}
@Test
public void testReloadOnStartWithNoNewRecordsPresent() throws Exception {
expectStart();
expectStop();
PowerMock.replayAll();
public void testReloadOnStartWithNoNewRecordsPresent() {
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 7L);
endOffsets.put(TP1, 7L);
@ -241,30 +227,19 @@ public class KafkaBasedLogTest { @@ -241,30 +227,19 @@ public class KafkaBasedLogTest {
store.stop();
assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
assertTrue(consumer.closed());
PowerMock.verifyAll();
verifyStartAndStop();
}
@Test
public void testSendAndReadToEnd() throws Exception {
expectStart();
TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE);
Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
EasyMock.expect(producer.send(EasyMock.eq(tp0Record), EasyMock.capture(callback0))).andReturn(tp0Future);
ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback0 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
when(producer.send(eq(tp0Record), callback0.capture())).thenReturn(tp0Future);
TestFuture<RecordMetadata> tp1Future = new TestFuture<>();
ProducerRecord<String, String> tp1Record = new ProducerRecord<>(TOPIC, TP1_KEY, TP1_VALUE);
Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
EasyMock.expect(producer.send(EasyMock.eq(tp1Record), EasyMock.capture(callback1))).andReturn(tp1Future);
// Producer flushes when read to log end is called
producer.flush();
PowerMock.expectLastCall();
expectStop();
PowerMock.replayAll();
ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback1 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
when(producer.send(eq(tp1Record), callback1.capture())).thenReturn(tp1Future);
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
@ -335,18 +310,13 @@ public class KafkaBasedLogTest { @@ -335,18 +310,13 @@ public class KafkaBasedLogTest {
// Cleanup
store.stop();
assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
assertTrue(consumer.closed());
PowerMock.verifyAll();
// Producer flushes when read to log end is called
verify(producer).flush();
verifyStartAndStop();
}
@Test
public void testPollConsumerError() throws Exception {
expectStart();
expectStop();
PowerMock.replayAll();
final CountDownLatch finishedLatch = new CountDownLatch(1);
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 1L);
@ -376,22 +346,11 @@ public class KafkaBasedLogTest { @@ -376,22 +346,11 @@ public class KafkaBasedLogTest {
store.stop();
assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
assertTrue(consumer.closed());
PowerMock.verifyAll();
verifyStartAndStop();
}
@Test
public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception {
expectStart();
// Producer flushes when read to log end is called
producer.flush();
PowerMock.expectLastCall();
expectStop();
PowerMock.replayAll();
final CountDownLatch finishedLatch = new CountDownLatch(1);
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
@ -433,22 +392,17 @@ public class KafkaBasedLogTest { @@ -433,22 +392,17 @@ public class KafkaBasedLogTest {
store.stop();
assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
assertTrue(consumer.closed());
PowerMock.verifyAll();
// Producer flushes when read to log end is called
verify(producer).flush();
verifyStartAndStop();
}
@Test
public void testProducerError() throws Exception {
expectStart();
public void testProducerError() {
TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE);
Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
EasyMock.expect(producer.send(EasyMock.eq(tp0Record), EasyMock.capture(callback0))).andReturn(tp0Future);
expectStop();
PowerMock.replayAll();
ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback0 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
when(producer.send(eq(tp0Record), callback0.capture())).thenReturn(tp0Future);
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
@ -471,42 +425,31 @@ public class KafkaBasedLogTest { @@ -471,42 +425,31 @@ public class KafkaBasedLogTest {
store.stop();
assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
assertTrue(consumer.closed());
PowerMock.verifyAll();
verifyStartAndStop();
}
@Test
public void testReadEndOffsetsUsingAdmin() throws Exception {
// Create a log that uses the admin supplier
setupWithAdmin();
expectProducerAndConsumerCreate();
public void testReadEndOffsetsUsingAdmin() {
Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
endOffsets.put(TP1, 0L);
admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong());
PowerMock.expectLastCall().andReturn(endOffsets).times(1);
admin.endOffsets(EasyMock.eq(tps));
PowerMock.expectLastCall().andReturn(endOffsets).times(1);
PowerMock.replayAll();
admin = mock(TopicAdmin.class);
when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenReturn(endOffsets);
when(admin.endOffsets(eq(tps))).thenReturn(endOffsets);
store.start();
assertEquals(endOffsets, store.readEndOffsets(tps, false));
verify(admin).retryEndOffsets(eq(tps), any(), anyLong());
verify(admin).endOffsets(eq(tps));
}
@Test
public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() throws Exception {
// Create a log that uses the admin supplier
setupWithAdmin();
expectProducerAndConsumerCreate();
public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() {
Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
admin = mock(TopicAdmin.class);
// Getting end offsets using the admin client should fail with unsupported version
admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong());
PowerMock.expectLastCall().andThrow(new UnsupportedVersionException("too old"));
when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenThrow(new UnsupportedVersionException("too old"));
// Falls back to the consumer
Map<TopicPartition, Long> endOffsets = new HashMap<>();
@ -514,65 +457,33 @@ public class KafkaBasedLogTest { @@ -514,65 +457,33 @@ public class KafkaBasedLogTest {
endOffsets.put(TP1, 0L);
consumer.updateEndOffsets(endOffsets);
PowerMock.replayAll();
store.start();
assertEquals(endOffsets, store.readEndOffsets(tps, false));
verify(admin).retryEndOffsets(eq(tps), any(), anyLong());
}
@Test
public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exception {
// Create a log that uses the admin supplier
setupWithAdmin();
expectProducerAndConsumerCreate();
public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() {
Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
endOffsets.put(TP1, 0L);
admin = mock(TopicAdmin.class);
// Getting end offsets upon startup should work fine
admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong());
PowerMock.expectLastCall().andReturn(endOffsets).times(1);
when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenReturn(endOffsets);
// Getting end offsets using the admin client should fail with leader not available
admin.endOffsets(EasyMock.eq(tps));
PowerMock.expectLastCall().andThrow(new LeaderNotAvailableException("retry"));
PowerMock.replayAll();
when(admin.endOffsets(eq(tps))).thenThrow(new LeaderNotAvailableException("retry"));
store.start();
assertThrows(LeaderNotAvailableException.class, () -> store.readEndOffsets(tps, false));
verify(admin).retryEndOffsets(eq(tps), any(), anyLong());
verify(admin).endOffsets(eq(tps));
}
@SuppressWarnings("unchecked")
private void setupWithAdmin() {
Supplier<TopicAdmin> adminSupplier = () -> admin;
java.util.function.Consumer<TopicAdmin> initializer = admin -> { };
store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"},
TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer);
}
private void expectProducerAndConsumerCreate() throws Exception {
PowerMock.expectPrivate(store, "createProducer")
.andReturn(producer);
PowerMock.expectPrivate(store, "createConsumer")
.andReturn(consumer);
}
private void expectStart() throws Exception {
initializer.run();
EasyMock.expectLastCall().times(1);
expectProducerAndConsumerCreate();
}
private void expectStop() {
producer.close();
PowerMock.expectLastCall();
// MockConsumer close is checked after test.
}
private static ByteBuffer buffer(String v) {
return ByteBuffer.wrap(v.getBytes());
private void verifyStartAndStop() {
verify(initializer).accept(admin);
verify(producer).close();
assertTrue(consumer.closed());
assertFalse(store.thread.isAlive());
}
}

Loading…
Cancel
Save