From 61f4c8b0923baa17bfc6160082fc2c8ca5a2c44d Mon Sep 17 00:00:00 2001
From: Jiangjie Qin
-Messages consist of a fixed-size header and variable length opaque byte array payload. The header contains a format version and a CRC32 checksum to detect corruption or truncation. Leaving the payload opaque is the right decision: there is a great deal of progress being made on serialization libraries right now, and any particular choice is unlikely to be right for all uses. Needless to say a particular application using Kafka would likely mandate a particular serialization type as part of its usage. The MessageSet
interface is simply an iterator over messages with specialized methods for bulk reading and writing to an NIO Channel
.
+Messages consist of a fixed-size header, a variable length opaque key byte array and a variable length opaque value byte array. The header contains the following fields:
+
MessageSet
interface is simply an iterator over messages with specialized methods for bulk reading and writing to an NIO Channel
.
- /** - * A message. The format of an N byte message is the following: - * - * If magic byte is 0 - * - * 1. 1 byte "magic" identifier to allow format changes - * - * 2. 4 byte CRC32 of the payload - * - * 3. N - 5 byte payload - * - * If magic byte is 1 - * - * 1. 1 byte "magic" identifier to allow format changes - * - * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) - * - * 3. 4 byte CRC32 of the payload - * - * 4. N - 6 byte payload - * - */ + /** + * 1. 4 byte CRC32 of the message + * 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1 + * 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version + * bit 0 ~ 2 : Compression codec. + * 0 : no compression + * 1 : gzip + * 2 : snappy + * 3 : lz4 + * bit 3 : Timestamp type + * 0 : create time + * 1 : log append time + * bit 4 ~ 7 : reserved + * 4. (Optional) 8 byte timestamp only if "magic" identifier is greater than 0 + * 5. 4 byte key length, containing length K + * 6. K byte key + * 7. 4 byte payload length, containing length V + * 8. V byte payload + */
On-disk format of a message -message length : 4 bytes (value: 1+4+n) -"magic" value : 1 byte +offset : 8 bytes +message length : 4 bytes (value: 4 + 1 + 1 + 8(if magic value > 0) + 4 + K + 4 + V) crc : 4 bytes -payload : n bytes +magic value : 1 byte +attributes : 1 byte +timestamp : 8 bytes (Only exists when magic value is greater than zero) +key length : 4 bytes +key : K bytes +value length : 4 bytes +value : V bytes
The use of the message offset as the message id is unusual. Our original idea was to use a GUID generated by the producer, and maintain a mapping from GUID to offset on each broker. But since a consumer must maintain an ID for each server, the global uniqueness of the GUID provides no value. Furthermore the complexity of maintaining the mapping from a random id to an offset requires a heavy weight index structure which must be synchronized with disk, essentially requiring a full persistent random-access data structure. Thus to simplify the lookup structure we decided to use a simple per-partition atomic counter which could be coupled with the partition id and node id to uniquely identify a message; this makes the lookup structure simpler, though multiple seeks per consumer request are still likely. However once we settled on a counter, the jump to directly using the offset seemed natural—both after all are monotonically increasing integers unique to a partition. Since the offset is hidden from the consumer API this decision is ultimately an implementation detail and we went with the more efficient approach.