Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Derrick Or <derrickor@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#3214 from guozhangwang/KMinor-doc-java-brush
Configurations pertinent to topics have both a server default as well an optional per-topic override. If no per-topic configuration is given the server default is used. The override can be set at topic creation time by giving one or more <code>--config</code> options. This example creates a topic named <i>my-topic</i> with a custom max message size and flush rate:
The following are the topic-level configurations. The server's default configuration for this property is given under the Server Default Property heading. A given server default config value only applies to a topic if it does not have an explicit topic config override.
In standalone mode all work is performed in a single process. This configuration is simpler to setup and get started with and may be useful in situations where only one worker makes sense (e.g. collecting log files), but it does not benefit from some of the features of Kafka Connect such as fault tolerance. You can start a standalone process with the following command:
Distributed mode handles automatic balancing of work, allows you to scale up (or down) dynamically, and offers fault tolerance both in the active tasks and for configuration and offset commit data. Execution is very similar to standalone mode:
<p>Throughout the example we'll use schemaless JSON data format. To use schemaless format, we changed the following two lines in <code>connect-standalone.properties</code> from true to false:</p>
<pre>
<preclass="brush: text;">
key.converter.schemas.enable
value.converter.schemas.enable
</pre>
@ -131,7 +131,7 @@
@@ -131,7 +131,7 @@
After adding the transformations, <code>connect-file-source.properties</code> file looks as following:
<pre>
<preclass="brush: text;">
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
@ -149,7 +149,7 @@
@@ -149,7 +149,7 @@
When we ran the file source connector on my sample file without the transformations, and then read them using <code>kafka-console-consumer.sh</code>, the results were:
<pre>
<preclass="brush: text;">
"foo"
"bar"
"hello world"
@ -157,7 +157,7 @@
@@ -157,7 +157,7 @@
We then create a new file connector, this time after adding the transformations to the configuration file. This time, the results will be:
We'll cover the <code>SourceConnector</code> as a simple example. <code>SinkConnector</code> implementations are very similar. Start by creating the class that inherits from <code>SourceConnector</code> and add a couple of fields that will store parsed configuration information (the filename to read from and the topic to send data to):
<pre>
<preclass="brush: java;">
public class FileStreamSourceConnector extends SourceConnector {
private String filename;
private String topic;
@ -255,7 +255,7 @@
@@ -255,7 +255,7 @@
The easiest method to fill in is <code>getTaskClass()</code>, which defines the class that should be instantiated in worker processes to actually read the data:
<pre>
<preclass="brush: java;">
@Override
public Class<? extends Task> getTaskClass() {
return FileStreamSourceTask.class;
@ -264,7 +264,7 @@
@@ -264,7 +264,7 @@
We will define the <code>FileStreamSourceTask</code> class below. Next, we add some standard lifecycle methods, <code>start()</code> and <code>stop()</code>:
<pre>
<preclass="brush: java;">
@Override
public void start(Map<String, String> props) {
// The complete version includes error handling as well.
@ -282,7 +282,7 @@
@@ -282,7 +282,7 @@
handling a single file, so even though we may be permitted to generate more tasks as per the
<code>maxTasks</code> argument, we return a list with only one entry:
<pre>
<preclass="brush: java;">
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
@ -310,7 +310,7 @@
@@ -310,7 +310,7 @@
Just as with the connector, we need to create a class inheriting from the appropriate base <code>Task</code> class. It also has some standard lifecycle methods:
<pre>
<preclass="brush: java;">
public class FileStreamSourceTask extends SourceTask {
String filename;
InputStream stream;
@ -333,7 +333,7 @@
@@ -333,7 +333,7 @@
Next, we implement the main functionality of the task, the <code>poll()</code> method which gets events from the input system and returns a <code>List<SourceRecord></code>:
<pre>
<preclass="brush: java;">
@Override
public List<SourceRecord> poll() throws InterruptedException {
try {
@ -365,7 +365,7 @@
@@ -365,7 +365,7 @@
The previous section described how to implement a simple <code>SourceTask</code>. Unlike <code>SourceConnector</code> and <code>SinkConnector</code>, <code>SourceTask</code> and <code>SinkTask</code> have very different interfaces because <code>SourceTask</code> uses a pull interface and <code>SinkTask</code> uses a push interface. Both share the common lifecycle methods, but the <code>SinkTask</code> interface is quite different:
<pre>
<preclass="brush: java;">
public abstract class SinkTask implements Task {
public void initialize(SinkTaskContext context) {
this.context = context;
@ -388,7 +388,7 @@
@@ -388,7 +388,7 @@
To correctly resume upon startup, the task can use the <code>SourceContext</code> passed into its <code>initialize()</code> method to access the offset data. In <code>initialize()</code>, we would add a bit more code to read the offset (if it exists) and seek to that position:
Source connectors need to monitor the source system for changes, e.g. table additions/deletions in a database. When they pick up changes, they should notify the framework via the <code>ConnectorContext</code> object that reconfiguration is necessary. For example, in a <code>SourceConnector</code>:
<pre>
<preclass="brush: java;">
if (inputsChanged())
this.context.requestTaskReconfiguration();
</pre>
@ -423,7 +423,7 @@
@@ -423,7 +423,7 @@
The following code in <code>FileStreamSourceConnector</code> defines the configuration and exposes it to the framework.
<pre>
<preclass="brush: java;">
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
@ -445,7 +445,7 @@
@@ -445,7 +445,7 @@
The API documentation provides a complete reference, but here is a simple example creating a <code>Schema</code> and <code>Struct</code>:
<pre>
<preclass="brush: java;">
Schema schema = SchemaBuilder.struct().name(NAME)
.field("name", Schema.STRING_SCHEMA)
.field("age", Schema.INT_SCHEMA)
@ -473,7 +473,7 @@
@@ -473,7 +473,7 @@
When a connector is first submitted to the cluster, the workers rebalance the full set of connectors in the cluster and their tasks so that each worker has approximately the same amount of work. This same rebalancing procedure is also used when connectors increase or decrease the number of tasks they require, or when a connector's configuration is changed. You can use the REST API to view the current status of a connector and its tasks, including the id of the worker to which each was assigned. For example, querying the status of a file source (using <code>GET /connectors/file-source/status</code>) might produce output like the following:
Let's discuss a concrete example of such a stream. Say we have a topic containing user email addresses; every time a user updates their email address we send a message to this topic using their user id as the
primary key. Now say we send the following messages over some time period for a user with id 123, each message corresponding to a change in email address (messages for other ids are omitted):
<pre>
<preclass="brush: text;">
123 => bill@microsoft.com
.
.
@ -510,11 +510,11 @@
@@ -510,11 +510,11 @@
The log cleaner is enabled by default. This will start the pool of cleaner threads.
To enable log cleaning on a particular topic you can add the log-specific property
This can be used to prevent messages newer than a minimum message age from being subject to compaction. If not set, all log segments are eligible for compaction except for the last segment, i.e. the one currently
being written to. The active segment will not be compacted even if all of its messages are older than the minimum compaction time lag.
/* Send fetch request to a broker and get back a set of messages. */
@ -101,7 +101,7 @@
@@ -101,7 +101,7 @@
The low-level API is used to implement the high-level API as well as being used directly for some of our offline consumers which have particular requirements around maintaining state.
* 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
* 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
@ -185,7 +185,7 @@
@@ -185,7 +185,7 @@
<p>
The exact binary format for messages is versioned and maintained as a standard interface so message sets can be transferred between producer, broker, and client without recopying or conversion when desirable. This format is as follows:
</p>
<pre>
<preclass="brush: java;">
On-disk format of a message
offset : 8 bytes
@ -220,7 +220,7 @@
@@ -220,7 +220,7 @@
<p> The following is the format of the results sent to the consumer.
Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available).
<h4><aid="impl_zkconsumerid"href="#impl_zkconsumerid">Consumer Id Registry</a></h4>
<p>
In addition to the group_id which is shared by all consumers in a group, each consumer is given a transient, unique consumer_id (of the form hostname:uuid) for identification purposes. Consumer ids are registered in the following directory.
Each of the consumers in the group registers under its group and creates a znode with its consumer_id. The value of the znode contains a map of <topic, #streams>. This id is simply used to identify each of the consumers which is currently active within a group. This is an ephemeral node so it will disappear if the consumer process dies.
@ -337,7 +337,7 @@
@@ -337,7 +337,7 @@
<p>
Consumers track the maximum offset they have consumed in each partition. This value is stored in a ZooKeeper directory if <code>offsets.storage=zookeeper</code>.
Each broker partition is consumed by a single consumer within a given consumer group. The consumer must establish its ownership of a given partition before any consumption can begin. To establish its ownership, a consumer writes its own id in an ephemeral node under the particular broker partition it is claiming.
You have the option of either adding topics manually or having them be created automatically when data is first published to a non-existent topic. If topics are auto-created then you may want to tune the default <ahref="#topic-config">topic configurations</a> used for auto-created topics.
<p>
Topics are added and modified using the topic tool:
Be aware that one use case for partitions is to semantically partition data, and adding partitions doesn't change the partitioning of existing data so this may disturb consumers if they rely on that partition. That is if data is partitioned by <code>hash(key) % number_of_partitions</code> then this partitioning will potentially be shuffled by adding partitions but Kafka will not attempt to automatically redistribute data in any way.
Kafka does not currently support reducing the number of partitions for a topic.
<p>
@ -80,7 +80,7 @@
@@ -80,7 +80,7 @@
</ol>
Syncing the logs will happen automatically whenever the server is stopped other than by a hard kill, but the controlled leadership migration requires using a special setting:
<pre>
<preclass="brush: text;">
controlled.shutdown.enable=true
</pre>
Note that controlled shutdown will only succeed if <i>all</i> the partitions hosted on the broker have replicas (i.e. the replication factor is greater than 1 <i>and</i> at least one of these replicas is alive). This is generally what you want since shutting down the last replica would make that topic partition unavailable.
@ -90,12 +90,12 @@
@@ -90,12 +90,12 @@
Whenever a broker stops or crashes leadership for that broker's partitions transfers to other replicas. This means that by default when the broker is restarted it will only be a follower for all its partitions, meaning it will not be used for client reads and writes.
<p>
To avoid this imbalance, Kafka has a notion of preferred replicas. If the list of replicas for a partition is 1,5,9 then node 1 is preferred as the leader to either node 5 or 9 because it is earlier in the replica list. You can have the Kafka cluster try to restore leadership to the restored replicas by running the command:
Since running this command can be tedious you can also configure Kafka to do this automatically by setting the following configuration:
<pre>
<preclass="brush: text;">
auto.leader.rebalance.enable=true
</pre>
@ -103,7 +103,7 @@
@@ -103,7 +103,7 @@
The rack awareness feature spreads replicas of the same partition across different racks. This extends the guarantees Kafka provides for broker-failure to cover rack-failure, limiting the risk of data loss should all the brokers on a rack fail at once. The feature can also be applied to other broker groupings such as availability zones in EC2.
<p></p>
You can specify that a broker belongs to a particular rack by adding a property to the broker config:
When a topic is <ahref="#basic_ops_add_topic">created</a>, <ahref="#basic_ops_modify_topic">modified</a> or replicas are <ahref="#basic_ops_cluster_expansion">redistributed</a>, the rack constraint will be honoured, ensuring replicas span as many racks as they can (a partition will span min(#racks, replication-factor) different racks).
<p></p>
The algorithm used to assign replicas to brokers ensures that the number of leaders per broker will be constant, regardless of how brokers are distributed across racks. This ensures balanced throughput.
@ -123,7 +123,7 @@
@@ -123,7 +123,7 @@
The source and destination clusters are completely independent entities: they can have different numbers of partitions and the offsets will not be the same. For this reason the mirror cluster is not really intended as a fault-tolerance mechanism (as the consumer position will be different); for that we recommend using normal in-cluster replication. The mirror maker process will, however, retain and use the message key for partitioning so order is preserved on a per-key basis.
<p>
Here is an example showing how to mirror a single topic (named <i>my-topic</i>) from an input cluster:
Sometimes it's useful to see the position of your consumers. We have a tool that will show the position of all consumers in a consumer group as well as how far behind the end of the log they are. To run this tool on a consumer group named <i>my-group</i> consuming a topic named <i>my-topic</i> would look like this:
<pre>
<preclass="brush: bash;">
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
For instance, the following example will move all partitions for topics foo1,foo2 to the new set of brokers 5,6. At the end of this move, all partitions for topics foo1 and foo2 will <i>only</i> exist on brokers 5,6.
<p>
Since the tool accepts the input list of topics as a json file, you first need to identify the topics you want to move and create the json file as follows:
<pre>
<preclass="brush: bash;">
> cat topics-to-move.json
{"topics": [{"topic": "foo1"},
{"topic": "foo2"}],
@ -207,7 +207,7 @@
@@ -207,7 +207,7 @@
}
</pre>
Once the json file is ready, use the partition reassignment tool to generate a candidate assignment:
The tool generates a candidate assignment that will move all partitions from topics foo1,foo2 to brokers 5,6. Note, however, that at this point, the partition movement has not started, it merely tells you the current assignment and the proposed new assignment. The current assignment should be saved in case you want to rollback to it. The new assignment should be saved in a json file (e.g. expand-cluster-reassignment.json) to be input to the tool with the --execute option as follows:
Finally, the --verify option can be used with the tool to check the status of the partition reassignment. Note that the same expand-cluster-reassignment.json (used with the --execute option) should be used with the --verify option:
The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same expand-cluster-reassignment.json (used with the --execute option) should be used with the --verify option:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
@ -315,13 +315,13 @@
@@ -315,13 +315,13 @@
For instance, the following example increases the replication factor of partition 0 of topic foo from 1 to 3. Before increasing the replication factor, the partition's only replica existed on broker 5. As part of increasing the replication factor, we will add more replicas on brokers 6 and 7.
<p>
The first step is to hand craft the custom reassignment plan in a json file:
The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same increase-replication-factor.json (used with the --execute option) should be used with the --verify option:
There are two interfaces that can be used to engage a throttle. The simplest, and safest, is to apply a throttle when invoking the kafka-reassign-partitions.sh, but kafka-configs.sh can also be used to view and alter the throttle values directly.
<p></p>
So for example, if you were to execute a rebalance, with the below command, it would move partitions at no more than 50MB/s.
When you execute this script you will see the throttle engage:
<pre>
<preclass="brush: bash;">
The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.</pre>
<p>Should you wish to alter the throttle, during a rebalance, say to increase the throughput so it completes quicker, you can do this by re-running the execute command passing the same reassignment-json-file:</p>
It is possible to set default quotas for each (user, client-id), user or client-id group by specifying <i>--entity-default</i> option instead of <i>--entity-name</i>.
Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
</pre>
<p>
It is possible to set default quotas that apply to all client-ids by setting these configs on the brokers. These properties are applied only if quota overrides or defaults are not configured in Zookeeper. By default, each client-id receives an unlimited quota. The following sets the default quota per producer and consumer client-id to 10MB/sec.
<pre>
<preclass="brush: text;">
quota.producer.default=10485760
quota.consumer.default=10485760
</pre>
@ -556,7 +559,7 @@
@@ -556,7 +559,7 @@
<p>
<h4><aid="prodconfig"href="#prodconfig">A Production Server Config</a></h4>
Here is an example production server configuration:
<pre>
<preclass="brush: text;">
# ZooKeeper
zookeeper.connect=[list of ZooKeeper servers]
@ -582,7 +585,7 @@
@@ -582,7 +585,7 @@
LinkedIn is currently running JDK 1.8 u5 (looking to upgrade to a newer version) with the G1 collector. If you decide to use the G1 collector (the current default) and you are still on JDK 1.7, make sure you are on u51 or newer. LinkedIn tried out u21 in testing, but they had a number of problems with the GC implementation in that version.
When Pdflush cannot keep up with the rate of data being written it will eventually cause the writing process to block incurring latency in the writes to slow down the accumulation of data.
<p>
You can see the current state of OS memory usage by doing
@ -27,9 +27,9 @@ Since Kafka console scripts are different for Unix-based and Windows platforms,
@@ -27,9 +27,9 @@ Since Kafka console scripts are different for Unix-based and Windows platforms,
<ahref="https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"title="Kafka downloads">Download</a> the 0.10.2.0 release and un-tar it.
<pre>
><b>tar -xzf kafka_2.11-0.10.2.0.tgz</b>
><b>cd kafka_2.11-0.10.2.0</b>
<preclass="brush: bash;">
> tar -xzf kafka_2.11-0.10.2.0.tgz
> cd kafka_2.11-0.10.2.0
</pre>
<h4><aid="quickstart_startserver"href="#quickstart_startserver">Step 2: Start the server</a></h4>
@ -38,15 +38,15 @@ Since Kafka console scripts are different for Unix-based and Windows platforms,
@@ -38,15 +38,15 @@ Since Kafka console scripts are different for Unix-based and Windows platforms,
Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
@ -55,13 +55,13 @@ Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't
@@ -55,13 +55,13 @@ Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't
<h4><aid="quickstart_createtopic"href="#quickstart_createtopic">Step 3: Create a topic</a></h4>
<p>Let's create a topic named "test" with a single partition and only one replica:</p>
<p>Alternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.</p>
@ -72,18 +72,18 @@ test
@@ -72,18 +72,18 @@ test
<p>
Run the producer and then type a few messages into the console to send to the server.</p>
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
</pre>
<h4><aid="quickstart_consume"href="#quickstart_consume">Step 5: Start a consumer</a></h4>
<p>Kafka also has a command line consumer that will dump out messages to standard output.</p>
<pre>
><b>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning</b>
<preclass="brush: bash;">
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
</pre>
@ -100,15 +100,15 @@ All of the command line tools have additional options; running the command with
@@ -100,15 +100,15 @@ All of the command line tools have additional options; running the command with
<p>
First we make a config file for each of the brokers (on Windows use the <code>copy</code> command instead):
@ -221,8 +221,8 @@ Kafka topic to a file.</p>
@@ -221,8 +221,8 @@ Kafka topic to a file.</p>
<p>First, we'll start by creating some seed data to test with:</p>
<pre>
><b>echo -e "foo\nbar" > test.txt</b>
<preclass="brush: bash;">
> echo -e "foo\nbar" > test.txt
</pre>
<p>Next, we'll start two connectors running in <i>standalone</i> mode, which means they run in a single, local, dedicated
@ -231,8 +231,8 @@ process, containing common configuration such as the Kafka brokers to connect to
@@ -231,8 +231,8 @@ process, containing common configuration such as the Kafka brokers to connect to
The remaining configuration files each specify a connector to create. These files include a unique connector name, the connector
class to instantiate, and any other configuration required by the connector.</p>
@ -250,8 +250,8 @@ by examining the contents of the output file:
@@ -250,8 +250,8 @@ by examining the contents of the output file:
</p>
<pre>
><b>cat test.sink.txt</b>
<preclass="brush: bash;">
> cat test.sink.txt
foo
bar
</pre>
@ -262,8 +262,8 @@ data in the topic (or use custom consumer code to process it):
@@ -262,8 +262,8 @@ data in the topic (or use custom consumer code to process it):
@ -271,8 +271,8 @@ data in the topic (or use custom consumer code to process it):
@@ -271,8 +271,8 @@ data in the topic (or use custom consumer code to process it):
<p>The connectors continue to process data, so we can add data to the file and see it move through the pipeline:</p>
<pre>
><b>echo "Another line" >> test.txt</b>
<preclass="brush: bash;">
> echo "Another line" >> test.txt
</pre>
<p>You should see the line appear in the console consumer output and in the sink file.</p>
@ -284,7 +284,7 @@ Kafka Streams is a client library of Kafka for real-time stream processing and a
@@ -284,7 +284,7 @@ Kafka Streams is a client library of Kafka for real-time stream processing and a
This quickstart example will demonstrate how to run a streaming application coded in this library. Here is the gist
of the <code><ahref="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java">WordCountDemo</a></code> example code (converted to use Java 8 lambda expressions for easy reading).
</p>
<pre>
<preclass="brush: bash;">
// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
@ -333,14 +333,14 @@ As the first step, we will prepare input data to a Kafka topic, which will subse
@@ -333,14 +333,14 @@ As the first step, we will prepare input data to a Kafka topic, which will subse
-->
<pre>
><b>echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt</b>
<preclass="brush: bash;">
> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
</pre>
Or on Windows:
<pre>
><b>echo all streams lead to kafka> file-input.txt</b>
@ -349,25 +349,25 @@ which reads the data from STDIN line-by-line, and publishes each line as a separ
@@ -349,25 +349,25 @@ which reads the data from STDIN line-by-line, and publishes each line as a separ
stream data will likely be flowing continuously into Kafka where the application will be up and running):
@ -380,22 +380,22 @@ The demo will run for a few seconds and then, unlike typical stream processing a
@@ -380,22 +380,22 @@ The demo will run for a few seconds and then, unlike typical stream processing a
We can now inspect the output of the WordCount demo application by reading from its output topic:
<li><h4><aid="security_ssl_key"href="#security_ssl_key">Generate SSL key and certificate for each Kafka broker</a></h4>
The first step of deploying HTTPS is to generate the key and the certificate for each machine in the cluster. You can use Java's keytool utility to accomplish this task.
We will generate the key into a temporary keystore initially so that we can export and sign it later with CA.
You need to specify two parameters in the above command:
@ -53,7 +53,7 @@
@@ -53,7 +53,7 @@
<br>
Note: By default the property <code>ssl.endpoint.identification.algorithm</code> is not defined, so hostname verification is not performed. In order to enable hostname verification, set the following property:
Once enabled, clients will verify the server's fully qualified domain name (FQDN) against one of the following two fields:
<ol>
@ -62,43 +62,43 @@
@@ -62,43 +62,43 @@
</ol>
<br>
Both fields are valid, RFC-2818 recommends the use of SAN however. SAN is also more flexible, allowing for multiple DNS entries to be declared. Another advantage is that the CN can be set to a more meaningful value for authorization purposes. To add a SAN field append the following argument <code> -ext SAN=DNS:{FQDN} </code> to the keytool command:
The following command can be run afterwards to verify the contents of the generated certificate:
<pre>
<preclass="brush: bash;">
keytool -list -v -keystore server.keystore.jks
</pre>
</li>
<li><h4><aid="security_ssl_ca"href="#security_ssl_ca">Creating your own CA</a></h4>
After the first step, each machine in the cluster has a public-private key pair, and a certificate to identify the machine. The certificate, however, is unsigned, which means that an attacker can create such a certificate to pretend to be any machine.<p>
Therefore, it is important to prevent forged certificates by signing them for each machine in the cluster. A certificate authority (CA) is responsible for signing certificates. CA works likes a government that issues passports—the government stamps (signs) each passport so that the passport becomes difficult to forge. Other governments verify the stamps to ensure the passport is authentic. Similarly, the CA signs the certificates, and the cryptography guarantees that a signed certificate is computationally difficult to forge. Thus, as long as the CA is a genuine and trusted authority, the clients have high assurance that they are connecting to the authentic machines.
<b>Note:</b> If you configure the Kafka brokers to require client authentication by setting ssl.client.auth to be "requested" or "required" on the <ahref="#config_broker">Kafka brokers config</a> then you must provide a truststore for the Kafka brokers as well and it should have all the CA certificates that clients' keys were signed by.
In contrast to the keystore in step 1 that stores each machine's own identity, the truststore of a client stores all the certificates that the client should trust. Importing a certificate into one's truststore also means trusting all certificates that are signed by that certificate. As the analogy above, trusting the government (CA) also means trusting all passports (certificates) that it has issued. This attribute is called the chain of trust, and it is particularly useful when deploying SSL on a large Kafka cluster. You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that trusts the CA. That way all machines can authenticate all other machines.</li>
<li><h4><aid="security_ssl_signing"href="#security_ssl_signing">Signing the certificate</a></h4>
The next step is to sign all certificates generated by step 1 with the CA generated in step 2. First, you need to export the certificate from the keystore:
SSL is supported only for the new Kafka Producer and Consumer, the older API is not supported. The configs for SSL will be the same for both producer and consumer.<br>
If client authentication is not required in the broker, then the following is a minimal configuration example:
Note: ssl.truststore.password is technically optional but highly recommended. If a password is not set access to the truststore is still available, but integrity checking is disabled.
If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured:
If you are using the organization's Kerberos or Active Directory server, ask your Kerberos administrator for a principal for each Kafka broker in your cluster and for every operating system user that will access Kafka with Kerberos authentication (via clients and tools).</br>
If you have installed your own Kerberos, you will need to create these principals yourself using the following commands:
<li><b>Make sure all hosts can be reachable using hostnames</b> - it is a Kerberos requirement that all your hosts can be resolved with their FQDNs.</li>
<li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example (note that each broker should have its own keytab):
<li>Make sure the keytabs configured in the JAAS file are readable by the operating system user who is starting kafka broker.</li>
<li>Configure SASL port and SASL mechanisms in server.properties as described <ahref="#security_sasl_brokerconfig">here</a>.</pre> For example:
<li>Configure SASL port and SASL mechanisms in server.properties as described <ahref="#security_sasl_brokerconfig">here</a>. For example:
<pre> listeners=SASL_PLAINTEXT://host.name:port
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
@ -422,7 +422,6 @@
@@ -422,7 +422,6 @@
as described <ahref="#security_client_staticjaas">here</a>. Clients use the login section named
<tt>KafkaClient</tt>. This option allows only one user for all client connections from a JVM.</li>
<li>Make sure the keytabs configured in the JAAS configuration are readable by the operating system user who is starting kafka client.</li>
</li>
<li>Optionally pass the krb5 file locations as JVM parameters to each client JVM (see <ahref="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html">here</a> for more details):
<li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
<p>JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers
as described <ahref="#security_client_staticjaas">here</a>. Clients use the login section named
<tt>KafkaClient</tt>. This option allows only one user for all client connections from a JVM.</p></li>
</li>
<li>Configure the following properties in producer.properties or consumer.properties:
<pre>
security.protocol=SASL_SSL
@ -791,25 +789,25 @@
@@ -791,25 +789,25 @@
<ul>
<li><b>Adding Acls</b><br>
Suppose you want to add an acl "Principals User:Bob and User:Alice are allowed to perform Operation Read and Write on Topic Test-Topic from IP 198.51.100.0 and IP 198.51.100.1". You can do that by executing the CLI with following options:
By default, all principals that don't have an explicit acl that allows access for an operation to a resource are denied. In rare cases where an allow acl is defined that allows access to all but some principal we will have to use the --deny-principal and --deny-host option. For example, if we want to allow all users to Read from Test-topic but only deny User:BadBob from IP 198.51.100.3 we can do so using following commands:
Note that ``--allow-host`` and ``deny-host`` only support IP addresses (hostnames are not supported).
Above examples add acls to a topic by specifying --topic [topic-name] as the resource option. Similarly user can add acls to cluster by specifying --cluster and to a consumer group by specifying --group [group-name].</li>
<li><b>Removing Acls</b><br>
Removing acls is pretty much the same. The only difference is instead of --add option users will have to specify --remove option. To remove the acls added by the first example above we can execute the CLI with following options:
We can list acls for any resource by specifying the --list option with the resource. To list all acls for Test-topic we can execute the CLI with following options:
<li><b>Adding or removing a principal as producer or consumer</b><br>
The most common use case for acl management are adding/removing a principal as producer or consumer so we added convenience options to handle these cases. In order to add User:Bob as a producer of Test-topic we can execute the following command:
Note that for consumer option we must also specify the consumer group.
In order to remove a principal from producer or consumer role we just need to pass --remove option. </li>
</ul>
@ -835,50 +833,58 @@
@@ -835,50 +833,58 @@
<p></p>
As an example, say we wish to encrypt both broker-client and broker-broker communication with SSL. In the first incremental bounce, a SSL port is opened on each node:
In the final bounce we secure the cluster by closing the PLAINTEXT port:
<pre>
listeners=SSL://broker1:9092
security.inter.broker.protocol=SSL</pre>
security.inter.broker.protocol=SSL
</pre>
Alternatively we might choose to open multiple ports so that different protocols can be used for broker-broker and broker-client communication. Say we wished to use SSL encryption throughout (i.e. for broker-broker and broker-client communication) but we'd like to add SASL authentication to the broker-client connection also. We would achieve this by opening two additional ports during the first bounce:
set the necessary parameters, and construct a <code>StreamsConfig</code> instance from the <code>Properties</code> instance.
</p>
<pre>
<preclass="brush: java;">
import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
@ -702,7 +702,7 @@ StreamsConfig config = new StreamsConfig(settings);
@@ -702,7 +702,7 @@ StreamsConfig config = new StreamsConfig(settings);
If you want to set different values for consumer and producer for such a parameter, you can prefix the parameter name with <code>consumer.</code> or <code>producer.</code>:
</p>
<pre>
<preclass="brush: java;">
Properties settings = new Properties();
// Example of a "normal" setting for Kafka Streams
@ -778,7 +778,7 @@ KafkaStreams streams = new KafkaStreams(builder, config);
@@ -778,7 +778,7 @@ KafkaStreams streams = new KafkaStreams(builder, config);
At this point, internal structures have been initialized, but the processing is not started yet. You have to explicitly start the Kafka Streams thread by calling the <code>start()</code> method:
To catch any unexpected exceptions, you may set an <code>java.lang.Thread.UncaughtExceptionHandler</code> before you start the application. This handler is called whenever a stream thread is terminated by an unexpected exception:
Now it's time to execute your application that uses the Kafka Streams library, which can be run just like any other Java application – there is no special magic or requirement on the side of Kafka Streams.
For example, you can package your Java application as a fat jar file and then start the application via:
<pre>
<preclass="brush: bash;">
# Start the application in class `com.example.MyStreamsApp`
# from the fat jar named `path-to-app-fatjar.jar`.