# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import socket import struct import binascii import sys PRODUCE_REQUEST_ID = 0 def encode_message(message): # return struct.pack('>B', 0) + \ struct.pack('>i', binascii.crc32(message)) + \ message def encode_produce_request(topic, partition, messages): # encode messages as encoded = [encode_message(message) for message in messages] message_set = ''.join([struct.pack('>i', len(m)) + m for m in encoded]) # create the request as data = struct.pack('>H', PRODUCE_REQUEST_ID) + \ struct.pack('>H', len(topic)) + topic + \ struct.pack('>i', partition) + \ struct.pack('>i', len(message_set)) + message_set return struct.pack('>i', len(data)) + data class KafkaProducer: def __init__(self, host, port): self.REQUEST_KEY = 0 self.connection = socket.socket() self.connection.connect((host, port)) def close(self): self.connection.close() def send(self, messages, topic, partition = 0): self.connection.sendall(encode_produce_request(topic, partition, messages)) if __name__ == '__main__': if len(sys.argv) < 4: print >> sys.stderr, 'USAGE: python', sys.argv[0], 'host port topic' sys.exit(1) host = sys.argv[1] port = int(sys.argv[2]) topic = sys.argv[3] producer = KafkaProducer(host, port) while True: print 'Enter comma seperated messages: ', line = sys.stdin.readline() messages = line.split(',') producer.send(messages, topic) print 'Sent', len(messages), 'messages successfully'