Browse Source

KAFKA-9051: Prematurely complete source offset read requests for stopped tasks (#7532)

Prematurely complete source offset read requests for stopped tasks, and added unit tests.

Author: Chris Egerton <chrise@confluent.io>
Reviewers: Arjun Satish <arjun@confluent.io>, Nigel Liang <nigel@nigelliang.com>, Jinxin Liu <liukrimhim@gmail.com>, Randall Hauch <rhauch@gmail.com>
pull/4237/merge
Chris Egerton 5 years ago committed by Randall Hauch
parent
commit
da4337271e
  1. 4
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
  2. 12
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
  3. 33
      connect/runtime/src/main/java/org/apache/kafka/connect/storage/CloseableOffsetStorageReader.java
  4. 7
      connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
  5. 6
      connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
  6. 8
      connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
  7. 51
      connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
  8. 58
      connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java
  9. 4
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
  10. 18
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
  11. 14
      connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
  12. 54
      connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
  13. 242
      connect/runtime/src/test/java/org/apache/kafka/connect/util/ConvertingFutureCallbackTest.java

4
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java

@ -50,10 +50,10 @@ import org.apache.kafka.connect.sink.SinkRecord; @@ -50,10 +50,10 @@ import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.ConnectorTaskId;
@ -512,7 +512,7 @@ public class Worker { @@ -512,7 +512,7 @@ public class Worker {
retryWithToleranceOperator.reporters(sourceTaskReporters(id, connConfig, errorHandlingMetrics));
TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(connConfig.<SourceRecord>transformations(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);

12
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java

@ -39,9 +39,9 @@ import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; @@ -39,9 +39,9 @@ import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
@ -75,7 +75,7 @@ class WorkerSourceTask extends WorkerTask { @@ -75,7 +75,7 @@ class WorkerSourceTask extends WorkerTask {
private final HeaderConverter headerConverter;
private final TransformationChain<SourceRecord> transformationChain;
private KafkaProducer<byte[], byte[]> producer;
private final OffsetStorageReader offsetReader;
private final CloseableOffsetStorageReader offsetReader;
private final OffsetStorageWriter offsetWriter;
private final Time time;
private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
@ -105,7 +105,7 @@ class WorkerSourceTask extends WorkerTask { @@ -105,7 +105,7 @@ class WorkerSourceTask extends WorkerTask {
HeaderConverter headerConverter,
TransformationChain<SourceRecord> transformationChain,
KafkaProducer<byte[], byte[]> producer,
OffsetStorageReader offsetReader,
CloseableOffsetStorageReader offsetReader,
OffsetStorageWriter offsetWriter,
WorkerConfig workerConfig,
ClusterConfigState configState,
@ -172,6 +172,12 @@ class WorkerSourceTask extends WorkerTask { @@ -172,6 +172,12 @@ class WorkerSourceTask extends WorkerTask {
sourceTaskMetricsGroup.close();
}
@Override
public void cancel() {
super.cancel();
offsetReader.close();
}
@Override
public void stop() {
super.stop();

33
connect/runtime/src/main/java/org/apache/kafka/connect/storage/CloseableOffsetStorageReader.java

@ -0,0 +1,33 @@ @@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.storage;
import java.io.Closeable;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Future;
public interface CloseableOffsetStorageReader extends Closeable, OffsetStorageReader {
/**
* {@link Future#cancel(boolean) Cancel} all outstanding offset read requests, and throw an
* exception in all current and future calls to {@link #offsets(Collection)} and
* {@link #offset(Map)}. This is useful for unblocking task threads which need to shut down but
* are blocked on offset reads.
*/
void close();
}

7
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java

@ -118,9 +118,8 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { @@ -118,9 +118,8 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
}
@Override
public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys,
final Callback<Map<ByteBuffer, ByteBuffer>> callback) {
ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = new ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>>(callback) {
public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys) {
ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = new ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>>() {
@Override
public Map<ByteBuffer, ByteBuffer> convert(Void result) {
Map<ByteBuffer, ByteBuffer> values = new HashMap<>();
@ -230,6 +229,4 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { @@ -230,6 +229,4 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
return null;
}
}
}

6
connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java

@ -75,9 +75,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore { @@ -75,9 +75,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
}
@Override
public Future<Map<ByteBuffer, ByteBuffer>> get(
final Collection<ByteBuffer> keys,
final Callback<Map<ByteBuffer, ByteBuffer>> callback) {
public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys) {
return executor.submit(new Callable<Map<ByteBuffer, ByteBuffer>>() {
@Override
public Map<ByteBuffer, ByteBuffer> call() throws Exception {
@ -85,8 +83,6 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore { @@ -85,8 +83,6 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
for (ByteBuffer key : keys) {
result.put(key, data.get(key));
}
if (callback != null)
callback.onCompletion(null, result);
return result;
}
});

8
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java

@ -53,12 +53,9 @@ public interface OffsetBackingStore { @@ -53,12 +53,9 @@ public interface OffsetBackingStore {
/**
* Get the values for the specified keys
* @param keys list of keys to look up
* @param callback callback to invoke on completion
* @return future for the resulting map from key to value
*/
Future<Map<ByteBuffer, ByteBuffer>> get(
Collection<ByteBuffer> keys,
Callback<Map<ByteBuffer, ByteBuffer>> callback);
Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys);
/**
* Set the specified keys and values.
@ -66,8 +63,7 @@ public interface OffsetBackingStore { @@ -66,8 +63,7 @@ public interface OffsetBackingStore {
* @param callback callback to invoke on completion
* @return void future for the operation
*/
Future<Void> set(Map<ByteBuffer, ByteBuffer> values,
Callback<Void> callback);
Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callback);
/**
* Configure class with the given key-value pairs

51
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java

@ -26,20 +26,27 @@ import java.util.Arrays; @@ -26,20 +26,27 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Implementation of OffsetStorageReader. Unlike OffsetStorageWriter which is implemented
* directly, the interface is only separate from this implementation because it needs to be
* included in the public API package.
*/
public class OffsetStorageReaderImpl implements OffsetStorageReader {
public class OffsetStorageReaderImpl implements CloseableOffsetStorageReader {
private static final Logger log = LoggerFactory.getLogger(OffsetStorageReaderImpl.class);
private final OffsetBackingStore backingStore;
private final String namespace;
private final Converter keyConverter;
private final Converter valueConverter;
private final AtomicBoolean closed;
private final Set<Future<Map<ByteBuffer, ByteBuffer>>> offsetReadFutures;
public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
Converter keyConverter, Converter valueConverter) {
@ -47,6 +54,8 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader { @@ -47,6 +54,8 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
this.namespace = namespace;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.closed = new AtomicBoolean(false);
this.offsetReadFutures = new HashSet<>();
}
@Override
@ -76,7 +85,30 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader { @@ -76,7 +85,30 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
// Get serialized key -> serialized value from backing store
Map<ByteBuffer, ByteBuffer> raw;
try {
raw = backingStore.get(serializedToOriginal.keySet(), null).get();
Future<Map<ByteBuffer, ByteBuffer>> offsetReadFuture;
synchronized (offsetReadFutures) {
if (closed.get()) {
throw new ConnectException(
"Offset reader is closed. This is likely because the task has already been "
+ "scheduled to stop but has taken longer than the graceful shutdown "
+ "period to do so.");
}
offsetReadFuture = backingStore.get(serializedToOriginal.keySet());
offsetReadFutures.add(offsetReadFuture);
}
try {
raw = offsetReadFuture.get();
} catch (CancellationException e) {
throw new ConnectException(
"Offset reader closed while attempting to read offsets. This is likely because "
+ "the task was been scheduled to stop but has taken longer than the "
+ "graceful shutdown period to do so.");
} finally {
synchronized (offsetReadFutures) {
offsetReadFutures.remove(offsetReadFuture);
}
}
} catch (Exception e) {
log.error("Failed to fetch offsets from namespace {}: ", namespace, e);
throw new ConnectException("Failed to fetch offsets.", e);
@ -108,4 +140,19 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader { @@ -108,4 +140,19 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
return result;
}
public void close() {
if (!closed.getAndSet(true)) {
synchronized (offsetReadFutures) {
for (Future<Map<ByteBuffer, ByteBuffer>> offsetReadFuture : offsetReadFutures) {
try {
offsetReadFuture.cancel(true);
} catch (Throwable t) {
log.error("Failed to cancel offset read future", t);
}
}
offsetReadFutures.clear();
}
}
}
}

58
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java

@ -16,6 +16,9 @@ @@ -16,6 +16,9 @@
*/
package org.apache.kafka.connect.util;
import org.apache.kafka.connect.errors.ConnectException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -24,10 +27,15 @@ import java.util.concurrent.TimeoutException; @@ -24,10 +27,15 @@ import java.util.concurrent.TimeoutException;
public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Future<T> {
private Callback<T> underlying;
private CountDownLatch finishedLatch;
private T result = null;
private Throwable exception = null;
private final Callback<T> underlying;
private final CountDownLatch finishedLatch;
private volatile T result = null;
private volatile Throwable exception = null;
private volatile boolean cancelled = false;
public ConvertingFutureCallback() {
this(null);
}
public ConvertingFutureCallback(Callback<T> underlying) {
this.underlying = underlying;
@ -38,21 +46,46 @@ public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Fut @@ -38,21 +46,46 @@ public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Fut
@Override
public void onCompletion(Throwable error, U result) {
this.exception = error;
this.result = convert(result);
if (underlying != null)
underlying.onCompletion(error, this.result);
finishedLatch.countDown();
synchronized (this) {
if (isDone()) {
return;
}
if (error != null) {
this.exception = error;
} else {
this.result = convert(result);
}
if (underlying != null)
underlying.onCompletion(error, this.result);
finishedLatch.countDown();
}
}
@Override
public boolean cancel(boolean b) {
public boolean cancel(boolean mayInterruptIfRunning) {
synchronized (this) {
if (isDone()) {
return false;
}
if (mayInterruptIfRunning) {
this.cancelled = true;
finishedLatch.countDown();
return true;
}
}
try {
finishedLatch.await();
} catch (InterruptedException e) {
throw new ConnectException("Interrupted while waiting for task to complete", e);
}
return false;
}
@Override
public boolean isCancelled() {
return false;
return cancelled;
}
@Override
@ -75,6 +108,9 @@ public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Fut @@ -75,6 +108,9 @@ public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Fut
}
private T result() throws ExecutionException {
if (cancelled) {
throw new CancellationException();
}
if (exception != null) {
throw new ExecutionException(exception);
}

4
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java

@ -46,7 +46,7 @@ import org.apache.kafka.connect.source.SourceRecord; @@ -46,7 +46,7 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@ -127,7 +127,7 @@ public class ErrorHandlingTaskTest { @@ -127,7 +127,7 @@ public class ErrorHandlingTaskTest {
private KafkaProducer<byte[], byte[]> producer;
@Mock
OffsetStorageReader offsetReader;
OffsetStorageReaderImpl offsetReader;
@Mock
OffsetStorageWriter offsetWriter;

18
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java

@ -41,9 +41,9 @@ import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; @@ -41,9 +41,9 @@ import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.Callback;
@ -114,7 +114,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { @@ -114,7 +114,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
@Mock private HeaderConverter headerConverter;
@Mock private TransformationChain<SourceRecord> transformationChain;
@Mock private KafkaProducer<byte[], byte[]> producer;
@Mock private OffsetStorageReader offsetReader;
@Mock private CloseableOffsetStorageReader offsetReader;
@Mock private OffsetStorageWriter offsetWriter;
@Mock private ClusterConfigState clusterConfigState;
private WorkerSourceTask workerTask;
@ -693,6 +693,20 @@ public class WorkerSourceTaskTest extends ThreadedTest { @@ -693,6 +693,20 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.verifyAll();
}
@Test
public void testCancel() {
createWorkerTask();
offsetReader.close();
PowerMock.expectLastCall();
PowerMock.replayAll();
workerTask.cancel();
PowerMock.verifyAll();
}
@Test
public void testMetricsGroup() {
SourceTaskMetricsGroup group = new SourceTaskMetricsGroup(taskId, metrics);

14
connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java

@ -71,12 +71,11 @@ public class FileOffsetBackingStoreTest { @@ -71,12 +71,11 @@ public class FileOffsetBackingStoreTest {
@Test
public void testGetSet() throws Exception {
Callback<Void> setCallback = expectSuccessfulSetCallback();
Callback<Map<ByteBuffer, ByteBuffer>> getCallback = expectSuccessfulGetCallback();
PowerMock.replayAll();
store.set(firstSet, setCallback).get();
Map<ByteBuffer, ByteBuffer> values = store.get(Arrays.asList(buffer("key"), buffer("bad")), getCallback).get();
Map<ByteBuffer, ByteBuffer> values = store.get(Arrays.asList(buffer("key"), buffer("bad"))).get();
assertEquals(buffer("value"), values.get(buffer("key")));
assertEquals(null, values.get(buffer("bad")));
@ -86,7 +85,6 @@ public class FileOffsetBackingStoreTest { @@ -86,7 +85,6 @@ public class FileOffsetBackingStoreTest {
@Test
public void testSaveRestore() throws Exception {
Callback<Void> setCallback = expectSuccessfulSetCallback();
Callback<Map<ByteBuffer, ByteBuffer>> getCallback = expectSuccessfulGetCallback();
PowerMock.replayAll();
store.set(firstSet, setCallback).get();
@ -96,7 +94,7 @@ public class FileOffsetBackingStoreTest { @@ -96,7 +94,7 @@ public class FileOffsetBackingStoreTest {
FileOffsetBackingStore restore = new FileOffsetBackingStore();
restore.configure(config);
restore.start();
Map<ByteBuffer, ByteBuffer> values = restore.get(Arrays.asList(buffer("key")), getCallback).get();
Map<ByteBuffer, ByteBuffer> values = restore.get(Arrays.asList(buffer("key"))).get();
assertEquals(buffer("value"), values.get(buffer("key")));
PowerMock.verifyAll();
@ -113,12 +111,4 @@ public class FileOffsetBackingStoreTest { @@ -113,12 +111,4 @@ public class FileOffsetBackingStoreTest {
PowerMock.expectLastCall();
return setCallback;
}
@SuppressWarnings("unchecked")
private Callback<Map<ByteBuffer, ByteBuffer>> expectSuccessfulGetCallback() {
Callback<Map<ByteBuffer, ByteBuffer>> getCallback = PowerMock.createMock(Callback.class);
getCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.anyObject(Map.class));
PowerMock.expectLastCall();
return getCallback;
}
}

54
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java

@ -217,17 +217,10 @@ public class KafkaOffsetBackingStoreTest { @@ -217,17 +217,10 @@ public class KafkaOffsetBackingStoreTest {
store.start();
// Getting from empty store should return nulls
final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
@Override
public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
// Since we didn't read them yet, these will be null
assertEquals(null, result.get(TP0_KEY));
assertEquals(null, result.get(TP1_KEY));
getInvokedAndPassed.set(true);
}
}).get(10000, TimeUnit.MILLISECONDS);
assertTrue(getInvokedAndPassed.get());
Map<ByteBuffer, ByteBuffer> offsets = store.get(Arrays.asList(TP0_KEY, TP1_KEY)).get(10000, TimeUnit.MILLISECONDS);
// Since we didn't read them yet, these will be null
assertNull(offsets.get(TP0_KEY));
assertNull(offsets.get(TP1_KEY));
// Set some offsets
Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
@ -250,28 +243,14 @@ public class KafkaOffsetBackingStoreTest { @@ -250,28 +243,14 @@ public class KafkaOffsetBackingStoreTest {
assertTrue(invoked.get());
// Getting data should read to end of our published data and return it
final AtomicBoolean secondGetInvokedAndPassed = new AtomicBoolean(false);
store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
@Override
public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
assertEquals(TP0_VALUE, result.get(TP0_KEY));
assertEquals(TP1_VALUE, result.get(TP1_KEY));
secondGetInvokedAndPassed.set(true);
}
}).get(10000, TimeUnit.MILLISECONDS);
assertTrue(secondGetInvokedAndPassed.get());
offsets = store.get(Arrays.asList(TP0_KEY, TP1_KEY)).get(10000, TimeUnit.MILLISECONDS);
assertEquals(TP0_VALUE, offsets.get(TP0_KEY));
assertEquals(TP1_VALUE, offsets.get(TP1_KEY));
// Getting data should read to end of our published data and return it
final AtomicBoolean thirdGetInvokedAndPassed = new AtomicBoolean(false);
store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
@Override
public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
assertEquals(TP0_VALUE_NEW, result.get(TP0_KEY));
assertEquals(TP1_VALUE_NEW, result.get(TP1_KEY));
thirdGetInvokedAndPassed.set(true);
}
}).get(10000, TimeUnit.MILLISECONDS);
assertTrue(thirdGetInvokedAndPassed.get());
offsets = store.get(Arrays.asList(TP0_KEY, TP1_KEY)).get(10000, TimeUnit.MILLISECONDS);
assertEquals(TP0_VALUE_NEW, offsets.get(TP0_KEY));
assertEquals(TP1_VALUE_NEW, offsets.get(TP1_KEY));
store.stop();
@ -329,16 +308,9 @@ public class KafkaOffsetBackingStoreTest { @@ -329,16 +308,9 @@ public class KafkaOffsetBackingStoreTest {
assertTrue(invoked.get());
// Getting data should read to end of our published data and return it
final AtomicBoolean secondGetInvokedAndPassed = new AtomicBoolean(false);
store.get(Arrays.asList(null, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
@Override
public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
assertEquals(TP0_VALUE, result.get(null));
assertNull(result.get(TP1_KEY));
secondGetInvokedAndPassed.set(true);
}
}).get(10000, TimeUnit.MILLISECONDS);
assertTrue(secondGetInvokedAndPassed.get());
Map<ByteBuffer, ByteBuffer> offsets = store.get(Arrays.asList(null, TP1_KEY)).get(10000, TimeUnit.MILLISECONDS);
assertEquals(TP0_VALUE, offsets.get(null));
assertNull(offsets.get(TP1_KEY));
store.stop();

242
connect/runtime/src/test/java/org/apache/kafka/connect/util/ConvertingFutureCallbackTest.java

@ -0,0 +1,242 @@ @@ -0,0 +1,242 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.util;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ConvertingFutureCallbackTest {
private ExecutorService executor;
@Before
public void setup() {
executor = Executors.newSingleThreadExecutor();
}
@Test
public void shouldConvertBeforeGetOnSuccessfulCompletion() throws Exception {
final Object expectedConversion = new Object();
TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
testCallback.onCompletion(null, expectedConversion);
assertEquals(1, testCallback.numberOfConversions());
assertEquals(expectedConversion, testCallback.get());
}
@Test
public void shouldConvertOnlyOnceBeforeGetOnSuccessfulCompletion() throws Exception {
final Object expectedConversion = new Object();
TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
testCallback.onCompletion(null, expectedConversion);
testCallback.onCompletion(null, 69);
testCallback.cancel(true);
testCallback.onCompletion(new RuntimeException(), null);
assertEquals(1, testCallback.numberOfConversions());
assertEquals(expectedConversion, testCallback.get());
}
@Test
public void shouldNotConvertBeforeGetOnFailedCompletion() throws Exception {
final Throwable expectedError = new Throwable();
TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
testCallback.onCompletion(expectedError, null);
assertEquals(0, testCallback.numberOfConversions());
try {
testCallback.get();
fail("Expected ExecutionException");
} catch (ExecutionException e) {
assertEquals(expectedError, e.getCause());
}
}
@Test
public void shouldRecordOnlyFirstErrorBeforeGetOnFailedCompletion() throws Exception {
final Throwable expectedError = new Throwable();
TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
testCallback.onCompletion(expectedError, null);
testCallback.onCompletion(new RuntimeException(), null);
testCallback.cancel(true);
testCallback.onCompletion(null, "420");
assertEquals(0, testCallback.numberOfConversions());
try {
testCallback.get();
fail("Expected ExecutionException");
} catch (ExecutionException e) {
assertEquals(expectedError, e.getCause());
}
}
@Test(expected = CancellationException.class)
public void shouldCancelBeforeGetIfMayCancelWhileRunning() throws Exception {
TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
assertTrue(testCallback.cancel(true));
testCallback.get();
}
@Test
public void shouldBlockUntilSuccessfulCompletion() throws Exception {
AtomicReference<Exception> testThreadException = new AtomicReference<>();
TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
final Object expectedConversion = new Object();
executor.submit(() -> {
try {
testCallback.waitForGet();
testCallback.onCompletion(null, expectedConversion);
} catch (Exception e) {
testThreadException.compareAndSet(null, e);
}
});
assertFalse(testCallback.isDone());
assertEquals(expectedConversion, testCallback.get());
assertEquals(1, testCallback.numberOfConversions());
assertTrue(testCallback.isDone());
if (testThreadException.get() != null) {
throw testThreadException.get();
}
}
@Test
public void shouldBlockUntilFailedCompletion() throws Exception {
AtomicReference<Exception> testThreadException = new AtomicReference<>();
TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
final Throwable expectedError = new Throwable();
executor.submit(() -> {
try {
testCallback.waitForGet();
testCallback.onCompletion(expectedError, null);
} catch (Exception e) {
testThreadException.compareAndSet(null, e);
}
});
assertFalse(testCallback.isDone());
try {
testCallback.get();
fail("Expected ExecutionException");
} catch (ExecutionException e) {
assertEquals(expectedError, e.getCause());
}
assertEquals(0, testCallback.numberOfConversions());
assertTrue(testCallback.isDone());
if (testThreadException.get() != null) {
throw testThreadException.get();
}
}
@Test(expected = CancellationException.class)
public void shouldBlockUntilCancellation() throws Exception {
AtomicReference<Exception> testThreadException = new AtomicReference<>();
TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
executor.submit(() -> {
try {
testCallback.waitForGet();
testCallback.cancel(true);
} catch (Exception e) {
testThreadException.compareAndSet(null, e);
}
});
assertFalse(testCallback.isDone());
testCallback.get();
if (testThreadException.get() != null) {
throw testThreadException.get();
}
}
@Test
public void shouldNotCancelIfMayNotCancelWhileRunning() throws Exception {
AtomicReference<Exception> testThreadException = new AtomicReference<>();
TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
final Object expectedConversion = new Object();
executor.submit(() -> {
try {
testCallback.waitForCancel();
testCallback.onCompletion(null, expectedConversion);
} catch (Exception e) {
testThreadException.compareAndSet(null, e);
}
});
assertFalse(testCallback.isCancelled());
assertFalse(testCallback.isDone());
testCallback.cancel(false);
assertFalse(testCallback.isCancelled());
assertTrue(testCallback.isDone());
assertEquals(expectedConversion, testCallback.get());
assertEquals(1, testCallback.numberOfConversions());
if (testThreadException.get() != null) {
throw testThreadException.get();
}
}
protected static class TestConvertingFutureCallback extends ConvertingFutureCallback<Object, Object> {
private AtomicInteger numberOfConversions = new AtomicInteger();
private CountDownLatch getInvoked = new CountDownLatch(1);
private CountDownLatch cancelInvoked = new CountDownLatch(1);
public int numberOfConversions() {
return numberOfConversions.get();
}
public void waitForGet() throws InterruptedException {
getInvoked.await();
}
public void waitForCancel() throws InterruptedException {
cancelInvoked.await();
}
@Override
public Object convert(Object result) {
numberOfConversions.incrementAndGet();
return result;
}
@Override
public Object get() throws InterruptedException, ExecutionException {
getInvoked.countDown();
return super.get();
}
@Override
public Object get(
long duration,
TimeUnit unit
) throws InterruptedException, ExecutionException, TimeoutException {
getInvoked.countDown();
return super.get(duration, unit);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
cancelInvoked.countDown();
return super.cancel(mayInterruptIfRunning);
}
}
}
Loading…
Cancel
Save