Messages (aka Records) are always written in batches. The technical term for a batch of messages is a record batch, and a record batch contains one or more records. In the degenerate case, we could have a record batch containing a single record.
Record batches and records have their own headers. The format of each is described below for Kafka version 0.11.0 and later (message format version v2, or magic=2). <ahref="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets">Click here</a> for details about message formats 0 and 1.</p>
Record batches and records have their own headers. The format of each is described below. </p>
<h4><aid="recordbatch"href="#recordbatch">5.3.1 Record Batch</a></h4>
<p> The following is the on-disk format of a RecordBatch. </p>
@ -78,7 +78,7 @@
@@ -78,7 +78,7 @@
</pre></p>
<p>The schema for the value of a control record is dependent on the type. The value is opaque to clients.</p>
<p>Record level headers were introduced in Kafka 0.11.0. The on-disk format of a record with Headers is delineated below. </p>
<p><preclass="brush: java;">
length: varint
@ -92,7 +92,7 @@
@@ -92,7 +92,7 @@
value: byte[]
Headers => [Header]
</pre></p>
<h5><aid="recordheader"href="#recordheader">5.4.2.1 Record Header</a></h5>
<h5><aid="recordheader"href="#recordheader">5.3.2.1 Record Header</a></h5>
<p><preclass="brush: java;">
headerKeyLength: varint
headerKey: String
@ -102,6 +102,73 @@
@@ -102,6 +102,73 @@
<p>We use the same varint encoding as Protobuf. More information on the latter can be found <ahref="https://developers.google.com/protocol-buffers/docs/encoding#varints">here</a>. The count of headers in a record
is also encoded as a varint.</p>
<h4><aid="messageset"href="#messageset">5.3.3 Old Message Format</a></h4>
<p>
Prior to Kafka 0.11, messages were transferred and stored in <i>message sets</i>. In a message set, each message has its own metadata. Note that although message sets are represented as an array,
they are not preceded by an int32 array size like other array elements in the protocol.
In versions prior to Kafka 0.10, the only supported message format version (which is indicated in the magic value) was 0. Message format version 1 was introduced with timestamp support in version 0.10.
<ul>
<li>Similarly to version 2 above, the lowest bits of attributes represent the compression type.</li>
<li>In version 1, the producer should always set the timestamp type bit to 0. If the topic is configured to use log append time,
(through either broker level config log.message.timestamp.type = LogAppendTime or topic level config message.timestamp.type = LogAppendTime),
the broker will overwrite the timestamp type and the timestamp in the message set.</li>
<li>The highest bits of attributes must be set to 0.</li>
</ul>
</p>
<p>In message format versions 0 and 1 Kafka supports recursive messages to enable compression. In this case the message's attributes must be set
to indicate one of the compression types and the value field will contain a message set compressed with that type. We often refer
to the nested messages as "inner messages" and the wrapping message as the "outer message." Note that the key should be null
for the outer message and its offset will be the offset of the last inner message.
</p>
<p>When receiving recursive version 0 messages, the broker decompresses them and each inner message is assigned an offset individually.
In version 1, to avoid server side re-compression, only the wrapper message will be assigned an offset. The inner messages
will have relative offsets. The absolute offset can be computed using the offset from the outer message, which corresponds
to the offset assigned to the last inner message.
</p>
<p>The crc field contains the CRC32 (and not CRC-32C) of the subsequent message bytes (i.e. from magic byte to the value).</p>
<h3><aid="log"href="#log">5.4 Log</a></h3>
<p>
A log for a topic named "my_topic" with two partitions consists of two directories (namely <code>my_topic_0</code> and <code>my_topic_1</code>) populated with data files containing the messages for that topic. The format of the log files is a sequence of "log entries""; each log entry is a 4 byte integer <i>N</i> storing the message length which is followed by the <i>N</i> message bytes. Each message is uniquely identified by a 64-bit integer <i>offset</i> giving the byte position of the start of this message in the stream of all messages ever sent to that topic on that partition. The on-disk format of each message is given below. Each log file is named with the offset of the first message it contains. So the first file created will be 00000000000.kafka, and each additional file will have an integer name roughly <i>S</i> bytes from the previous file where <i>S</i> is the max log file size given in the configuration.
@ -185,7 +185,7 @@ Kafka request. SASL/GSSAPI authentication is performed starting with this packet
@@ -185,7 +185,7 @@ Kafka request. SASL/GSSAPI authentication is performed starting with this packet
<tr><td>message_size</td><td>The message_size field gives the size of the subsequent request or response message in bytes. The client can read requests by first reading this 4 byte size as an integer N, and then reading and parsing the subsequent N bytes of the request.</td></tr>
<p>A description of the message set format can be found <ahref="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets">here</a>. (KAFKA-3368)</p>