diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java index 8a82bf9fe5e..249e2c3cffd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java @@ -16,15 +16,13 @@ */ package org.apache.kafka.streams.integration; -import kafka.admin.AdminClient; -import kafka.tools.StreamsResetter; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; @@ -65,6 +63,9 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; +import kafka.admin.AdminClient; +import kafka.tools.StreamsResetter; + import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -106,11 +107,25 @@ public abstract class AbstractResetIntegrationTest { kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig); } - // we align time to seconds to get clean window boundaries and thus ensure the same result for each run - // otherwise, input records could fall into different windows for different runs depending on the initial mock time - final long alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000; - mockTime = cluster.time; - mockTime.setCurrentTimeMs(alignedTime); + boolean timeSet = false; + while (!timeSet) { + timeSet = setCurrentTime(); + } + } + + private boolean setCurrentTime() { + boolean currentTimeSet = false; + try { + mockTime = cluster.time; + // we align time to seconds to get clean window boundaries and thus ensure the same result for each run + // otherwise, input records could fall into different windows for different runs depending on the initial mock time + final long alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000; + mockTime.setCurrentTimeMs(alignedTime); + currentTimeSet = true; + } catch (final IllegalArgumentException e) { + // don't care will retry until set + } + return currentTimeSet; } private void prepareConfigs() {