Browse Source

KAFKA-6782: solved the bug of restoration of aborted messages for GlobalStateStore and KGlobalTable (#4900)

Reviewer: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
pull/5232/head
Gitomain 6 years ago committed by Matthias J. Sax
parent
commit
9bf277bc1a
  1. 1
      .gitignore
  2. 1
      kafka
  3. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
  4. 382
      streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
  5. 60
      streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
  6. 35
      streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
  7. 10
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java

1
.gitignore vendored

@ -1,5 +1,6 @@ @@ -1,5 +1,6 @@
dist
*classes
*.class
target/
build/
build_eclipse/

1
kafka

@ -0,0 +1 @@ @@ -0,0 +1 @@
Subproject commit cc43e77bbbfad71883011186de55603c936cbcd1

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java

@ -176,8 +176,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager { @@ -176,8 +176,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
if (record.key() != null) {
stateRestoreCallback.restore(record.key(), record.value());
}
offset = consumer.position(topicPartition);
}
offset = consumer.position(topicPartition);
}
checkpointableOffsets.put(topicPartition, offset);
}

382
streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java

@ -0,0 +1,382 @@ @@ -0,0 +1,382 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
@Category({IntegrationTest.class})
public class GlobalKTableEOSIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final Properties BROKER_CONFIG;
static {
BROKER_CONFIG = new Properties();
BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1);
BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
}
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER =
new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
private static volatile int testNo = 0;
private final MockTime mockTime = CLUSTER.time;
private final KeyValueMapper<String, Long, Long> keyMapper = new KeyValueMapper<String, Long, Long>() {
@Override
public Long apply(final String key, final Long value) {
return value;
}
};
private final ValueJoiner<Long, String, String> joiner = new ValueJoiner<Long, String, String>() {
@Override
public String apply(final Long value1, final String value2) {
return value1 + "+" + value2;
}
};
private final String globalStore = "globalStore";
private final Map<String, String> results = new HashMap<>();
private KStreamBuilder builder;
private Properties streamsConfiguration;
private KafkaStreams kafkaStreams;
private String globalTableTopic;
private String streamTopic;
private GlobalKTable<Long, String> globalTable;
private KStream<String, Long> stream;
private ForeachAction<String, String> foreachAction;
@Before
public void before() throws InterruptedException {
testNo++;
builder = new KStreamBuilder();
createTopics();
streamsConfiguration = new Properties();
final String applicationId = "globalTableTopic-table-eos-test-" + testNo;
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), globalTableTopic, globalStore);
stream = builder.stream(Serdes.String(), Serdes.Long(), streamTopic);
foreachAction = new ForeachAction<String, String>() {
@Override
public void apply(final String key, final String value) {
results.put(key, value);
}
};
}
@After
public void whenShuttingDown() throws IOException {
if (kafkaStreams != null) {
kafkaStreams.close();
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
@Test
public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
final KStream<String, String> streamTableJoin = stream.leftJoin(globalTable, keyMapper, joiner);
streamTableJoin.foreach(foreachAction);
produceInitialGlobalTableValues();
startStreams();
produceTopicValues(streamTopic);
final Map<String, String> expected = new HashMap<>();
expected.put("a", "1+A");
expected.put("b", "2+B");
expected.put("c", "3+C");
expected.put("d", "4+D");
expected.put("e", "5+null");
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return results.equals(expected);
}
}, 30000L, "waiting for initial values");
produceGlobalTableValues();
final ReadOnlyKeyValueStore<Long, String> replicatedStore = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return "J".equals(replicatedStore.get(5L));
}
}, 30000, "waiting for data in replicated store");
produceTopicValues(streamTopic);
expected.put("a", "1+F");
expected.put("b", "2+G");
expected.put("c", "3+H");
expected.put("d", "4+I");
expected.put("e", "5+J");
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return results.equals(expected);
}
}, 30000L, "waiting for final values");
}
@Test
public void shouldKStreamGlobalKTableJoin() throws Exception {
final KStream<String, String> streamTableJoin = stream.join(globalTable, keyMapper, joiner);
streamTableJoin.foreach(foreachAction);
produceInitialGlobalTableValues();
startStreams();
produceTopicValues(streamTopic);
final Map<String, String> expected = new HashMap<>();
expected.put("a", "1+A");
expected.put("b", "2+B");
expected.put("c", "3+C");
expected.put("d", "4+D");
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return results.equals(expected);
}
}, 30000L, "waiting for initial values");
produceGlobalTableValues();
final ReadOnlyKeyValueStore<Long, String> replicatedStore = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return "J".equals(replicatedStore.get(5L));
}
}, 30000, "waiting for data in replicated store");
produceTopicValues(streamTopic);
expected.put("a", "1+F");
expected.put("b", "2+G");
expected.put("c", "3+H");
expected.put("d", "4+I");
expected.put("e", "5+J");
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return results.equals(expected);
}
}, 30000L, "waiting for final values");
}
@Test
public void shouldRestoreTransactionalMessages() throws Exception {
produceInitialGlobalTableValues();
startStreams();
final Map<Long, String> expected = new HashMap<>();
expected.put(1L, "A");
expected.put(2L, "B");
expected.put(3L, "C");
expected.put(4L, "D");
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
ReadOnlyKeyValueStore<Long, String> store = null;
try {
store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
} catch (InvalidStateStoreException ex) {
return false;
}
Map<Long, String> result = new HashMap<>();
Iterator<KeyValue<Long, String>> it = store.all();
while (it.hasNext()) {
KeyValue<Long, String> kv = it.next();
result.put(kv.key, kv.value);
}
return result.equals(expected);
}
}, 30000L, "waiting for initial values");
}
@Test
public void shouldNotRestoreAbortedMessages() throws Exception {
produceAbortedMessages();
produceInitialGlobalTableValues();
produceAbortedMessages();
startStreams();
final Map<Long, String> expected = new HashMap<>();
expected.put(1L, "A");
expected.put(2L, "B");
expected.put(3L, "C");
expected.put(4L, "D");
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
ReadOnlyKeyValueStore<Long, String> store = null;
try {
store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
} catch (InvalidStateStoreException ex) {
return false;
}
Map<Long, String> result = new HashMap<>();
Iterator<KeyValue<Long, String>> it = store.all();
while (it.hasNext()) {
KeyValue<Long, String> kv = it.next();
result.put(kv.key, kv.value);
}
return result.equals(expected);
}
}, 30000L, "waiting for initial values");
}
private void createTopics() throws InterruptedException {
streamTopic = "stream-" + testNo;
globalTableTopic = "globalTable-" + testNo;
CLUSTER.createTopics(streamTopic);
CLUSTER.createTopic(globalTableTopic, 2, 1);
}
private void startStreams() {
kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
kafkaStreams.start();
}
private void produceTopicValues(final String topic) throws Exception {
IntegrationTestUtils.produceKeyValuesSynchronously(
topic,
Arrays.asList(
new KeyValue<>("a", 1L),
new KeyValue<>("b", 2L),
new KeyValue<>("c", 3L),
new KeyValue<>("d", 4L),
new KeyValue<>("e", 5L)),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
LongSerializer.class,
new Properties()),
mockTime);
}
private void produceAbortedMessages() throws Exception {
final Properties properties = new Properties();
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
properties.put(ProducerConfig.RETRIES_CONFIG, 1);
IntegrationTestUtils.produceAbortedKeyValuesSynchronouslyWithTimestamp(
globalTableTopic, Arrays.asList(
new KeyValue<>(1L, "A"),
new KeyValue<>(2L, "B"),
new KeyValue<>(3L, "C"),
new KeyValue<>(4L, "D")
),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
LongSerializer.class,
StringSerializer.class,
properties),
mockTime.milliseconds());
}
private void produceInitialGlobalTableValues() throws Exception {
produceInitialGlobalTableValues(true);
}
private void produceInitialGlobalTableValues(final boolean enableTransactions) throws Exception {
final Properties properties = new Properties();
if (enableTransactions) {
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
properties.put(ProducerConfig.RETRIES_CONFIG, 1);
}
IntegrationTestUtils.produceKeyValuesSynchronously(
globalTableTopic,
Arrays.asList(
new KeyValue<>(1L, "A"),
new KeyValue<>(2L, "B"),
new KeyValue<>(3L, "C"),
new KeyValue<>(4L, "D")
),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
LongSerializer.class,
StringSerializer.class,
properties),
mockTime,
enableTransactions);
}
private void produceGlobalTableValues() throws Exception {
IntegrationTestUtils.produceKeyValuesSynchronously(
globalTableTopic,
Arrays.asList(
new KeyValue<>(1L, "F"),
new KeyValue<>(2L, "G"),
new KeyValue<>(3L, "H"),
new KeyValue<>(4L, "I"),
new KeyValue<>(5L, "J")),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
LongSerializer.class,
StringSerializer.class,
new Properties()),
mockTime);
}
}

60
streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java

@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.StringSerializer; @@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
@ -49,23 +48,16 @@ import org.junit.experimental.categories.Category; @@ -49,23 +48,16 @@ import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
@Category({IntegrationTest.class})
public class GlobalKTableIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final Properties BROKER_CONFIG;
static {
BROKER_CONFIG = new Properties();
BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1);
BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
}
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER =
new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
new EmbeddedKafkaCluster(NUM_BROKERS);
private static volatile int testNo = 0;
private final MockTime mockTime = CLUSTER.time;
@ -225,39 +217,7 @@ public class GlobalKTableIntegrationTest { @@ -225,39 +217,7 @@ public class GlobalKTableIntegrationTest {
}
}, 30000L, "waiting for final values");
}
@Test
public void shouldRestoreTransactionalMessages() throws Exception {
produceInitialGlobalTableValues(true);
startStreams();
final Map<Long, String> expected = new HashMap<>();
expected.put(1L, "A");
expected.put(2L, "B");
expected.put(3L, "C");
expected.put(4L, "D");
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
ReadOnlyKeyValueStore<Long, String> store = null;
try {
store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
} catch (InvalidStateStoreException ex) {
return false;
}
Map<Long, String> result = new HashMap<>();
Iterator<KeyValue<Long, String>> it = store.all();
while (it.hasNext()) {
KeyValue<Long, String> kv = it.next();
result.put(kv.key, kv.value);
}
return result.equals(expected);
}
}, 30000L, "waiting for initial values");
System.out.println("no failed test");
}
private void createTopics() throws InterruptedException {
inputStream = "input-stream-" + testNo;
inputTable = "input-table-" + testNo;
@ -265,7 +225,7 @@ public class GlobalKTableIntegrationTest { @@ -265,7 +225,7 @@ public class GlobalKTableIntegrationTest {
CLUSTER.createTopics(inputStream, inputTable);
CLUSTER.createTopic(globalOne, 2, 1);
}
private void startStreams() {
kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
kafkaStreams.start();
@ -288,11 +248,11 @@ public class GlobalKTableIntegrationTest { @@ -288,11 +248,11 @@ public class GlobalKTableIntegrationTest {
mockTime);
}
private void produceInitialGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
private void produceInitialGlobalTableValues() throws Exception {
produceInitialGlobalTableValues(false);
}
private void produceInitialGlobalTableValues(final boolean enableTransactions) throws java.util.concurrent.ExecutionException, InterruptedException {
private void produceInitialGlobalTableValues(final boolean enableTransactions) throws Exception {
Properties properties = new Properties();
if (enableTransactions) {
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
@ -304,14 +264,14 @@ public class GlobalKTableIntegrationTest { @@ -304,14 +264,14 @@ public class GlobalKTableIntegrationTest {
new KeyValue<>(1L, "A"),
new KeyValue<>(2L, "B"),
new KeyValue<>(3L, "C"),
new KeyValue<>(4L, "D")),
new KeyValue<>(4L, "D")
),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
LongSerializer.class,
StringSerializer.class,
properties),
mockTime,
enableTransactions);
StringSerializer.class
),
mockTime);
}
private void produceGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {

35
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java

@ -140,16 +140,38 @@ public class IntegrationTestUtils { @@ -140,16 +140,38 @@ public class IntegrationTestUtils {
producer.flush();
}
}
public static <K, V> void produceAbortedKeyValuesSynchronouslyWithTimestamp(final String topic,
final Collection<KeyValue<K, V>> records,
final Properties producerConfig,
final Long timestamp)
throws ExecutionException, InterruptedException {
try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
producer.initTransactions();
for (final KeyValue<K, V> record : records) {
producer.beginTransaction();
final Future<RecordMetadata> f = producer
.send(new ProducerRecord<>(topic, null, timestamp, record.key, record.value));
f.get();
producer.abortTransaction();
}
}
}
public static <V> void produceValuesSynchronously(
final String topic, final Collection<V> records, final Properties producerConfig, final Time time)
public static <V> void produceValuesSynchronously(final String topic,
final Collection<V> records,
final Properties producerConfig,
final Time time)
throws ExecutionException, InterruptedException {
IntegrationTestUtils.produceValuesSynchronously(topic, records, producerConfig, time, false);
}
public static <V> void produceValuesSynchronously(
final String topic, final Collection<V> records, final Properties producerConfig, final Time time, final boolean enableTransactions)
throws ExecutionException, InterruptedException {
public static <V> void produceValuesSynchronously(final String topic,
final Collection<V> records,
final Properties producerConfig,
final Time time,
final boolean enableTransactions)
throws ExecutionException, InterruptedException {
final Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>();
for (final V value : records) {
final KeyValue<Object, V> kv = new KeyValue<>(null, value);
@ -161,10 +183,9 @@ public class IntegrationTestUtils { @@ -161,10 +183,9 @@ public class IntegrationTestUtils {
public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
final String topic,
final int expectedNumRecords) throws InterruptedException {
return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT);
}
/**
* Wait until enough data (key-value records) has been consumed.
*

10
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java

@ -635,6 +635,16 @@ public class StreamTaskTest { @@ -635,6 +635,16 @@ public class StreamTaskTest {
assertTrue(producer.transactionInFlight());
}
@Test
public void shouldNotThrowOnCloseIfTaskWasNotInitializedWithEosEnabled() throws Exception {
final MockProducer producer = new MockProducer();
task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
eosConfig, streamsMetrics, stateDirectory, null, time, producer);
task.close(false, false);
task = null;
}
@Test
public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() throws Exception {
final MockProducer producer = new MockProducer();

Loading…
Cancel
Save