Compare commits

...

10 Commits
trunk ... 0.8

  1. 15
      DISCLAIMER
  2. 2
      bin/kafka-add-partitions.sh
  3. 2
      bin/kafka-console-consumer.sh
  4. 2
      bin/kafka-console-producer.sh
  5. 2
      bin/kafka-consumer-perf-test.sh
  6. 2
      bin/kafka-create-topic.sh
  7. 2
      bin/kafka-list-topic.sh
  8. 2
      bin/kafka-preferred-replica-election.sh
  9. 2
      bin/kafka-producer-perf-test.sh
  10. 2
      bin/kafka-reassign-partitions.sh
  11. 2
      bin/kafka-replay-log-producer.sh
  12. 18
      bin/kafka-run-class.sh
  13. 2
      bin/kafka-server-start.sh
  14. 2
      bin/kafka-simple-consumer-perf-test.sh
  15. 2
      bin/kafka-simple-consumer-shell.sh
  16. 2
      bin/zookeeper-server-start.sh
  17. 2
      bin/zookeeper-shell.sh
  18. 13
      core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
  19. 6
      core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
  20. 108
      core/src/main/scala/kafka/log/OffsetIndex.scala
  21. 11
      core/src/main/scala/kafka/server/AbstractFetcherThread.scala
  22. 70
      core/src/main/scala/kafka/tools/GetOffsetShell.scala
  23. 23
      core/src/main/scala/kafka/utils/Os.scala
  24. 13
      core/src/main/scala/kafka/utils/Utils.scala
  25. 12
      core/src/main/scala/kafka/utils/ZkUtils.scala
  26. 6
      core/src/test/scala/unit/kafka/utils/TestUtils.scala
  27. 14
      core/src/test/scala/unit/kafka/utils/UtilsTest.scala
  28. 9
      kafka-patch-review.py
  29. 9
      project/Build.scala
  30. 8
      system_test/migration_tool_testsuite/0.7/bin/kafka-run-class.sh
  31. 68
      system_test/migration_tool_testsuite/0.7/config/test-log4j.properties
  32. 1
      system_test/utils/kafka_system_test_utils.py

15
DISCLAIMER

@ -1,15 +0,0 @@ @@ -1,15 +0,0 @@
Apache Kafka is an effort undergoing incubation at the Apache Software
Foundation (ASF), sponsored by the Apache Incubator PMC.
Incubation is required of all newly accepted projects until a further review
indicates that the infrastructure, communications, and decision making process
have stabilized in a manner consistent with other successful ASF projects.
While incubation status is not necessarily a reflection of the completeness
or stability of the code, it does indicate that the project has yet to be
fully endorsed by the ASF.
For more information about the incubation status of the Kafka project you
can go to the following page:
http://incubator.apache.org/kafka/

2
bin/kafka-add-partitions.sh

@ -15,4 +15,4 @@ @@ -15,4 +15,4 @@
# limitations under the License.
base_dir=$(dirname $0)
$base_dir/kafka-run-class.sh kafka.admin.AddPartitionsCommand $@
exec $base_dir/kafka-run-class.sh kafka.admin.AddPartitionsCommand $@

2
bin/kafka-console-consumer.sh

@ -15,4 +15,4 @@ @@ -15,4 +15,4 @@
# limitations under the License.
export KAFKA_HEAP_OPTS="-Xmx512M"
$(dirname $0)/kafka-run-class.sh kafka.consumer.ConsoleConsumer $@
exec $(dirname $0)/kafka-run-class.sh kafka.consumer.ConsoleConsumer $@

2
bin/kafka-console-producer.sh

@ -15,4 +15,4 @@ @@ -15,4 +15,4 @@
# limitations under the License.
export KAFKA_HEAP_OPTS="-Xmx512M"
$(dirname $0)/kafka-run-class.sh kafka.producer.ConsoleProducer $@
exec $(dirname $0)/kafka-run-class.sh kafka.producer.ConsoleProducer $@

2
bin/kafka-consumer-perf-test.sh

@ -15,4 +15,4 @@ @@ -15,4 +15,4 @@
# limitations under the License.
export KAFKA_HEAP_OPTS="-Xmx512M"
$(dirname $0)/kafka-run-class.sh kafka.perf.ConsumerPerformance $@
exec $(dirname $0)/kafka-run-class.sh kafka.perf.ConsumerPerformance $@

2
bin/kafka-create-topic.sh

@ -14,4 +14,4 @@ @@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
$(dirname $0)/kafka-run-class.sh kafka.admin.CreateTopicCommand $@
exec $(dirname $0)/kafka-run-class.sh kafka.admin.CreateTopicCommand $@

2
bin/kafka-list-topic.sh

@ -14,4 +14,4 @@ @@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
$(dirname $0)/kafka-run-class.sh kafka.admin.ListTopicCommand $@
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ListTopicCommand $@

2
bin/kafka-preferred-replica-election.sh

@ -14,4 +14,4 @@ @@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
$(dirname $0)/kafka-run-class.sh kafka.admin.PreferredReplicaLeaderElectionCommand $@
exec $(dirname $0)/kafka-run-class.sh kafka.admin.PreferredReplicaLeaderElectionCommand $@

2
bin/kafka-producer-perf-test.sh

@ -15,4 +15,4 @@ @@ -15,4 +15,4 @@
# limitations under the License.
export KAFKA_HEAP_OPTS="-Xmx512M"
$(dirname $0)/kafka-run-class.sh kafka.perf.ProducerPerformance $@
exec $(dirname $0)/kafka-run-class.sh kafka.perf.ProducerPerformance $@

2
bin/kafka-reassign-partitions.sh

@ -14,4 +14,4 @@ @@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
$(dirname $0)/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand $@
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand $@

2
bin/kafka-replay-log-producer.sh

@ -14,4 +14,4 @@ @@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
$(dirname $0)/kafka-run-class.sh kafka.tools.ReplayLogProducer $@
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ReplayLogProducer $@

18
bin/kafka-run-class.sh

@ -88,7 +88,7 @@ fi @@ -88,7 +88,7 @@ fi
# JVM performance options
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC"
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
fi
# GC options
@ -101,20 +101,6 @@ if [ "$1" = "daemon" ] && [ -z "$KAFKA_GC_LOG_OPTS"] ; then @@ -101,20 +101,6 @@ if [ "$1" = "daemon" ] && [ -z "$KAFKA_GC_LOG_OPTS"] ; then
KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps "
fi
$JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
exitval=$?
if [ $exitval -eq "1" ] ; then
$JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" >& exception.txt
exception=`cat exception.txt`
noBuildMessage='Please build the project using sbt. Documentation is available at http://kafka.apache.org/'
pattern="(Could not find or load main class)|(java\.lang\.NoClassDefFoundError)"
match=`echo $exception | grep -E "$pattern"`
if [[ -n "$match" ]]; then
echo $noBuildMessage
fi
rm exception.txt
fi
exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"

2
bin/kafka-server-start.sh

@ -22,4 +22,4 @@ fi @@ -22,4 +22,4 @@ fi
base_dir=$(dirname $0)
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
$base_dir/kafka-run-class.sh daemon kafkaServer kafka.Kafka $@
exec $base_dir/kafka-run-class.sh daemon kafkaServer kafka.Kafka $@

2
bin/kafka-simple-consumer-perf-test.sh

@ -15,4 +15,4 @@ @@ -15,4 +15,4 @@
# limitations under the License.
export KAFKA_HEAP_OPTS="-Xmx512M"
$(dirname $0)/kafka-run-class.sh kafka.perf.SimpleConsumerPerformance $@
exec $(dirname $0)/kafka-run-class.sh kafka.perf.SimpleConsumerPerformance $@

2
bin/kafka-simple-consumer-shell.sh

@ -14,4 +14,4 @@ @@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
$(dirname $0)/kafka-run-class.sh kafka.tools.SimpleConsumerShell $@
exec $(dirname $0)/kafka-run-class.sh kafka.tools.SimpleConsumerShell $@

2
bin/zookeeper-server-start.sh

@ -22,5 +22,5 @@ fi @@ -22,5 +22,5 @@ fi
base_dir=$(dirname $0)
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
$base_dir/kafka-run-class.sh daemon zookeeper org.apache.zookeeper.server.quorum.QuorumPeerMain $@
exec $base_dir/kafka-run-class.sh daemon zookeeper org.apache.zookeeper.server.quorum.QuorumPeerMain $@

2
bin/zookeeper-shell.sh

@ -20,4 +20,4 @@ then @@ -20,4 +20,4 @@ then
exit 1
fi
$(dirname $0)/kafka-run-class.sh org.apache.zookeeper.ZooKeeperMain -server $1
exec $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.ZooKeeperMain -server $1

13
core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala

@ -24,6 +24,7 @@ import scala.collection.immutable @@ -24,6 +24,7 @@ import scala.collection.immutable
import collection.mutable.HashMap
import scala.collection.mutable
import java.util.concurrent.locks.ReentrantLock
import kafka.utils.Utils.inLock
import kafka.utils.ZkUtils._
import kafka.utils.{ShutdownableThread, SystemTime}
import kafka.common.TopicAndPartition
@ -54,7 +55,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, @@ -54,7 +55,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
lock.lock()
try {
if (noLeaderPartitionSet.isEmpty) {
while (noLeaderPartitionSet.isEmpty) {
trace("No partition for leader election.")
cond.await()
}
@ -123,14 +124,11 @@ class ConsumerFetcherManager(private val consumerIdString: String, @@ -123,14 +124,11 @@ class ConsumerFetcherManager(private val consumerIdString: String,
leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread")
leaderFinderThread.start()
lock.lock()
try {
inLock(lock) {
partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap
this.cluster = cluster
noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId))
cond.signalAll()
} finally {
lock.unlock()
}
}
@ -158,14 +156,11 @@ class ConsumerFetcherManager(private val consumerIdString: String, @@ -158,14 +156,11 @@ class ConsumerFetcherManager(private val consumerIdString: String,
def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) {
debug("adding partitions with error %s".format(partitionList))
lock.lock()
try {
inLock(lock) {
if (partitionMap != null) {
noLeaderPartitionSet ++= partitionList
cond.signalAll()
}
} finally {
lock.unlock()
}
}
}

6
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

@ -30,6 +30,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState @@ -30,6 +30,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState
import java.util.UUID
import kafka.serializer._
import kafka.utils.ZkUtils._
import kafka.utils.Utils.inLock
import kafka.common._
import com.yammer.metrics.core.Gauge
import kafka.metrics._
@ -363,12 +364,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -363,12 +364,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
@throws(classOf[Exception])
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
lock.lock()
try {
inLock(lock) {
isWatcherTriggered = true
cond.signalAll()
} finally {
lock.unlock()
}
}

108
core/src/main/scala/kafka/log/OffsetIndex.scala

@ -21,8 +21,10 @@ import scala.math._ @@ -21,8 +21,10 @@ import scala.math._
import java.io._
import java.nio._
import java.nio.channels._
import java.util.concurrent.locks._
import java.util.concurrent.atomic._
import kafka.utils._
import kafka.utils.Utils.inLock
import kafka.common.InvalidOffsetException
/**
@ -52,6 +54,8 @@ import kafka.common.InvalidOffsetException @@ -52,6 +54,8 @@ import kafka.common.InvalidOffsetException
*/
class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
private val lock = new ReentrantLock
/* the memory mapping */
private var mmap: MappedByteBuffer =
{
@ -88,25 +92,30 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = @@ -88,25 +92,30 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
/* the number of entries in the index */
private var size = new AtomicInteger(mmap.position / 8)
/**
* The maximum number of eight-byte entries this index can hold
*/
@volatile
var maxEntries = mmap.limit / 8
/* the last offset in the index */
var lastOffset = readLastOffset()
debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
.format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position))
/* the maximum number of entries this index can hold */
def maxEntries = mmap.limit / 8
/**
* The last offset written to the index
*/
private def readLastOffset(): Long = {
val offset =
size.get match {
case 0 => 0
case s => relativeOffset(this.mmap, s-1)
}
baseOffset + offset
inLock(lock) {
val offset =
size.get match {
case 0 => 0
case s => relativeOffset(this.mmap, s-1)
}
baseOffset + offset
}
}
/**
@ -116,12 +125,14 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = @@ -116,12 +125,14 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
* the pair (baseOffset, 0) is returned.
*/
def lookup(targetOffset: Long): OffsetPosition = {
val idx = mmap.duplicate
val slot = indexSlotFor(idx, targetOffset)
if(slot == -1)
OffsetPosition(baseOffset, 0)
else
OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
maybeLock(lock) {
val idx = mmap.duplicate
val slot = indexSlotFor(idx, targetOffset)
if(slot == -1)
OffsetPosition(baseOffset, 0)
else
OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
}
}
/**
@ -167,17 +178,19 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = @@ -167,17 +178,19 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
* Get the nth offset mapping from the index
*/
def entry(n: Int): OffsetPosition = {
if(n >= entries)
throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries))
val idx = mmap.duplicate
OffsetPosition(relativeOffset(idx, n), physical(idx, n))
maybeLock(lock) {
if(n >= entries)
throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries))
val idx = mmap.duplicate
OffsetPosition(relativeOffset(idx, n), physical(idx, n))
}
}
/**
* Append entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.
*/
def append(offset: Long, position: Int) {
this synchronized {
inLock(lock) {
require(!isFull, "Attempt to append to a full index (size = " + size + ").")
if (size.get == 0 || offset > lastOffset) {
debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
@ -186,8 +199,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = @@ -186,8 +199,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
this.size.incrementAndGet()
this.lastOffset = offset
require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")
}
else {
} else {
throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
.format(offset, entries, lastOffset, file.getName))
}
@ -209,7 +221,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = @@ -209,7 +221,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
* Truncating to an offset larger than the largest in the index has no effect.
*/
def truncateTo(offset: Long) {
this synchronized {
inLock(lock) {
val idx = mmap.duplicate
val slot = indexSlotFor(idx, offset)
@ -233,9 +245,11 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = @@ -233,9 +245,11 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
* Truncates index to a known number of entries.
*/
private def truncateToEntries(entries: Int) {
this.size.set(entries)
mmap.position(this.size.get * 8)
this.lastOffset = readLastOffset
inLock(lock) {
this.size.set(entries)
mmap.position(this.size.get * 8)
this.lastOffset = readLastOffset
}
}
/**
@ -243,7 +257,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = @@ -243,7 +257,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
* the file.
*/
def trimToValidSize() {
this synchronized {
inLock(lock) {
resize(entries * 8)
}
}
@ -255,14 +269,18 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = @@ -255,14 +269,18 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
* we want to reset the index size to maximum index size to avoid rolling new segment.
*/
def resize(newSize: Int) {
this synchronized {
flush()
inLock(lock) {
val raf = new RandomAccessFile(file, "rws")
val roundedNewSize = roundToExactMultiple(newSize, 8)
val position = this.mmap.position
/* Windows won't let us modify the file length while the file is mmapped :-( */
if(Os.isWindows)
forceUnmap(this.mmap)
try {
raf.setLength(roundedNewSize)
val position = this.mmap.position
this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
this.maxEntries = this.mmap.limit / 8
this.mmap.position(position)
} finally {
Utils.swallow(raf.close())
@ -270,11 +288,23 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = @@ -270,11 +288,23 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
}
}
/**
* Forcefully free the buffer's mmap. We do this only on windows.
*/
private def forceUnmap(m: MappedByteBuffer) {
try {
if(m.isInstanceOf[sun.nio.ch.DirectBuffer])
(m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean()
} catch {
case t: Throwable => warn("Error when freeing index buffer", t)
}
}
/**
* Flush the data in the index to disk
*/
def flush() {
this synchronized {
inLock(lock) {
mmap.force()
}
}
@ -300,4 +330,20 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = @@ -300,4 +330,20 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
* E.g. roundToExactMultiple(67, 8) == 64
*/
private def roundToExactMultiple(number: Int, factor: Int) = factor * (number / factor)
/**
* Execute the given function in a lock only if we are running on windows. We do this
* because Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it
* and this requires synchronizing reads.
*/
private def maybeLock[T](lock: Lock)(fun: => T): T = {
if(Os.isWindows)
lock.lock()
try {
return fun
} finally {
if(Os.isWindows)
lock.unlock()
}
}
}

11
core/src/main/scala/kafka/server/AbstractFetcherThread.scala

@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock @@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock
import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping}
import kafka.utils.Utils.inLock
/**
@ -70,8 +71,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke @@ -70,8 +71,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
}
override def doWork() {
partitionMapLock.lock()
try {
inLock(partitionMapLock) {
if (partitionMap.isEmpty)
partitionMapCond.await(200L, TimeUnit.MILLISECONDS)
partitionMap.foreach {
@ -79,8 +79,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke @@ -79,8 +79,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
offset, fetchSize)
}
} finally {
partitionMapLock.unlock()
}
val fetchRequest = fetchRequestBuilder.build()
@ -107,8 +105,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke @@ -107,8 +105,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
if (response != null) {
// process fetched data
partitionMapLock.lock()
try {
inLock(partitionMapLock) {
response.data.foreach {
case(topicAndPartition, partitionData) =>
val (topic, partitionId) = topicAndPartition.asTuple
@ -160,8 +157,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke @@ -160,8 +157,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
}
}
}
} finally {
partitionMapLock.unlock()
}
}

70
core/src/main/scala/kafka/tools/GetOffsetShell.scala

@ -23,25 +23,27 @@ import joptsimple._ @@ -23,25 +23,27 @@ import joptsimple._
import java.net.URI
import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
import kafka.common.TopicAndPartition
import kafka.client.ClientUtils
import kafka.utils.CommandLineUtils
object GetOffsetShell {
def main(args: Array[String]): Unit = {
val parser = new OptionParser
val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
.withRequiredArg
.describedAs("kafka://hostname:port")
.describedAs("hostname:port,...,hostname:port")
.ofType(classOf[String])
val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val partitionOpt = parser.accepts("partition", "partition id")
val partitionOpt = parser.accepts("partitions", "comma separated list of partition ids. If not specified, it will find offsets for all partitions")
.withRequiredArg
.describedAs("partition id")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
.describedAs("partition ids")
.ofType(classOf[String])
.defaultsTo("")
val timeOpt = parser.accepts("time", "timestamp of the offsets before that")
.withRequiredArg
.describedAs("timestamp/-1(latest)/-2(earliest)")
@ -51,28 +53,52 @@ object GetOffsetShell { @@ -51,28 +53,52 @@ object GetOffsetShell {
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
.withRequiredArg
.describedAs("ms")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1000)
val options = parser.parse(args : _*)
for(arg <- List(urlOpt, topicOpt, timeOpt)) {
if(!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
System.exit(1)
}
}
CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt)
val url = new URI(options.valueOf(urlOpt))
val clientId = "GetOffsetShell"
val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
val topic = options.valueOf(topicOpt)
val partition = options.valueOf(partitionOpt).intValue
var partitionList = options.valueOf(partitionOpt)
var time = options.valueOf(timeOpt).longValue
val nOffsets = options.valueOf(nOffsetsOpt).intValue
val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000, "GetOffsetShell")
val topicAndPartition = TopicAndPartition(topic, partition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
println("get " + offsets.length + " results")
for (offset <- offsets)
println(offset)
val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()
val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) +
"kafka-list-topic.sh to verify")
System.exit(1)
}
val partitions =
if(partitionList == "") {
topicsMetadata.head.partitionsMetadata.map(_.partitionId)
} else {
partitionList.split(",").map(_.toInt).toSeq
}
partitions.foreach { partitionId =>
val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partitionId)
partitionMetadataOpt match {
case Some(metadata) =>
metadata.leader match {
case Some(leader) =>
val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
val topicAndPartition = TopicAndPartition(topic, partitionId)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
println("%s:%d:%s".format(topic, partitionId, offsets.mkString(",")))
case None => System.err.println("Error: partition %d does not have a leader. Skip getting offsets".format(partitionId))
}
case None => System.err.println("Error: partition %d does not exist".format(partitionId))
}
}
}
}

23
core/src/main/scala/kafka/utils/Os.scala

@ -0,0 +1,23 @@ @@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
object Os {
val name = System.getProperty("os.name").toLowerCase
val isWindows = name.startsWith("windows")
}

13
core/src/main/scala/kafka/utils/Utils.scala

@ -21,6 +21,7 @@ import java.io._ @@ -21,6 +21,7 @@ import java.io._
import java.nio._
import charset.Charset
import java.nio.channels._
import java.util.concurrent.locks.Lock
import java.lang.management._
import java.util.zip.CRC32
import javax.management._
@ -554,4 +555,16 @@ object Utils extends Logging { @@ -554,4 +555,16 @@ object Utils extends Logging {
* This is different from java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).
*/
def abs(n: Int) = n & 0x7fffffff
/**
* Execute the given function inside the lock
*/
def inLock[T](lock: Lock)(fun: => T): T = {
lock.lock()
try {
return fun
} finally {
lock.unlock()
}
}
}

12
core/src/main/scala/kafka/utils/ZkUtils.scala

@ -35,6 +35,7 @@ import kafka.controller.KafkaController @@ -35,6 +35,7 @@ import kafka.controller.KafkaController
import scala.Some
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.common.TopicAndPartition
import kafka.utils.Utils.inLock
object ZkUtils extends Logging {
val ConsumersPath = "/consumers"
@ -774,8 +775,7 @@ class LeaderExistsOrChangedListener(topic: String, @@ -774,8 +775,7 @@ class LeaderExistsOrChangedListener(topic: String,
def handleDataChange(dataPath: String, data: Object) {
val t = dataPath.split("/").takeRight(3).head
val p = dataPath.split("/").takeRight(2).head.toInt
leaderLock.lock()
try {
inLock(leaderLock) {
if(t == topic && p == partition){
if(oldLeaderOpt == None){
trace("In leader existence listener on partition [%s, %d], leader has been created".format(topic, partition))
@ -790,18 +790,12 @@ class LeaderExistsOrChangedListener(topic: String, @@ -790,18 +790,12 @@ class LeaderExistsOrChangedListener(topic: String,
}
}
}
finally {
leaderLock.unlock()
}
}
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
leaderLock.lock()
try {
inLock(leaderLock) {
leaderExistsOrChanged.signal()
}finally {
leaderLock.unlock()
}
}
}

6
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -37,6 +37,7 @@ import kafka.api._ @@ -37,6 +37,7 @@ import kafka.api._
import collection.mutable.Map
import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
import kafka.common.TopicAndPartition
import kafka.utils.Utils.inLock
import junit.framework.Assert
@ -425,8 +426,7 @@ object TestUtils extends Logging { @@ -425,8 +426,7 @@ object TestUtils extends Logging {
else
info("Waiting for leader for partition [%s,%d] to be changed from old leader %d".format(topic, partition, oldLeaderOpt.get))
leaderLock.lock()
try {
inLock(leaderLock) {
zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), new LeaderExistsOrChangedListener(topic, partition, leaderLock, leaderExistsOrChanged, oldLeaderOpt, zkClient))
leaderExistsOrChanged.await(timeoutMs, TimeUnit.MILLISECONDS)
// check if leader is elected
@ -441,8 +441,6 @@ object TestUtils extends Logging { @@ -441,8 +441,6 @@ object TestUtils extends Logging {
.format(timeoutMs, topic, partition))
}
leader
} finally {
leaderLock.unlock()
}
}

14
core/src/test/scala/unit/kafka/utils/UtilsTest.scala

@ -19,10 +19,12 @@ package kafka.utils @@ -19,10 +19,12 @@ package kafka.utils
import java.util.Arrays
import java.nio.ByteBuffer
import java.util.concurrent.locks._
import org.apache.log4j.Logger
import org.scalatest.junit.JUnitSuite
import org.junit.Assert._
import kafka.common.KafkaException
import kafka.utils.Utils.inLock
import org.junit.Test
@ -74,4 +76,16 @@ class UtilsTest extends JUnitSuite { @@ -74,4 +76,16 @@ class UtilsTest extends JUnitSuite {
assertTrue(emptyStringList.equals(emptyListFromNullString))
assertTrue(emptyStringList.equals(emptyList))
}
@Test
def testInLock() {
val lock = new ReentrantLock()
val result = inLock(lock) {
assertTrue("Should be in lock", lock.isHeldByCurrentThread)
1 + 1
}
assertEquals(2, result)
assertFalse("Should be unlocked", lock.isLocked)
}
}

9
kafka-patch-review.py

@ -36,7 +36,12 @@ def main(): @@ -36,7 +36,12 @@ def main():
ts = time.time()
st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d_%H:%M:%S')
patch_file=tempfile.gettempdir() + "/" + opt.jira + '_' + st + '.patch'
git_configure_reviewboard="git config reviewboard.url https://reviews.apache.org"
print "Configuring reviewboard url to https://reviews.apache.org"
p=os.popen(git_configure_reviewboard)
p.close()
git_remote_update="git remote update"
print "Updating your remote branches to pull the latest changes"
p=os.popen(git_remote_update)
@ -90,7 +95,7 @@ def main(): @@ -90,7 +95,7 @@ def main():
comment="Created reviewboard "
if not opt.reviewboard:
print 'Created a new reviewboard ',rb_url
print 'Created a new reviewboard ',rb_url,' against branch: ',opt.branch
else:
print 'Updated reviewboard',opt.reviewboard
comment="Updated reviewboard "

9
project/Build.scala

@ -44,7 +44,7 @@ object KafkaBuild extends Build { @@ -44,7 +44,7 @@ object KafkaBuild extends Build {
crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2", "2.10.1"),
excludeFilter in unmanagedSources <<= scalaVersion(v => if (v.startsWith("2.8")) "*_2.9+.scala" else "*_2.8.scala"),
scalaVersion := "2.8.0",
version := "0.8.0-beta1",
version := "0.8.0",
publishTo := Some("Apache Maven Repo" at "https://repository.apache.org/service/local/staging/deploy/maven2"),
credentials += Credentials(Path.userHome / ".m2" / ".credentials"),
buildNumber := System.getProperty("build.number", ""),
@ -53,6 +53,7 @@ object KafkaBuild extends Build { @@ -53,6 +53,7 @@ object KafkaBuild extends Build {
javacOptions in compile ++= Seq("-Xlint:unchecked", "-source", "1.5"),
javacOptions in doc ++= Seq("-source", "1.5"),
parallelExecution in Test := false, // Prevent tests from overrunning each other
publishArtifact in Test := true,
libraryDependencies ++= Seq(
"log4j" % "log4j" % "1.2.15" exclude("javax.jms", "jms"),
"net.sf.jopt-simple" % "jopt-simple" % "3.2",
@ -70,7 +71,9 @@ object KafkaBuild extends Build { @@ -70,7 +71,9 @@ object KafkaBuild extends Build {
<exclude org="log4j" module="log4j"/>
<exclude org="jline" module="jline"/>
</dependency>
</dependencies>
</dependencies>,
mappings in packageBin in Compile += file("LICENSE") -> "LICENSE",
mappings in packageBin in Compile += file("NOTICE") -> "NOTICE"
)
val hadoopSettings = Seq(
@ -112,6 +115,8 @@ object KafkaBuild extends Build { @@ -112,6 +115,8 @@ object KafkaBuild extends Build {
val jarFiles = deps.files.filter(f => !products.files.contains(f) && f.getName.endsWith(".jar"))
val destination = target / "RELEASE" / releaseName
IO.copyFile(packageBin, destination / packageBin.getName)
IO.copyFile(file("LICENSE"), destination / "LICENSE")
IO.copyFile(file("NOTICE"), destination / "NOTICE")
IO.copy(jarFiles.map { f => (f, destination / "libs" / f.getName) })
IO.copyDirectory(file("config"), destination / "config")
IO.copyDirectory(file("bin"), destination / "bin")

8
system_test/migration_tool_testsuite/0.7/bin/kafka-run-class.sh

@ -66,8 +66,14 @@ done @@ -66,8 +66,14 @@ done
if [ -z "$KAFKA_JMX_OPTS" ]; then
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
fi
# Log4j settings
if [ -z "$KAFKA_LOG4J_OPTS" ]; then
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/config/log4j.properties"
fi
if [ -z "$KAFKA_OPTS" ]; then
KAFKA_OPTS="-Xmx512M -server -Dlog4j.configuration=file:$base_dir/config/log4j.properties"
KAFKA_OPTS="-Xmx512M -server $KAFKA_LOG4J_OPTS"
fi
if [ $JMX_PORT ]; then
KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "

68
system_test/migration_tool_testsuite/0.7/config/test-log4j.properties

@ -0,0 +1,68 @@ @@ -0,0 +1,68 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.kafkaAppender.File=logs/server.log
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.stateChangeAppender.File=logs/state-change.log
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.requestAppender.File=logs/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File=logs/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
# Turn on all our debugging info
#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
log4j.logger.kafka.perf=DEBUG, kafkaAppender
log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
log4j.logger.kafka=INFO, kafkaAppender
log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false
#log4j.logger.kafka.network.Processor=TRACE, requestAppender
#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
#log4j.additivity.kafka.server.KafkaApis=false
log4j.logger.kafka.request.logger=TRACE, requestAppender
log4j.additivity.kafka.request.logger=false
log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false
log4j.logger.state.change.logger=TRACE, stateChangeAppender
log4j.additivity.state.change.logger=false

1
system_test/utils/kafka_system_test_utils.py

@ -1046,6 +1046,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk @@ -1046,6 +1046,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
cmdList = ["ssh " + host,
"'JAVA_HOME=" + javaHome,
"JMX_PORT=" + jmxPort,
"KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/test-log4j.properties" % kafkaHome,
kafkaRunClassBin + " kafka.perf.ProducerPerformance",
"--brokerinfo " + brokerInfoStr,
"--initial-message-id " + str(initMsgId),

Loading…
Cancel
Save