Browse Source

MINOR: Retry setting aligned time until set (#4893)

In the AbstractResetIntegrationTest we can have a transient error when setting the time for the test where the new time is less than the original time, for those cases we should catch the exception and re-try setting the time once versus letting the test fail.

For testing, ran the entire streams test suite.

Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
pull/4903/head
Bill Bejeck 7 years ago committed by Guozhang Wang
parent
commit
93fd5707fa
  1. 31
      streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java

31
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java

@ -16,15 +16,13 @@ @@ -16,15 +16,13 @@
*/
package org.apache.kafka.streams.integration;
import kafka.admin.AdminClient;
import kafka.tools.StreamsResetter;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
@ -65,6 +63,9 @@ import java.util.Map; @@ -65,6 +63,9 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.admin.AdminClient;
import kafka.tools.StreamsResetter;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@ -106,11 +107,25 @@ public abstract class AbstractResetIntegrationTest { @@ -106,11 +107,25 @@ public abstract class AbstractResetIntegrationTest {
kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig);
}
// we align time to seconds to get clean window boundaries and thus ensure the same result for each run
// otherwise, input records could fall into different windows for different runs depending on the initial mock time
final long alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000;
mockTime = cluster.time;
mockTime.setCurrentTimeMs(alignedTime);
boolean timeSet = false;
while (!timeSet) {
timeSet = setCurrentTime();
}
}
private boolean setCurrentTime() {
boolean currentTimeSet = false;
try {
mockTime = cluster.time;
// we align time to seconds to get clean window boundaries and thus ensure the same result for each run
// otherwise, input records could fall into different windows for different runs depending on the initial mock time
final long alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000;
mockTime.setCurrentTimeMs(alignedTime);
currentTimeSet = true;
} catch (final IllegalArgumentException e) {
// don't care will retry until set
}
return currentTimeSet;
}
private void prepareConfigs() {

Loading…
Cancel
Save