Browse Source

HOTFIX: Fix reset integration test hangs on busy wait (#4491)

* do not use static properties
* use new object to take appID
* capture timeout exception inside condition

Reviewers: Matthias J. Sax <matthias@confluent.io>
pull/4568/head
Guozhang Wang 7 years ago committed by GitHub
parent
commit
f3a3253e24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
  2. 10
      core/src/main/scala/kafka/admin/AdminClient.scala
  3. 208
      streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
  4. 8
      streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
  5. 10
      streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
  6. 2
      streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
  7. 2
      streams/src/test/resources/log4j.properties

10
clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java

@ -36,16 +36,14 @@ import javax.net.ssl.SSLPeerUnverifiedException; @@ -36,16 +36,14 @@ import javax.net.ssl.SSLPeerUnverifiedException;
import org.apache.kafka.common.errors.SslAuthenticationException;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
* Transport layer for SSL communication
*/
public class SslTransportLayer implements TransportLayer {
private static final Logger log = LoggerFactory.getLogger(SslTransportLayer.class);
private enum State {
HANDSHAKE,
HANDSHAKE_FAILED,
@ -57,6 +55,7 @@ public class SslTransportLayer implements TransportLayer { @@ -57,6 +55,7 @@ public class SslTransportLayer implements TransportLayer {
private final SSLEngine sslEngine;
private final SelectionKey key;
private final SocketChannel socketChannel;
private final Logger log;
private HandshakeStatus handshakeStatus;
private SSLEngineResult handshakeResult;
@ -79,6 +78,9 @@ public class SslTransportLayer implements TransportLayer { @@ -79,6 +78,9 @@ public class SslTransportLayer implements TransportLayer {
this.key = key;
this.socketChannel = (SocketChannel) key.channel();
this.sslEngine = sslEngine;
final LogContext logContext = new LogContext(String.format("[SslTransportLayer channelId=%s key=%s] ", channelId, key));
this.log = logContext.logger(getClass());
}
// Visible for testing
@ -172,7 +174,7 @@ public class SslTransportLayer implements TransportLayer { @@ -172,7 +174,7 @@ public class SslTransportLayer implements TransportLayer {
flush(netWriteBuffer);
}
} catch (IOException ie) {
log.warn("Failed to send SSL Close message ", ie);
log.warn("Failed to send SSL Close message", ie);
} finally {
socketChannel.socket().close();
socketChannel.close();

10
core/src/main/scala/kafka/admin/AdminClient.scala

@ -528,18 +528,20 @@ object AdminClient { @@ -528,18 +528,20 @@ object AdminClient {
val bootstrapCluster = Cluster.bootstrap(brokerAddresses)
metadata.update(bootstrapCluster, Collections.emptySet(), 0)
val clientId = "admin-" + AdminClientIdSequence.getAndIncrement()
val selector = new Selector(
DefaultConnectionMaxIdleMs,
metrics,
time,
"admin",
channelBuilder,
new LogContext())
new LogContext(String.format("[Producer clientId=%s] ", clientId)))
val networkClient = new NetworkClient(
selector,
metadata,
"admin-" + AdminClientIdSequence.getAndIncrement(),
clientId,
DefaultMaxInFlightRequestsPerConnection,
DefaultReconnectBackoffMs,
DefaultReconnectBackoffMax,
@ -549,10 +551,10 @@ object AdminClient { @@ -549,10 +551,10 @@ object AdminClient {
time,
true,
new ApiVersions,
new LogContext())
new LogContext(String.format("[NetworkClient clientId=%s] ", clientId)))
val highLevelClient = new ConsumerNetworkClient(
new LogContext(),
new LogContext(String.format("[ConsumerNetworkClient clientId=%s] ", clientId)),
networkClient,
metadata,
time,

208
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java

@ -22,6 +22,7 @@ import org.apache.kafka.clients.CommonClientConfigs; @@ -22,6 +22,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.serialization.LongDeserializer;
@ -47,7 +48,6 @@ import org.apache.kafka.test.TestCondition; @@ -47,7 +48,6 @@ import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
@ -68,17 +68,18 @@ import java.util.concurrent.TimeUnit; @@ -68,17 +68,18 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@Ignore
@Category({IntegrationTest.class})
public abstract class AbstractResetIntegrationTest {
static String testId;
static EmbeddedKafkaCluster cluster;
static Map<String, Object> sslConfig = null;
private static KafkaStreams streams;
private static MockTime mockTime;
private static KafkaStreams streams;
private static AdminClient adminClient = null;
private static KafkaAdminClient kafkaAdminClient = null;
abstract Map<String, Object> getClientSslConfig();
@AfterClass
public static void afterClassCleanup() {
if (adminClient != null) {
@ -91,15 +92,18 @@ public abstract class AbstractResetIntegrationTest { @@ -91,15 +92,18 @@ public abstract class AbstractResetIntegrationTest {
}
}
private String appID;
private String appID = "abstract-reset-integration-test";
private Properties commonClientConfig;
private Properties streamsConfig;
private Properties producerConfig;
private Properties resultConsumerConfig;
private void prepareEnvironment() {
if (adminClient == null) {
adminClient = AdminClient.create(commonClientConfig);
}
if (kafkaAdminClient == null) {
kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig);
kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig);
}
// we align time to seconds to get clean window boundaries and thus ensure the same result for each run
@ -113,34 +117,38 @@ public abstract class AbstractResetIntegrationTest { @@ -113,34 +117,38 @@ public abstract class AbstractResetIntegrationTest {
commonClientConfig = new Properties();
commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
Map<String, Object> sslConfig = getClientSslConfig();
if (sslConfig != null) {
commonClientConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
commonClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
commonClientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
}
PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
PRODUCER_CONFIG.putAll(commonClientConfig);
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, testId + "-result-consumer");
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
RESULT_CONSUMER_CONFIG.putAll(commonClientConfig);
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
STREAMS_CONFIG.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
STREAMS_CONFIG.putAll(commonClientConfig);
producerConfig = new Properties();
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.putAll(commonClientConfig);
resultConsumerConfig = new Properties();
resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, testId + "-result-consumer");
resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
resultConsumerConfig.putAll(commonClientConfig);
streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
streamsConfig.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
streamsConfig.putAll(commonClientConfig);
}
@Rule
@ -157,24 +165,24 @@ public abstract class AbstractResetIntegrationTest { @@ -157,24 +165,24 @@ public abstract class AbstractResetIntegrationTest {
private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
private static final int TIMEOUT_MULTIPLIER = 5;
private final TestCondition consumerGroupInactiveCondition = new TestCondition() {
private class ConsumerGroupInactiveCondition implements TestCondition {
@Override
public boolean conditionMet() {
return adminClient.describeConsumerGroup(testId + "-result-consumer", 0).consumers().get().isEmpty();
try {
return adminClient.describeConsumerGroup(appID, 0).consumers().get().isEmpty();
} catch (final TimeoutException e) {
return false;
}
}
};
private static final Properties STREAMS_CONFIG = new Properties();
private final static Properties PRODUCER_CONFIG = new Properties();
private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
}
void prepareTest() throws Exception {
prepareConfigs();
prepareEnvironment();
// busy wait until cluster (ie, ConsumerGroupCoordinator) is available
TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Test consumer group active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Test consumer group " + appID + " still active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
@ -185,7 +193,7 @@ public abstract class AbstractResetIntegrationTest { @@ -185,7 +193,7 @@ public abstract class AbstractResetIntegrationTest {
if (streams != null) {
streams.close(30, TimeUnit.SECONDS);
}
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
}
private void add10InputElements() throws java.util.concurrent.ExecutionException, InterruptedException {
@ -202,7 +210,7 @@ public abstract class AbstractResetIntegrationTest { @@ -202,7 +210,7 @@ public abstract class AbstractResetIntegrationTest {
for (KeyValue<Long, String> record : records) {
mockTime.sleep(10);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(record), PRODUCER_CONFIG, mockTime.milliseconds());
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(record), producerConfig, mockTime.milliseconds());
}
}
@ -216,10 +224,10 @@ public abstract class AbstractResetIntegrationTest { @@ -216,10 +224,10 @@ public abstract class AbstractResetIntegrationTest {
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.start();
final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
@ -258,35 +266,35 @@ public abstract class AbstractResetIntegrationTest { @@ -258,35 +266,35 @@ public abstract class AbstractResetIntegrationTest {
void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
appID = testId + "-from-scratch";
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.start();
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// RESET
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.cleanUp();
cleanGlobal(false, null, null);
TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted(null);
// RE-RUN
streams.start();
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
assertThat(resultRerun, equalTo(result));
TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
cleanGlobal(false, null, null);
}
@ -294,19 +302,19 @@ public abstract class AbstractResetIntegrationTest { @@ -294,19 +302,19 @@ public abstract class AbstractResetIntegrationTest {
cluster.createTopic(INTERMEDIATE_USER_TOPIC);
appID = testId + "-from-scratch-with-intermediate-topic";
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), STREAMS_CONFIG);
streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), streamsConfig);
streams.start();
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
// receive only first values to make sure intermediate user topic is not consumed completely
// => required to test "seekToEnd" for intermediate topics
final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC_2, 40);
final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC_2, 40);
streams.close();
TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// insert bad record to make sure intermediate user topic gets seekToEnd()
mockTime.sleep(1);
@ -314,22 +322,22 @@ public abstract class AbstractResetIntegrationTest { @@ -314,22 +322,22 @@ public abstract class AbstractResetIntegrationTest {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
INTERMEDIATE_USER_TOPIC,
Collections.singleton(badMessage),
PRODUCER_CONFIG,
producerConfig,
mockTime.milliseconds());
// RESET
streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), STREAMS_CONFIG);
streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfig);
streams.cleanUp();
cleanGlobal(true, null, null);
TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC);
// RE-RUN
streams.start();
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC_2_RERUN, 40);
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC_2_RERUN, 40);
streams.close();
assertThat(resultRerun, equalTo(result));
@ -343,8 +351,8 @@ public abstract class AbstractResetIntegrationTest { @@ -343,8 +351,8 @@ public abstract class AbstractResetIntegrationTest {
}
assertThat(resultIntermediate.get(10), equalTo(badMessage));
TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
cleanGlobal(true, null, null);
cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
@ -352,16 +360,16 @@ public abstract class AbstractResetIntegrationTest { @@ -352,16 +360,16 @@ public abstract class AbstractResetIntegrationTest {
void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception {
appID = testId + "-from-file";
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.start();
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// RESET
final File resetFile = File.createTempFile("reset", ".csv");
@ -370,12 +378,12 @@ public abstract class AbstractResetIntegrationTest { @@ -370,12 +378,12 @@ public abstract class AbstractResetIntegrationTest {
writer.close();
}
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.cleanUp();
cleanGlobal(false, "--from-file", resetFile.getAbsolutePath());
TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted(null);
@ -383,29 +391,29 @@ public abstract class AbstractResetIntegrationTest { @@ -383,29 +391,29 @@ public abstract class AbstractResetIntegrationTest {
// RE-RUN
streams.start();
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 5);
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 5);
streams.close();
result.remove(0);
assertThat(resultRerun, equalTo(result));
TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
cleanGlobal(false, null, null);
}
void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws Exception {
appID = testId + "-from-datetime";
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.start();
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// RESET
final File resetFile = File.createTempFile("reset", ".csv");
@ -414,7 +422,7 @@ public abstract class AbstractResetIntegrationTest { @@ -414,7 +422,7 @@ public abstract class AbstractResetIntegrationTest {
writer.close();
}
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.cleanUp();
@ -423,8 +431,8 @@ public abstract class AbstractResetIntegrationTest { @@ -423,8 +431,8 @@ public abstract class AbstractResetIntegrationTest {
calendar.add(Calendar.DATE, -1);
cleanGlobal(false, "--to-datetime", format.format(calendar.getTime()));
TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted(null);
@ -432,28 +440,28 @@ public abstract class AbstractResetIntegrationTest { @@ -432,28 +440,28 @@ public abstract class AbstractResetIntegrationTest {
// RE-RUN
streams.start();
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
assertThat(resultRerun, equalTo(result));
TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
cleanGlobal(false, null, null);
}
void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws Exception {
appID = testId + "-from-duration";
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.start();
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// RESET
final File resetFile = File.createTempFile("reset", ".csv");
@ -462,12 +470,12 @@ public abstract class AbstractResetIntegrationTest { @@ -462,12 +470,12 @@ public abstract class AbstractResetIntegrationTest {
writer.close();
}
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.cleanUp();
cleanGlobal(false, "--by-duration", "PT1M");
TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted(null);
@ -475,13 +483,13 @@ public abstract class AbstractResetIntegrationTest { @@ -475,13 +483,13 @@ public abstract class AbstractResetIntegrationTest {
// RE-RUN
streams.start();
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
assertThat(resultRerun, equalTo(result));
TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
cleanGlobal(false, null, null);
}
@ -546,6 +554,8 @@ public abstract class AbstractResetIntegrationTest { @@ -546,6 +554,8 @@ public abstract class AbstractResetIntegrationTest {
parameterList.add("--intermediate-topics");
parameterList.add(INTERMEDIATE_USER_TOPIC);
}
final Map<String, Object> sslConfig = getClientSslConfig();
if (sslConfig != null) {
final File configFile = TestUtils.tempFile();
final BufferedWriter writer = new BufferedWriter(new FileWriter(configFile));

8
streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java

@ -23,17 +23,16 @@ import org.apache.kafka.test.IntegrationTest; @@ -23,17 +23,16 @@ import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.Map;
import java.util.Properties;
/**
* Tests local state store and global application cleanup.
*/
@Ignore
@Category({IntegrationTest.class})
public class ResetIntegrationTest extends AbstractResetIntegrationTest {
@ -51,6 +50,11 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @@ -51,6 +50,11 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
}
@Override
Map<String, Object> getClientSslConfig() {
return null;
}
@Before
public void before() throws Exception {
testId = TEST_ID;

10
streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java

@ -25,16 +25,15 @@ import org.apache.kafka.test.TestUtils; @@ -25,16 +25,15 @@ import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.Map;
import java.util.Properties;
/**
* Tests command line SSL setup for reset tool.
*/
@Ignore
@Category({IntegrationTest.class})
public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
@ -43,6 +42,8 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest { @@ -43,6 +42,8 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
private static final String TEST_ID = "reset-with-ssl-integration-test";
private static Map<String, Object> sslConfig;
static {
final Properties brokerProps = new Properties();
// we double the value passed to `time.sleep` in each iteration in one of the map functions, so we disable
@ -63,6 +64,11 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest { @@ -63,6 +64,11 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
}
@Override
Map<String, Object> getClientSslConfig() {
return sslConfig;
}
@Before
public void before() throws Exception {
testId = TEST_ID;

2
streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java

@ -104,7 +104,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { @@ -104,7 +104,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
for (int i = 0; i < brokers.length; i++) {
brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);
log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp()));
log.debug("Starting a Kafka instance on port {} ...", brokerConfig.get(KafkaConfig$.MODULE$.PortProp()));
brokers[i] = new KafkaEmbedded(brokerConfig, time);
log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}",

2
streams/src/test/resources/log4j.properties

@ -18,4 +18,4 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender @@ -18,4 +18,4 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.kafka=INFO

Loading…
Cancel
Save