Browse Source

Improve the command line tools in the bin directory to use the compression feature correctly; KAFKA-112; patched by nehanarkhede; reviewed by junrao

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1160529 13f79535-47bb-0310-9956-ffa450edef68
0.7.0
Neha Narkhede 13 years ago
parent
commit
2e05fbfa22
  1. 4
      core/src/main/scala/kafka/tools/DumpLogSegments.scala
  2. 2
      core/src/main/scala/kafka/tools/ProducerPerformance.scala
  3. 45
      core/src/main/scala/kafka/tools/SimpleConsumerShell.scala

4
core/src/main/scala/kafka/utils/DumpLogSegments.scala → core/src/main/scala/kafka/tools/DumpLogSegments.scala

@ -15,7 +15,7 @@ @@ -15,7 +15,7 @@
* limitations under the License.
*/
package kafka.utils
package kafka.tools
import java.io._
import kafka.message._
@ -44,7 +44,7 @@ object DumpLogSegments { @@ -44,7 +44,7 @@ object DumpLogSegments {
println("offset:\t %d \t invalid".format(offset))
if (!isNoPrint)
println("payload:\t" + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
offset += messageAndOffset.offset
offset = messageAndOffset.offset
}
}
}

2
core/src/main/scala/kafka/tools/ProducerPerformance.scala

@ -219,7 +219,7 @@ object ProducerPerformance { @@ -219,7 +219,7 @@ object ProducerPerformance {
props.put("zk.connect", brokerInfoList(1))
else
props.put("broker.list", brokerInfoList(1))
props.put("compression.codec", config.compressionCodec.toString)
props.put("compression.codec", config.compressionCodec.codec.toString)
props.put("reconnect.interval", Integer.MAX_VALUE.toString)
props.put("buffer.size", (64*1024).toString)

45
core/src/main/scala/kafka/tools/SimpleConsumerShell.scala

@ -23,6 +23,7 @@ import kafka.api.FetchRequest @@ -23,6 +23,7 @@ import kafka.api.FetchRequest
import kafka.utils._
import kafka.consumer._
import kafka.server._
import org.apache.log4j.Logger
/**
* Command line program to dump out messages to standard out using the simple consumer
@ -30,7 +31,9 @@ import kafka.server._ @@ -30,7 +31,9 @@ import kafka.server._
object SimpleConsumerShell {
def main(args: Array[String]): Unit = {
val logger = Logger.getLogger(getClass)
val parser = new OptionParser
val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
.withRequiredArg
@ -55,12 +58,22 @@ object SimpleConsumerShell { @@ -55,12 +58,22 @@ object SimpleConsumerShell {
.describedAs("fetchsize")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1000000)
val printOffsetOpt = parser.accepts("print-offsets", "Print the offsets returned by the iterator")
.withOptionalArg
.describedAs("print offsets")
.ofType(classOf[java.lang.Boolean])
.defaultsTo(false)
val printMessageOpt = parser.accepts("print-messages", "Print the messages returned by the iterator")
.withOptionalArg
.describedAs("print messages")
.ofType(classOf[java.lang.Boolean])
.defaultsTo(false)
val options = parser.parse(args : _*)
for(arg <- List(urlOpt, topicOpt)) {
if(!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"")
logger.error("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
System.exit(1)
}
@ -71,31 +84,35 @@ object SimpleConsumerShell { @@ -71,31 +84,35 @@ object SimpleConsumerShell {
val partition = options.valueOf(partitionOpt).intValue
val startingOffset = options.valueOf(offsetOpt).longValue
val fetchsize = options.valueOf(fetchsizeOpt).intValue
val printOffsets = if(options.has(printOffsetOpt)) true else false
val printMessages = if(options.has(printMessageOpt)) true else false
println("Starting consumer...")
logger.info("Starting consumer...")
val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 64*1024)
val thread = Utils.newThread("kafka-consumer", new Runnable() {
def run() {
var offset = startingOffset
while(true) {
val fetchRequest = new FetchRequest(topic, partition, offset, fetchsize)
val messageSets = consumer.multifetch(fetchRequest)
for (messages <- messageSets) {
println("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset)
val fetchRequest = new FetchRequest(topic, partition, offset, fetchsize)
val messageSets = consumer.multifetch(fetchRequest)
for (messages <- messageSets) {
if(logger.isDebugEnabled)
logger.debug("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset)
var consumed = 0
for(messageAndOffset <- messages) {
println("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
if(printMessages)
logger.info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
offset = messageAndOffset.offset
if(printOffsets)
logger.info("next offset = " + offset)
consumed += 1
}
if(consumed > 0)
offset += messages.validBytes
}
}
Thread.sleep(10000)
}
}
}
}, false);
thread.start()
thread.join()
}
}

Loading…
Cancel
Save