Browse Source

KAFKA-3698; Update the message format section.

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1375 from becketqin/KAFKA-3698
pull/1362/merge
Jiangjie Qin 9 years ago committed by Ismael Juma
parent
commit
61f4c8b092
  1. 58
      docs/implementation.html

58
docs/implementation.html

@ -144,32 +144,36 @@ The network layer is a fairly straight-forward NIO server, and will not be descr @@ -144,32 +144,36 @@ The network layer is a fairly straight-forward NIO server, and will not be descr
</p>
<h3><a id="messages" href="#messages">5.3 Messages</a></h3>
<p>
Messages consist of a fixed-size header and variable length opaque byte array payload. The header contains a format version and a CRC32 checksum to detect corruption or truncation. Leaving the payload opaque is the right decision: there is a great deal of progress being made on serialization libraries right now, and any particular choice is unlikely to be right for all uses. Needless to say a particular application using Kafka would likely mandate a particular serialization type as part of its usage. The <code>MessageSet</code> interface is simply an iterator over messages with specialized methods for bulk reading and writing to an NIO <code>Channel</code>.
Messages consist of a fixed-size header, a variable length opaque key byte array and a variable length opaque value byte array. The header contains the following fields:
<ul>
<li> A CRC32 checksum to detect corruption or truncation. <li/>
<li> A format version. </li>
<li> An attributes identifier </li>
<li> A timestamp </li>
</ul>
Leaving the key and value opaque is the right decision: there is a great deal of progress being made on serialization libraries right now, and any particular choice is unlikely to be right for all uses. Needless to say a particular application using Kafka would likely mandate a particular serialization type as part of its usage. The <code>MessageSet</code> interface is simply an iterator over messages with specialized methods for bulk reading and writing to an NIO <code>Channel</code>.
<h3><a id="messageformat" href="#messageformat">5.4 Message Format</a></h3>
<pre>
/**
* A message. The format of an N byte message is the following:
*
* If magic byte is 0
*
* 1. 1 byte "magic" identifier to allow format changes
*
* 2. 4 byte CRC32 of the payload
*
* 3. N - 5 byte payload
*
* If magic byte is 1
*
* 1. 1 byte "magic" identifier to allow format changes
*
* 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used)
*
* 3. 4 byte CRC32 of the payload
*
* 4. N - 6 byte payload
*
* 1. 4 byte CRC32 of the message
* 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
* 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
* bit 0 ~ 2 : Compression codec.
* 0 : no compression
* 1 : gzip
* 2 : snappy
* 3 : lz4
* bit 3 : Timestamp type
* 0 : create time
* 1 : log append time
* bit 4 ~ 7 : reserved
* 4. (Optional) 8 byte timestamp only if "magic" identifier is greater than 0
* 5. 4 byte key length, containing length K
* 6. K byte key
* 7. 4 byte payload length, containing length V
* 8. V byte payload
*/
</pre>
</p>
@ -183,10 +187,16 @@ The exact binary format for messages is versioned and maintained as a standard i @@ -183,10 +187,16 @@ The exact binary format for messages is versioned and maintained as a standard i
<pre>
On-disk format of a message
message length : 4 bytes (value: 1+4+n)
"magic" value : 1 byte
offset : 8 bytes
message length : 4 bytes (value: 4 + 1 + 1 + 8(if magic value > 0) + 4 + K + 4 + V)
crc : 4 bytes
payload : n bytes
magic value : 1 byte
attributes : 1 byte
timestamp : 8 bytes (Only exists when magic value is greater than zero)
key length : 4 bytes
key : K bytes
value length : 4 bytes
value : V bytes
</pre>
<p>
The use of the message offset as the message id is unusual. Our original idea was to use a GUID generated by the producer, and maintain a mapping from GUID to offset on each broker. But since a consumer must maintain an ID for each server, the global uniqueness of the GUID provides no value. Furthermore the complexity of maintaining the mapping from a random id to an offset requires a heavy weight index structure which must be synchronized with disk, essentially requiring a full persistent random-access data structure. Thus to simplify the lookup structure we decided to use a simple per-partition atomic counter which could be coupled with the partition id and node id to uniquely identify a message; this makes the lookup structure simpler, though multiple seeks per consumer request are still likely. However once we settled on a counter, the jump to directly using the offset seemed natural&mdash;both after all are monotonically increasing integers unique to a partition. Since the offset is hidden from the consumer API this decision is ultimately an implementation detail and we went with the more efficient approach.

Loading…
Cancel
Save