Browse Source

KAFKA-5958; Global stores access state restore listener

Author: Bill Bejeck <bill@confluent.io>

Reviewers: Damian Guy <damian.guy@gmail.com>

Closes #3973 from bbejeck/KAFKA-5958_global_stores_access_state_restore_listener
pull/3973/merge
Bill Bejeck 7 years ago committed by Damian Guy
parent
commit
e1543a5a8e
  1. 3
      streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
  2. 17
      streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
  3. 17
      streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
  4. 4
      streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
  5. 27
      streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
  6. 8
      streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
  7. 6
      streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java

3
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

@ -613,7 +613,8 @@ public class KafkaStreams { @@ -613,7 +613,8 @@ public class KafkaStreams {
stateDirectory,
metrics,
Time.SYSTEM,
globalThreadId);
globalThreadId,
delegatingStateRestoreListener);
globalThreadState = globalStreamThread.state();
}

17
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java

@ -27,6 +27,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException; @@ -27,6 +27,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;
@ -61,15 +62,18 @@ public class GlobalStateManagerImpl implements GlobalStateManager { @@ -61,15 +62,18 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
private final OffsetCheckpoint checkpoint;
private final Set<String> globalStoreNames = new HashSet<>();
private final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();
private final StateRestoreListener stateRestoreListener;
public GlobalStateManagerImpl(final ProcessorTopology topology,
final Consumer<byte[], byte[]> consumer,
final StateDirectory stateDirectory) {
final StateDirectory stateDirectory,
final StateRestoreListener stateRestoreListener) {
this.topology = topology;
this.consumer = consumer;
this.stateDirectory = stateDirectory;
this.baseDir = stateDirectory.globalStateDir();
this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
this.stateRestoreListener = stateRestoreListener;
}
@Override
@ -135,7 +139,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager { @@ -135,7 +139,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
final List<TopicPartition> topicPartitions = topicPartitionsForStore(store);
final Map<TopicPartition, Long> highWatermarks = consumer.endOffsets(topicPartitions);
try {
restoreState(stateRestoreCallback, topicPartitions, highWatermarks);
restoreState(stateRestoreCallback, topicPartitions, highWatermarks, store.name());
stores.put(store.name(), store);
} finally {
consumer.assign(Collections.<TopicPartition>emptyList());
@ -159,7 +163,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager { @@ -159,7 +163,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
private void restoreState(final StateRestoreCallback stateRestoreCallback,
final List<TopicPartition> topicPartitions,
final Map<TopicPartition, Long> highWatermarks) {
final Map<TopicPartition, Long> highWatermarks,
final String storeName) {
for (final TopicPartition topicPartition : topicPartitions) {
consumer.assign(Collections.singletonList(topicPartition));
final Long checkpoint = checkpointableOffsets.get(topicPartition);
@ -178,6 +183,9 @@ public class GlobalStateManagerImpl implements GlobalStateManager { @@ -178,6 +183,9 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
? stateRestoreCallback
: new WrappedBatchingStateRestoreCallback(stateRestoreCallback));
stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark);
long restoreCount = 0L;
while (offset < highWatermark) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
@ -188,7 +196,10 @@ public class GlobalStateManagerImpl implements GlobalStateManager { @@ -188,7 +196,10 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
}
}
stateRestoreAdapter.restoreAll(restoreRecords);
stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size());
restoreCount += restoreRecords.size();
}
stateRestoreListener.onRestoreEnd(topicPartition, storeName, restoreCount);
checkpointableOffsets.put(topicPartition, offset);
}
}

17
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java

@ -27,15 +27,16 @@ import org.apache.kafka.common.utils.Utils; @@ -27,15 +27,16 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN;
@ -113,6 +114,7 @@ public class GlobalStreamThread extends Thread { @@ -113,6 +114,7 @@ public class GlobalStreamThread extends Thread {
private final Object stateLock = new Object();
private StreamThread.StateListener stateListener = null;
private final String logPrefix;
private final StateRestoreListener stateRestoreListener;
/**
* Set the {@link StreamThread.StateListener} to be notified when state changes. Note this API is internal to
@ -175,7 +177,8 @@ public class GlobalStreamThread extends Thread { @@ -175,7 +177,8 @@ public class GlobalStreamThread extends Thread {
final StateDirectory stateDirectory,
final Metrics metrics,
final Time time,
final String threadClientId) {
final String threadClientId,
final StateRestoreListener stateRestoreListener) {
super(threadClientId);
this.time = time;
this.config = config;
@ -189,6 +192,7 @@ public class GlobalStreamThread extends Thread { @@ -189,6 +192,7 @@ public class GlobalStreamThread extends Thread {
this.logContext = new LogContext(logPrefix);
this.log = logContext.logger(getClass());
this.cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
this.stateRestoreListener = stateRestoreListener;
}
@ -294,7 +298,10 @@ public class GlobalStreamThread extends Thread { @@ -294,7 +298,10 @@ public class GlobalStreamThread extends Thread {
private StateConsumer initialize() {
try {
final GlobalStateManager stateMgr = new GlobalStateManagerImpl(topology, consumer, stateDirectory);
final GlobalStateManager stateMgr = new GlobalStateManagerImpl(topology,
consumer,
stateDirectory,
stateRestoreListener);
final StateConsumer stateConsumer
= new StateConsumer(this.logContext,
consumer,

4
streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java

@ -28,9 +28,9 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; @@ -28,9 +28,9 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockStateRestoreListener;
@ -54,9 +54,9 @@ import java.util.concurrent.atomic.AtomicBoolean; @@ -54,9 +54,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertNotNull;
@Category({IntegrationTest.class})
public class KafkaStreamsTest {

27
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java

@ -31,6 +31,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback; @@ -31,6 +31,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.NoOpProcessorContext;
import org.apache.kafka.test.NoOpReadOnlyStore;
import org.apache.kafka.test.TestUtils;
@ -49,6 +50,9 @@ import java.util.List; @@ -49,6 +50,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH;
import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END;
import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@ -61,6 +65,7 @@ public class GlobalStateManagerImplTest { @@ -61,6 +65,7 @@ public class GlobalStateManagerImplTest {
private final MockTime time = new MockTime();
private final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
private final TopicPartition t1 = new TopicPartition("t1", 1);
private final TopicPartition t2 = new TopicPartition("t2", 1);
private GlobalStateManagerImpl stateManager;
@ -95,7 +100,7 @@ public class GlobalStateManagerImplTest { @@ -95,7 +100,7 @@ public class GlobalStateManagerImplTest {
stateDirPath = TestUtils.tempDirectory().getPath();
stateDirectory = new StateDirectory("appId", stateDirPath, time);
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
stateManager = new GlobalStateManagerImpl(topology, consumer, stateDirectory);
stateManager = new GlobalStateManagerImpl(topology, consumer, stateDirectory, stateRestoreListener);
checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
}
@ -203,6 +208,24 @@ public class GlobalStateManagerImplTest { @@ -203,6 +208,24 @@ public class GlobalStateManagerImplTest {
assertEquals(2, stateRestoreCallback.restored.size());
}
@Test
public void shouldListenForRestoreEvents() {
initializeConsumer(5, 1, t1);
stateManager.initialize(context);
final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
stateManager.register(store1, false, stateRestoreCallback);
assertThat(stateRestoreListener.restoreStartOffset, equalTo(1L));
assertThat(stateRestoreListener.restoreEndOffset, equalTo(5L));
assertThat(stateRestoreListener.totalNumRestored, equalTo(5L));
assertThat(stateRestoreListener.storeNameCalledStates.get(RESTORE_START), equalTo(store1.name()));
assertThat(stateRestoreListener.storeNameCalledStates.get(RESTORE_BATCH), equalTo(store1.name()));
assertThat(stateRestoreListener.storeNameCalledStates.get(RESTORE_END), equalTo(store1.name()));
}
@Test
public void shouldRestoreRecordsFromCheckpointToHighwatermark() throws IOException {
initializeConsumer(5, 6, t1);
@ -452,7 +475,7 @@ public class GlobalStateManagerImplTest { @@ -452,7 +475,7 @@ public class GlobalStateManagerImplTest {
public boolean lockGlobalState(final int retry) throws IOException {
throw new IOException("KABOOM!");
}
});
}, stateRestoreListener);
try {
stateManager.initialize(context);

8
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java

@ -27,6 +27,7 @@ import org.apache.kafka.streams.StreamsConfig; @@ -27,6 +27,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
@ -49,6 +50,7 @@ public class GlobalStreamThreadTest { @@ -49,6 +50,7 @@ public class GlobalStreamThreadTest {
private final KStreamBuilder builder = new KStreamBuilder();
private final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
private final MockTime time = new MockTime();
private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
private GlobalStreamThread globalStreamThread;
private StreamsConfig config;
@ -65,7 +67,8 @@ public class GlobalStreamThreadTest { @@ -65,7 +67,8 @@ public class GlobalStreamThreadTest {
new StateDirectory("appId", TestUtils.tempDirectory().getPath(), time),
new Metrics(),
new MockTime(),
"clientId");
"clientId",
stateRestoreListener);
}
@Test
@ -96,7 +99,8 @@ public class GlobalStreamThreadTest { @@ -96,7 +99,8 @@ public class GlobalStreamThreadTest {
new StateDirectory("appId", TestUtils.tempDirectory().getPath(), time),
new Metrics(),
new MockTime(),
"clientId");
"clientId",
stateRestoreListener);
try {
globalStreamThread.start();

6
streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java

@ -211,6 +211,7 @@ public class ProcessorTopologyTestDriver { @@ -211,6 +211,7 @@ public class ProcessorTopologyTestDriver {
if (globalTopology != null) {
final MockConsumer<byte[], byte[]> globalConsumer = createGlobalConsumer();
final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
for (final String topicName : globalTopology.sourceTopics()) {
final List<PartitionInfo> partitionInfos = new ArrayList<>();
partitionInfos.add(new PartitionInfo(topicName, 1, null, null, null));
@ -220,7 +221,10 @@ public class ProcessorTopologyTestDriver { @@ -220,7 +221,10 @@ public class ProcessorTopologyTestDriver {
globalPartitionsByTopic.put(topicName, partition);
offsetsByTopicPartition.put(partition, new AtomicLong());
}
final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory);
final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology,
globalConsumer,
stateDirectory,
stateRestoreListener);
globalStateTask = new GlobalStateUpdateTask(globalTopology,
new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache),
stateManager, new LogAndContinueExceptionHandler()

Loading…
Cancel
Save