Browse Source
git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1344526 13f79535-47bb-0310-9956-ffa450edef680.8.0-beta1-candidate1
Joe Stein
13 years ago
107 changed files with 2460 additions and 1219 deletions
@ -0,0 +1,76 @@
@@ -0,0 +1,76 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package kafka.consumer |
||||
|
||||
|
||||
import kafka.utils.Logging |
||||
import java.util.regex.{PatternSyntaxException, Pattern} |
||||
|
||||
|
||||
sealed abstract class TopicFilter(rawRegex: String) extends Logging { |
||||
|
||||
val regex = rawRegex |
||||
.trim |
||||
.replace(',', '|') |
||||
.replace(" ", "") |
||||
.replaceAll("""^["']+""","") |
||||
.replaceAll("""["']+$""","") // property files may bring quotes |
||||
|
||||
try { |
||||
Pattern.compile(regex) |
||||
} |
||||
catch { |
||||
case e: PatternSyntaxException => |
||||
throw new RuntimeException(regex + " is an invalid regex.") |
||||
} |
||||
|
||||
override def toString = regex |
||||
|
||||
def requiresTopicEventWatcher: Boolean |
||||
|
||||
def isTopicAllowed(topic: String): Boolean |
||||
} |
||||
|
||||
case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) { |
||||
override def requiresTopicEventWatcher = !regex.matches("""[\p{Alnum}-|]+""") |
||||
|
||||
override def isTopicAllowed(topic: String) = { |
||||
val allowed = topic.matches(regex) |
||||
|
||||
debug("%s %s".format( |
||||
topic, if (allowed) "allowed" else "filtered")) |
||||
|
||||
allowed |
||||
} |
||||
|
||||
|
||||
} |
||||
|
||||
case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) { |
||||
override def requiresTopicEventWatcher = true |
||||
|
||||
override def isTopicAllowed(topic: String) = { |
||||
val allowed = !topic.matches(regex) |
||||
|
||||
debug("%s %s".format( |
||||
topic, if (allowed) "allowed" else "filtered")) |
||||
|
||||
allowed |
||||
} |
||||
} |
||||
|
@ -0,0 +1,21 @@
@@ -0,0 +1,21 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package kafka.message |
||||
|
||||
case class MessageAndMetadata[T](message: T, topic: String = "") |
||||
|
@ -0,0 +1,123 @@
@@ -0,0 +1,123 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package kafka.tools |
||||
|
||||
import java.io.FileWriter |
||||
import joptsimple._ |
||||
import kafka.utils.{Logging, ZkUtils, ZKStringSerializer,ZKGroupTopicDirs} |
||||
import org.I0Itec.zkclient.ZkClient |
||||
|
||||
|
||||
/** |
||||
* A utility that retrieve the offset of broker partitions in ZK and |
||||
* prints to an output file in the following format: |
||||
* |
||||
* /consumers/group1/offsets/topic1/1-0:286894308 |
||||
* /consumers/group1/offsets/topic1/2-0:284803985 |
||||
* |
||||
* This utility expects 3 arguments: |
||||
* 1. Zk host:port string |
||||
* 2. group name (all groups implied if omitted) |
||||
* 3. output filename |
||||
* |
||||
* To print debug message, add the following line to log4j.properties: |
||||
* log4j.logger.kafka.tools.ExportZkOffsets$=DEBUG |
||||
* (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) |
||||
*/ |
||||
object ExportZkOffsets extends Logging { |
||||
|
||||
def main(args: Array[String]) { |
||||
val parser = new OptionParser |
||||
|
||||
val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") |
||||
.withRequiredArg() |
||||
.defaultsTo("localhost:2181") |
||||
.ofType(classOf[String]) |
||||
val groupOpt = parser.accepts("group", "Consumer group.") |
||||
.withRequiredArg() |
||||
.ofType(classOf[String]) |
||||
val outFileOpt = parser.accepts("output-file", "Output file") |
||||
.withRequiredArg() |
||||
.ofType(classOf[String]) |
||||
parser.accepts("help", "Print this message.") |
||||
|
||||
val options = parser.parse(args : _*) |
||||
|
||||
if (options.has("help")) { |
||||
parser.printHelpOn(System.out) |
||||
System.exit(0) |
||||
} |
||||
|
||||
for (opt <- List(zkConnectOpt, outFileOpt)) { |
||||
if (!options.has(opt)) { |
||||
System.err.println("Missing required argument: %s".format(opt)) |
||||
parser.printHelpOn(System.err) |
||||
System.exit(1) |
||||
} |
||||
} |
||||
|
||||
val zkConnect = options.valueOf(zkConnectOpt) |
||||
val groups = options.valuesOf(groupOpt) |
||||
val outfile = options.valueOf(outFileOpt) |
||||
|
||||
var zkClient : ZkClient = null |
||||
val fileWriter : FileWriter = new FileWriter(outfile) |
||||
|
||||
try { |
||||
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) |
||||
|
||||
var consumerGroups: Seq[String] = null |
||||
|
||||
if (groups.size == 0) { |
||||
consumerGroups = ZkUtils.getChildren(zkClient, ZkUtils.ConsumersPath).toList |
||||
} |
||||
else { |
||||
import scala.collection.JavaConversions._ |
||||
consumerGroups = groups |
||||
} |
||||
|
||||
for (consumerGrp <- consumerGroups) { |
||||
val topicsList = getTopicsList(zkClient, consumerGrp) |
||||
|
||||
for (topic <- topicsList) { |
||||
val bidPidList = getBrokeridPartition(zkClient, consumerGrp, topic) |
||||
|
||||
for (bidPid <- bidPidList) { |
||||
val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp,topic) |
||||
val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid |
||||
val offsetVal = ZkUtils.readDataMaybeNull(zkClient, offsetPath) |
||||
fileWriter.write(offsetPath + ":" + offsetVal + "\n") |
||||
debug(offsetPath + " => " + offsetVal) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
finally { |
||||
fileWriter.flush() |
||||
fileWriter.close() |
||||
} |
||||
} |
||||
|
||||
private def getBrokeridPartition(zkClient: ZkClient, consumerGroup: String, topic: String): List[String] = { |
||||
return ZkUtils.getChildrenParentMayNotExist(zkClient, "/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList |
||||
} |
||||
|
||||
private def getTopicsList(zkClient: ZkClient, consumerGroup: String): List[String] = { |
||||
return ZkUtils.getChildren(zkClient, "/consumers/%s/offsets".format(consumerGroup)).toList |
||||
} |
||||
} |
@ -0,0 +1,112 @@
@@ -0,0 +1,112 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package kafka.tools |
||||
|
||||
import java.io.BufferedReader |
||||
import java.io.FileReader |
||||
import joptsimple._ |
||||
import kafka.utils.{Logging, ZkUtils,ZKStringSerializer} |
||||
import org.I0Itec.zkclient.ZkClient |
||||
|
||||
|
||||
/** |
||||
* A utility that updates the offset of broker partitions in ZK. |
||||
* |
||||
* This utility expects 2 input files as arguments: |
||||
* 1. consumer properties file |
||||
* 2. a file contains partition offsets data such as: |
||||
* (This output data file can be obtained by running kafka.tools.ExportZkOffsets) |
||||
* |
||||
* /consumers/group1/offsets/topic1/3-0:285038193 |
||||
* /consumers/group1/offsets/topic1/1-0:286894308 |
||||
* |
||||
* To print debug message, add the following line to log4j.properties: |
||||
* log4j.logger.kafka.tools.ImportZkOffsets$=DEBUG |
||||
* (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin) |
||||
*/ |
||||
object ImportZkOffsets extends Logging { |
||||
|
||||
def main(args: Array[String]) { |
||||
val parser = new OptionParser |
||||
|
||||
val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.") |
||||
.withRequiredArg() |
||||
.defaultsTo("localhost:2181") |
||||
.ofType(classOf[String]) |
||||
val inFileOpt = parser.accepts("input-file", "Input file") |
||||
.withRequiredArg() |
||||
.ofType(classOf[String]) |
||||
parser.accepts("help", "Print this message.") |
||||
|
||||
val options = parser.parse(args : _*) |
||||
|
||||
if (options.has("help")) { |
||||
parser.printHelpOn(System.out) |
||||
System.exit(0) |
||||
} |
||||
|
||||
for (opt <- List(inFileOpt)) { |
||||
if (!options.has(opt)) { |
||||
System.err.println("Missing required argument: %s".format(opt)) |
||||
parser.printHelpOn(System.err) |
||||
System.exit(1) |
||||
} |
||||
} |
||||
|
||||
val zkConnect = options.valueOf(zkConnectOpt) |
||||
val partitionOffsetFile = options.valueOf(inFileOpt) |
||||
|
||||
val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) |
||||
val partitionOffsets: Map[String,String] = getPartitionOffsetsFromFile(partitionOffsetFile) |
||||
|
||||
updateZkOffsets(zkClient, partitionOffsets) |
||||
} |
||||
|
||||
private def getPartitionOffsetsFromFile(filename: String):Map[String,String] = { |
||||
val fr = new FileReader(filename) |
||||
val br = new BufferedReader(fr) |
||||
var partOffsetsMap: Map[String,String] = Map() |
||||
|
||||
var s: String = br.readLine() |
||||
while ( s != null && s.length() >= 1) { |
||||
val tokens = s.split(":") |
||||
|
||||
partOffsetsMap += tokens(0) -> tokens(1) |
||||
debug("adding node path [" + s + "]") |
||||
|
||||
s = br.readLine() |
||||
} |
||||
|
||||
return partOffsetsMap |
||||
} |
||||
|
||||
private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]): Unit = { |
||||
val cluster = ZkUtils.getCluster(zkClient) |
||||
var partitions: List[String] = Nil |
||||
|
||||
for ((partition, offset) <- partitionOffsets) { |
||||
debug("updating [" + partition + "] with offset [" + offset + "]") |
||||
|
||||
try { |
||||
ZkUtils.updatePersistentPath(zkClient, partition, offset.toString) |
||||
} catch { |
||||
case e => e.printStackTrace() |
||||
} |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,171 @@
@@ -0,0 +1,171 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package kafka.tools |
||||
|
||||
import kafka.message.Message |
||||
import joptsimple.OptionParser |
||||
import kafka.utils.{Utils, Logging} |
||||
import kafka.producer.{ProducerData, ProducerConfig, Producer} |
||||
import scala.collection.JavaConversions._ |
||||
import java.util.concurrent.CountDownLatch |
||||
import kafka.consumer._ |
||||
|
||||
|
||||
object MirrorMaker extends Logging { |
||||
|
||||
def main(args: Array[String]) { |
||||
|
||||
info ("Starting mirror maker") |
||||
val parser = new OptionParser |
||||
|
||||
val consumerConfigOpt = parser.accepts("consumer.config", |
||||
"Consumer config to consume from a source cluster. " + |
||||
"You may specify multiple of these.") |
||||
.withRequiredArg() |
||||
.describedAs("config file") |
||||
.ofType(classOf[String]) |
||||
|
||||
val producerConfigOpt = parser.accepts("producer.config", |
||||
"Embedded producer config.") |
||||
.withRequiredArg() |
||||
.describedAs("config file") |
||||
.ofType(classOf[String]) |
||||
|
||||
val numProducersOpt = parser.accepts("num.producers", |
||||
"Number of producer instances") |
||||
.withRequiredArg() |
||||
.describedAs("Number of producers") |
||||
.ofType(classOf[java.lang.Integer]) |
||||
.defaultsTo(1) |
||||
|
||||
val numStreamsOpt = parser.accepts("num.streams", |
||||
"Number of consumption streams.") |
||||
.withRequiredArg() |
||||
.describedAs("Number of threads") |
||||
.ofType(classOf[java.lang.Integer]) |
||||
.defaultsTo(1) |
||||
|
||||
val whitelistOpt = parser.accepts("whitelist", |
||||
"Whitelist of topics to mirror.") |
||||
.withRequiredArg() |
||||
.describedAs("Java regex (String)") |
||||
.ofType(classOf[String]) |
||||
|
||||
val blacklistOpt = parser.accepts("blacklist", |
||||
"Blacklist of topics to mirror.") |
||||
.withRequiredArg() |
||||
.describedAs("Java regex (String)") |
||||
.ofType(classOf[String]) |
||||
|
||||
val helpOpt = parser.accepts("help", "Print this message.") |
||||
|
||||
val options = parser.parse(args : _*) |
||||
|
||||
if (options.has(helpOpt)) { |
||||
parser.printHelpOn(System.out) |
||||
System.exit(0) |
||||
} |
||||
|
||||
Utils.checkRequiredArgs( |
||||
parser, options, consumerConfigOpt, producerConfigOpt) |
||||
if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) { |
||||
println("Exactly one of whitelist or blacklist is required.") |
||||
System.exit(1) |
||||
} |
||||
|
||||
val numStreams = options.valueOf(numStreamsOpt) |
||||
|
||||
val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => { |
||||
val config = new ProducerConfig( |
||||
Utils.loadProps(options.valueOf(producerConfigOpt))) |
||||
new Producer[Null, Message](config) |
||||
}) |
||||
|
||||
val threads = { |
||||
val connectors = options.valuesOf(consumerConfigOpt).toList |
||||
.map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString))) |
||||
.map(new ZookeeperConsumerConnector(_)) |
||||
|
||||
Runtime.getRuntime.addShutdownHook(new Thread() { |
||||
override def run() { |
||||
connectors.foreach(_.shutdown()) |
||||
producers.foreach(_.close()) |
||||
} |
||||
}) |
||||
|
||||
val filterSpec = if (options.has(whitelistOpt)) |
||||
new Whitelist(options.valueOf(whitelistOpt)) |
||||
else |
||||
new Blacklist(options.valueOf(blacklistOpt)) |
||||
|
||||
val streams = |
||||
connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue())) |
||||
|
||||
streams.flatten.zipWithIndex.map(streamAndIndex => { |
||||
new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2) |
||||
}) |
||||
} |
||||
|
||||
threads.foreach(_.start()) |
||||
|
||||
threads.foreach(_.awaitShutdown()) |
||||
} |
||||
|
||||
class MirrorMakerThread(stream: KafkaStream[Message], |
||||
producers: Seq[Producer[Null, Message]], |
||||
threadId: Int) |
||||
extends Thread with Logging { |
||||
|
||||
private val shutdownLatch = new CountDownLatch(1) |
||||
private val threadName = "mirrormaker-" + threadId |
||||
private val producerSelector = Utils.circularIterator(producers) |
||||
|
||||
this.setName(threadName) |
||||
|
||||
override def run() { |
||||
try { |
||||
for (msgAndMetadata <- stream) { |
||||
val producer = producerSelector.next() |
||||
val pd = new ProducerData[Null, Message]( |
||||
msgAndMetadata.topic, msgAndMetadata.message) |
||||
producer.send(pd) |
||||
} |
||||
} |
||||
catch { |
||||
case e => |
||||
fatal("%s stream unexpectedly exited.", e) |
||||
} |
||||
finally { |
||||
shutdownLatch.countDown() |
||||
info("Stopped thread %s.".format(threadName)) |
||||
} |
||||
} |
||||
|
||||
def awaitShutdown() { |
||||
try { |
||||
shutdownLatch.await() |
||||
} |
||||
catch { |
||||
case e: InterruptedException => fatal( |
||||
"Shutdown of thread %s interrupted. This might leak data!" |
||||
.format(threadName)) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
@ -0,0 +1,51 @@
@@ -0,0 +1,51 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package kafka.consumer |
||||
|
||||
|
||||
import junit.framework.Assert._ |
||||
import org.scalatest.junit.JUnitSuite |
||||
import org.junit.Test |
||||
|
||||
|
||||
class TopicFilterTest extends JUnitSuite { |
||||
|
||||
@Test |
||||
def testWhitelists() { |
||||
|
||||
val topicFilter1 = new Whitelist("white1,white2") |
||||
assertFalse(topicFilter1.requiresTopicEventWatcher) |
||||
assertTrue(topicFilter1.isTopicAllowed("white2")) |
||||
assertFalse(topicFilter1.isTopicAllowed("black1")) |
||||
|
||||
val topicFilter2 = new Whitelist(".+") |
||||
assertTrue(topicFilter2.requiresTopicEventWatcher) |
||||
assertTrue(topicFilter2.isTopicAllowed("alltopics")) |
||||
|
||||
val topicFilter3 = new Whitelist("white_listed-topic.+") |
||||
assertTrue(topicFilter3.requiresTopicEventWatcher) |
||||
assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1")) |
||||
assertFalse(topicFilter3.isTopicAllowed("black1")) |
||||
} |
||||
|
||||
@Test |
||||
def testBlacklists() { |
||||
val topicFilter1 = new Blacklist("black1") |
||||
assertTrue(topicFilter1.requiresTopicEventWatcher) |
||||
} |
||||
} |
@ -1,27 +0,0 @@
@@ -1,27 +0,0 @@
|
||||
This test replicates messages from 3 kafka brokers to 2 other kafka brokers |
||||
using the embedded consumer. At the end, the messages produced at the source |
||||
brokers should match that at the target brokers. |
||||
|
||||
To run this test, do |
||||
bin/run-test.sh |
||||
|
||||
The expected output is given in bin/expected.out. There is only 1 thing that's |
||||
important. |
||||
1. The output should have a line "test passed". |
||||
|
||||
In the event of failure, by default the brokers and zookeepers remain running |
||||
to make it easier to debug the issue - hit Ctrl-C to shut them down. You can |
||||
change this behavior by setting the action_on_fail flag in the script to "exit" |
||||
or "proceed", in which case a snapshot of all the logs and directories is |
||||
placed in the test's base directory. |
||||
|
||||
If you are making any changes that may affect the embedded consumer, it is a |
||||
good idea to run the test in a loop. E.g.: |
||||
|
||||
:>/tmp/embeddedconsumer_test.log |
||||
for i in {1..10}; do echo "run $i"; ./bin/run-test.sh 2>1 >> /tmp/embeddedconsumer_test.log; done |
||||
tail -F /tmp/embeddedconsumer_test.log |
||||
|
||||
grep -ic passed /tmp/embeddedconsumer_test.log |
||||
grep -ic failed /tmp/embeddedconsumer_test.log |
||||
|
@ -1,328 +0,0 @@
@@ -1,328 +0,0 @@
|
||||
#!/bin/bash |
||||
# Licensed to the Apache Software Foundation (ASF) under one or more |
||||
# contributor license agreements. See the NOTICE file distributed with |
||||
# this work for additional information regarding copyright ownership. |
||||
# The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
# (the "License"); you may not use this file except in compliance with |
||||
# the License. You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
readonly num_messages=400000 |
||||
readonly message_size=400 |
||||
readonly action_on_fail="proceed" |
||||
|
||||
readonly test_start_time="$(date +%s)" |
||||
|
||||
readonly base_dir=$(dirname $0)/.. |
||||
|
||||
info() { |
||||
echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*" |
||||
} |
||||
|
||||
kill_child_processes() { |
||||
isTopmost=$1 |
||||
curPid=$2 |
||||
childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}') |
||||
for childPid in $childPids |
||||
do |
||||
kill_child_processes 0 $childPid |
||||
done |
||||
if [ $isTopmost -eq 0 ]; then |
||||
kill -15 $curPid 2> /dev/null |
||||
fi |
||||
} |
||||
|
||||
cleanup() { |
||||
info "cleaning up" |
||||
|
||||
pid_zk_source= |
||||
pid_zk_target= |
||||
pid_kafka_source1= |
||||
pid_kafka_source2= |
||||
pid_kafka_source3= |
||||
pid_kafka_target1= |
||||
pid_kafka_target2= |
||||
pid_producer= |
||||
|
||||
rm -rf /tmp/zookeeper_source |
||||
rm -rf /tmp/zookeeper_target |
||||
|
||||
rm -rf /tmp/kafka-source{1..3}-logs |
||||
# mkdir -p /tmp/kafka-source{1..3}-logs/test0{1..3}-0 |
||||
# touch /tmp/kafka-source{1..3}-logs/test0{1..3}-0/00000000000000000000.kafka |
||||
|
||||
rm -rf /tmp/kafka-target{1..2}-logs |
||||
} |
||||
|
||||
begin_timer() { |
||||
t_begin=$(date +%s) |
||||
} |
||||
|
||||
end_timer() { |
||||
t_end=$(date +%s) |
||||
} |
||||
|
||||
start_zk() { |
||||
info "starting zookeepers" |
||||
$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source.properties 2>&1 > $base_dir/zookeeper_source.log & |
||||
pid_zk_source=$! |
||||
$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log & |
||||
pid_zk_target=$! |
||||
} |
||||
|
||||
start_source_servers() { |
||||
info "starting source cluster" |
||||
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source1.properties 2>&1 > $base_dir/kafka_source1.log & |
||||
pid_kafka_source1=$! |
||||
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source2.properties 2>&1 > $base_dir/kafka_source2.log & |
||||
pid_kafka_source2=$! |
||||
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source3.properties 2>&1 > $base_dir/kafka_source3.log & |
||||
pid_kafka_source3=$! |
||||
} |
||||
|
||||
start_target_servers_for_whitelist_test() { |
||||
echo "starting mirror cluster" |
||||
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/whitelisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target1.log & |
||||
pid_kafka_target1=$! |
||||
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/whitelisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target2.log & |
||||
pid_kafka_target2=$! |
||||
} |
||||
|
||||
start_target_servers_for_blacklist_test() { |
||||
echo "starting mirror cluster" |
||||
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/blacklisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target1.log & |
||||
pid_kafka_target1=$! |
||||
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/blacklisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target2.log & |
||||
pid_kafka_target2=$! |
||||
} |
||||
|
||||
shutdown_servers() { |
||||
info "stopping producer" |
||||
if [ "x${pid_producer}" != "x" ]; then kill_child_processes 0 ${pid_producer}; fi |
||||
|
||||
info "shutting down target servers" |
||||
if [ "x${pid_kafka_target1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target1}; fi |
||||
if [ "x${pid_kafka_target2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target2}; fi |
||||
sleep 2 |
||||
|
||||
info "shutting down source servers" |
||||
if [ "x${pid_kafka_source1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source1}; fi |
||||
if [ "x${pid_kafka_source2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source2}; fi |
||||
if [ "x${pid_kafka_source3}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source3}; fi |
||||
|
||||
info "shutting down zookeeper servers" |
||||
if [ "x${pid_zk_target}" != "x" ]; then kill_child_processes 0 ${pid_zk_target}; fi |
||||
if [ "x${pid_zk_source}" != "x" ]; then kill_child_processes 0 ${pid_zk_source}; fi |
||||
} |
||||
|
||||
start_producer() { |
||||
topic=$1 |
||||
info "start producing messages for topic $topic ..." |
||||
$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log & |
||||
pid_producer=$! |
||||
} |
||||
|
||||
# In case the consumer does not consume, the test may exit prematurely (i.e., |
||||
# shut down the kafka brokers, and ProducerPerformance will start throwing ugly |
||||
# exceptions. So, wait for the producer to finish before shutting down. If it |
||||
# takes too long, the user can just hit Ctrl-c which is trapped to kill child |
||||
# processes. |
||||
# Usage: wait_partition_done ([kafka-server] [topic] [partition-id])+ |
||||
wait_partition_done() { |
||||
n_tuples=$(($# / 3)) |
||||
|
||||
i=1 |
||||
while (($#)); do |
||||
kafka_server[i]=$1 |
||||
topic[i]=$2 |
||||
partitionid[i]=$3 |
||||
prev_offset[i]=0 |
||||
info "\twaiting for partition on server ${kafka_server[i]}, topic ${topic[i]}, partition ${partitionid[i]}" |
||||
i=$((i+1)) |
||||
shift 3 |
||||
done |
||||
|
||||
all_done=0 |
||||
|
||||
# set -x |
||||
while [[ $all_done != 1 ]]; do |
||||
sleep 4 |
||||
i=$n_tuples |
||||
all_done=1 |
||||
for ((i=1; i <= $n_tuples; i++)); do |
||||
cur_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server ${kafka_server[i]} --topic ${topic[i]} --partition ${partitionid[i]} --time -1 --offsets 1 | tail -1) |
||||
if [ "x$cur_size" != "x${prev_offset[i]}" ]; then |
||||
all_done=0 |
||||
prev_offset[i]=$cur_size |
||||
fi |
||||
done |
||||
done |
||||
|
||||
} |
||||
|
||||
cmp_logs() { |
||||
topic=$1 |
||||
info "comparing source and target logs for topic $topic" |
||||
source_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) |
||||
source_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) |
||||
source_part2_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) |
||||
target_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) |
||||
target_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) |
||||
if [ "x$target_part0_size" == "x" ]; then target_part0_size=0; fi |
||||
if [ "x$target_part1_size" == "x" ]; then target_part1_size=0; fi |
||||
expected_size=$(($source_part0_size + $source_part1_size + $source_part2_size)) |
||||
actual_size=$(($target_part0_size + $target_part1_size)) |
||||
if [ "x$expected_size" != "x$actual_size" ] |
||||
then |
||||
info "source size: $expected_size target size: $actual_size" |
||||
return 1 |
||||
else |
||||
return 0 |
||||
fi |
||||
} |
||||
|
||||
take_fail_snapshot() { |
||||
snapshot_dir="$base_dir/failed-${snapshot_prefix}-${test_start_time}" |
||||
mkdir $snapshot_dir |
||||
for dir in /tmp/zookeeper_source /tmp/zookeeper_target /tmp/kafka-source{1..3}-logs /tmp/kafka-target{1..2}-logs; do |
||||
if [ -d $dir ]; then |
||||
cp -r $dir $snapshot_dir |
||||
fi |
||||
done |
||||
} |
||||
|
||||
# Usage: process_test_result <result> <action_on_fail> |
||||
# result: last test result |
||||
# action_on_fail: (exit|wait|proceed) |
||||
# ("wait" is useful if you want to troubleshoot using zookeeper) |
||||
process_test_result() { |
||||
result=$1 |
||||
if [ $1 -eq 0 ]; then |
||||
info "test passed" |
||||
else |
||||
info "test failed" |
||||
case "$2" in |
||||
"wait") info "waiting: hit Ctrl-c to quit" |
||||
wait |
||||
;; |
||||
"exit") shutdown_servers |
||||
take_fail_snapshot |
||||
exit $result |
||||
;; |
||||
*) shutdown_servers |
||||
take_fail_snapshot |
||||
info "proceeding" |
||||
;; |
||||
esac |
||||
fi |
||||
} |
||||
|
||||
test_whitelists() { |
||||
info "### Testing whitelists" |
||||
snapshot_prefix="whitelist-test" |
||||
|
||||
cleanup |
||||
start_zk |
||||
start_source_servers |
||||
start_target_servers_for_whitelist_test |
||||
sleep 4 |
||||
|
||||
begin_timer |
||||
|
||||
start_producer test01 |
||||
info "waiting for producer to finish producing ..." |
||||
wait_partition_done kafka://localhost:9090 test01 0 kafka://localhost:9091 test01 0 kafka://localhost:9092 test01 0 |
||||
|
||||
info "waiting for consumer to finish consuming ..." |
||||
wait_partition_done kafka://localhost:9093 test01 0 kafka://localhost:9094 test01 0 |
||||
|
||||
end_timer |
||||
info "embedded consumer took $((t_end - t_begin)) seconds" |
||||
|
||||
sleep 2 |
||||
|
||||
cmp_logs test01 |
||||
result=$? |
||||
|
||||
return $result |
||||
} |
||||
|
||||
test_blacklists() { |
||||
info "### Testing blacklists" |
||||
snapshot_prefix="blacklist-test" |
||||
cleanup |
||||
start_zk |
||||
start_source_servers |
||||
start_target_servers_for_blacklist_test |
||||
sleep 4 |
||||
|
||||
start_producer test02 |
||||
info "waiting for producer to finish producing test02 ..." |
||||
wait_partition_done kafka://localhost:9090 test02 0 kafka://localhost:9091 test02 0 kafka://localhost:9092 test02 0 |
||||
|
||||
# start_producer test03 |
||||
# info "waiting for producer to finish producing test03 ..." |
||||
# wait_partition_done kafka://localhost:9090 test03 0 kafka://localhost:9091 test03 0 kafka://localhost:9092 test03 0 |
||||
|
||||
begin_timer |
||||
|
||||
start_producer test01 |
||||
info "waiting for producer to finish producing ..." |
||||
wait_partition_done kafka://localhost:9090 test01 0 kafka://localhost:9091 test01 0 kafka://localhost:9092 test01 0 |
||||
|
||||
info "waiting for consumer to finish consuming ..." |
||||
wait_partition_done kafka://localhost:9093 test01 0 kafka://localhost:9094 test01 0 |
||||
|
||||
end_timer |
||||
|
||||
info "embedded consumer took $((t_end - t_begin)) seconds" |
||||
|
||||
sleep 2 |
||||
|
||||
cmp_logs test02 |
||||
result1=$? |
||||
# cmp_logs test03 |
||||
# result2=$? |
||||
# if [[ "x$result1" == "x0" || "x$result2" == "x0" ]]; then |
||||
if [[ "x$result1" == "x0" ]]; then |
||||
result=1 |
||||
else |
||||
cmp_logs test01 |
||||
result=$? |
||||
fi |
||||
|
||||
return $result |
||||
} |
||||
|
||||
# main test begins |
||||
|
||||
echo "Test-$test_start_time" |
||||
|
||||
# Ctrl-c trap. Catches INT signal |
||||
trap "shutdown_servers; exit 0" INT |
||||
|
||||
test_whitelists |
||||
result=$? |
||||
|
||||
process_test_result $result $action_on_fail |
||||
|
||||
shutdown_servers |
||||
|
||||
sleep 2 |
||||
|
||||
test_blacklists |
||||
result=$? |
||||
|
||||
process_test_result $result $action_on_fail |
||||
|
||||
shutdown_servers |
||||
|
||||
exit $result |
||||
|
@ -1,11 +0,0 @@
@@ -1,11 +0,0 @@
|
||||
start the servers ... |
||||
start producing messages ... |
||||
Total Num Messages: 10000000 bytes: 1994374785 in 106.076 secs |
||||
Messages/sec: 94272.0314 |
||||
MB/sec: 17.9304 |
||||
[2011-05-02 11:50:29,022] INFO Disconnecting from localhost:9092 (kafka.producer.SyncProducer) |
||||
wait for consumer to finish consuming ... |
||||
test passed |
||||
bin/../../../bin/kafka-server-start.sh: line 11: 359 Terminated $(dirname $0)/kafka-run-class.sh kafka.Kafka $@ |
||||
bin/../../../bin/zookeeper-server-start.sh: line 9: 357 Terminated $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@ |
||||
bin/../../../bin/zookeeper-server-start.sh: line 9: 358 Terminated $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@ |
@ -0,0 +1,22 @@
@@ -0,0 +1,22 @@
|
||||
This test replicates messages from two source kafka clusters into one target |
||||
kafka cluster using the mirror-maker tool. At the end, the messages produced |
||||
at the source brokers should match that at the target brokers. |
||||
|
||||
To run this test, do |
||||
bin/run-test.sh |
||||
|
||||
In the event of failure, by default the brokers and zookeepers remain running |
||||
to make it easier to debug the issue - hit Ctrl-C to shut them down. You can |
||||
change this behavior by setting the action_on_fail flag in the script to "exit" |
||||
or "proceed", in which case a snapshot of all the logs and directories is |
||||
placed in the test's base directory. |
||||
|
||||
It is a good idea to run the test in a loop. E.g.: |
||||
|
||||
:>/tmp/mirrormaker_test.log |
||||
for i in {1..10}; do echo "run $i"; ./bin/run-test.sh 2>1 >> /tmp/mirrormaker_test.log; done |
||||
tail -F /tmp/mirrormaker_test.log |
||||
|
||||
grep -ic passed /tmp/mirrormaker_test.log |
||||
grep -ic failed /tmp/mirrormaker_test.log |
||||
|
@ -0,0 +1,357 @@
@@ -0,0 +1,357 @@
|
||||
#!/bin/bash |
||||
# Licensed to the Apache Software Foundation (ASF) under one or more |
||||
# contributor license agreements. See the NOTICE file distributed with |
||||
# this work for additional information regarding copyright ownership. |
||||
# The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
# (the "License"); you may not use this file except in compliance with |
||||
# the License. You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
readonly num_messages=10000 |
||||
readonly message_size=100 |
||||
readonly action_on_fail="proceed" |
||||
# readonly action_on_fail="wait" |
||||
|
||||
readonly test_start_time="$(date +%s)" |
||||
|
||||
readonly base_dir=$(dirname $0)/.. |
||||
|
||||
info() { |
||||
echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*" |
||||
} |
||||
|
||||
kill_child_processes() { |
||||
isTopmost=$1 |
||||
curPid=$2 |
||||
childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}') |
||||
for childPid in $childPids |
||||
do |
||||
kill_child_processes 0 $childPid |
||||
done |
||||
if [ $isTopmost -eq 0 ]; then |
||||
kill -15 $curPid 2> /dev/null |
||||
fi |
||||
} |
||||
|
||||
cleanup() { |
||||
info "cleaning up" |
||||
|
||||
pid_zk_source1= |
||||
pid_zk_source2= |
||||
pid_zk_target= |
||||
pid_kafka_source_1_1= |
||||
pid_kafka_source_1_2= |
||||
pid_kafka_source_2_1= |
||||
pid_kafka_source_2_2= |
||||
pid_kafka_target_1_1= |
||||
pid_kafka_target_1_2= |
||||
pid_producer= |
||||
pid_mirrormaker_1= |
||||
pid_mirrormaker_2= |
||||
|
||||
rm -rf /tmp/zookeeper* |
||||
|
||||
rm -rf /tmp/kafka* |
||||
} |
||||
|
||||
begin_timer() { |
||||
t_begin=$(date +%s) |
||||
} |
||||
|
||||
end_timer() { |
||||
t_end=$(date +%s) |
||||
} |
||||
|
||||
start_zk() { |
||||
info "starting zookeepers" |
||||
$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source_1.properties 2>&1 > $base_dir/zookeeper_source-1.log & |
||||
pid_zk_source1=$! |
||||
$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source_2.properties 2>&1 > $base_dir/zookeeper_source-2.log & |
||||
pid_zk_source2=$! |
||||
$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log & |
||||
pid_zk_target=$! |
||||
} |
||||
|
||||
start_source_servers() { |
||||
info "starting source cluster" |
||||
|
||||
JMX_PORT=1111 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_1_1.properties 2>&1 > $base_dir/kafka_source-1-1.log & |
||||
pid_kafka_source_1_1=$! |
||||
JMX_PORT=2222 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_1_2.properties 2>&1 > $base_dir/kafka_source-1-2.log & |
||||
pid_kafka_source_1_2=$! |
||||
JMX_PORT=3333 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_2_1.properties 2>&1 > $base_dir/kafka_source-2-1.log & |
||||
pid_kafka_source_2_1=$! |
||||
JMX_PORT=4444 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_2_2.properties 2>&1 > $base_dir/kafka_source-2-2.log & |
||||
pid_kafka_source_2_2=$! |
||||
} |
||||
|
||||
start_target_servers() { |
||||
info "starting mirror cluster" |
||||
JMX_PORT=5555 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target_1_1.properties 2>&1 > $base_dir/kafka_target-1-1.log & |
||||
pid_kafka_target_1_1=$! |
||||
JMX_PORT=6666 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target_1_2.properties 2>&1 > $base_dir/kafka_target-1-2.log & |
||||
pid_kafka_target_1_2=$! |
||||
} |
||||
|
||||
shutdown_servers() { |
||||
info "stopping mirror-maker" |
||||
if [ "x${pid_mirrormaker_1}" != "x" ]; then kill_child_processes 0 ${pid_mirrormaker_1}; fi |
||||
# sleep to avoid rebalancing during shutdown |
||||
sleep 2 |
||||
if [ "x${pid_mirrormaker_2}" != "x" ]; then kill_child_processes 0 ${pid_mirrormaker_2}; fi |
||||
|
||||
info "stopping producer" |
||||
if [ "x${pid_producer}" != "x" ]; then kill_child_processes 0 ${pid_producer}; fi |
||||
|
||||
info "shutting down target servers" |
||||
if [ "x${pid_kafka_target_1_1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target_1_1}; fi |
||||
if [ "x${pid_kafka_target_1_2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target_1_2}; fi |
||||
sleep 2 |
||||
|
||||
info "shutting down source servers" |
||||
if [ "x${pid_kafka_source_1_1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_1_1}; fi |
||||
if [ "x${pid_kafka_source_1_2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_1_2}; fi |
||||
if [ "x${pid_kafka_source_2_1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_2_1}; fi |
||||
if [ "x${pid_kafka_source_2_2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_2_2}; fi |
||||
|
||||
info "shutting down zookeeper servers" |
||||
if [ "x${pid_zk_target}" != "x" ]; then kill_child_processes 0 ${pid_zk_target}; fi |
||||
if [ "x${pid_zk_source1}" != "x" ]; then kill_child_processes 0 ${pid_zk_source1}; fi |
||||
if [ "x${pid_zk_source2}" != "x" ]; then kill_child_processes 0 ${pid_zk_source2}; fi |
||||
} |
||||
|
||||
start_producer() { |
||||
topic=$1 |
||||
zk=$2 |
||||
info "start producing messages for topic $topic to zookeeper $zk ..." |
||||
$base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo zk.connect=$zk --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log & |
||||
pid_producer=$! |
||||
} |
||||
|
||||
# Usage: wait_partition_done ([kafka-server] [topic] [partition-id])+ |
||||
wait_partition_done() { |
||||
n_tuples=$(($# / 3)) |
||||
|
||||
i=1 |
||||
while (($#)); do |
||||
kafka_server[i]=$1 |
||||
topic[i]=$2 |
||||
partitionid[i]=$3 |
||||
prev_offset[i]=0 |
||||
info "\twaiting for partition on server ${kafka_server[i]}, topic ${topic[i]}, partition ${partitionid[i]}" |
||||
i=$((i+1)) |
||||
shift 3 |
||||
done |
||||
|
||||
all_done=0 |
||||
|
||||
# set -x |
||||
while [[ $all_done != 1 ]]; do |
||||
sleep 4 |
||||
i=$n_tuples |
||||
all_done=1 |
||||
for ((i=1; i <= $n_tuples; i++)); do |
||||
cur_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server ${kafka_server[i]} --topic ${topic[i]} --partition ${partitionid[i]} --time -1 --offsets 1 | tail -1) |
||||
if [ "x$cur_size" != "x${prev_offset[i]}" ]; then |
||||
all_done=0 |
||||
prev_offset[i]=$cur_size |
||||
fi |
||||
done |
||||
done |
||||
|
||||
} |
||||
|
||||
cmp_logs() { |
||||
topic=$1 |
||||
info "comparing source and target logs for topic $topic" |
||||
source_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) |
||||
source_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) |
||||
source_part2_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) |
||||
source_part3_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) |
||||
target_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) |
||||
target_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9095 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) |
||||
if [ "x$source_part0_size" == "x" ]; then source_part0_size=0; fi |
||||
if [ "x$source_part1_size" == "x" ]; then source_part1_size=0; fi |
||||
if [ "x$source_part2_size" == "x" ]; then source_part2_size=0; fi |
||||
if [ "x$source_part3_size" == "x" ]; then source_part3_size=0; fi |
||||
if [ "x$target_part0_size" == "x" ]; then target_part0_size=0; fi |
||||
if [ "x$target_part1_size" == "x" ]; then target_part1_size=0; fi |
||||
expected_size=$(($source_part0_size + $source_part1_size + $source_part2_size + $source_part3_size)) |
||||
actual_size=$(($target_part0_size + $target_part1_size)) |
||||
if [ "x$expected_size" != "x$actual_size" ] |
||||
then |
||||
info "source size: $expected_size target size: $actual_size" |
||||
return 1 |
||||
else |
||||
return 0 |
||||
fi |
||||
} |
||||
|
||||
take_fail_snapshot() { |
||||
snapshot_dir="$base_dir/failed-${snapshot_prefix}-${test_start_time}" |
||||
mkdir $snapshot_dir |
||||
for dir in /tmp/zookeeper_source{1..2} /tmp/zookeeper_target /tmp/kafka-source-{1..2}-{1..2}-logs /tmp/kafka-target{1..2}-logs; do |
||||
if [ -d $dir ]; then |
||||
cp -r $dir $snapshot_dir |
||||
fi |
||||
done |
||||
} |
||||
|
||||
# Usage: process_test_result <result> <action_on_fail> |
||||
# result: last test result |
||||
# action_on_fail: (exit|wait|proceed) |
||||
# ("wait" is useful if you want to troubleshoot using zookeeper) |
||||
process_test_result() { |
||||
result=$1 |
||||
if [ $1 -eq 0 ]; then |
||||
info "test passed" |
||||
else |
||||
info "test failed" |
||||
case "$2" in |
||||
"wait") info "waiting: hit Ctrl-c to quit" |
||||
wait |
||||
;; |
||||
"exit") shutdown_servers |
||||
take_fail_snapshot |
||||
exit $result |
||||
;; |
||||
*) shutdown_servers |
||||
take_fail_snapshot |
||||
info "proceeding" |
||||
;; |
||||
esac |
||||
fi |
||||
} |
||||
|
||||
test_whitelists() { |
||||
info "### Testing whitelists" |
||||
snapshot_prefix="whitelist-test" |
||||
|
||||
cleanup |
||||
start_zk |
||||
start_source_servers |
||||
start_target_servers |
||||
sleep 4 |
||||
|
||||
info "starting mirror makers" |
||||
JMX_PORT=7777 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/whitelisttest_1.consumer.properties --consumer.config $base_dir/config/whitelisttest_2.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log & |
||||
pid_mirrormaker_1=$! |
||||
JMX_PORT=8888 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/whitelisttest_1.consumer.properties --consumer.config $base_dir/config/whitelisttest_2.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_2.log & |
||||
pid_mirrormaker_2=$! |
||||
|
||||
begin_timer |
||||
|
||||
start_producer whitetopic01 localhost:2181 |
||||
start_producer whitetopic01 localhost:2182 |
||||
info "waiting for whitetopic01 producers to finish producing ..." |
||||
wait_partition_done kafka://localhost:9090 whitetopic01 0 kafka://localhost:9091 whitetopic01 0 kafka://localhost:9092 whitetopic01 0 kafka://localhost:9093 whitetopic01 0 |
||||
|
||||
start_producer whitetopic02 localhost:2181 |
||||
start_producer whitetopic03 localhost:2181 |
||||
start_producer whitetopic04 localhost:2182 |
||||
info "waiting for whitetopic02,whitetopic03,whitetopic04 producers to finish producing ..." |
||||
wait_partition_done kafka://localhost:9090 whitetopic02 0 kafka://localhost:9091 whitetopic02 0 kafka://localhost:9090 whitetopic03 0 kafka://localhost:9091 whitetopic03 0 kafka://localhost:9092 whitetopic04 0 kafka://localhost:9093 whitetopic04 0 |
||||
|
||||
start_producer blacktopic01 localhost:2182 |
||||
info "waiting for blacktopic01 producer to finish producing ..." |
||||
wait_partition_done kafka://localhost:9092 blacktopic01 0 kafka://localhost:9093 blacktopic01 0 |
||||
|
||||
info "waiting for consumer to finish consuming ..." |
||||
|
||||
wait_partition_done kafka://localhost:9094 whitetopic01 0 kafka://localhost:9095 whitetopic01 0 kafka://localhost:9094 whitetopic02 0 kafka://localhost:9095 whitetopic02 0 kafka://localhost:9094 whitetopic03 0 kafka://localhost:9095 whitetopic03 0 kafka://localhost:9094 whitetopic04 0 kafka://localhost:9095 whitetopic04 0 |
||||
|
||||
end_timer |
||||
info "embedded consumer took $((t_end - t_begin)) seconds" |
||||
|
||||
sleep 2 |
||||
|
||||
# if [[ -d /tmp/kafka-target-1-1-logs/blacktopic01 || /tmp/kafka-target-1-2-logs/blacktopic01 ]]; then |
||||
# echo "blacktopic01 found on target cluster" |
||||
# result=1 |
||||
# else |
||||
# cmp_logs whitetopic01 && cmp_logs whitetopic02 && cmp_logs whitetopic03 && cmp_logs whitetopic04 |
||||
# result=$? |
||||
# fi |
||||
|
||||
cmp_logs blacktopic01 |
||||
|
||||
cmp_logs whitetopic01 && cmp_logs whitetopic02 && cmp_logs whitetopic03 && cmp_logs whitetopic04 |
||||
result=$? |
||||
|
||||
return $result |
||||
} |
||||
|
||||
test_blacklists() { |
||||
info "### Testing blacklists" |
||||
snapshot_prefix="blacklist-test" |
||||
cleanup |
||||
start_zk |
||||
start_source_servers |
||||
start_target_servers |
||||
sleep 4 |
||||
|
||||
info "starting mirror maker" |
||||
$base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/blacklisttest.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --blacklist="black.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log & |
||||
pid_mirrormaker_1=$! |
||||
|
||||
start_producer blacktopic01 localhost:2181 |
||||
start_producer blacktopic02 localhost:2181 |
||||
info "waiting for producer to finish producing blacktopic01,blacktopic02 ..." |
||||
wait_partition_done kafka://localhost:9090 blacktopic01 0 kafka://localhost:9091 blacktopic01 0 kafka://localhost:9090 blacktopic02 0 kafka://localhost:9091 blacktopic02 0 |
||||
|
||||
begin_timer |
||||
|
||||
start_producer whitetopic01 localhost:2181 |
||||
info "waiting for producer to finish producing whitetopic01 ..." |
||||
wait_partition_done kafka://localhost:9090 whitetopic01 0 kafka://localhost:9091 whitetopic01 0 |
||||
|
||||
info "waiting for consumer to finish consuming ..." |
||||
wait_partition_done kafka://localhost:9094 whitetopic01 0 kafka://localhost:9095 whitetopic01 0 |
||||
|
||||
end_timer |
||||
|
||||
info "embedded consumer took $((t_end - t_begin)) seconds" |
||||
|
||||
sleep 2 |
||||
|
||||
cmp_logs blacktopic01 || cmp_logs blacktopic02 |
||||
if [ $? -eq 0 ]; then |
||||
return 1 |
||||
fi |
||||
|
||||
cmp_logs whitetopic01 |
||||
return $? |
||||
} |
||||
|
||||
# main test begins |
||||
|
||||
echo "Test-$test_start_time" |
||||
|
||||
# Ctrl-c trap. Catches INT signal |
||||
trap "shutdown_servers; exit 0" INT |
||||
|
||||
test_whitelists |
||||
result=$? |
||||
|
||||
process_test_result $result $action_on_fail |
||||
|
||||
shutdown_servers |
||||
|
||||
sleep 2 |
||||
|
||||
test_blacklists |
||||
result=$? |
||||
|
||||
process_test_result $result $action_on_fail |
||||
|
||||
shutdown_servers |
||||
|
||||
exit $result |
||||
|
@ -0,0 +1,30 @@
@@ -0,0 +1,30 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more |
||||
# contributor license agreements. See the NOTICE file distributed with |
||||
# this work for additional information regarding copyright ownership. |
||||
# The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
# (the "License"); you may not use this file except in compliance with |
||||
# the License. You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
# zk connection string |
||||
# comma separated host:port pairs, each corresponding to a zk |
||||
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" |
||||
zk.connect=localhost:2183 |
||||
# broker.list=1:localhost:9094,2:localhost:9095 |
||||
|
||||
# timeout in ms for connecting to zookeeper |
||||
# zk.connectiontimeout.ms=1000000 |
||||
|
||||
producer.type=async |
||||
|
||||
# to avoid dropping events if the queue is full, wait indefinitely |
||||
queue.enqueueTimeout.ms=-1 |
||||
|
||||
num.producers.per.broker=2 |
||||
|
@ -0,0 +1,76 @@
@@ -0,0 +1,76 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more |
||||
# contributor license agreements. See the NOTICE file distributed with |
||||
# this work for additional information regarding copyright ownership. |
||||
# The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
# (the "License"); you may not use this file except in compliance with |
||||
# the License. You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
# see kafka.server.KafkaConfig for additional details and defaults |
||||
|
||||
# the id of the broker |
||||
brokerid=2 |
||||
|
||||
# hostname of broker. If not set, will pick up from the value returned |
||||
# from getLocalHost. If there are multiple interfaces getLocalHost |
||||
# may not be what you want. |
||||
# hostname= |
||||
|
||||
# number of logical partitions on this broker |
||||
num.partitions=1 |
||||
|
||||
# the port the socket server runs on |
||||
port=9093 |
||||
|
||||
# the number of processor threads the socket server uses. Defaults to the number of cores on the machine |
||||
num.threads=8 |
||||
|
||||
# the directory in which to store log files |
||||
log.dir=/tmp/kafka-source-2-2-logs |
||||
|
||||
# the send buffer used by the socket server |
||||
socket.send.buffer=1048576 |
||||
|
||||
# the receive buffer used by the socket server |
||||
socket.receive.buffer=1048576 |
||||
|
||||
# the maximum size of a log segment |
||||
log.file.size=536870912 |
||||
|
||||
# the interval between running cleanup on the logs |
||||
log.cleanup.interval.mins=1 |
||||
|
||||
# the minimum age of a log file to eligible for deletion |
||||
log.retention.hours=168 |
||||
|
||||
#the number of messages to accept without flushing the log to disk |
||||
log.flush.interval=600 |
||||
|
||||
#set the following properties to use zookeeper |
||||
|
||||
# enable connecting to zookeeper |
||||
enable.zookeeper=true |
||||
|
||||
# zk connection string |
||||
# comma separated host:port pairs, each corresponding to a zk |
||||
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" |
||||
zk.connect=localhost:2182 |
||||
|
||||
# timeout in ms for connecting to zookeeper |
||||
zk.connectiontimeout.ms=1000000 |
||||
|
||||
# time based topic flush intervals in ms |
||||
#topic.flush.intervals.ms=topic:1000 |
||||
|
||||
# default time based flush interval in ms |
||||
log.default.flush.interval.ms=1000 |
||||
|
||||
# time based topic flasher time rate in ms |
||||
log.default.flush.scheduler.interval.ms=1000 |
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue