@ -33,6 +33,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
@@ -33,6 +33,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection ;
import org.apache.kafka.common.message.JoinGroupRequestData ;
import org.apache.kafka.common.message.JoinGroupResponseData ;
import org.apache.kafka.common.message.SyncGroupRequestData ;
import org.apache.kafka.common.message.SyncGroupResponseData ;
import org.apache.kafka.common.protocol.Errors ;
import org.apache.kafka.common.requests.JoinGroupRequest ;
@ -101,6 +102,7 @@ import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscript
@@ -101,6 +102,7 @@ import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscript
import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord ;
import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord ;
import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord ;
import static org.apache.kafka.coordinator.group.generic.GenericGroupMember.EMPTY_ASSIGNMENT ;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE ;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD ;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY ;
@ -1491,8 +1493,9 @@ public class GroupMetadataManager {
@@ -1491,8 +1493,9 @@ public class GroupMetadataManager {
/ * *
* Handle a JoinGroupRequest .
*
* @param context The request context .
* @param request The actual JoinGroup request .
* @param context The request context .
* @param request The actual JoinGroup request .
* @param responseFuture The join group response future .
*
* @return The result that contains records to append if the join group phase completes .
* /
@ -1619,7 +1622,7 @@ public class GroupMetadataManager {
@@ -1619,7 +1622,7 @@ public class GroupMetadataManager {
// finding the correct coordinator and rejoin.
responseFuture . complete ( new JoinGroupResponseData ( )
. setMemberId ( UNKNOWN_MEMBER_ID )
. setErrorCode ( Errors . COORDINATOR_NOT_AVAILABLE . code ( ) )
. setErrorCode ( COORDINATOR_NOT_AVAILABLE . code ( ) )
) ;
} else if ( ! group . supportsProtocols ( request . protocolType ( ) , request . protocols ( ) ) ) {
responseFuture . complete ( new JoinGroupResponseData ( )
@ -1678,7 +1681,7 @@ public class GroupMetadataManager {
@@ -1678,7 +1681,7 @@ public class GroupMetadataManager {
"group {} in {} state. Replacing previously mapped member {} with this groupInstanceId." ,
groupInstanceId , group . groupId ( ) , group . currentState ( ) , existingMemberId ) ;
return updateStaticMemberAndRebalance (
return updateStaticMemberThenRebalanceOrCompleteJoin (
context ,
request ,
group ,
@ -1691,7 +1694,7 @@ public class GroupMetadataManager {
@@ -1691,7 +1694,7 @@ public class GroupMetadataManager {
"group {} in {} state. Created a new member id {} for this member and added to the group." ,
groupInstanceId , group . groupId ( ) , group . currentState ( ) , newMemberId ) ;
return addMemberAndRebalance ( context , request , group , newMemberId , responseFuture ) ;
return addMemberThenRebalanceOrCompleteJoin ( context , request , group , newMemberId , responseFuture ) ;
}
}
@ -1744,7 +1747,7 @@ public class GroupMetadataManager {
@@ -1744,7 +1747,7 @@ public class GroupMetadataManager {
"Created a new member id {} and added the member to the group." ,
group . groupId ( ) , group . currentState ( ) , newMemberId ) ;
return addMemberAndRebalance ( context , request , group , newMemberId , responseFuture ) ;
return addMemberThenRebalanceOrCompleteJoin ( context , request , group , newMemberId , responseFuture ) ;
}
return EMPTY_RESULT ;
@ -1776,7 +1779,7 @@ public class GroupMetadataManager {
@@ -1776,7 +1779,7 @@ public class GroupMetadataManager {
// finding the correct coordinator and rejoin.
responseFuture . complete ( new JoinGroupResponseData ( )
. setMemberId ( memberId )
. setErrorCode ( Errors . COORDINATOR_NOT_AVAILABLE . code ( ) )
. setErrorCode ( COORDINATOR_NOT_AVAILABLE . code ( ) )
) ;
} else if ( ! group . supportsProtocols ( request . protocolType ( ) , request . protocols ( ) ) ) {
responseFuture . complete ( new JoinGroupResponseData ( )
@ -1793,7 +1796,7 @@ public class GroupMetadataManager {
@@ -1793,7 +1796,7 @@ public class GroupMetadataManager {
log . debug ( "Pending dynamic member with id {} joins group {} in {} state. Adding to the group now." ,
memberId , group . groupId ( ) , group . currentState ( ) ) ;
return addMemberAndRebalance (
return addMemberThenRebalanceOrCompleteJoin (
context ,
request ,
group ,
@ -1812,6 +1815,8 @@ public class GroupMetadataManager {
@@ -1812,6 +1815,8 @@ public class GroupMetadataManager {
responseFuture . complete ( new JoinGroupResponseData ( )
. setMemberId ( memberId )
. setErrorCode ( memberError . get ( ) . code ( ) )
. setProtocolType ( null )
. setProtocolName ( null )
) ;
} else {
GenericGroupMember member = group . member ( memberId ) ;
@ -1968,9 +1973,8 @@ public class GroupMetadataManager {
@@ -1968,9 +1973,8 @@ public class GroupMetadataManager {
}
} ) ;
List < Record > records = Collections . singletonList (
RecordHelpers . newGroupMetadataRecord ( group , metadataImage . features ( ) . metadataVersion ( ) )
) ;
List < Record > records = Collections . singletonList ( RecordHelpers . newGroupMetadataRecord (
group , Collections . emptyMap ( ) , metadataImage . features ( ) . metadataVersion ( ) ) ) ;
return new CoordinatorResult < > ( records , appendFuture ) ;
@ -2188,7 +2192,7 @@ public class GroupMetadataManager {
@@ -2188,7 +2192,7 @@ public class GroupMetadataManager {
}
/ * *
* Add a member and rebalance .
* Add a member then rebalance or complete join .
*
* @param context The request context .
* @param request The join group request .
@ -2198,7 +2202,7 @@ public class GroupMetadataManager {
@@ -2198,7 +2202,7 @@ public class GroupMetadataManager {
*
* @return The coordinator result that will be appended to the log .
* /
private CoordinatorResult < Void , Record > addMemberAndRebalance (
private CoordinatorResult < Void , Record > addMemberThenRebalanceOrCompleteJoin (
RequestContext context ,
JoinGroupRequestData request ,
GenericGroup group ,
@ -2376,10 +2380,28 @@ public class GroupMetadataManager {
@@ -2376,10 +2380,28 @@ public class GroupMetadataManager {
" state but is in " + group . currentState ( ) + "." ) ;
}
group . allMembers ( ) . forEach ( member - > member . setAssignment ( GenericGroupMember . EMPTY_ASSIGNMENT ) ) ;
group . allMembers ( ) . forEach ( member - > member . setAssignment ( EMPTY_ASSIGNMENT ) ) ;
propagateAssignment ( group , error ) ;
}
/ * *
* Sets assignment for group and propagate assignment and error to all members .
*
* @param group The group .
* @param assignment The assignment for all members .
* /
private void setAndPropagateAssignment ( GenericGroup group , Map < String , byte [ ] > assignment ) {
if ( ! group . isInState ( COMPLETING_REBALANCE ) ) {
throw new IllegalStateException ( "The group must be in CompletingRebalance state " +
"to set and propagate assignment." ) ;
}
group . allMembers ( ) . forEach ( member - >
member . setAssignment ( assignment . getOrDefault ( member . memberId ( ) , EMPTY_ASSIGNMENT ) ) ) ;
propagateAssignment ( group , Errors . NONE ) ;
}
/ * *
* Propagate assignment and error to all members .
*
@ -2536,7 +2558,7 @@ public class GroupMetadataManager {
@@ -2536,7 +2558,7 @@ public class GroupMetadataManager {
}
/ * *
* Update a static member and rebalance .
* Update a static member then rebalance or complete join .
*
* @param context The request context .
* @param request The join group request .
@ -2547,7 +2569,7 @@ public class GroupMetadataManager {
@@ -2547,7 +2569,7 @@ public class GroupMetadataManager {
*
* @return The coordinator result that will be appended to the log .
* /
private CoordinatorResult < Void , Record > updateStaticMemberAndRebalance (
private CoordinatorResult < Void , Record > updateStaticMemberThenRebalanceOrCompleteJoin (
RequestContext context ,
JoinGroupRequestData request ,
GenericGroup group ,
@ -2584,6 +2606,7 @@ public class GroupMetadataManager {
@@ -2584,6 +2606,7 @@ public class GroupMetadataManager {
log . info ( "Static member which joins during Stable stage and doesn't affect " +
"the selected protocol will not trigger a rebalance." ) ;
Map < String , byte [ ] > groupAssignment = group . groupAssignment ( ) ;
CompletableFuture < Void > appendFuture = new CompletableFuture < > ( ) ;
appendFuture . whenComplete ( ( __ , t ) - > {
if ( t ! = null ) {
@ -2633,7 +2656,7 @@ public class GroupMetadataManager {
@@ -2633,7 +2656,7 @@ public class GroupMetadataManager {
} ) ;
List < Record > records = Collections . singletonList (
RecordHelpers . newGroupMetadataRecord ( group , metadataImage . features ( ) . metadataVersion ( ) )
RecordHelpers . newGroupMetadataRecord ( group , groupAssignment , metadataImage . features ( ) . metadataVersion ( ) )
) ;
return new CoordinatorResult < > ( records , appendFuture ) ;
@ -2660,6 +2683,113 @@ public class GroupMetadataManager {
@@ -2660,6 +2683,113 @@ public class GroupMetadataManager {
group . stateAsString ( ) + " when the unknown static member " + request . groupInstanceId ( ) + " rejoins." ) ;
}
return maybeCompleteJoinPhase ( group ) ;
}
/ * *
* Handle a SyncGroupRequest .
*
* @param context The request context .
* @param request The actual SyncGroup request .
* @param responseFuture The sync group response future .
*
* @return The result that contains records to append if the group metadata manager received assignments .
* /
public CoordinatorResult < Void , Record > genericGroupSync (
RequestContext context ,
SyncGroupRequestData request ,
CompletableFuture < SyncGroupResponseData > responseFuture
) throws UnknownMemberIdException , GroupIdNotFoundException {
String groupId = request . groupId ( ) ;
String memberId = request . memberId ( ) ;
GenericGroup group ;
try {
group = getOrMaybeCreateGenericGroup ( groupId , false ) ;
} catch ( Throwable t ) {
responseFuture . complete ( new SyncGroupResponseData ( )
. setErrorCode ( Errors . forException ( t ) . code ( ) )
) ;
return EMPTY_RESULT ;
}
Optional < Errors > errorOpt = validateSyncGroup ( group , request ) ;
if ( errorOpt . isPresent ( ) ) {
responseFuture . complete ( new SyncGroupResponseData ( )
. setErrorCode ( errorOpt . get ( ) . code ( ) ) ) ;
} else if ( group . isInState ( EMPTY ) ) {
responseFuture . complete ( new SyncGroupResponseData ( )
. setErrorCode ( Errors . UNKNOWN_MEMBER_ID . code ( ) ) ) ;
} else if ( group . isInState ( PREPARING_REBALANCE ) ) {
responseFuture . complete ( new SyncGroupResponseData ( )
. setErrorCode ( Errors . REBALANCE_IN_PROGRESS . code ( ) ) ) ;
} else if ( group . isInState ( COMPLETING_REBALANCE ) ) {
group . member ( memberId ) . setAwaitingSyncFuture ( responseFuture ) ;
removePendingSyncMember ( group , request . memberId ( ) ) ;
// If this is the leader, then we can attempt to persist state and transition to stable
if ( group . isLeader ( memberId ) ) {
log . info ( "Assignment received from leader {} for group {} for generation {}. " +
"The group has {} members, {} of which are static." ,
memberId , groupId , group . generationId ( ) , group . size ( ) , group . allStaticMemberIds ( ) . size ( ) ) ;
// Fill all members with corresponding member assignment. If the member assignment
// does not exist, fill with an empty assignment.
Map < String , byte [ ] > assignment = new HashMap < > ( ) ;
request . assignments ( ) . forEach ( memberAssignment - >
assignment . put ( memberAssignment . memberId ( ) , memberAssignment . assignment ( ) )
) ;
Map < String , byte [ ] > membersWithMissingAssignment = new HashMap < > ( ) ;
group . allMembers ( ) . forEach ( member - > {
if ( ! assignment . containsKey ( member . memberId ( ) ) ) {
membersWithMissingAssignment . put ( member . memberId ( ) , EMPTY_ASSIGNMENT ) ;
}
} ) ;
assignment . putAll ( membersWithMissingAssignment ) ;
if ( ! membersWithMissingAssignment . isEmpty ( ) ) {
log . warn ( "Setting empty assignments for members {} of {} for generation {}." ,
membersWithMissingAssignment , groupId , group . generationId ( ) ) ;
}
CompletableFuture < Void > appendFuture = new CompletableFuture < > ( ) ;
appendFuture . whenComplete ( ( __ , t ) - > {
// Another member may have joined the group while we were awaiting this callback,
// so we must ensure we are still in the CompletingRebalance state and the same generation
// when it gets invoked. if we have transitioned to another state, then do nothing
if ( group . isInState ( COMPLETING_REBALANCE ) & & request . generationId ( ) = = group . generationId ( ) ) {
if ( t ! = null ) {
Errors error = Errors . forException ( t ) ;
resetAndPropagateAssignmentWithError ( group , error ) ;
maybePrepareRebalanceOrCompleteJoin ( group , "Error " + error + " when storing group assignment" +
"during SyncGroup (member: " + memberId + ")." ) ;
} else {
// Update group's assignment and propagate to all members.
setAndPropagateAssignment ( group , assignment ) ;
group . transitionTo ( STABLE ) ;
}
}
} ) ;
List < Record > records = Collections . singletonList (
RecordHelpers . newGroupMetadataRecord ( group , assignment , metadataImage . features ( ) . metadataVersion ( ) )
) ;
return new CoordinatorResult < > ( records , appendFuture ) ;
}
} else if ( group . isInState ( STABLE ) ) {
removePendingSyncMember ( group , memberId ) ;
// If the group is stable, we just return the current assignment
GenericGroupMember member = group . member ( memberId ) ;
responseFuture . complete ( new SyncGroupResponseData ( )
. setProtocolType ( group . protocolType ( ) . orElse ( null ) )
. setProtocolName ( group . protocolName ( ) . orElse ( null ) )
. setAssignment ( member . assignment ( ) )
. setErrorCode ( Errors . NONE . code ( ) ) ) ;
} else if ( group . isInState ( DEAD ) ) {
throw new IllegalStateException ( "Reached unexpected condition for Dead group " + groupId ) ;
}
return EMPTY_RESULT ;
}
@ -2684,8 +2814,81 @@ public class GroupMetadataManager {
@@ -2684,8 +2814,81 @@ public class GroupMetadataManager {
}
}
private Optional < Errors > validateSyncGroup (
GenericGroup group ,
SyncGroupRequestData request
) {
if ( group . isInState ( DEAD ) ) {
// If the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; this is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry
// finding the correct coordinator and rejoin.
return Optional . of ( COORDINATOR_NOT_AVAILABLE ) ;
} else {
Optional < Errors > memberError = validateExistingMember (
group ,
request . memberId ( ) ,
request . groupInstanceId ( ) ,
"sync-group"
) ;
if ( memberError . isPresent ( ) ) {
return memberError ;
} else {
if ( request . generationId ( ) ! = group . generationId ( ) ) {
return Optional . of ( Errors . ILLEGAL_GENERATION ) ;
} else if ( isProtocolInconsistent ( request . protocolType ( ) , group . protocolType ( ) . orElse ( null ) ) | |
isProtocolInconsistent ( request . protocolName ( ) , group . protocolName ( ) . orElse ( null ) ) ) {
return Optional . of ( Errors . INCONSISTENT_GROUP_PROTOCOL ) ;
} else {
return Optional . empty ( ) ;
}
}
}
}
private void removePendingSyncMember (
GenericGroup group ,
String memberId
) {
group . removePendingSyncMember ( memberId ) ;
String syncKey = genericGroupSyncKey ( group . groupId ( ) ) ;
switch ( group . currentState ( ) ) {
case DEAD :
case EMPTY :
case PREPARING_REBALANCE :
timer . cancel ( syncKey ) ;
break ;
case COMPLETING_REBALANCE :
case STABLE :
if ( group . hasReceivedSyncFromAllMembers ( ) ) {
timer . cancel ( syncKey ) ;
}
break ;
default :
throw new IllegalStateException ( "Unknown group state: " + group . stateAsString ( ) ) ;
}
}
/ * *
* Checks whether the given protocol type or name in the request is inconsistent with the group ' s .
*
* @param protocolTypeOrName The request ' s protocol type or name .
* @param groupProtocolTypeOrName The group ' s protoocl type or name .
*
* @return True if protocol is inconsistent , false otherwise .
* /
private boolean isProtocolInconsistent (
String protocolTypeOrName ,
String groupProtocolTypeOrName
) {
return protocolTypeOrName ! = null
& & groupProtocolTypeOrName ! = null
& & ! groupProtocolTypeOrName . equals ( protocolTypeOrName ) ;
}
/ * *
* Generate a heartbeat key for the timer .
* Generate a generic group heartbeat key for the timer .
*
* Package private for testing .
*
@ -2699,7 +2902,7 @@ public class GroupMetadataManager {
@@ -2699,7 +2902,7 @@ public class GroupMetadataManager {
}
/ * *
* Generate a join key for the timer .
* Generate a generic group join key for the timer .
*
* Package private for testing .
*
@ -2712,7 +2915,7 @@ public class GroupMetadataManager {
@@ -2712,7 +2915,7 @@ public class GroupMetadataManager {
}
/ * *
* Generate a sync key for the timer .
* Generate a generic group sync key for the timer .
*
* Package private for testing .
*