Browse Source

SimpleConsumer throws UnsupportedOperationException: empty.head; patched by Yang Ye; reviewed by Jun Rao; kafka-576

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1401704 13f79535-47bb-0310-9956-ffa450edef68
0.8.0-beta1-candidate1
Jun Rao 12 years ago
parent
commit
3deb6cd420
  1. 11
      core/src/main/scala/kafka/consumer/SimpleConsumer.scala
  2. 12
      core/src/main/scala/kafka/tools/SimpleConsumerShell.scala

11
core/src/main/scala/kafka/consumer/SimpleConsumer.scala

@ -30,7 +30,8 @@ import kafka.cluster.Broker @@ -30,7 +30,8 @@ import kafka.cluster.Broker
object SimpleConsumer extends Logging {
def earliestOrLatestOffset(broker: Broker, topic: String, partitionId: Int, earliestOrLatest: Long, isFromOrdinaryConsumer: Boolean): Long = {
def earliestOrLatestOffset(broker: Broker, topic: String, partitionId: Int, earliestOrLatest: Long,
isFromOrdinaryConsumer: Boolean): Long = {
var simpleConsumer: SimpleConsumer = null
var producedOffset: Long = -1L
try {
@ -38,9 +39,10 @@ object SimpleConsumer extends Logging { @@ -38,9 +39,10 @@ object SimpleConsumer extends Logging {
ConsumerConfig.SocketBufferSize)
val topicAndPartition = TopicAndPartition(topic, partitionId)
val request = if(isFromOrdinaryConsumer)
OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
else
OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), Request.DebuggingConsumerId.toShort)
new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
Request.DebuggingConsumerId)
producedOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
} catch {
case e =>
@ -53,7 +55,8 @@ object SimpleConsumer extends Logging { @@ -53,7 +55,8 @@ object SimpleConsumer extends Logging {
producedOffset
}
def earliestOrLatestOffset(zkClient: ZkClient, topic: String, brokerId: Int, partitionId: Int, earliestOrLatest: Long, isFromOrdinaryConsumer: Boolean = true): Long = {
def earliestOrLatestOffset(zkClient: ZkClient, topic: String, brokerId: Int, partitionId: Int,
earliestOrLatest: Long, isFromOrdinaryConsumer: Boolean = true): Long = {
val cluster = getCluster(zkClient)
val broker = cluster.getBroker(brokerId) match {
case Some(b) => b

12
core/src/main/scala/kafka/tools/SimpleConsumerShell.scala

@ -25,7 +25,6 @@ import kafka.api.{OffsetRequest, FetchRequestBuilder, Request} @@ -25,7 +25,6 @@ import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
import kafka.cluster.Broker
import scala.collection.JavaConversions._
/**
* Command line program to dump out messages to standard out using the simple consumer
*/
@ -90,6 +89,8 @@ object SimpleConsumerShell extends Logging { @@ -90,6 +89,8 @@ object SimpleConsumerShell extends Logging {
.defaultsTo(1000)
val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
"skip it instead of halt.")
val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend",
"If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages")
val options = parser.parse(args : _*)
for(arg <- List(brokerListOpt, topicOpt, partitionIdOpt)) {
@ -110,6 +111,7 @@ object SimpleConsumerShell extends Logging { @@ -110,6 +111,7 @@ object SimpleConsumerShell extends Logging {
val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
val printOffsets = if(options.has(printOffsetOpt)) true else false
val noWaitAtEndOfLog = options.has(noWaitAtEndOfLogOpt)
val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
@ -182,11 +184,14 @@ object SimpleConsumerShell extends Logging { @@ -182,11 +184,14 @@ object SimpleConsumerShell extends Logging {
.build()
val fetchResponse = simpleConsumer.fetch(fetchRequest)
val messageSet = fetchResponse.messageSet(topic, partitionId)
if (messageSet.validBytes <= 0 && noWaitAtEndOfLog) {
println("Terminating. Reached the end of partition (%s, %d) at offset %d".format(topic, partitionId, offset))
return
}
debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset)
var consumed = 0
for(messageAndOffset <- messageSet) {
try {
offset = messageAndOffset.offset
offset = messageAndOffset.nextOffset
if(printOffsets)
System.out.println("next offset = " + offset)
formatter.writeTo(messageAndOffset.message, System.out)
@ -204,7 +209,6 @@ object SimpleConsumerShell extends Logging { @@ -204,7 +209,6 @@ object SimpleConsumerShell extends Logging {
simpleConsumer.close()
System.exit(1)
}
consumed += 1
}
}
} catch {

Loading…
Cancel
Save