@ -50,9 +50,9 @@ object TestOffsetManager {
@@ -50,9 +50,9 @@ object TestOffsetManager {
extends ShutdownableThread ( "commit-thread" )
with KafkaMetricsGroup {
private val group = "group-" + id
private val groupId = "group-" + id
private val metadata = "Metadata from commit thread " + id
private var offsetsChannel = ClientUtils . channelToOffsetManager ( group , zkClient , SocketTimeoutMs )
private var offsetsChannel = ClientUtils . channelToOffsetManager ( groupId , zkClient , SocketTimeoutMs )
private var offset = 0L
val numErrors = new AtomicInteger ( 0 )
val numCommits = new AtomicInteger ( 0 )
@ -62,11 +62,11 @@ object TestOffsetManager {
@@ -62,11 +62,11 @@ object TestOffsetManager {
private def ensureConnected ( ) {
if ( ! offsetsChannel . isConnected )
offsetsChannel = ClientUtils . channelToOffsetManager ( group , zkClient , SocketTimeoutMs )
offsetsChannel = ClientUtils . channelToOffsetManager ( groupId , zkClient , SocketTimeoutMs )
}
override def doWork ( ) {
val commitRequest = OffsetCommitRequest ( group , immutable . Map ( ( 1 to partitionCount ) . map ( TopicAndPartition ( "topic-" + id , _ ) -> OffsetAndMetadata ( offset , metadata ) ) : _ * ) )
val commitRequest = OffsetCommitRequest ( groupId , immutable . Map ( ( 1 to partitionCount ) . map ( TopicAndPartition ( "topic-" + id , _ ) -> OffsetAndMetadata ( offset , metadata ) ) : _ * ) )
try {
ensureConnected ( )
offsetsChannel . send ( commitRequest )
@ -81,7 +81,7 @@ object TestOffsetManager {
@@ -81,7 +81,7 @@ object TestOffsetManager {
case e1 : ClosedByInterruptException =>
offsetsChannel . disconnect ( )
case e2 : IOException =>
println ( "Commit thread %d: Error while committing offsets to %s:%d for group %s due to %s." . format ( id , offsetsChannel . host , offsetsChannel . port , group , e2 ) )
println ( "Commit thread %d: Error while committing offsets to %s:%d for group %s due to %s." . format ( id , offsetsChannel . host , offsetsChannel . port , groupId , e2 ) )
offsetsChannel . disconnect ( )
}
finally {