Browse Source

KAFKA-8984: Improve tagged fields documentation

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Vikas Singh <vikas@confluent.io>, Jason Gustafson <jason@confluent.io>

Closes #7477 from cmccabe/KAFKA-8984
pull/7675/head
Colin P. Mccabe 5 years ago committed by Manikumar Reddy
parent
commit
67fd88050f
  1. 3
      clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
  2. 50
      clients/src/main/resources/common/message/README.md
  3. 19
      docs/protocol.html
  4. 78
      tests/bin/flatten_html.sh

3
clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java

@ -20,6 +20,7 @@ import org.apache.kafka.common.message.RequestHeaderData; @@ -20,6 +20,7 @@ import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.message.ResponseHeaderData;
import org.apache.kafka.common.protocol.types.BoundField;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.TaggedFields;
import org.apache.kafka.common.protocol.types.Type;
import java.util.LinkedHashMap;
@ -50,6 +51,8 @@ public class Protocol { @@ -50,6 +51,8 @@ public class Protocol {
if (!subTypes.containsKey(field.def.name)) {
subTypes.put(field.def.name, type.arrayElementType().get());
}
} else if (type instanceof TaggedFields) {
b.append("TAG_BUFFER ");
} else {
b.append(field.def.name);
b.append(" ");

50
clients/src/main/resources/common/message/README.md

@ -69,25 +69,28 @@ Field Types @@ -69,25 +69,28 @@ Field Types
-----------
There are several primitive field types available.
* "boolean": either true or false. This takes up 1 byte on the wire.
* "boolean": either true or false.
* "int8": an 8-bit integer. This also takes up 1 byte on the wire.
* "int8": an 8-bit integer.
* "int16": a 16-bit integer. This takes up 2 bytes on the wire.
* "int16": a 16-bit integer.
* "int32": a 32-bit integer. This takes up 4 bytes on the wire.
* "int32": a 32-bit integer.
* "int64": a 64-bit integer. This takes up 8 bytes on the wire.
* "int64": a 64-bit integer.
* "string": a string. This must be less than 64kb in size when serialized as UTF-8. This takes up 2 bytes on the wire, plus the length of the string when serialized to UTF-8.
* "string": a UTF-8 string.
* "bytes": binary data. This takes up 4 bytes on the wire, plus the length of the bytes.
* "bytes": binary data.
In addition to these primitive field types, there is also an array type. Array
types start with a "[]" and end with the name of the element type. For
example, []Foo declares an array of "Foo" objects. Array fields have their own
array of fields, which specifies what is in the contained objects.
For information about how fields are serialized, see the [Kafka Protocol
Guide](https://kafka.apache.org/protocol.html).
Nullable Fields
---------------
Booleans and ints can never be null. However, fields that are strings, bytes,
@ -104,6 +107,39 @@ If a field is declared as non-nullable, and it is present in the message @@ -104,6 +107,39 @@ If a field is declared as non-nullable, and it is present in the message
version you are using, you should set it to a non-null value before serializing
the message. Otherwise, you will get a runtime error.
Tagged Fields
-------------
Tagged fields are an extension to the Kafka protocol which allows optional data
to be attached to messages. Tagged fields can appear at the root level of
messages, or within any structure in the message.
Unlike mandatory fields, tagged fields can be added to message versions that
already exists. Older servers will ignore new tagged fields which they do not
understand.
In order to make a field tagged, set a "tag" for the field, and also set up
tagged versions for the field. The taggedVersions you specify should be
open-ended-- that is, they should specify a start version, but not an end
version.
You can remove support for a tagged field from a specific version of a message,
but you can't reuse a tag once it has been used for something else. Once tags
have been used for something, they can't be used for anything else, without
breaking compatibilty.
Note that tagged fields can only be added to "flexible" message versions.
Flexible Versions
-----------------
Kafka serialization has been improved over time to be more flexible and
efficient. Message versions that contain these improvements are referred to as
"flexible versions."
In flexible verisons, variable-length fields such as strings, arrays, and bytes
fields are serialized in a more efficient way that saves space. The new
serialization types start with compact. For example COMPACT_STRING is a more
efficient form of STRING.
Serializing Messages
--------------------
The Message#write method writes out a message to a buffer. The fields that are

19
docs/protocol.html

@ -32,6 +32,8 @@ @@ -32,6 +32,8 @@
<li><a href="#protocol_partitioning_strategies">Partitioning Strategies</a>
<li><a href="#protocol_batching">Batching</a>
<li><a href="#protocol_compatibility">Versioning and Compatibility</a>
<li><a href="#api_versions">Retrieving Supported API versions</a>
<li><a href="#sasl_handshake">SASL Authentication Sequence</a>
</ul>
</li>
<li><a href="#protocol_details">The Protocol</a>
@ -42,6 +44,11 @@ @@ -42,6 +44,11 @@
<li><a href="#protocol_recordbatch">Record Batch</a>
</ul>
</li>
<li><a href="#protocol_evolution">Evolving the Protocol</a>
<ul>
<li><a href="#protocol_versioning">The Request Header</a>
<li><a href="#protocol_versioning">Versioning</a>
</ul>
<li><a href="#protocol_constants">Constants</a>
<ul>
<li><a href="#protocol_error_codes">Error Codes</a>
@ -107,17 +114,19 @@ @@ -107,17 +114,19 @@
<p>The client implementer can choose to ignore this and send everything one at a time if they like.</p>
<h5><a id="protocol_compatibility" href="#protocol_compatibility">Versioning and Compatibility</a></h5>
<h5><a id="protocol_compatibility" href="#protocol_compatibility">Compatibility</a></h5>
<p>Kafka has a "bidirectional" client compatibility policy. In other words, new clients can talk to old servers, and old clients can talk to new servers. This allows users to upgrade either clients or servers without experiencing any downtime.
<p>The protocol is designed to enable incremental evolution in a backward compatible fashion. Our versioning is on a per API basis, each version consisting of a request and response pair. Each request contains an API key that identifies the API being invoked and a version number that indicates the format of the request and the expected format of the response.</p>
<p>Since the Kafka protocol has changed over time, clients and servers need to agree on the schema of the message that they are sending over the wire. This is done through API versioning.
<p>Before each request is sent, the client sends the API key and the API version. These two 16-bit numbers, when taken together, uniquely identify the schema of the message to follow.
<p>The intention is that clients will support a range of API versions. When communicating with a particular broker, a given client should use the highest API version supported by both and indicate this version in their requests.</p>
<p>The server will reject requests with a version it does not support, and will always respond to the client with exactly the protocol format it expects based on the version it included in its request. The intended upgrade path is that new features would first be rolled out on the server (with the older clients not making use of them) and then as newer clients are deployed these new features would gradually be taken advantage of.</p>
<p>Our goal is primarily to allow API evolution in an environment where downtime is not allowed and clients and servers cannot all be changed at once.</p>
<p>Currently all versions are baselined at 0, as we evolve these APIs we will indicate the format for each version individually.</p>
<p>Note that <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields">KIP-482 tagged fields</a> can be added to a request without incrementing the version number. This offers an additional way of evolving the message schema without breaking compatibility. Tagged fields do not take up any space when the field is not set. Therefore, if a field is rarely used, it is more efficient to make it a tagged field than to put it in the mandatory schema. However, tagged fields are ignored by recipients that don't know about them, which could pose a challenge if this is not the behavior that the sender wants. In such cases, a version bump may be more appropriate.
<h5><a id="api_versions" href="#api_versions">Retrieving Supported API versions</a></h5>
<p>In order to work against multiple broker versions, clients need to know what versions of various APIs a

78
tests/bin/flatten_html.sh

@ -0,0 +1,78 @@ @@ -0,0 +1,78 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
usage() {
cat <<EOF
flatten_html.sh: This script "flattens" an HTML file by inlining all
files included via "#include virtual". This is useful when making
changes to the Kafka documentation files.
Typical usage:
./gradlew docsJar
./tests/bin/flatten_html.sh -f ./docs/protocol.html > /tmp/my-protocol.html
firefox /tmp/my-protocol.html &
usage:
$0 [flags]
flags:
-f [filename] The HTML file to process.
-h Print this help message.
EOF
}
die() {
echo $@
exit 1
}
realpath() {
[[ $1 = /* ]] && echo "$1" || echo "$PWD/${1#./}"
}
process_file() {
local CUR_FILE="${1}"
[[ -f "${CUR_FILE}" ]] || die "Unable to open input file ${CUR_FILE}"
while IFS= read -r LINE; do
if [[ $LINE =~ \#include\ virtual=\"(.*)\" ]]; then
local INCLUDED_FILE="${BASH_REMATCH[1]}"
if [[ $INCLUDED_FILE =~ ../includes/ ]]; then
: # ignore ../includes
else
pushd "$(dirname "${CUR_FILE}")" &> /dev/null \
|| die "failed to change directory to directory of ${CUR_FILE}"
process_file "${INCLUDED_FILE}"
popd &> /dev/null
fi
else
echo "${LINE}"
fi
done < "${CUR_FILE}"
}
FILE=""
while getopts "f:h" arg; do
case $arg in
f) FILE=$OPTARG;;
h) usage; exit 0;;
*) echo "Error parsing command-line arguments."
usage
exit 1;;
esac
done
[[ -z "${FILE}" ]] && die "You must specify which file to process. -h for help."
process_file "${FILE}"
Loading…
Cancel
Save