@ -408,7 +408,7 @@ object ConsumerGroupCommand extends Logging {
@@ -408,7 +408,7 @@ object ConsumerGroupCommand extends Logging {
getLag ( offset , logEndOffsetOpt ) , consumerIdOpt , hostOpt , clientIdOpt , logEndOffsetOpt )
}
getLogEndOffsets ( group , topicPartitions ) . map {
getLogEndOffsets ( topicPartitions ) . map {
logEndOffsetResult =>
logEndOffsetResult . _2 match {
case LogOffsetResult . LogOffset ( logEndOffset ) => getDescribePartitionResult ( logEndOffsetResult . _1 , Some ( logEndOffset ) )
@ -629,7 +629,7 @@ object ConsumerGroupCommand extends Logging {
@@ -629,7 +629,7 @@ object ConsumerGroupCommand extends Logging {
} ) . toMap
}
private def getLogEndOffsets ( groupId : String , topicPartitions : Seq [ TopicPartition ] ) : Map [ TopicPartition , LogOffsetResult ] = {
private def getLogEndOffsets ( topicPartitions : Seq [ TopicPartition ] ) : Map [ TopicPartition , LogOffsetResult ] = {
val endOffsets = topicPartitions . map { topicPartition =>
topicPartition -> OffsetSpec . latest
} . toMap
@ -645,7 +645,7 @@ object ConsumerGroupCommand extends Logging {
@@ -645,7 +645,7 @@ object ConsumerGroupCommand extends Logging {
} . toMap
}
private def getLogStartOffsets ( groupId : String , topicPartitions : Seq [ TopicPartition ] ) : Map [ TopicPartition , LogOffsetResult ] = {
private def getLogStartOffsets ( topicPartitions : Seq [ TopicPartition ] ) : Map [ TopicPartition , LogOffsetResult ] = {
val startOffsets = topicPartitions . map { topicPartition =>
topicPartition -> OffsetSpec . earliest
} . toMap
@ -661,7 +661,7 @@ object ConsumerGroupCommand extends Logging {
@@ -661,7 +661,7 @@ object ConsumerGroupCommand extends Logging {
} . toMap
}
private def getLogTimestampOffsets ( groupId : String , topicPartitions : Seq [ TopicPartition ] , timestamp : java . lang . Long ) : Map [ TopicPartition , LogOffsetResult ] = {
private def getLogTimestampOffsets ( topicPartitions : Seq [ TopicPartition ] , timestamp : java . lang . Long ) : Map [ TopicPartition , LogOffsetResult ] = {
val timestampOffsets = topicPartitions . map { topicPartition =>
topicPartition -> OffsetSpec . forTimestamp ( timestamp )
} . toMap
@ -681,7 +681,7 @@ object ConsumerGroupCommand extends Logging {
@@ -681,7 +681,7 @@ object ConsumerGroupCommand extends Logging {
" is empty. Falling back to latest known offset." )
}
successfulLogTimestampOffsets ++ getLogEndOffsets ( groupId , unsuccessfulOffsetsForTimes . keySet . toSeq )
successfulLogTimestampOffsets ++ getLogEndOffsets ( unsuccessfulOffsetsForTimes . keySet . toSeq )
}
def close ( ) : Unit = {
@ -793,11 +793,11 @@ object ConsumerGroupCommand extends Logging {
@@ -793,11 +793,11 @@ object ConsumerGroupCommand extends Logging {
partitionsToReset : Seq [ TopicPartition ] ) : Map [ TopicPartition , OffsetAndMetadata ] = {
if ( opts . options . has ( opts . resetToOffsetOpt ) ) {
val offset = opts . options . valueOf ( opts . resetToOffsetOpt )
checkOffsetsRange ( groupId , partitionsToReset . map ( ( _ , offset ) ) . toMap ) . map {
checkOffsetsRange ( partitionsToReset . map ( ( _ , offset ) ) . toMap ) . map {
case ( topicPartition , newOffset ) => ( topicPartition , new OffsetAndMetadata ( newOffset ) )
}
} else if ( opts . options . has ( opts . resetToEarliestOpt ) ) {
val logStartOffsets = getLogStartOffsets ( groupId , partitionsToReset )
val logStartOffsets = getLogStartOffsets ( partitionsToReset )
partitionsToReset . map { topicPartition =>
logStartOffsets . get ( topicPartition ) match {
case Some ( LogOffsetResult . LogOffset ( offset ) ) => ( topicPartition , new OffsetAndMetadata ( offset ) )
@ -805,7 +805,7 @@ object ConsumerGroupCommand extends Logging {
@@ -805,7 +805,7 @@ object ConsumerGroupCommand extends Logging {
}
} . toMap
} else if ( opts . options . has ( opts . resetToLatestOpt ) ) {
val logEndOffsets = getLogEndOffsets ( groupId , partitionsToReset )
val logEndOffsets = getLogEndOffsets ( partitionsToReset )
partitionsToReset . map { topicPartition =>
logEndOffsets . get ( topicPartition ) match {
case Some ( LogOffsetResult . LogOffset ( offset ) ) => ( topicPartition , new OffsetAndMetadata ( offset ) )
@ -820,12 +820,12 @@ object ConsumerGroupCommand extends Logging {
@@ -820,12 +820,12 @@ object ConsumerGroupCommand extends Logging {
throw new IllegalArgumentException ( s" Cannot shift offset for partition $topicPartition since there is no current committed offset " ) ) . offset
( topicPartition , currentOffset + shiftBy )
} . toMap
checkOffsetsRange ( groupId , requestedOffsets ) . map {
checkOffsetsRange ( requestedOffsets ) . map {
case ( topicPartition , newOffset ) => ( topicPartition , new OffsetAndMetadata ( newOffset ) )
}
} else if ( opts . options . has ( opts . resetToDatetimeOpt ) ) {
val timestamp = Utils . getDateTime ( opts . options . valueOf ( opts . resetToDatetimeOpt ) )
val logTimestampOffsets = getLogTimestampOffsets ( groupId , partitionsToReset , timestamp )
val logTimestampOffsets = getLogTimestampOffsets ( partitionsToReset , timestamp )
partitionsToReset . map { topicPartition =>
val logTimestampOffset = logTimestampOffsets . get ( topicPartition )
logTimestampOffset match {
@ -839,7 +839,7 @@ object ConsumerGroupCommand extends Logging {
@@ -839,7 +839,7 @@ object ConsumerGroupCommand extends Logging {
val now = Instant . now ( )
durationParsed . negated ( ) . addTo ( now )
val timestamp = now . minus ( durationParsed ) . toEpochMilli
val logTimestampOffsets = getLogTimestampOffsets ( groupId , partitionsToReset , timestamp )
val logTimestampOffsets = getLogTimestampOffsets ( partitionsToReset , timestamp )
partitionsToReset . map { topicPartition =>
val logTimestampOffset = logTimestampOffsets . get ( topicPartition )
logTimestampOffset match {
@ -852,7 +852,7 @@ object ConsumerGroupCommand extends Logging {
@@ -852,7 +852,7 @@ object ConsumerGroupCommand extends Logging {
val requestedOffsets = resetPlanForGroup . keySet . map { topicPartition =>
topicPartition -> resetPlanForGroup ( topicPartition ) . offset
} . toMap
checkOffsetsRange ( groupId , requestedOffsets ) . map {
checkOffsetsRange ( requestedOffsets ) . map {
case ( topicPartition , newOffset ) => ( topicPartition , new OffsetAndMetadata ( newOffset ) )
}
} match {
@ -873,7 +873,7 @@ object ConsumerGroupCommand extends Logging {
@@ -873,7 +873,7 @@ object ConsumerGroupCommand extends Logging {
} ) )
} . toMap
val preparedOffsetsForPartitionsWithoutCommittedOffset = getLogEndOffsets ( groupId , partitionsToResetWithoutCommittedOffset ) . map {
val preparedOffsetsForPartitionsWithoutCommittedOffset = getLogEndOffsets ( partitionsToResetWithoutCommittedOffset ) . map {
case ( topicPartition , LogOffsetResult . LogOffset ( offset ) ) => ( topicPartition , new OffsetAndMetadata ( offset ) )
case ( topicPartition , _ ) => ToolsUtils . printUsageAndExit ( opts . parser , s" Error getting ending offset of topic partition: $topicPartition " )
}
@ -884,9 +884,9 @@ object ConsumerGroupCommand extends Logging {
@@ -884,9 +884,9 @@ object ConsumerGroupCommand extends Logging {
}
}
private def checkOffsetsRange ( groupId : String , requestedOffsets : Map [ TopicPartition , Long ] ) = {
val logStartOffsets = getLogStartOffsets ( groupId , requestedOffsets . keySet . toSeq )
val logEndOffsets = getLogEndOffsets ( groupId , requestedOffsets . keySet . toSeq )
private def checkOffsetsRange ( requestedOffsets : Map [ TopicPartition , Long ] ) = {
val logStartOffsets = getLogStartOffsets ( requestedOffsets . keySet . toSeq )
val logEndOffsets = getLogEndOffsets ( requestedOffsets . keySet . toSeq )
requestedOffsets . map { case ( topicPartition , offset ) => ( topicPartition ,
logEndOffsets . get ( topicPartition ) match {
case Some ( LogOffsetResult . LogOffset ( endOffset ) ) if offset > endOffset =>