Browse Source

reverting previous commit for KAFKA-296 because patch didn't apply cleanly

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1300801 13f79535-47bb-0310-9956-ffa450edef68
pull/1/head
Jun Rao 13 years ago
parent
commit
5e0e0b17fb
  1. 21
      clients/go/src/consumer.go
  2. 20
      clients/go/src/kafka.go
  3. 4
      clients/go/src/message.go
  4. 4
      clients/go/src/payload_codec.go
  5. 8
      clients/go/src/publisher.go
  6. 2
      clients/go/src/request.go
  7. 6
      clients/go/src/timing.go
  8. 23
      clients/go/tools/consumer/consumer.go
  9. 6
      clients/go/tools/offsets/offsets.go
  10. 4
      clients/go/tools/publisher/publisher.go

21
clients/go/src/consumer.go

@ -23,12 +23,11 @@ @@ -23,12 +23,11 @@
package kafka
import (
"encoding/binary"
"errors"
"io"
"log"
"os"
"net"
"time"
"encoding/binary"
)
type BrokerConsumer struct {
@ -67,11 +66,11 @@ func NewBrokerOffsetConsumer(hostname string, topic string, partition int) *Brok @@ -67,11 +66,11 @@ func NewBrokerOffsetConsumer(hostname string, topic string, partition int) *Brok
func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec) {
// merge to the default map, so one 'could' override the default codecs..
for k, v := range codecsMap(payloadCodecs) {
consumer.codecs[k] = v
consumer.codecs[k] = v, true
}
}
func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, error) {
func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, os.Error) {
conn, err := consumer.broker.connect()
if err != nil {
return -1, err
@ -87,14 +86,14 @@ func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTime @@ -87,14 +86,14 @@ func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTime
})
if err != nil {
if err != io.EOF {
if err != os.EOF {
log.Println("Fatal Error: ", err)
panic(err)
}
quit <- true // force quit
break
}
time.Sleep(time.Millisecond * time.Duration(pollTimeoutMs))
time.Sleep(pollTimeoutMs * 1000000)
}
done <- true
}()
@ -108,7 +107,7 @@ func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTime @@ -108,7 +107,7 @@ func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTime
type MessageHandlerFunc func(msg *Message)
func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, error) {
func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, os.Error) {
conn, err := consumer.broker.connect()
if err != nil {
return -1, err
@ -124,7 +123,7 @@ func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, er @@ -124,7 +123,7 @@ func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, er
return num, err
}
func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, error) {
func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, os.Error) {
_, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize))
if err != nil {
return -1, err
@ -143,7 +142,7 @@ func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc M @@ -143,7 +142,7 @@ func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc M
for currentOffset <= uint64(length-4) {
totalLength, msgs := Decode(payload[currentOffset:], consumer.codecs)
if msgs == nil {
return num, errors.New("Error Decoding Message")
return num, os.NewError("Error Decoding Message")
}
msgOffset := consumer.offset + currentOffset
for _, msg := range msgs {
@ -165,7 +164,7 @@ func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc M @@ -165,7 +164,7 @@ func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc M
// Get a list of valid offsets (up to maxNumOffsets) before the given time, where
// time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available)
// The result is a list of offsets, in descending order.
func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, error) {
func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, os.Error) {
offsets := make([]uint64, 0)
conn, err := consumer.broker.connect()

20
clients/go/src/kafka.go

@ -23,13 +23,13 @@ @@ -23,13 +23,13 @@
package kafka
import (
"bufio"
"encoding/binary"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"fmt"
"encoding/binary"
"io"
"bufio"
)
const (
@ -48,7 +48,7 @@ func newBroker(hostname string, topic string, partition int) *Broker { @@ -48,7 +48,7 @@ func newBroker(hostname string, topic string, partition int) *Broker {
hostname: hostname}
}
func (b *Broker) connect() (conn *net.TCPConn, error error) {
func (b *Broker) connect() (conn *net.TCPConn, error os.Error) {
raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname)
if err != nil {
log.Println("Fatal Error: ", err)
@ -63,7 +63,7 @@ func (b *Broker) connect() (conn *net.TCPConn, error error) { @@ -63,7 +63,7 @@ func (b *Broker) connect() (conn *net.TCPConn, error error) {
}
// returns length of response & payload & err
func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, error) {
func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, os.Error) {
reader := bufio.NewReader(conn)
length := make([]byte, 4)
lenRead, err := io.ReadFull(reader, length)
@ -71,7 +71,7 @@ func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, error) { @@ -71,7 +71,7 @@ func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, error) {
return 0, []byte{}, err
}
if lenRead != 4 || lenRead < 0 {
return 0, []byte{}, errors.New("invalid length of the packet length field")
return 0, []byte{}, os.NewError("invalid length of the packet length field")
}
expectedLength := binary.BigEndian.Uint32(length)
@ -82,13 +82,13 @@ func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, error) { @@ -82,13 +82,13 @@ func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, error) {
}
if uint32(lenRead) != expectedLength {
return 0, []byte{}, errors.New(fmt.Sprintf("Fatal Error: Unexpected Length: %d expected: %d", lenRead, expectedLength))
return 0, []byte{}, os.NewError(fmt.Sprintf("Fatal Error: Unexpected Length: %d expected: %d", lenRead, expectedLength))
}
errorCode := binary.BigEndian.Uint16(messages[0:2])
if errorCode != 0 {
log.Println("errorCode: ", errorCode)
return 0, []byte{}, errors.New(
return 0, []byte{}, os.NewError(
fmt.Sprintf("Broker Response Error: %d", errorCode))
}
return expectedLength, messages[2:], nil

4
clients/go/src/message.go

@ -23,9 +23,9 @@ @@ -23,9 +23,9 @@
package kafka
import (
"bytes"
"encoding/binary"
"hash/crc32"
"encoding/binary"
"bytes"
"log"
)

4
clients/go/src/payload_codec.go

@ -57,7 +57,7 @@ var DefaultCodecsMap = codecsMap(DefaultCodecs) @@ -57,7 +57,7 @@ var DefaultCodecsMap = codecsMap(DefaultCodecs)
func codecsMap(payloadCodecs []PayloadCodec) map[byte]PayloadCodec {
payloadCodecsMap := make(map[byte]PayloadCodec, len(payloadCodecs))
for _, c := range payloadCodecs {
payloadCodecsMap[c.Id()] = c
payloadCodecsMap[c.Id()] = c, true
}
return payloadCodecsMap
}
@ -65,6 +65,7 @@ func codecsMap(payloadCodecs []PayloadCodec) map[byte]PayloadCodec { @@ -65,6 +65,7 @@ func codecsMap(payloadCodecs []PayloadCodec) map[byte]PayloadCodec {
// No compression codec, noop
type NoCompressionPayloadCodec struct {
}
func (codec *NoCompressionPayloadCodec) Id() byte {
@ -82,6 +83,7 @@ func (codec *NoCompressionPayloadCodec) Decode(data []byte) []byte { @@ -82,6 +83,7 @@ func (codec *NoCompressionPayloadCodec) Decode(data []byte) []byte {
// Gzip Codec
type GzipPayloadCodec struct {
}
func (codec *GzipPayloadCodec) Id() byte {

8
clients/go/src/publisher.go

@ -22,6 +22,10 @@ @@ -22,6 +22,10 @@
package kafka
import (
"os"
)
type BrokerPublisher struct {
broker *Broker
}
@ -30,11 +34,11 @@ func NewBrokerPublisher(hostname string, topic string, partition int) *BrokerPub @@ -30,11 +34,11 @@ func NewBrokerPublisher(hostname string, topic string, partition int) *BrokerPub
return &BrokerPublisher{broker: newBroker(hostname, topic, partition)}
}
func (b *BrokerPublisher) Publish(message *Message) (int, error) {
func (b *BrokerPublisher) Publish(message *Message) (int, os.Error) {
return b.BatchPublish(message)
}
func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, error) {
func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, os.Error) {
conn, err := b.broker.connect()
if err != nil {
return -1, err

2
clients/go/src/request.go

@ -23,8 +23,8 @@ @@ -23,8 +23,8 @@
package kafka
import (
"bytes"
"encoding/binary"
"bytes"
)
type RequestType uint16

6
clients/go/src/timing.go

@ -34,16 +34,16 @@ type Timing struct { @@ -34,16 +34,16 @@ type Timing struct {
}
func StartTiming(label string) *Timing {
return &Timing{label: label, start: time.Now().UnixNano()}
return &Timing{label: label, start: time.Nanoseconds(), stop: 0}
}
func (t *Timing) Stop() {
t.stop = time.Now().UnixNano()
t.stop = time.Nanoseconds()
}
func (t *Timing) Print() {
if t.stop == 0 {
t.Stop()
}
log.Printf("%s took: %f ms\n", t.label, float64(t.stop-t.start)/1000000)
log.Printf("%s took: %f ms\n", t.label, float64((time.Nanoseconds()-t.start))/1000000)
}

23
clients/go/tools/consumer/consumer.go

@ -23,12 +23,12 @@ @@ -23,12 +23,12 @@
package main
import (
"kafka"
"flag"
"fmt"
"os"
"os/signal"
"strconv"
kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src"
"os/signal"
"syscall"
)
@ -46,7 +46,7 @@ func init() { @@ -46,7 +46,7 @@ func init() {
flag.StringVar(&topic, "topic", "test", "topic to publish to")
flag.IntVar(&partition, "partition", 0, "partition to publish to")
flag.Uint64Var(&offset, "offset", 0, "offset to start consuming from")
flag.UintVar(&maxSize, "maxsize", 1048576, "max size in bytes of message set to request")
flag.UintVar(&maxSize, "maxsize", 1048576, "offset to start consuming from")
flag.StringVar(&writePayloadsTo, "writeto", "", "write payloads to this file")
flag.BoolVar(&consumerForever, "consumeforever", false, "loop forever consuming")
flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout")
@ -61,7 +61,7 @@ func main() { @@ -61,7 +61,7 @@ func main() {
var payloadFile *os.File = nil
if len(writePayloadsTo) > 0 {
var err error
var err os.Error
payloadFile, err = os.Create(writePayloadsTo)
if err != nil {
fmt.Println("Error opening file: ", err)
@ -74,7 +74,7 @@ func main() { @@ -74,7 +74,7 @@ func main() {
msg.Print()
}
if payloadFile != nil {
payloadFile.Write([]byte("Message at: " + strconv.FormatUint(msg.Offset(), 10) + "\n"))
payloadFile.Write([]byte("Message at: " + strconv.Uitoa64(msg.Offset()) + "\n"))
payloadFile.Write(msg.Payload())
payloadFile.Write([]byte("\n-------------------------------\n"))
}
@ -83,17 +83,10 @@ func main() { @@ -83,17 +83,10 @@ func main() {
if consumerForever {
quit := make(chan bool, 1)
go func() {
sigIn := make(chan os.Signal)
signal.Notify(sigIn)
for {
select {
case sig := <-sigIn:
if sig.(os.Signal) == syscall.SIGINT {
quit <- true
} else {
fmt.Println(sig)
}
sig := <-signal.Incoming
if sig.(os.UnixSignal) == syscall.SIGINT {
quit <- true
}
}
}()

6
clients/go/tools/offsets/offsets.go

@ -20,12 +20,13 @@ @@ -20,12 +20,13 @@
* of their respective owners.
*/
package main
import (
"kafka"
"flag"
"fmt"
kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src"
)
var hostname string
@ -42,6 +43,7 @@ func init() { @@ -42,6 +43,7 @@ func init() {
flag.Int64Var(&time, "time", -1, "timestamp of the offsets before that: time(ms)/-1(latest)/-2(earliest)")
}
func main() {
flag.Parse()
fmt.Println("Offsets :")
@ -54,7 +56,7 @@ func main() { @@ -54,7 +56,7 @@ func main() {
fmt.Println("Error: ", err)
}
fmt.Printf("Offsets found: %d\n", len(offsets))
for i := 0; i < len(offsets); i++ {
for i := 0 ; i < len(offsets); i++ {
fmt.Printf("Offset[%d] = %d\n", i, offsets[i])
}
}

4
clients/go/tools/publisher/publisher.go

@ -23,9 +23,9 @@ @@ -23,9 +23,9 @@
package main
import (
"kafka"
"flag"
"fmt"
kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src"
"os"
)
@ -63,7 +63,7 @@ func main() { @@ -63,7 +63,7 @@ func main() {
fmt.Println("Error: ", err)
return
}
payload := make([]byte, stat.Size())
payload := make([]byte, stat.Size)
file.Read(payload)
timing := kafka.StartTiming("Sending")

Loading…
Cancel
Save