Browse Source

KAFKA-5474: Streams StandbyTask should no checkpoint on commit if EOS is enabled

<strike> - actual fix for `StandbyTask#commit()` </strike>

Additionally (for debugging):
 - EOS test, does not report "expected" value correctly
 - add `IntegerDecoder` (to be use with `kafka.tools.DumpLogSegments`)
 - add test for `StreamTask` to not checkpoint on commit if EOS enabled

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Bill Bejeck <bbejeck@gmail.com>, Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #3375 from mjsax/kafka-5474-eos-standby-task
pull/3375/merge
Matthias J. Sax 8 years ago committed by Guozhang Wang
parent
commit
914e42a282
  1. 18
      core/src/main/scala/kafka/serializer/Decoder.scala
  2. 24
      core/src/main/scala/kafka/serializer/Encoder.scala
  3. 7
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
  4. 66
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
  5. 16
      streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java

18
core/src/main/scala/kafka/serializer/Decoder.scala

@ -54,17 +54,19 @@ class StringDecoder(props: VerifiableProperties = null) extends Decoder[String] @@ -54,17 +54,19 @@ class StringDecoder(props: VerifiableProperties = null) extends Decoder[String]
}
/**
* The string decoder translates bytes into strings. It uses UTF8 by default but takes
* an optional property serializer.encoding to control this.
* The long decoder translates bytes into longs.
*/
class LongDecoder(props: VerifiableProperties = null) extends Decoder[Long] {
val encoding =
if(props == null)
"UTF8"
else
props.getString("serializer.encoding", "UTF8")
def fromBytes(bytes: Array[Byte]): Long = {
ByteBuffer.wrap(bytes).getLong
}
}
/**
* The integer decoder translates bytes into integers.
*/
class IntegerDecoder(props: VerifiableProperties = null) extends Decoder[Integer] {
def fromBytes(bytes: Array[Byte]): Integer = {
ByteBuffer.wrap(bytes).getInt()
}
}

24
core/src/main/scala/kafka/serializer/Encoder.scala

@ -17,6 +17,8 @@ @@ -17,6 +17,8 @@
package kafka.serializer
import java.nio.ByteBuffer
import kafka.utils.VerifiableProperties
/**
@ -56,3 +58,25 @@ class StringEncoder(props: VerifiableProperties = null) extends Encoder[String] @@ -56,3 +58,25 @@ class StringEncoder(props: VerifiableProperties = null) extends Encoder[String]
else
s.getBytes(encoding)
}
/**
* The long encoder translates longs into bytes.
*/
class LongEncoder(props: VerifiableProperties = null) extends Encoder[Long] {
override def toBytes(l: Long): Array[Byte] =
if(l == null)
null
else
ByteBuffer.allocate(8).putLong(l).array()
}
/**
* The integer encoder translates integers into bytes.
*/
class IntegerEncoder(props: VerifiableProperties = null) extends Encoder[Integer] {
override def toBytes(i: Integer): Array[Byte] =
if(i == null)
null
else
ByteBuffer.allocate(4).putInt(i).array()
}

7
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java

@ -91,8 +91,7 @@ public class StandbyTask extends AbstractTask { @@ -91,8 +91,7 @@ public class StandbyTask extends AbstractTask {
@Override
public void commit() {
log.trace("{} Committing", logPrefix);
stateMgr.flush();
stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
flushAndCheckpointState();
// reinitialize offset limits
updateOffsetLimits();
}
@ -106,6 +105,10 @@ public class StandbyTask extends AbstractTask { @@ -106,6 +105,10 @@ public class StandbyTask extends AbstractTask {
@Override
public void suspend() {
log.debug("{} Suspending", logPrefix);
flushAndCheckpointState();
}
private void flushAndCheckpointState() {
stateMgr.flush();
stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
}

66
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java

@ -499,6 +499,72 @@ public class StreamTaskTest { @@ -499,6 +499,72 @@ public class StreamTaskTest {
assertThat(checkpoint.read(), equalTo(Collections.singletonMap(partition, offset + 1)));
}
@SuppressWarnings("unchecked")
@Test
public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() throws Exception {
final Map<String, Object> properties = config.originals();
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
final StreamsConfig testConfig = new StreamsConfig(properties);
final String storeName = "test";
final String changelogTopic = ProcessorStateManager.storeChangelogTopic("appId", storeName);
final InMemoryKeyValueStore inMemoryStore = new InMemoryKeyValueStore(storeName, null, null) {
@Override
public void init(final ProcessorContext context, final StateStore root) {
context.register(root, true, null);
}
@Override
public boolean persistent() {
return true;
}
};
Map<String, SourceNode> sourceByTopics = new HashMap() {
{
put(partition1.topic(), source1);
put(partition2.topic(), source2);
}
};
final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
sourceByTopics,
Collections.<String, SinkNode>emptyMap(),
Collections.<StateStore>singletonList(inMemoryStore),
Collections.singletonMap(storeName, changelogTopic),
Collections.<StateStore>emptyList());
final TopicPartition partition = new TopicPartition(changelogTopic, 0);
restoreStateConsumer.updatePartitions(changelogTopic,
Collections.singletonList(
new PartitionInfo(changelogTopic, 0, null, new Node[0], new Node[0])));
restoreStateConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
final long offset = 543L;
final StreamTask streamTask = new StreamTask(taskId00, "appId", partitions, topology, consumer,
changelogReader, testConfig, streamsMetrics, stateDirectory, null, time, producer) {
@Override
RecordCollector createRecordCollector() {
return new NoOpRecordCollector() {
@Override
public Map<TopicPartition, Long> offsets() {
return Collections.singletonMap(partition, offset);
}
};
}
};
time.sleep(testConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
streamTask.commit();
final File checkpointFile = new File(stateDirectory.directoryForTask(taskId00),
ProcessorStateManager.CHECKPOINT_FILE_NAME);
assertFalse(checkpointFile.exists());
}
@Test
public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() throws Exception {
((ProcessorContextImpl) task.processorContext()).setCurrentNode(processor);

16
streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java

@ -334,18 +334,19 @@ public class EosTestDriver extends SmokeTestUtil { @@ -334,18 +334,19 @@ public class EosTestDriver extends SmokeTestUtil {
final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
final String key = stringDeserializer.deserialize(input.topic(), input.key());
final Integer value = integerDeserializer.deserialize(input.topic(), input.value());
final int value = integerDeserializer.deserialize(input.topic(), input.value());
Integer min = currentMinPerKey.get(key);
if (min == null) {
min = value;
} else {
min = Math.min(min, value);
}
min = Math.min(min, value);
currentMinPerKey.put(key, min);
if (!receivedKey.equals(key) || receivedValue != min) {
throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + value + "> but was <" + receivedKey + "," + receivedValue + ">");
throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + min + "> but was <" + receivedKey + "," + receivedValue + ">");
}
}
} catch (final NullPointerException e) {
@ -374,17 +375,18 @@ public class EosTestDriver extends SmokeTestUtil { @@ -374,17 +375,18 @@ public class EosTestDriver extends SmokeTestUtil {
final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
final long receivedValue = longDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
final String key = stringDeserializer.deserialize(input.topic(), input.key());
final Integer value = integerDeserializer.deserialize(input.topic(), input.value());
final int value = integerDeserializer.deserialize(input.topic(), input.value());
Long sum = currentSumPerKey.get(key);
if (sum == null) {
sum = 0L;
sum = (long) value;
} else {
sum += value;
}
sum += value;
currentSumPerKey.put(key, sum);
if (!receivedKey.equals(key) || receivedValue != sum) {
throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + value + "> but was <" + receivedKey + "," + receivedValue + ">");
throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + sum + "> but was <" + receivedKey + "," + receivedValue + ">");
}
}
} catch (final NullPointerException e) {

Loading…
Cancel
Save