Browse Source

KAFKA-544 Trivial fix--migration tool is using message when it should be using a byte array. Checked in w/o review.

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1410588 13f79535-47bb-0310-9956-ffa450edef68
0.8.0-beta1-candidate1
Edward Jay Kreps 12 years ago
parent
commit
8c884ee6a6
  1. 36
      core/src/main/scala/kafka/javaapi/producer/ProducerData.scala
  2. 5
      core/src/main/scala/kafka/tools/KafkaMigrationTool.java

36
core/src/main/scala/kafka/javaapi/producer/ProducerData.scala

@ -1,36 +0,0 @@ @@ -1,36 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.javaapi.producer
import scala.collection.JavaConversions._
case class ProducerData[K, V](topic: String,
key: K,
data: java.util.List[V]) {
def this(t: String, d: java.util.List[V]) = this(topic = t, key = null.asInstanceOf[K], data = d)
def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = asList(List(d)))
def this(t: String, k: K, d: V) = this(topic = t, key = k, data = asList(List(d)))
def getTopic: String = topic
def getKey: K = key
def getData: java.util.List[V] = data
}

5
core/src/main/scala/kafka/tools/KafkaMigrationTool.java

@ -280,9 +280,8 @@ public class KafkaMigrationTool @@ -280,9 +280,8 @@ public class KafkaMigrationTool
int size = ((ByteBuffer)payload_07).remaining();
byte[] bytes = new byte[size];
((ByteBuffer)payload_07).get(bytes);
Message message_08 = new Message(bytes);
logger.debug(String.format("Send kafka 08 message of size %d to topic %s", message_08.size(), topic));
KeyedMessage<String, Message> producerData = new KeyedMessage((String)topic, null, message_08);
logger.debug(String.format("Send kafka 08 message of size %d to topic %s", bytes.length, topic));
KeyedMessage<String, byte[]> producerData = new KeyedMessage((String)topic, null, bytes);
Producer nextProducer = producerCircularIterator.next();
nextProducer.send(producerData);
}

Loading…
Cancel
Save