@ -26,6 +26,8 @@ import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
@@ -26,6 +26,8 @@ import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.producer.Producer ;
import org.apache.kafka.common.KafkaException ;
import org.apache.kafka.common.TopicPartition ;
import org.apache.kafka.common.config.ConfigDef ;
import org.apache.kafka.common.config.ConfigDef.Type ;
import org.apache.kafka.common.metrics.Metrics ;
import org.apache.kafka.common.metrics.Sensor ;
import org.apache.kafka.common.metrics.stats.Avg ;
@ -197,6 +199,7 @@ public class StreamThread extends Thread {
@@ -197,6 +199,7 @@ public class StreamThread extends Thread {
private final Map < TaskId , StreamTask > suspendedTasks ;
private final Map < TaskId , StandbyTask > suspendedStandbyTasks ;
private final Time time ;
private final int rebalanceTimeoutMs ;
private final long pollTimeMs ;
private final long cleanTimeMs ;
private final long commitTimeMs ;
@ -290,6 +293,8 @@ public class StreamThread extends Thread {
@@ -290,6 +293,8 @@ public class StreamThread extends Thread {
this . standbyRecords = new HashMap < > ( ) ;
this . stateDirectory = new StateDirectory ( applicationId , config . getString ( StreamsConfig . STATE_DIR_CONFIG ) , time ) ;
final Object maxPollInterval = consumerConfigs . get ( ConsumerConfig . MAX_POLL_INTERVAL_MS_CONFIG ) ;
this . rebalanceTimeoutMs = ( Integer ) ConfigDef . parseType ( ConsumerConfig . MAX_POLL_INTERVAL_MS_CONFIG , maxPollInterval , Type . INT ) ;
this . pollTimeMs = config . getLong ( StreamsConfig . POLL_MS_CONFIG ) ;
this . commitTimeMs = config . getLong ( StreamsConfig . COMMIT_INTERVAL_MS_CONFIG ) ;
this . cleanTimeMs = config . getLong ( StreamsConfig . STATE_CLEANUP_DELAY_MS_CONFIG ) ;
@ -855,7 +860,7 @@ public class StreamThread extends Thread {
@@ -855,7 +860,7 @@ public class StreamThread extends Thread {
}
}
private void addStreamTasks ( Collection < TopicPartition > assignment ) {
private void addStreamTasks ( Collection < TopicPartition > assignment , final long start ) {
if ( partitionAssignor = = null )
throw new IllegalStateException ( logPrefix + " Partition assignor has not been initialized while adding stream tasks: this should not happen." ) ;
@ -893,7 +898,7 @@ public class StreamThread extends Thread {
@@ -893,7 +898,7 @@ public class StreamThread extends Thread {
// create all newly assigned tasks (guard against race condition with other thread via backoff and retry)
// -> other thread will call removeSuspendedTasks(); eventually
taskCreator . retryWithBackoff ( newTasks ) ;
taskCreator . retryWithBackoff ( newTasks , start ) ;
}
StandbyTask createStandbyTask ( TaskId id , Collection < TopicPartition > partitions ) {
@ -910,7 +915,7 @@ public class StreamThread extends Thread {
@@ -910,7 +915,7 @@ public class StreamThread extends Thread {
}
}
private void addStandbyTasks ( ) {
private void addStandbyTasks ( final long start ) {
if ( partitionAssignor = = null )
throw new IllegalStateException ( logPrefix + " Partition assignor has not been initialized while adding standby tasks: this should not happen." ) ;
@ -937,7 +942,7 @@ public class StreamThread extends Thread {
@@ -937,7 +942,7 @@ public class StreamThread extends Thread {
// create all newly assigned standby tasks (guard against race condition with other thread via backoff and retry)
// -> other thread will call removeSuspendedStandbyTasks(); eventually
new StandbyTaskCreator ( checkpointedOffsets ) . retryWithBackoff ( newStandbyTasks ) ;
new StandbyTaskCreator ( checkpointedOffsets ) . retryWithBackoff ( newStandbyTasks , start ) ;
restoreConsumer . assign ( new ArrayList < > ( checkpointedOffsets . keySet ( ) ) ) ;
@ -1126,7 +1131,7 @@ public class StreamThread extends Thread {
@@ -1126,7 +1131,7 @@ public class StreamThread extends Thread {
}
abstract class AbstractTaskCreator {
void retryWithBackoff ( final Map < TaskId , Set < TopicPartition > > tasksToBeCreated ) {
void retryWithBackoff ( final Map < TaskId , Set < TopicPartition > > tasksToBeCreated , final long start ) {
long backoffTimeMs = 50L ;
while ( true ) {
final Iterator < Map . Entry < TaskId , Set < TopicPartition > > > it = tasksToBeCreated . entrySet ( ) . iterator ( ) ;
@ -1138,13 +1143,14 @@ public class StreamThread extends Thread {
@@ -1138,13 +1143,14 @@ public class StreamThread extends Thread {
try {
createTask ( taskId , partitions ) ;
it . remove ( ) ;
backoffTimeMs = 50L ;
} catch ( final LockException e ) {
// ignore and retry
log . warn ( "Could not create task {}. Will retry." , taskId , e ) ;
}
}
if ( tasksToBeCreated . isEmpty ( ) ) {
if ( tasksToBeCreated . isEmpty ( ) | | time . milliseconds ( ) - start > rebalanceTimeoutMs ) {
break ;
}
@ -1207,9 +1213,9 @@ public class StreamThread extends Thread {
@@ -1207,9 +1213,9 @@ public class StreamThread extends Thread {
// will become active or vice versa
closeNonAssignedSuspendedStandbyTasks ( ) ;
closeNonAssignedSuspendedTasks ( ) ;
addStreamTasks ( assignment ) ;
addStreamTasks ( assignment , start ) ;
storeChangelogReader . restore ( ) ;
addStandbyTasks ( ) ;
addStandbyTasks ( start ) ;
streamsMetadataState . onChange ( partitionAssignor . getPartitionsByHostState ( ) , partitionAssignor . clusterMetadata ( ) ) ;
lastCleanMs = time . milliseconds ( ) ; // start the cleaning cycle
setStateWhenNotInPendingShutdown ( State . RUNNING ) ;