@ -24,7 +24,6 @@ import org.apache.kafka.clients.StaleMetadataException;
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.StaleMetadataException;
import org.apache.kafka.clients.consumer.LogTruncationException ;
import org.apache.kafka.clients.consumer.OffsetAndMetadata ;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp ;
import org.apache.kafka.clients.consumer.OffsetResetStrategy ;
import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData ;
import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult ;
import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult ;
@ -34,12 +33,9 @@ import org.apache.kafka.common.Node;
@@ -34,12 +33,9 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition ;
import org.apache.kafka.common.errors.RetriableException ;
import org.apache.kafka.common.errors.TimeoutException ;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion ;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition ;
import org.apache.kafka.common.protocol.ApiKeys ;
import org.apache.kafka.common.requests.ListOffsetsRequest ;
import org.apache.kafka.common.requests.ListOffsetsResponse ;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest ;
import org.apache.kafka.common.utils.LogContext ;
import org.apache.kafka.common.utils.Time ;
import org.apache.kafka.common.utils.Timer ;
@ -54,10 +50,14 @@ import java.util.Map;
@@ -54,10 +50,14 @@ import java.util.Map;
import java.util.Optional ;
import java.util.Set ;
import java.util.concurrent.atomic.AtomicInteger ;
import java.util.concurrent.atomic.AtomicReference ;
import java.util.function.Function ;
import java.util.stream.Collectors ;
import static org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult ;
import static org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.hasUsableOffsetForLeaderEpochVersion ;
import static org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.regroupFetchPositionsByLeader ;
import static org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.topicsForPartitions ;
/ * *
* { @link OffsetFetcher } is responsible for fetching the { @link OffsetAndTimestamp offsets } for
* a given set of { @link TopicPartition topic and partition pairs } and for validation and resetting of positions ,
@ -73,7 +73,6 @@ public class OffsetFetcher {
@@ -73,7 +73,6 @@ public class OffsetFetcher {
private final long retryBackoffMs ;
private final long requestTimeoutMs ;
private final IsolationLevel isolationLevel ;
private final AtomicReference < RuntimeException > cachedListOffsetsException = new AtomicReference < > ( ) ;
private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient ;
private final ApiVersions apiVersions ;
private final OffsetFetcherUtils offsetFetcherUtils ;
@ -98,16 +97,7 @@ public class OffsetFetcher {
@@ -98,16 +97,7 @@ public class OffsetFetcher {
this . apiVersions = apiVersions ;
this . offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient ( client , logContext ) ;
this . offsetFetcherUtils = new OffsetFetcherUtils ( logContext , metadata , subscriptions ,
time , apiVersions ) ;
}
private OffsetResetStrategy timestampToOffsetResetStrategy ( long timestamp ) {
if ( timestamp = = ListOffsetsRequest . EARLIEST_TIMESTAMP )
return OffsetResetStrategy . EARLIEST ;
else if ( timestamp = = ListOffsetsRequest . LATEST_TIMESTAMP )
return OffsetResetStrategy . LATEST ;
else
return null ;
time , retryBackoffMs , apiVersions ) ;
}
/ * *
@ -117,11 +107,6 @@ public class OffsetFetcher {
@@ -117,11 +107,6 @@ public class OffsetFetcher {
* and one or more partitions aren ' t awaiting a seekToBeginning ( ) or seekToEnd ( ) .
* /
public void resetPositionsIfNeeded ( ) {
// Raise exception from previous offset fetch if there is one
RuntimeException exception = cachedListOffsetsException . getAndSet ( null ) ;
if ( exception ! = null )
throw exception ;
Map < TopicPartition , Long > offsetResetTimestamps = offsetFetcherUtils . getOffsetResetTimestamp ( ) ;
if ( offsetResetTimestamps . isEmpty ( ) )
@ -142,13 +127,13 @@ public class OffsetFetcher {
@@ -142,13 +127,13 @@ public class OffsetFetcher {
public Map < TopicPartition , OffsetAndTimestamp > offsetsForTimes ( Map < TopicPartition , Long > timestampsToSearch ,
Timer timer ) {
metadata . addTransientTopics ( offsetFetcherUtils . topicsForPartitions ( timestampsToSearch . keySet ( ) ) ) ;
metadata . addTransientTopics ( topicsForPartitions ( timestampsToSearch . keySet ( ) ) ) ;
try {
Map < TopicPartition , ListOffsetData > fetchedOffsets = fetchOffsetsByTimes ( timestampsToSearch ,
timer , true ) . fetchedOffsets ;
return offsetFetcherUtils . buildOffsetsForTimesResult ( timestampsToSearch , fetchedOffsets ) ;
return buildOffsetsForTimesResult ( timestampsToSearch , fetchedOffsets ) ;
} finally {
metadata . clearTransientTopics ( ) ;
}
@ -215,7 +200,7 @@ public class OffsetFetcher {
@@ -215,7 +200,7 @@ public class OffsetFetcher {
private Map < TopicPartition , Long > beginningOrEndOffset ( Collection < TopicPartition > partitions ,
long timestamp ,
Timer timer ) {
metadata . addTransientTopics ( offsetFetcherUtils . topicsForPartitions ( partitions ) ) ;
metadata . addTransientTopics ( topicsForPartitions ( partitions ) ) ;
try {
Map < TopicPartition , Long > timestampsToSearch = partitions . stream ( )
. distinct ( )
@ -230,16 +215,6 @@ public class OffsetFetcher {
@@ -230,16 +215,6 @@ public class OffsetFetcher {
}
}
// Visible for testing
void resetPositionIfNeeded ( TopicPartition partition , OffsetResetStrategy requestedResetStrategy , ListOffsetData offsetData ) {
FetchPosition position = new FetchPosition (
offsetData . offset ,
Optional . empty ( ) , // This will ensure we skip validation
metadata . currentLeader ( partition ) ) ;
offsetData . leaderEpoch . ifPresent ( epoch - > metadata . updateLastSeenEpochIfNewer ( partition , epoch ) ) ;
subscriptions . maybeSeekUnvalidated ( partition , position , requestedResetStrategy ) ;
}
private void resetPositionsAsync ( Map < TopicPartition , Long > partitionResetTimestamps ) {
Map < Node , Map < TopicPartition , ListOffsetsPartition > > timestampsToSearchByNode =
groupListOffsetRequests ( partitionResetTimestamps , new HashSet < > ( ) ) ;
@ -252,39 +227,17 @@ public class OffsetFetcher {
@@ -252,39 +227,17 @@ public class OffsetFetcher {
future . addListener ( new RequestFutureListener < ListOffsetResult > ( ) {
@Override
public void onSuccess ( ListOffsetResult result ) {
if ( ! result . partitionsToRetry . isEmpty ( ) ) {
subscriptions . requestFailed ( result . partitionsToRetry , time . milliseconds ( ) + retryBackoffMs ) ;
metadata . requestUpdate ( ) ;
}
for ( Map . Entry < TopicPartition , ListOffsetData > fetchedOffset : result . fetchedOffsets . entrySet ( ) ) {
TopicPartition partition = fetchedOffset . getKey ( ) ;
ListOffsetData offsetData = fetchedOffset . getValue ( ) ;
ListOffsetsPartition requestedReset = resetTimestamps . get ( partition ) ;
resetPositionIfNeeded ( partition , timestampToOffsetResetStrategy ( requestedReset . timestamp ( ) ) , offsetData ) ;
}
offsetFetcherUtils . onSuccessfulRequestForResettingPositions ( resetTimestamps , result ) ;
}
@Override
public void onFailure ( RuntimeException e ) {
subscriptions . requestFailed ( resetTimestamps . keySet ( ) , time . milliseconds ( ) + retryBackoffMs ) ;
metadata . requestUpdate ( ) ;
if ( ! ( e instanceof RetriableException ) & & ! cachedListOffsetsException . compareAndSet ( null , e ) )
log . error ( "Discarding error in ListOffsetResponse because another error is pending" , e ) ;
offsetFetcherUtils . onFailedRequestForResettingPositions ( resetTimestamps , e ) ;
}
} ) ;
}
}
static boolean hasUsableOffsetForLeaderEpochVersion ( NodeApiVersions nodeApiVersions ) {
ApiVersion apiVersion = nodeApiVersions . apiVersion ( ApiKeys . OFFSET_FOR_LEADER_EPOCH ) ;
if ( apiVersion = = null )
return false ;
return OffsetsForLeaderEpochRequest . supportsTopicPermission ( apiVersion . maxVersion ( ) ) ;
}
/ * *
* For each partition which needs validation , make an asynchronous request to get the end - offsets for the partition
* with the epoch less than or equal to the epoch the partition last saw .
@ -294,8 +247,7 @@ public class OffsetFetcher {
@@ -294,8 +247,7 @@ public class OffsetFetcher {
* Requests are grouped by Node for efficiency .
* /
private void validatePositionsAsync ( Map < TopicPartition , FetchPosition > partitionsToValidate ) {
final Map < Node , Map < TopicPartition , FetchPosition > > regrouped =
regroupFetchPositionsByLeader ( partitionsToValidate ) ;
final Map < Node , Map < TopicPartition , FetchPosition > > regrouped = regroupFetchPositionsByLeader ( partitionsToValidate ) ;
long nextResetTimeMs = time . milliseconds ( ) + requestTimeoutMs ;
regrouped . forEach ( ( node , fetchPositions ) - > {
@ -518,7 +470,6 @@ public class OffsetFetcher {
@@ -518,7 +470,6 @@ public class OffsetFetcher {
}
}
/ * *
* If we have seen new metadata ( as tracked by { @link org . apache . kafka . clients . Metadata # updateVersion ( ) } ) , then
* we should check that all the assignments have a valid position .
@ -526,14 +477,4 @@ public class OffsetFetcher {
@@ -526,14 +477,4 @@ public class OffsetFetcher {
public void validatePositionsOnMetadataChange ( ) {
offsetFetcherUtils . validatePositionsOnMetadataChange ( ) ;
}
private Map < Node , Map < TopicPartition , FetchPosition > > regroupFetchPositionsByLeader (
Map < TopicPartition , FetchPosition > partitionMap ) {
return partitionMap . entrySet ( )
. stream ( )
. filter ( entry - > entry . getValue ( ) . currentLeader . leader . isPresent ( ) )
. collect ( Collectors . groupingBy ( entry - > entry . getValue ( ) . currentLeader . leader . get ( ) ,
Collectors . toMap ( Map . Entry : : getKey , Map . Entry : : getValue ) ) ) ;
}
}
}