Browse Source

MINOR: Catch Throwable in commitSourceTask()

Author: Liquan Pei <liquanpei@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1402 from Ishiihara/source-task-commit-record
pull/1402/merge
Liquan Pei 9 years ago committed by Ewen Cheslack-Postava
parent
commit
9ff54cb5dd
  1. 6
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
  2. 79
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java

6
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java

@ -243,6 +243,8 @@ class WorkerSourceTask extends WorkerTask { @@ -243,6 +243,8 @@ class WorkerSourceTask extends WorkerTask {
task.commitRecord(record);
} catch (InterruptedException e) {
log.error("Exception thrown", e);
} catch (Throwable t) {
log.error("Exception thrown while calling task.commitRecord()", t);
}
}
@ -366,8 +368,8 @@ class WorkerSourceTask extends WorkerTask { @@ -366,8 +368,8 @@ class WorkerSourceTask extends WorkerTask {
this.task.commit();
} catch (InterruptedException ex) {
log.warn("Commit interrupted", ex);
} catch (Throwable ex) {
log.error("Exception thrown while calling task.commit()", ex);
} catch (Throwable t) {
log.error("Exception thrown while calling task.commit()", t);
}
}

79
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java

@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; @@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
@ -52,6 +53,7 @@ import java.util.Collections; @@ -52,6 +53,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -66,6 +68,7 @@ import static org.junit.Assert.assertTrue; @@ -66,6 +68,7 @@ import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
public class WorkerSourceTaskTest extends ThreadedTest {
private final Random random = new Random();
private static final String TOPIC = "topic";
private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes());
private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12);
@ -197,7 +200,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { @@ -197,7 +200,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
workerTask.initialize(TASK_CONFIG);
executor.submit(workerTask);
awaitPolls(pollLatch);
awaitLatch(pollLatch);
workerTask.transitionTo(TargetState.PAUSED);
@ -238,7 +241,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { @@ -238,7 +241,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
workerTask.initialize(TASK_CONFIG);
executor.submit(workerTask);
awaitPolls(pollLatch);
awaitLatch(pollLatch);
workerTask.stop();
assertTrue(workerTask.awaitStop(1000));
@ -271,7 +274,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { @@ -271,7 +274,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
workerTask.initialize(TASK_CONFIG);
executor.submit(workerTask);
awaitPolls(pollLatch);
awaitLatch(pollLatch);
workerTask.stop();
assertTrue(workerTask.awaitStop(1000));
@ -306,7 +309,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { @@ -306,7 +309,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
workerTask.initialize(TASK_CONFIG);
executor.submit(workerTask);
awaitPolls(pollLatch);
awaitLatch(pollLatch);
assertTrue(workerTask.commitOffsets());
workerTask.stop();
assertTrue(workerTask.awaitStop(1000));
@ -341,7 +344,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { @@ -341,7 +344,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
workerTask.initialize(TASK_CONFIG);
executor.submit(workerTask);
awaitPolls(pollLatch);
awaitLatch(pollLatch);
assertTrue(workerTask.commitOffsets());
workerTask.stop();
assertTrue(workerTask.awaitStop(1000));
@ -403,6 +406,30 @@ public class WorkerSourceTaskTest extends ThreadedTest { @@ -403,6 +406,30 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.verifyAll();
}
@Test
public void testSendRecordsTaskCommitRecordFail() throws Exception {
createWorkerTask();
// Differentiate only by Kafka partition so we can reuse conversion expectations
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, "topic", 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
// Source task commit record failure will not cause the task to abort
expectSendRecordOnce(false);
expectSendRecordTaskCommitRecordFail(false, false);
expectSendRecordOnce(false);
PowerMock.replayAll();
Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3));
Whitebox.invokeMethod(workerTask, "sendRecords");
assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed"));
assertNull(Whitebox.getInternalState(workerTask, "toSend"));
PowerMock.verifyAll();
}
@Test
public void testSlowTaskStart() throws Exception {
final CountDownLatch startupLatch = new CountDownLatch(1);
@ -435,7 +462,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { @@ -435,7 +462,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
// Stopping immediately while the other thread has work to do should result in no polling, no offset commits,
// exiting the work thread immediately, and the stop() method will be invoked in the background thread since it
// cannot be invoked immediately in the thread trying to stop the task.
startupLatch.await(1000, TimeUnit.MILLISECONDS);
awaitLatch(startupLatch);
workerTask.stop();
assertTrue(workerTask.awaitStop(1000));
@ -479,14 +506,22 @@ public class WorkerSourceTaskTest extends ThreadedTest { @@ -479,14 +506,22 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() throws InterruptedException {
return expectSendRecord(true, false);
return expectSendRecordTaskCommitRecordSucceed(true, false);
}
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordOnce(boolean isRetry) throws InterruptedException {
return expectSendRecord(false, isRetry);
return expectSendRecordTaskCommitRecordSucceed(false, isRetry);
}
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordSucceed(boolean anyTimes, boolean isRetry) throws InterruptedException {
return expectSendRecord(anyTimes, isRetry, true);
}
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(boolean anyTimes, boolean isRetry) throws InterruptedException {
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordFail(boolean anyTimes, boolean isRetry) throws InterruptedException {
return expectSendRecord(anyTimes, isRetry, false);
}
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(boolean anyTimes, boolean isRetry, boolean succeed) throws InterruptedException {
expectConvertKeyValue(anyTimes);
Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
@ -523,11 +558,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { @@ -523,11 +558,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
expect.andAnswer(expectResponse);
// 3. As a result of a successful producer send callback, we'll notify the source task of the record commit
sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class));
if (anyTimes)
EasyMock.expectLastCall().anyTimes();
else
EasyMock.expectLastCall();
expectTaskCommitRecord(anyTimes, succeed);
return sent;
}
@ -545,8 +576,24 @@ public class WorkerSourceTaskTest extends ThreadedTest { @@ -545,8 +576,24 @@ public class WorkerSourceTaskTest extends ThreadedTest {
convertValueExpect.andReturn(SERIALIZED_RECORD);
}
private boolean awaitPolls(CountDownLatch latch) throws InterruptedException {
return latch.await(1000, TimeUnit.MILLISECONDS);
private void expectTaskCommitRecord(boolean anyTimes, boolean succeed) throws InterruptedException {
sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class));
IExpectationSetters<Void> expect = EasyMock.expectLastCall();
if (!succeed) {
expect = expect.andThrow(new InterruptException("Error committing record in source task"));
}
if (anyTimes) {
expect.anyTimes();
}
}
private boolean awaitLatch(CountDownLatch latch) {
try {
return latch.await(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore
}
return false;
}
@SuppressWarnings("unchecked")

Loading…
Cancel
Save