From 3deb6cd4208351fa6831331da5b60e8e2ffc97c6 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Wed, 24 Oct 2012 14:19:18 +0000 Subject: [PATCH] SimpleConsumer throws UnsupportedOperationException: empty.head; patched by Yang Ye; reviewed by Jun Rao; kafka-576 git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1401704 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/scala/kafka/consumer/SimpleConsumer.scala | 11 +++++++---- .../main/scala/kafka/tools/SimpleConsumerShell.scala | 12 ++++++++---- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index b8319db4939..d642a670f24 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -30,7 +30,8 @@ import kafka.cluster.Broker object SimpleConsumer extends Logging { - def earliestOrLatestOffset(broker: Broker, topic: String, partitionId: Int, earliestOrLatest: Long, isFromOrdinaryConsumer: Boolean): Long = { + def earliestOrLatestOffset(broker: Broker, topic: String, partitionId: Int, earliestOrLatest: Long, + isFromOrdinaryConsumer: Boolean): Long = { var simpleConsumer: SimpleConsumer = null var producedOffset: Long = -1L try { @@ -38,9 +39,10 @@ object SimpleConsumer extends Logging { ConsumerConfig.SocketBufferSize) val topicAndPartition = TopicAndPartition(topic, partitionId) val request = if(isFromOrdinaryConsumer) - OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))) + new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))) else - OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), Request.DebuggingConsumerId.toShort) + new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), + Request.DebuggingConsumerId) producedOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head } catch { case e => @@ -53,7 +55,8 @@ object SimpleConsumer extends Logging { producedOffset } - def earliestOrLatestOffset(zkClient: ZkClient, topic: String, brokerId: Int, partitionId: Int, earliestOrLatest: Long, isFromOrdinaryConsumer: Boolean = true): Long = { + def earliestOrLatestOffset(zkClient: ZkClient, topic: String, brokerId: Int, partitionId: Int, + earliestOrLatest: Long, isFromOrdinaryConsumer: Boolean = true): Long = { val cluster = getCluster(zkClient) val broker = cluster.getBroker(brokerId) match { case Some(b) => b diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 8ac81984194..09e40e68eea 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -25,7 +25,6 @@ import kafka.api.{OffsetRequest, FetchRequestBuilder, Request} import kafka.cluster.Broker import scala.collection.JavaConversions._ - /** * Command line program to dump out messages to standard out using the simple consumer */ @@ -90,6 +89,8 @@ object SimpleConsumerShell extends Logging { .defaultsTo(1000) val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + "skip it instead of halt.") + val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend", + "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages") val options = parser.parse(args : _*) for(arg <- List(brokerListOpt, topicOpt, partitionIdOpt)) { @@ -110,6 +111,7 @@ object SimpleConsumerShell extends Logging { val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false val printOffsets = if(options.has(printOffsetOpt)) true else false + val noWaitAtEndOfLog = options.has(noWaitAtEndOfLogOpt) val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt)) @@ -182,11 +184,14 @@ object SimpleConsumerShell extends Logging { .build() val fetchResponse = simpleConsumer.fetch(fetchRequest) val messageSet = fetchResponse.messageSet(topic, partitionId) + if (messageSet.validBytes <= 0 && noWaitAtEndOfLog) { + println("Terminating. Reached the end of partition (%s, %d) at offset %d".format(topic, partitionId, offset)) + return + } debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset) - var consumed = 0 for(messageAndOffset <- messageSet) { try { - offset = messageAndOffset.offset + offset = messageAndOffset.nextOffset if(printOffsets) System.out.println("next offset = " + offset) formatter.writeTo(messageAndOffset.message, System.out) @@ -204,7 +209,6 @@ object SimpleConsumerShell extends Logging { simpleConsumer.close() System.exit(1) } - consumed += 1 } } } catch {