@ -44,6 +44,8 @@ import java.util.concurrent.ConcurrentHashMap;
@@ -44,6 +44,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.atomic.AtomicBoolean ;
import java.util.concurrent.locks.ReentrantLock ;
import java.util.function.Consumer ;
/ * *
* The CoordinatorRuntime provides a framework to implement coordinators such as the group coordinator
@ -179,7 +181,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
@@ -179,7 +181,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
LOADING {
@Override
boolean canTransitionFrom ( CoordinatorState state ) {
return state = = INITIAL ;
return state = = INITIAL | | state = = FAILED ;
}
} ,
@ -347,6 +349,12 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
@@ -347,6 +349,12 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
* CoordinatorContext holds all the metadata around a coordinator state machine .
* /
class CoordinatorContext {
/ * *
* The lock which protects all data in the context . Note that the context
* is never accessed concurrently , but it is accessed by multiple threads .
* /
final ReentrantLock lock ;
/ * *
* The topic partition backing the coordinator .
* /
@ -357,11 +365,6 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
@@ -357,11 +365,6 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
* /
final LogContext logContext ;
/ * *
* The snapshot registry backing the coordinator .
* /
final SnapshotRegistry snapshotRegistry ;
/ * *
* The deferred event queue used to park events waiting
* on records to be committed .
@ -376,29 +379,34 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
@@ -376,29 +379,34 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
/ * *
* The current state .
* /
volatile CoordinatorState state ;
CoordinatorState state ;
/ * *
* The actual state machine .
* The current epoch of the coordinator . This represents
* the epoch of the partition leader .
* /
volatile S coordinator ;
int epoch ;
/ * *
* The current epoch of the coordinator . This represents
* the epoch of the partition leader .
* The snapshot registry backing the coordinator .
* /
SnapshotRegistry snapshotRegistry ;
/ * *
* The actual state machine .
* /
volatile int epoch ;
S coordinator ;
/ * *
* The last offset written to the partition .
* /
volatile long lastWrittenOffset ;
long lastWrittenOffset ;
/ * *
* The last offset committed . This represents the high
* watermark of the partition .
* /
volatile long lastCommittedOffset ;
long lastCommittedOffset ;
/ * *
* Constructor .
@ -408,19 +416,17 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
@@ -408,19 +416,17 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
private CoordinatorContext (
TopicPartition tp
) {
this . lock = new ReentrantLock ( ) ;
this . tp = tp ;
this . logContext = new LogContext ( String . format ( "[%s topic=%s partition=%d] " ,
logPrefix ,
tp . topic ( ) ,
tp . partition ( )
) ) ;
this . snapshotRegistry = new SnapshotRegistry ( logContext ) ;
this . deferredEventQueue = new DeferredEventQueue ( logContext ) ;
this . timer = new EventBasedCoordinatorTimer ( tp , logContext ) ;
this . state = CoordinatorState . INITIAL ;
this . epoch = - 1 ;
this . lastWrittenOffset = 0L ;
this . lastCommittedOffset = 0L ;
this . deferredEventQueue = new DeferredEventQueue ( logContext ) ;
this . timer = new EventBasedCoordinatorTimer ( tp , logContext ) ;
}
/ * *
@ -499,6 +505,9 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
@@ -499,6 +505,9 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
switch ( newState ) {
case LOADING :
state = CoordinatorState . LOADING ;
snapshotRegistry = new SnapshotRegistry ( logContext ) ;
lastWrittenOffset = 0L ;
lastCommittedOffset = 0L ;
coordinator = coordinatorBuilderSupplier
. get ( )
. withLogContext ( logContext )
@ -540,8 +549,9 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
@@ -540,8 +549,9 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
deferredEventQueue . failAll ( Errors . NOT_COORDINATOR . exception ( ) ) ;
if ( coordinator ! = null ) {
coordinator . onUnloaded ( ) ;
coordinator = null ;
}
coordinator = null ;
snapshotRegistry = null ;
}
}
@ -633,48 +643,49 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
@@ -633,48 +643,49 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
public void run ( ) {
try {
// Get the context of the coordinator or fail if the coordinator is not in active state.
CoordinatorContext context = activeContextOrThrow ( tp ) ;
long prevLastWrittenOffset = context . lastWrittenOffset ;
// Execute the operation.
result = op . generateRecordsAndResult ( context . coordinator ) ;
if ( result . records ( ) . isEmpty ( ) ) {
// If the records are empty, it was a read operation after all. In this case,
// the response can be returned directly iff there are no pending write operations;
// otherwise, the read needs to wait on the last write operation to be completed.
OptionalLong pendingOffset = context . deferredEventQueue . highestPendingOffset ( ) ;
if ( pendingOffset . isPresent ( ) ) {
context . deferredEventQueue . add ( pendingOffset . getAsLong ( ) , this ) ;
} else {
complete ( null ) ;
}
} else {
// If the records are not empty, first, they are applied to the state machine,
// second, then are written to the partition/log, and finally, the response
// is put into the deferred event queue.
try {
// Apply the records to the state machine.
if ( result . replayRecords ( ) ) {
result . records ( ) . forEach ( context . coordinator : : replay ) ;
}
// Write the records to the log and update the last written
// offset.
long offset = partitionWriter . append ( tp , result . records ( ) ) ;
context . updateLastWrittenOffset ( offset ) ;
// Add the response to the deferred queue.
if ( ! future . isDone ( ) ) {
context . deferredEventQueue . add ( offset , this ) ;
withActiveContextOrThrow ( tp , context - > {
long prevLastWrittenOffset = context . lastWrittenOffset ;
// Execute the operation.
result = op . generateRecordsAndResult ( context . coordinator ) ;
if ( result . records ( ) . isEmpty ( ) ) {
// If the records are empty, it was a read operation after all. In this case,
// the response can be returned directly iff there are no pending write operations;
// otherwise, the read needs to wait on the last write operation to be completed.
OptionalLong pendingOffset = context . deferredEventQueue . highestPendingOffset ( ) ;
if ( pendingOffset . isPresent ( ) ) {
context . deferredEventQueue . add ( pendingOffset . getAsLong ( ) , this ) ;
} else {
complete ( null ) ;
}
} catch ( Throwable t ) {
context . revertLastWrittenOffset ( prevLastWrittenOffset ) ;
complete ( t ) ;
} else {
// If the records are not empty, first, they are applied to the state machine,
// second, then are written to the partition/log, and finally, the response
// is put into the deferred event queue.
try {
// Apply the records to the state machine.
if ( result . replayRecords ( ) ) {
result . records ( ) . forEach ( context . coordinator : : replay ) ;
}
// Write the records to the log and update the last written
// offset.
long offset = partitionWriter . append ( tp , result . records ( ) ) ;
context . updateLastWrittenOffset ( offset ) ;
// Add the response to the deferred queue.
if ( ! future . isDone ( ) ) {
context . deferredEventQueue . add ( offset , this ) ;
} else {
complete ( null ) ;
}
} catch ( Throwable t ) {
context . revertLastWrittenOffset ( prevLastWrittenOffset ) ;
complete ( t ) ;
}
}
}
} ) ;
} catch ( Throwable t ) {
complete ( t ) ;
}
@ -792,16 +803,16 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
@@ -792,16 +803,16 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
public void run ( ) {
try {
// Get the context of the coordinator or fail if the coordinator is not in active state.
CoordinatorContext context = activeContextOrThrow ( tp ) ;
// Execute the read operation.
response = op . generateResponse (
context . coordinator ,
context . lastCommittedOffset
) ;
// The response can be completed immediately.
complete ( null ) ;
withActiveContextOrThrow ( tp , context - > {
// Execute the read operation.
response = op . generateResponse (
context . coordinator ,
context . lastCommittedOffset
) ;
// The response can be completed immediately.
complete ( null ) ;
} ) ;
} catch ( Throwable t ) {
complete ( t ) ;
}
@ -893,7 +904,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
@@ -893,7 +904,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
@Override
public void complete ( Throwable exception ) {
if ( exception ! = null ) {
log . error ( "Execution of {} failed due to {}." , name , exception . getMessage ( ) ) ;
log . error ( "Execution of {} failed due to {}." , name , exception . getMessage ( ) , exception ) ;
}
}
@ -921,7 +932,9 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
@@ -921,7 +932,9 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
) {
log . debug ( "High watermark of {} incremented to {}." , tp , offset ) ;
scheduleInternalOperation ( "HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")" , tp , ( ) - > {
contextOrThrow ( tp ) . updateLastCommittedOffset ( offset ) ;
withActiveContextOrThrow ( tp , context - > {
context . updateLastCommittedOffset ( offset ) ;
} ) ;
} ) ;
}
}
@ -1053,50 +1066,80 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
@@ -1053,50 +1066,80 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
}
/ * *
* @return The coordinator context if the coordinator is active or an exception otherwise .
* Creates the context if it does not exist .
*
* @param tp The topic partition .
* /
private void maybeCreateContext ( TopicPartition tp ) {
coordinators . computeIfAbsent ( tp , CoordinatorContext : : new ) ;
}
/ * *
* @return The coordinator context or thrown an exception if it does
* not exist .
* @throws NotCoordinatorException
* @throws CoordinatorLoadInProgressException
* Package private for testing .
* /
private CoordinatorContext activeContextOrThrow ( TopicPartition tp ) {
CoordinatorContext contextOrThrow ( TopicPartition tp ) throws NotCoordinatorException {
CoordinatorContext context = coordinators . get ( tp ) ;
if ( context = = null ) {
throw Errors . NOT_COORDINATOR . exception ( ) ;
} else {
switch ( context . state ) {
case INITIAL :
case FAILED :
case CLOSED :
throw Errors . NOT_COORDINATOR . exception ( ) ;
case LOADING :
throw Errors . COORDINATOR_LOAD_IN_PROGRESS . exception ( ) ;
}
return context ;
}
return context ;
}
/ * *
* @return The coordinator context . It is created if it does not exist .
* Calls the provided function with the context ; throws an exception otherwise .
* This method ensures that the context lock is acquired before calling the
* function and releases afterwards .
*
* @param tp The topic partition .
* @param func The function that will receive the context .
* @throws NotCoordinatorException
* /
private CoordinatorContext getOrCreateContext ( TopicPartition tp ) {
return coordinators . computeIfAbsent ( tp , CoordinatorContext : : new ) ;
private void withContextOrThrow (
TopicPartition tp ,
Consumer < CoordinatorContext > func
) throws NotCoordinatorException {
CoordinatorContext context = contextOrThrow ( tp ) ;
try {
context . lock . lock ( ) ;
func . accept ( context ) ;
} finally {
context . lock . unlock ( ) ;
}
}
/ * *
* @return The coordinator context or thrown an exception if it does
* not exist .
* Calls the provided function with the context iff the context is active ; throws
* an exception otherwise . This method ensures that the context lock is acquired
* before calling the function and releases afterwards .
*
* @param tp The topic partition .
* @param func The function that will receive the context .
* @throws NotCoordinatorException
* Package private for testing .
* @throws CoordinatorLoadInProgressException
* /
CoordinatorContext contextOrThrow ( TopicPartition tp ) {
CoordinatorContext context = coordinators . get ( tp ) ;
private void withActiveContextOrThrow (
TopicPartition tp ,
Consumer < CoordinatorContext > func
) throws NotCoordinatorException , CoordinatorLoadInProgressException {
CoordinatorContext context = contextOrThrow ( tp ) ;
if ( context = = null ) {
throw Errors . NOT_COORDINATOR . exception ( ) ;
} else {
return context ;
try {
context . lock . lock ( ) ;
if ( context . state = = CoordinatorState . ACTIVE ) {
func . accept ( context ) ;
} else if ( context . state = = CoordinatorState . LOADING ) {
throw Errors . COORDINATOR_LOAD_IN_PROGRESS . exception ( ) ;
} else {
throw Errors . NOT_COORDINATOR . exception ( ) ;
}
} finally {
context . lock . unlock ( ) ;
}
}
@ -1187,63 +1230,62 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
@@ -1187,63 +1230,62 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
) {
throwIfNotRunning ( ) ;
log . info ( "Scheduling loading of metadata from {} with epoch {}" , tp , partitionEpoch ) ;
// Touch the state to make the runtime immediately aware of the new coordinator.
getOr CreateContext( tp ) ;
maybe CreateContext( tp ) ;
scheduleInternalOperation ( "Load(tp=" + tp + ", epoch=" + partitionEpoch + ")" , tp , ( ) - > {
CoordinatorContext context = getOrCreateContext ( tp ) ;
if ( context . state = = CoordinatorState . FAILED ) {
// When the coordinator has failed, we create a new context instead of
// recycling the previous one because it is better to start from an
// empty state for timeline data structures.
coordinators . remove ( tp ) ;
context = getOrCreateContext ( tp ) ;
}
if ( context . epoch < partitionEpoch ) {
context . epoch = partitionEpoch ;
switch ( context . state ) {
case INITIAL :
context . transitionTo ( CoordinatorState . LOADING ) ;
loader . load ( tp , context . coordinator ) . whenComplete ( ( state , exception ) - > {
scheduleInternalOperation ( "CompleteLoad(tp=" + tp + ", epoch=" + partitionEpoch + ")" , tp , ( ) - > {
CoordinatorContext ctx = contextOrThrow ( tp ) ;
if ( ctx . state ! = CoordinatorState . LOADING ) {
log . info ( "Ignoring load completion from {} because context is in {} state." ,
ctx . tp , ctx . state ) ;
return ;
}
try {
if ( exception ! = null ) throw exception ;
ctx . transitionTo ( CoordinatorState . ACTIVE ) ;
log . info ( "Finished loading of metadata from {} with epoch {}." ,
tp , partitionEpoch ) ;
} catch ( Throwable ex ) {
log . error ( "Failed to load metadata from {} with epoch {} due to {}." ,
tp , partitionEpoch , ex . toString ( ) ) ;
ctx . transitionTo ( CoordinatorState . FAILED ) ;
}
withContextOrThrow ( tp , context - > {
if ( context . epoch < partitionEpoch ) {
context . epoch = partitionEpoch ;
switch ( context . state ) {
case FAILED :
case INITIAL :
context . transitionTo ( CoordinatorState . LOADING ) ;
loader . load ( tp , context . coordinator ) . whenComplete ( ( state , exception ) - > {
scheduleInternalOperation ( "CompleteLoad(tp=" + tp + ", epoch=" + partitionEpoch + ")" , tp , ( ) - > {
withContextOrThrow ( tp , ctx - > {
if ( ctx . state ! = CoordinatorState . LOADING ) {
log . info ( "Ignoring load completion from {} because context is in {} state." ,
ctx . tp , ctx . state
) ;
return ;
}
try {
if ( exception ! = null ) throw exception ;
ctx . transitionTo ( CoordinatorState . ACTIVE ) ;
log . info ( "Finished loading of metadata from {} with epoch {}." ,
tp , partitionEpoch
) ;
} catch ( Throwable ex ) {
log . error ( "Failed to load metadata from {} with epoch {} due to {}." ,
tp , partitionEpoch , ex . toString ( )
) ;
ctx . transitionTo ( CoordinatorState . FAILED ) ;
}
} ) ;
} ) ;
} ) ;
} ) ;
break ;
break ;
case LOADING :
log . info ( "The coordinator {} is already loading metadata." , tp ) ;
break ;
case LOADING :
log . info ( "The coordinator {} is already loading metadata." , tp ) ;
break ;
case ACTIVE :
log . info ( "The coordinator {} is already active." , tp ) ;
break ;
case ACTIVE :
log . info ( "The coordinator {} is already active." , tp ) ;
break ;
default :
log . error ( "Cannot load coordinator {} in state {}." , tp , context . state ) ;
default :
log . error ( "Cannot load coordinator {} in state {}." , tp , context . state ) ;
}
} else {
log . info ( "Ignoring loading metadata from {} since current epoch {} is larger than or equals to {}." ,
context . tp , context . epoch , partitionEpoch
) ;
}
} else {
log . info ( "Ignoring loading metadata from {} since current epoch {} is larger than or equals to {}." ,
context . tp , context . epoch , partitionEpoch ) ;
}
} ) ;
} ) ;
}
@ -1262,16 +1304,18 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
@@ -1262,16 +1304,18 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
log . info ( "Scheduling unloading of metadata for {} with epoch {}" , tp , partitionEpoch ) ;
scheduleInternalOperation ( "UnloadCoordinator(tp=" + tp + ", epoch=" + partitionEpoch + ")" , tp , ( ) - > {
CoordinatorContext context = contextOrThrow ( tp ) ;
if ( context . epoch < partitionEpoch ) {
log . info ( "Started unloading metadata for {} with epoch {}." , tp , partitionEpoch ) ;
context . transitionTo ( CoordinatorState . CLOSED ) ;
coordinators . remove ( tp ) ;
log . info ( "Finished unloading metadata for {} with epoch {}." , tp , partitionEpoch ) ;
} else {
log . info ( "Ignored unloading metadata for {} in epoch {} since current epoch is {}." ,
tp , partitionEpoch , context . epoch ) ;
}
withContextOrThrow ( tp , context - > {
if ( context . epoch < partitionEpoch ) {
log . info ( "Started unloading metadata for {} with epoch {}." , tp , partitionEpoch ) ;
context . transitionTo ( CoordinatorState . CLOSED ) ;
coordinators . remove ( tp , context ) ;
log . info ( "Finished unloading metadata for {} with epoch {}." , tp , partitionEpoch ) ;
} else {
log . info ( "Ignored unloading metadata for {} in epoch {} since current epoch is {}." ,
tp , partitionEpoch , context . epoch
) ;
}
} ) ;
} ) ;
}
@ -1294,14 +1338,15 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
@@ -1294,14 +1338,15 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
// Push an event for each coordinator.
coordinators . keySet ( ) . forEach ( tp - > {
scheduleInternalOperation ( "UpdateImage(tp=" + tp + ", offset=" + newImage . offset ( ) + ")" , tp , ( ) - > {
CoordinatorContext context = contextOrThrow ( tp ) ;
if ( context . state = = CoordinatorState . ACTIVE ) {
log . debug ( "Applying new metadata image with offset {} to {}." , newImage . offset ( ) , tp ) ;
context . coordinator . onNewMetadataImage ( newImage , delta ) ;
} else {
log . debug ( "Ignoring new metadata image with offset {} for {} because the coordinator is not active." ,
newImage . offset ( ) , tp ) ;
}
withContextOrThrow ( tp , context - > {
if ( context . state = = CoordinatorState . ACTIVE ) {
log . debug ( "Applying new metadata image with offset {} to {}." , newImage . offset ( ) , tp ) ;
context . coordinator . onNewMetadataImage ( newImage , delta ) ;
} else {
log . debug ( "Ignoring new metadata image with offset {} for {} because the coordinator is not active." ,
newImage . offset ( ) , tp ) ;
}
} ) ;
} ) ;
} ) ;
}