From 57b0d0fe572399048e523d54beafa3520a708cf5 Mon Sep 17 00:00:00 2001
From: Guozhang Wang
To use the producer, you can use the following maven dependency:
-
To use the consumer, you can use the following maven dependency:
-
To use Kafka Streams you can use the following maven dependency:
-
To use the AdminClient API, add the following Maven dependency:
- Throughout the example we'll use schemaless JSON data format. To use schemaless format, we changed the following two lines in
+
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
@@ -51,7 +51,7 @@
javadocs.
+
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
@@ -70,7 +70,7 @@
+
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
@@ -92,7 +92,7 @@
The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects.
+
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
diff --git a/docs/configuration.html b/docs/configuration.html
index 2cad283c428..ea4a5bb231e 100644
--- a/docs/configuration.html
+++ b/docs/configuration.html
@@ -36,23 +36,24 @@
Topic-level configuration
Configurations pertinent to topics have both a server default as well an optional per-topic override. If no per-topic configuration is given the server default is used. The override can be set at topic creation time by giving one or more
--config
options. This example creates a topic named my-topic with a custom max message size and flush rate:
-
- > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
- --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
+
+ > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
+ --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
Overrides can also be changed or set later using the alter configs command. This example updates the max message size for my-topic:
-
- > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --add-config max.message.bytes=128000
+
+ > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic
+ --alter --add-config max.message.bytes=128000
To check overrides set on the topic you can do
-
- > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe
+
+ > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe
To remove an override you can do
-
- > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes
+
+ > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes
The following are the topic-level configurations. The server's default configuration for this property is given under the Server Default Property heading. A given server default config value only applies to a topic if it does not have an explicit topic config override.
diff --git a/docs/connect.html b/docs/connect.html
index 48c51395409..57505d6fa0b 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -40,7 +40,7 @@
In standalone mode all work is performed in a single process. This configuration is simpler to setup and get started with and may be useful in situations where only one worker makes sense (e.g. collecting log files), but it does not benefit from some of the features of Kafka Connect such as fault tolerance. You can start a standalone process with the following command:
-
+
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
@@ -62,7 +62,7 @@
Distributed mode handles automatic balancing of work, allows you to scale up (or down) dynamically, and offers fault tolerance both in the active tasks and for configuration and offset commit data. Execution is very similar to standalone mode:
-
+
> bin/connect-distributed.sh config/connect-distributed.properties
@@ -118,7 +118,7 @@
connect-standalone.properties
from true to false:
+
key.converter.schemas.enable
value.converter.schemas.enable
@@ -131,7 +131,7 @@
After adding the transformations, connect-file-source.properties
file looks as following:
-
+
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
@@ -149,7 +149,7 @@
When we ran the file source connector on my sample file without the transformations, and then read them using
kafka-console-consumer.sh
, the results were:
-
+
"foo"
"bar"
"hello world"
@@ -157,7 +157,7 @@
We then create a new file connector, this time after adding the transformations to the configuration file. This time, the results will be:
-
+
{"line":"foo","data_source":"test-file-source"}
{"line":"bar","data_source":"test-file-source"}
{"line":"hello world","data_source":"test-file-source"}
@@ -247,7 +247,7 @@
We'll cover the
SourceConnector
as a simple example. SinkConnector
implementations are very similar. Start by creating the class that inherits from SourceConnector
and add a couple of fields that will store parsed configuration information (the filename to read from and the topic to send data to):
-
+
public class FileStreamSourceConnector extends SourceConnector {
private String filename;
private String topic;
@@ -255,7 +255,7 @@
The easiest method to fill in is
getTaskClass()
, which defines the class that should be instantiated in worker processes to actually read the data:
-
+
@Override
public Class<? extends Task> getTaskClass() {
return FileStreamSourceTask.class;
@@ -264,7 +264,7 @@
We will define the
FileStreamSourceTask
class below. Next, we add some standard lifecycle methods, start()
and stop()
:
-
+
@Override
public void start(Map<String, String> props) {
// The complete version includes error handling as well.
@@ -282,7 +282,7 @@
handling a single file, so even though we may be permitted to generate more tasks as per the
maxTasks
argument, we return a list with only one entry:
-
+
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
@@ -310,7 +310,7 @@
Just as with the connector, we need to create a class inheriting from the appropriate base
Task
class. It also has some standard lifecycle methods:
-
+
public class FileStreamSourceTask extends SourceTask {
String filename;
InputStream stream;
@@ -333,7 +333,7 @@
Next, we implement the main functionality of the task, the
poll()
method which gets events from the input system and returns a List<SourceRecord>
:
-
+
@Override
public List<SourceRecord> poll() throws InterruptedException {
try {
@@ -365,7 +365,7 @@
The previous section described how to implement a simple
SourceTask
. Unlike SourceConnector
and SinkConnector
, SourceTask
and SinkTask
have very different interfaces because SourceTask
uses a pull interface and SinkTask
uses a push interface. Both share the common lifecycle methods, but the SinkTask
interface is quite different:
-
+
public abstract class SinkTask implements Task {
public void initialize(SinkTaskContext context) {
this.context = context;
@@ -388,7 +388,7 @@
To correctly resume upon startup, the task can use the
SourceContext
passed into its initialize()
method to access the offset data. In initialize()
, we would add a bit more code to read the offset (if it exists) and seek to that position:
-
+
stream = new FileInputStream(filename);
Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
if (offset != null) {
@@ -406,7 +406,7 @@
Source connectors need to monitor the source system for changes, e.g. table additions/deletions in a database. When they pick up changes, they should notify the framework via the
ConnectorContext
object that reconfiguration is necessary. For example, in a SourceConnector
:
-
+
if (inputsChanged())
this.context.requestTaskReconfiguration();
@@ -423,7 +423,7 @@
The following code in FileStreamSourceConnector
defines the configuration and exposes it to the framework.
-
+
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
@@ -445,7 +445,7 @@
The API documentation provides a complete reference, but here is a simple example creating a
Schema
and Struct
:
-
+
Schema schema = SchemaBuilder.struct().name(NAME)
.field("name", Schema.STRING_SCHEMA)
.field("age", Schema.INT_SCHEMA)
@@ -473,7 +473,7 @@
When a connector is first submitted to the cluster, the workers rebalance the full set of connectors in the cluster and their tasks so that each worker has approximately the same amount of work. This same rebalancing procedure is also used when connectors increase or decrease the number of tasks they require, or when a connector's configuration is changed. You can use the REST API to view the current status of a connector and its tasks, including the id of the worker to which each was assigned. For example, querying the status of a file source (using
GET /connectors/file-source/status
) might produce output like the following:
+{ "name": "file-source", "connector": { diff --git a/docs/design.html b/docs/design.html index c061cf453ae..0373a2d197c 100644 --- a/docs/design.html +++ b/docs/design.html @@ -417,7 +417,7 @@Let's discuss a concrete example of such a stream. Say we have a topic containing user email addresses; every time a user updates their email address we send a message to this topic using their user id as the primary key. Now say we send the following messages over some time period for a user with id 123, each message corresponding to a change in email address (messages for other ids are omitted): -
+123 => bill@microsoft.com . . @@ -510,11 +510,11 @@ The log cleaner is enabled by default. This will start the pool of cleaner threads. To enable log cleaning on a particular topic you can add the log-specific property -log.cleanup.policy=compact+log.cleanup.policy=compactThis can be done either at topic creation time or using the alter topic command.The log cleaner can be configured to retain a minimum amount of the uncompacted "head" of the log. This is enabled by setting the compaction time lag. -
log.cleaner.min.compaction.lag.ms+log.cleaner.min.compaction.lag.msThis can be used to prevent messages newer than a minimum message age from being subject to compaction. If not set, all log segments are eligible for compaction except for the last segment, i.e. the one currently being written to. The active segment will not be compacted even if all of its messages are older than the minimum compaction time lag. diff --git a/docs/implementation.html b/docs/implementation.html index 48602f26301..5ff75ec5053 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -22,8 +22,8 @@The Producer API that wraps the 2 low-level producers -
kafka.producer.SyncProducer
andkafka.producer.async.AsyncProducer
. -- class ProducerFor example: +{ + + class Producer<T> { /* Sends the data, partitioned by key to the topic using either the */ /* synchronous or the asynchronous producer */ @@ -48,7 +48,7 @@handles the serialization of data through a user-specified Encoder
: -+interface Encoder<T> { public Message toMessage(T data); } @@ -58,7 +58,7 @@provides software load balancing through an optionally user-specified Partitioner
:The routing decision is influenced by the
kafka.producer.Partitioner
. -+interface Partitioner<T> { int partition(T key, int numPartitions); } @@ -78,7 +78,7 @@Low-level API
-+class SimpleConsumer { /* Send fetch request to a broker and get back a set of messages. */ @@ -101,7 +101,7 @@ The low-level API is used to implement the high-level API as well as being used directly for some of our offline consumers which have particular requirements around maintaining state.High-level API
-+/* create a connection to the cluster */ ConsumerConnector connector = Consumer.create(consumerConfig); @@ -156,8 +156,8 @@5.4 Message Format
-- /** ++ /** * 1. 4 byte CRC32 of the message * 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1 * 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version @@ -185,7 +185,7 @@The exact binary format for messages is versioned and maintained as a standard interface so message sets can be transferred between producer, broker, and client without recopying or conversion when desirable. This format is as follows:
-+On-disk format of a message offset : 8 bytes @@ -220,7 +220,7 @@The following is the format of the results sent to the consumer. -
+MessageSetSend (fetch result) total length : 4 bytes @@ -230,7 +230,7 @@ message n : x bytes-+MultiMessageSetSend (multiFetch result) total length : 4 bytes @@ -292,7 +292,7 @@Broker Node Registry
-+/brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)@@ -302,7 +302,7 @@ Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available).
Broker Topic Registry
-+/brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)@@ -327,7 +327,7 @@Consumer Id Registry
In addition to the group_id which is shared by all consumers in a group, each consumer is given a transient, unique consumer_id (of the form hostname:uuid) for identification purposes. Consumer ids are registered in the following directory. -
+/consumers/[group_id]/ids/[consumer_id] --> {"version":...,"subscription":{...:...},"pattern":...,"timestamp":...} (ephemeral node)Each of the consumers in the group registers under its group and creates a znode with its consumer_id. The value of the znode contains a map of <topic, #streams>. This id is simply used to identify each of the consumers which is currently active within a group. This is an ephemeral node so it will disappear if the consumer process dies. @@ -337,7 +337,7 @@Consumers track the maximum offset they have consumed in each partition. This value is stored in a ZooKeeper directory if
-offsets.storage=zookeeper
.+/consumers/[group_id]/offsets/[topic]/[partition_id] --> offset_counter_value (persistent node)@@ -347,7 +347,7 @@ Each broker partition is consumed by a single consumer within a given consumer group. The consumer must establish its ownership of a given partition before any consumption can begin. To establish its ownership, a consumer writes its own id in an ephemeral node under the particular broker partition it is claiming. -+/consumers/[group_id]/owners/[topic]/[partition_id] --> consumer_node_id (ephemeral node)@@ -389,7 +389,7 @@Each consumer does the following during rebalancing:
-+1. For each topic T that Ci subscribes to 2. let PT be all partitions producing topic T 3. let CG be all consumers in the same group as Ci that consume topic T diff --git a/docs/ops.html b/docs/ops.html index 46edb194c0b..94b6c2dc453 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -27,7 +27,7 @@ You have the option of either adding topics manually or having them be created automatically when data is first published to a non-existent topic. If topics are auto-created then you may want to tune the default topic configurations used for auto-created topics.Topics are added and modified using the topic tool: -
+> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y@@ -44,26 +44,26 @@ You can change the configuration or partitioning of a topic using the same topic tool.To add partitions you can do -
+> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --partitions 40Be aware that one use case for partitions is to semantically partition data, and adding partitions doesn't change the partitioning of existing data so this may disturb consumers if they rely on that partition. That is if data is partitioned byhash(key) % number_of_partitions
then this partitioning will potentially be shuffled by adding partitions but Kafka will not attempt to automatically redistribute data in any way.To add configs: -
+> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=yTo remove a config: -+> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --delete-config xAnd finally deleting a topic: -+> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_nameTopic deletion option is disabled by default. To enable it set the server config -delete.topic.enable=true+delete.topic.enable=trueKafka does not currently support reducing the number of partitions for a topic.
@@ -80,7 +80,7 @@ Syncing the logs will happen automatically whenever the server is stopped other than by a hard kill, but the controlled leadership migration requires using a special setting: -
+controlled.shutdown.enable=trueNote that controlled shutdown will only succeed if all the partitions hosted on the broker have replicas (i.e. the replication factor is greater than 1 and at least one of these replicas is alive). This is generally what you want since shutting down the last replica would make that topic partition unavailable. @@ -90,12 +90,12 @@ Whenever a broker stops or crashes leadership for that broker's partitions transfers to other replicas. This means that by default when the broker is restarted it will only be a follower for all its partitions, meaning it will not be used for client reads and writes.To avoid this imbalance, Kafka has a notion of preferred replicas. If the list of replicas for a partition is 1,5,9 then node 1 is preferred as the leader to either node 5 or 9 because it is earlier in the replica list. You can have the Kafka cluster try to restore leadership to the restored replicas by running the command: -
+> bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chrootSince running this command can be tedious you can also configure Kafka to do this automatically by setting the following configuration: -+auto.leader.rebalance.enable=true@@ -103,7 +103,7 @@ The rack awareness feature spreads replicas of the same partition across different racks. This extends the guarantees Kafka provides for broker-failure to cover rack-failure, limiting the risk of data loss should all the brokers on a rack fail at once. The feature can also be applied to other broker groupings such as availability zones in EC2. You can specify that a broker belongs to a particular rack by adding a property to the broker config: -broker.rack=my-rack-id+broker.rack=my-rack-idWhen a topic is created, modified or replicas are redistributed, the rack constraint will be honoured, ensuring replicas span as many racks as they can (a partition will span min(#racks, replication-factor) different racks). The algorithm used to assign replicas to brokers ensures that the number of leaders per broker will be constant, regardless of how brokers are distributed across racks. This ensures balanced throughput. @@ -123,7 +123,7 @@ The source and destination clusters are completely independent entities: they can have different numbers of partitions and the offsets will not be the same. For this reason the mirror cluster is not really intended as a fault-tolerance mechanism (as the consumer position will be different); for that we recommend using normal in-cluster replication. The mirror maker process will, however, retain and use the message key for partitioning so order is preserved on a per-key basis.Here is an example showing how to mirror a single topic (named my-topic) from an input cluster: -
+> bin/kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist my-topic @@ -139,7 +139,7 @@Checking consumer position
Sometimes it's useful to see the position of your consumers. We have a tool that will show the position of all consumers in a consumer group as well as how far behind the end of the log they are. To run this tool on a consumer group named my-group consuming a topic named my-topic would look like this: -+> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test Group Topic Pid Offset logSize Lag Owner my-group my-topic 0 0 0 0 test_jkreps-mn-1394154511599-60744496-0 @@ -157,7 +157,7 @@ For example, to list all consumer groups across all topics: -+> bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list test-consumer-group @@ -165,7 +165,7 @@ To view offsets as in the previous example with the ConsumerOffsetChecker, we "describe" the consumer group like this: -+> bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --describe --group test-consumer-group TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID @@ -175,7 +175,7 @@ If you are using the old high-level consumer and storing the group metadata in ZooKeeper (i.e.offsets.storage=zookeeper
), pass--zookeeper
instead ofbootstrap-server
: -+> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list@@ -199,7 +199,7 @@ For instance, the following example will move all partitions for topics foo1,foo2 to the new set of brokers 5,6. At the end of this move, all partitions for topics foo1 and foo2 will only exist on brokers 5,6.Since the tool accepts the input list of topics as a json file, you first need to identify the topics you want to move and create the json file as follows: -
+> cat topics-to-move.json {"topics": [{"topic": "foo1"}, {"topic": "foo2"}], @@ -207,7 +207,7 @@ }Once the json file is ready, use the partition reassignment tool to generate a candidate assignment: -+> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate Current partition replica assignment @@ -233,7 +233,7 @@The tool generates a candidate assignment that will move all partitions from topics foo1,foo2 to brokers 5,6. Note, however, that at this point, the partition movement has not started, it merely tells you the current assignment and the proposed new assignment. The current assignment should be saved in case you want to rollback to it. The new assignment should be saved in a json file (e.g. expand-cluster-reassignment.json) to be input to the tool with the --execute option as follows: -
+> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute Current partition replica assignment @@ -259,7 +259,7 @@Finally, the --verify option can be used with the tool to check the status of the partition reassignment. Note that the same expand-cluster-reassignment.json (used with the --execute option) should be used with the --verify option: -
+> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully @@ -276,12 +276,12 @@ For instance, the following example moves partition 0 of topic foo1 to brokers 5,6 and partition 1 of topic foo2 to brokers 2,3:The first step is to hand craft the custom reassignment plan in a json file: -
+> cat custom-reassignment.json {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}Then, use the json file with the --execute option to start the reassignment process: -+> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute Current partition replica assignment @@ -299,8 +299,8 @@The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same expand-cluster-reassignment.json (used with the --execute option) should be used with the --verify option: -
- bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify ++ > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully Reassignment of partition [foo2,1] completed successfully @@ -315,13 +315,13 @@ For instance, the following example increases the replication factor of partition 0 of topic foo from 1 to 3. Before increasing the replication factor, the partition's only replica existed on broker 5. As part of increasing the replication factor, we will add more replicas on brokers 6 and 7.The first step is to hand craft the custom reassignment plan in a json file: -
+> cat increase-replication-factor.json {"version":1, "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}Then, use the json file with the --execute option to start the reassignment process: -+> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute Current partition replica assignment @@ -335,13 +335,13 @@The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same increase-replication-factor.json (used with the --execute option) should be used with the --verify option: -
- bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify ++ > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify Status of partition reassignment: Reassignment of partition [foo,0] completed successfullyYou can also verify the increase in replication factor with the kafka-topics tool: -+> bin/kafka-topics.sh --zookeeper localhost:2181 --topic foo --describe Topic:foo PartitionCount:1 ReplicationFactor:3 Configs: Topic: foo Partition: 0 Leader: 5 Replicas: 5,6,7 Isr: 5,6,7 @@ -353,13 +353,13 @@ There are two interfaces that can be used to engage a throttle. The simplest, and safest, is to apply a throttle when invoking the kafka-reassign-partitions.sh, but kafka-configs.sh can also be used to view and alter the throttle values directly. So for example, if you were to execute a rebalance, with the below command, it would move partitions at no more than 50MB/s. -$ bin/kafka-reassign-partitions.sh --zookeeper myhost:2181--execute --reassignment-json-file bigger-cluster.json —throttle 50000000+$ bin/kafka-reassign-partitions.sh --zookeeper myhost:2181--execute --reassignment-json-file bigger-cluster.json —throttle 50000000When you execute this script you will see the throttle engage: -+The throttle limit was set to 50000000 B/s Successfully started reassignment of partitions.Should you wish to alter the throttle, during a rebalance, say to increase the throughput so it completes quicker, you can do this by re-running the execute command passing the same reassignment-json-file:
-$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file bigger-cluster.json --throttle 700000000 +$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file bigger-cluster.json --throttle 700000000 There is an existing assignment running. The throttle limit was set to 700000000 B/s@@ -369,7 +369,8 @@ the --verify option. Failure to do so could cause regular replication traffic to be throttled.When the --verify option is executed, and the reassignment has completed, the script will confirm that the throttle was removed:
-$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --verify --reassignment-json-file bigger-cluster.json ++ > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --verify --reassignment-json-file bigger-cluster.json Status of partition reassignment: Reassignment of partition [my-topic,1] completed successfully Reassignment of partition [mytopic,0] completed successfully @@ -379,19 +380,20 @@ configuration used to manage the throttling process. The throttle value itself. This is configured, at a broker level, using the dynamic properties: -leader.replication.throttled.rate +leader.replication.throttled.rate follower.replication.throttled.rateThere is also an enumerated set of throttled replicas:
-leader.replication.throttled.replicas +leader.replication.throttled.replicas follower.replication.throttled.replicasWhich are configured per topic. All four config values are automatically assigned by kafka-reassign-partitions.sh (discussed below).
To view the throttle limit configuration:
-$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type brokers ++ > bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type brokers Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000 Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000@@ -400,7 +402,8 @@To view the list of throttled replicas:
-$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics ++ > bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101, follower.replication.throttled.replicas=1:101,0:102@@ -448,19 +451,19 @@ It is possible to set custom quotas for each (user, client-id), user or client-id group.Configure custom quota for (user=user1, client-id=clientA): -
+> bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA Updated config for entity: user-principal 'user1', client-id 'clientA'.Configure custom quota for user=user1: -+> bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 Updated config for entity: user-principal 'user1'.Configure custom quota for client-id=clientA: -+> bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA Updated config for entity: client-id 'clientA'.@@ -468,53 +471,53 @@ It is possible to set default quotas for each (user, client-id), user or client-id group by specifying --entity-default option instead of --entity-name.Configure default client-id quota for user=userA: -
+> bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default Updated config for entity: user-principal 'user1', default client-id.Configure default quota for user: -+> bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default Updated config for entity: default user-principal.Configure default quota for client-id: -+> bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default Updated config for entity: default client-id.Here's how to describe the quota for a given (user, client-id): -+> bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200Describe quota for a given user: -+> bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200Describe quota for a given client-id: -+> bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type clients --entity-name clientA Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200If entity name is not specified, all entities of the specified type are described. For example, describe all users: -+> bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200 Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200Similarly for (user, client): -+> bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-type clients Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200 Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200It is possible to set default quotas that apply to all client-ids by setting these configs on the brokers. These properties are applied only if quota overrides or defaults are not configured in Zookeeper. By default, each client-id receives an unlimited quota. The following sets the default quota per producer and consumer client-id to 10MB/sec. -
+quota.producer.default=10485760 quota.consumer.default=10485760@@ -556,7 +559,7 @@
A Production Server Config
Here is an example production server configuration: -+# ZooKeeper zookeeper.connect=[list of ZooKeeper servers] @@ -582,7 +585,7 @@ LinkedIn is currently running JDK 1.8 u5 (looking to upgrade to a newer version) with the G1 collector. If you decide to use the G1 collector (the current default) and you are still on JDK 1.7, make sure you are on u51 or newer. LinkedIn tried out u21 in testing, but they had a number of problems with the GC implementation in that version. LinkedIn's tuning looks like this: -+-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 @@ -648,9 +651,7 @@ When Pdflush cannot keep up with the rate of data being written it will eventually cause the writing process to block incurring latency in the writes to slow down the accumulation of data.You can see the current state of OS memory usage by doing -
- > cat /proc/meminfo -+> cat /proc/meminfoThe meaning of these values are described in the link above.Using pagecache has several advantages over an in-process cache for storing data that will be written out to disk: diff --git a/docs/quickstart.html b/docs/quickstart.html index 461e18474dc..b8722a6b68c 100644 --- a/docs/quickstart.html +++ b/docs/quickstart.html @@ -27,9 +27,9 @@ Since Kafka console scripts are different for Unix-based and Windows platforms, Download the 0.10.2.0 release and un-tar it. -
-> tar -xzf kafka_2.11-0.10.2.0.tgz -> cd kafka_2.11-0.10.2.0 ++> tar -xzf kafka_2.11-0.10.2.0.tgz +> cd kafka_2.11-0.10.2.0Step 2: Start the server
@@ -38,15 +38,15 @@ Since Kafka console scripts are different for Unix-based and Windows platforms, Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance. --> bin/zookeeper-server-start.sh config/zookeeper.properties ++> bin/zookeeper-server-start.sh config/zookeeper.properties [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ...Now start the Kafka server:
--> bin/kafka-server-start.sh config/server.properties ++> bin/kafka-server-start.sh config/server.properties [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ... @@ -55,13 +55,13 @@ Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don'tStep 3: Create a topic
Let's create a topic named "test" with a single partition and only one replica:
--> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test ++> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testWe can now see that topic if we run the list topic command:
--> bin/kafka-topics.sh --list --zookeeper localhost:2181 ++> bin/kafka-topics.sh --list --zookeeper localhost:2181 testAlternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.
@@ -72,18 +72,18 @@ testRun the producer and then type a few messages into the console to send to the server.
--> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test -This is a message -This is another message ++> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test +This is a message +This is another messageStep 5: Start a consumer
Kafka also has a command line consumer that will dump out messages to standard output.
--> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning ++> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message@@ -100,15 +100,15 @@ All of the command line tools have additional options; running the command withFirst we make a config file for each of the brokers (on Windows use the
-copy
command instead):-> cp config/server.properties config/server-1.properties -> cp config/server.properties config/server-2.properties ++> cp config/server.properties config/server-1.properties +> cp config/server.properties config/server-2.propertiesNow edit these new files and set the following properties:
-+config/server-1.properties: broker.id=1 @@ -124,21 +124,21 @@ config/server-2.properties:We already have Zookeeper and our single node started, so we just need to start the two new nodes:
--> bin/kafka-server-start.sh config/server-1.properties & ++> bin/kafka-server-start.sh config/server-1.properties & ... -> bin/kafka-server-start.sh config/server-2.properties & +> bin/kafka-server-start.sh config/server-2.properties & ...Now create a new topic with a replication factor of three:
--> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic ++> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topicOkay but now that we have a cluster how can we know which broker is doing what? To see that run the "describe topics" command:
--> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic ++> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0@@ -152,8 +152,8 @@ Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:We can run the same command on the original topic we created to see where it is:
--> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test ++> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0@@ -161,50 +161,50 @@ Topic:test PartitionCount:1 ReplicationFactor:1 Configs:Let's publish a few messages to our new topic:
--> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic ++> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic ... -my test message 1 -my test message 2 -^C +my test message 1 +my test message 2 +^CNow let's consume these messages:
--> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ++> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 -^C +^CNow let's test out fault-tolerance. Broker 1 was acting as the leader so let's kill it:
--> ps aux | grep server-1.properties -7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java... -> kill -9 7564 ++> ps aux | grep server-1.properties +7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java... +> kill -9 7564On Windows use: --> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties" -java.exe java -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.11-0.10.2.0.jar" kafka.Kafka config\server-1.properties 644 -> taskkill /pid 644 /f ++> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties" +java.exe java -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.11-0.10.2.0.jar" kafka.Kafka config\server-1.properties 644 +> taskkill /pid 644 /fLeadership has switched to one of the slaves and node 1 is no longer in the in-sync replica set:
--> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic ++> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0But the messages are still available for consumption even though the leader that took the writes originally is down:
--> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ++> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 -^C +^C@@ -221,8 +221,8 @@ Kafka topic to a file.First, we'll start by creating some seed data to test with:
--> echo -e "foo\nbar" > test.txt ++> echo -e "foo\nbar" > test.txtNext, we'll start two connectors running in standalone mode, which means they run in a single, local, dedicated @@ -231,8 +231,8 @@ process, containing common configuration such as the Kafka brokers to connect to The remaining configuration files each specify a connector to create. These files include a unique connector name, the connector class to instantiate, and any other configuration required by the connector.
--> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties ++> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties@@ -250,8 +250,8 @@ by examining the contents of the output file:
--> cat test.sink.txt ++> cat test.sink.txt foo bar@@ -262,8 +262,8 @@ data in the topic (or use custom consumer code to process it): --> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning ++> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"} ... @@ -271,8 +271,8 @@ data in the topic (or use custom consumer code to process it):The connectors continue to process data, so we can add data to the file and see it move through the pipeline:
--> echo "Another line" >> test.txt ++> echo "Another line" >> test.txtYou should see the line appear in the console consumer output and in the sink file.
@@ -284,7 +284,7 @@ Kafka Streams is a client library of Kafka for real-time stream processing and a This quickstart example will demonstrate how to run a streaming application coded in this library. Here is the gist of theWordCountDemo
example code (converted to use Java 8 lambda expressions for easy reading). -+// Serializers/deserializers (serde) for String and Long types final Serde<String> stringSerde = Serdes.String(); final Serde<Long> longSerde = Serdes.Long(); @@ -333,14 +333,14 @@ As the first step, we will prepare input data to a Kafka topic, which will subse --> --> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt ++> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txtOr on Windows: --> echo all streams lead to kafka> file-input.txt -> echo hello kafka streams>> file-input.txt -> echo|set /p=join kafka summit>> file-input.txt ++> echo all streams lead to kafka> file-input.txt +> echo hello kafka streams>> file-input.txt +> echo|set /p=join kafka summit>> file-input.txt@@ -349,25 +349,25 @@ which reads the data from STDIN line-by-line, and publishes each line as a separ stream data will likely be flowing continuously into Kafka where the application will be up and running):
--> bin/kafka-topics.sh --create \ - --zookeeper localhost:2181 \ - --replication-factor 1 \ - --partitions 1 \ - --topic streams-file-input ++> bin/kafka-topics.sh --create \ + --zookeeper localhost:2181 \ + --replication-factor 1 \ + --partitions 1 \ + --topic streams-file-input--> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt ++> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txtWe can now run the WordCount demo application to process the input data:
--> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo ++> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo@@ -380,22 +380,22 @@ The demo will run for a few seconds and then, unlike typical stream processing a We can now inspect the output of the WordCount demo application by reading from its output topic:
--> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ - --topic streams-wordcount-output \ - --from-beginning \ - --formatter kafka.tools.DefaultMessageFormatter \ - --property print.key=true \ - --property print.value=true \ - --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ - --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer ++> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ + --topic streams-wordcount-output \ + --from-beginning \ + --formatter kafka.tools.DefaultMessageFormatter \ + --property print.key=true \ + --property print.value=true \ + --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ + --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializerwith the following output data being printed to the console:
-+all 1 lead 1 to 1 diff --git a/docs/security.html b/docs/security.html index b2d47598887..09a6c332e43 100644 --- a/docs/security.html +++ b/docs/security.html @@ -42,7 +42,7 @@Generate SSL key and certificate for each Kafka broker
The first step of deploying HTTPS is to generate the key and the certificate for each machine in the cluster. You can use Java's keytool utility to accomplish this task. We will generate the key into a temporary keystore initially so that we can export and sign it later with CA. -+keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkeyYou need to specify two parameters in the above command: @@ -53,7 +53,7 @@
Note: By default the propertyssl.endpoint.identification.algorithm
is not defined, so hostname verification is not performed. In order to enable hostname verification, set the following property: -ssl.endpoint.identification.algorithm=HTTPS+ssl.endpoint.identification.algorithm=HTTPSOnce enabled, clients will verify the server's fully qualified domain name (FQDN) against one of the following two fields:@@ -62,43 +62,43 @@
Both fields are valid, RFC-2818 recommends the use of SAN however. SAN is also more flexible, allowing for multiple DNS entries to be declared. Another advantage is that the CN can be set to a more meaningful value for authorization purposes. To add a SAN field append the following argument-ext SAN=DNS:{FQDN}
to the keytool command: -+keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -ext SAN=DNS:{FQDN}The following command can be run afterwards to verify the contents of the generated certificate: -+keytool -list -v -keystore server.keystore.jksCreating your own CA
After the first step, each machine in the cluster has a public-private key pair, and a certificate to identify the machine. The certificate, however, is unsigned, which means that an attacker can create such a certificate to pretend to be any machine.Therefore, it is important to prevent forged certificates by signing them for each machine in the cluster. A certificate authority (CA) is responsible for signing certificates. CA works likes a government that issues passports—the government stamps (signs) each passport so that the passport becomes difficult to forge. Other governments verify the stamps to ensure the passport is authentic. Similarly, the CA signs the certificates, and the cryptography guarantees that a signed certificate is computationally difficult to forge. Thus, as long as the CA is a genuine and trusted authority, the clients have high assurance that they are connecting to the authentic machines. -
- openssl req -new -x509 -keyout ca-key -out ca-cert -days 365++ openssl req -new -x509 -keyout ca-key -out ca-cert -days 365The generated CA is simply a public-private key pair and certificate, and it is intended to sign other certificates.
The next step is to add the generated CA to the **clients' truststore** so that the clients can trust this CA: -+keytool -keystore client.truststore.jks -alias CARoot -import -file ca-certNote: If you configure the Kafka brokers to require client authentication by setting ssl.client.auth to be "requested" or "required" on the Kafka brokers config then you must provide a truststore for the Kafka brokers as well and it should have all the CA certificates that clients' keys were signed by. -- keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert++ keytool -keystore server.truststore.jks -alias CARoot -import -file ca-certIn contrast to the keystore in step 1 that stores each machine's own identity, the truststore of a client stores all the certificates that the client should trust. Importing a certificate into one's truststore also means trusting all certificates that are signed by that certificate. As the analogy above, trusting the government (CA) also means trusting all passports (certificates) that it has issued. This attribute is called the chain of trust, and it is particularly useful when deploying SSL on a large Kafka cluster. You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that trusts the CA. That way all machines can authenticate all other machines.Signing the certificate
The next step is to sign all certificates generated by step 1 with the CA generated in step 2. First, you need to export the certificate from the keystore: -+keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-fileThen sign it with the CA: -+openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}Finally, you need to import both the certificate of the CA and the signed certificate into the keystore: -+keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed@@ -132,11 +132,11 @@listenersIf SSL is not enabled for inter-broker communication (see below for how to enable it), both PLAINTEXT and SSL ports will be necessary. -+listeners=PLAINTEXT://host.name:port,SSL://host.name:portFollowing SSL configs are needed on the broker side -+ssl.keystore.location=/var/private/ssl/server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 @@ -189,7 +189,7 @@@@ -278,7 +278,7 @@ SCRAM. For example, GSSAPI credentials may be configured as: - Configuring Kafka Clients
SSL is supported only for the new Kafka Producer and Consumer, the older API is not supported. The configs for SSL will be the same for both producer and consumer.
If client authentication is not required in the broker, then the following is a minimal configuration example: -+security.protocol=SSL ssl.truststore.location=/var/private/ssl/client.truststore.jks ssl.truststore.password=test1234@@ -197,7 +197,7 @@ Note: ssl.truststore.password is technically optional but highly recommended. If a password is not set access to the truststore is still available, but integrity checking is disabled. If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured: -+ssl.keystore.location=/var/private/ssl/client.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234@@ -212,7 +212,7 @@
Examples using console-producer and console-consumer: -+kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.properties kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties+KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true @@ -288,7 +288,7 @@ };Pass the JAAS config file location as JVM parameter to each client JVM. For example: - +-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf@@ -350,7 +350,7 @@Create Kerberos Principals
If you are using the organization's Kerberos or Active Directory server, ask your Kerberos administrator for a principal for each Kafka broker in your cluster and for every operating system user that will access Kafka with Kerberos authentication (via clients and tools). If you have installed your own Kerberos, you will need to create these principals yourself using the following commands: -+sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}' sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"Make sure all hosts can be reachable using hostnames - it is a Kerberos requirement that all your hosts can be resolved with their FQDNs. @@ -358,7 +358,7 @@Configuring Kafka Brokers
- Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example (note that each broker should have its own keytab): -
+KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true @@ -384,7 +384,7 @@ -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf- Make sure the keytabs configured in the JAAS file are readable by the operating system user who is starting kafka broker.
-- Configure SASL port and SASL mechanisms in server.properties as described here.
listeners=SASL_PLAINTEXT://host.name:port security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=GSSAPI @@ -422,7 +422,6 @@ as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.
-Djava.security.krb5.conf=/etc/kafka/krb5.conf
+KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" @@ -458,7 +457,7 @@ those from other brokers using these properties.
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
listeners=SASL_SSL://host.name:port security.inter.broker.protocol=SASL_SSL sasl.mechanism.inter.broker.protocol=PLAIN @@ -472,7 +471,7 @@
+sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="alice" \ password="alice-secret";@@ -538,23 +537,23 @@ before Kafka brokers are started. Client credentials may be created and updated dynamically and updated credentials will be used to authenticate new connections.Create SCRAM credentials for user alice with password alice-secret: -
- bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice ++ > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name aliceThe default iteration count of 4096 is used if iterations are not specified. A random salt is created and the SCRAM identity consisting of salt, iterations, StoredKey and ServerKey are stored in Zookeeper. See RFC 5802 for details on SCRAM identity and the individual fields.
The following examples also require a user admin for inter-broker communication which can be created using: -
- bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin ++ > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name adminExisting credentials may be listed using the --describe option: -
- bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name alice ++ > bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name aliceCredentials may be deleted for one or more SCRAM mechanisms using the --delete option: -
- bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice ++ > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice
+sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="alice" \ password="alice-secret";@@ -599,7 +598,6 @@JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.
security.protocol=SASL_SSL @@ -791,25 +789,25 @@
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic+
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topicBy default, all principals that don't have an explicit acl that allows access for an operation to a resource are denied. In rare cases where an allow acl is defined that allows access to all but some principal we will have to use the --deny-principal and --deny-host option. For example, if we want to allow all users to Read from Test-topic but only deny User:BadBob from IP 198.51.100.3 we can do so using following commands: -
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic+
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topicNote that ``--allow-host`` and ``deny-host`` only support IP addresses (hostnames are not supported). Above examples add acls to a topic by specifying --topic [topic-name] as the resource option. Similarly user can add acls to cluster by specifying --cluster and to a consumer group by specifying --group [group-name].
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic+
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topicSimilarly to add Alice as a consumer of Test-topic with consumer group Group-1 we just have to pass --consumer option: -
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1+
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1Note that for consumer option we must also specify the consumer group. In order to remove a principal from producer or consumer role we just need to pass --remove option.
- listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092+ listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092 + We then restart the clients, changing their config to point at the newly opened, secured port:
bootstrap.servers = [broker1:9092,...] security.protocol = SSL - ...etc+ ...etc + In the second incremental server bounce we instruct Kafka to use SSL as the broker-broker protocol (which will use the same SSL port):
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092 - security.inter.broker.protocol=SSL+ security.inter.broker.protocol=SSL + In the final bounce we secure the cluster by closing the PLAINTEXT port:
listeners=SSL://broker1:9092 - security.inter.broker.protocol=SSL+ security.inter.broker.protocol=SSL + Alternatively we might choose to open multiple ports so that different protocols can be used for broker-broker and broker-client communication. Say we wished to use SSL encryption throughout (i.e. for broker-broker and broker-client communication) but we'd like to add SASL authentication to the broker-client connection also. We would achieve this by opening two additional ports during the first bounce:
- listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093+ listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093 + We would then restart the clients, changing their config to point at the newly opened, SASL & SSL secured port:
bootstrap.servers = [broker1:9093,...] security.protocol = SASL_SSL - ...etc+ ...etc + The second server bounce would switch the cluster to use encrypted broker-broker communication via the SSL port we previously opened on port 9092:
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093 - security.inter.broker.protocol=SSL+ security.inter.broker.protocol=SSL + The final bounce secures the cluster by closing the PLAINTEXT port.
listeners=SSL://broker1:9092,SASL_SSL://broker1:9093 - security.inter.broker.protocol=SSL+ security.inter.broker.protocol=SSL + ZooKeeper can be secured independently of the Kafka cluster. The steps for doing this are covered in section 7.6.2. @@ -907,11 +913,11 @@
+./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181Run this to see the full list of parameters:
-+./bin/zookeeper-security-migration.sh --help7.6.3 Migrating the ZooKeeper ensemble
diff --git a/docs/streams.html b/docs/streams.html index bff9bf07dbd..8192bf7e7e4 100644 --- a/docs/streams.html +++ b/docs/streams.html @@ -306,7 +306,7 @@ The following exampleProcessor
implementation defines a simple word-count algorithm: -+public class MyProcessor implements Processor<String, String> { private ProcessorContext context; private KeyValueStore<String, Long> kvStore; @@ -380,7 +380,7 @@ public class MyProcessor implements Processor<String, String> { by connecting these processors together: -+TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE", "src-topic") @@ -424,7 +424,7 @@ builder.addSource("SOURCE", "src-topic") In the following example, a persistent key-value store named “Counts” with key typeString
and value typeLong
is created. -+StateStoreSupplier countStore = Stores.create("Counts") .withKeys(Serdes.String()) .withValues(Serdes.Long()) @@ -438,7 +438,7 @@ StateStoreSupplier countStore = Stores.create("Counts") state store with the existing processor nodes throughTopologyBuilder.connectProcessorAndStateStores
. -+TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE", "src-topic") @@ -526,7 +526,7 @@ builder.addSource("SOURCE", "src-topic") from a single topic). -+KStreamBuilder builder = new KStreamBuilder(); KStream<String, GenericRecord> source1 = builder.stream("topic1", "topic2"); @@ -606,7 +606,7 @@ GlobalKTable<String, GenericRecord> source2 = builder.globalTable("topic4" -+// written in Java 8+, using lambda expressions KStream<String, GenericRecord> mapped = source1.mapValue(record -> record.get("category"));@@ -620,7 +620,7 @@ KStream<String, GenericRecord> mapped = source1.mapValue(record -> record. based on them. -+// written in Java 8+, using lambda expressions KTable<Windowed<String>, Long> counts = source1.groupByKey().aggregate( () -> 0L, // initial value @@ -641,7 +641,7 @@ KStream<String, String> joined = source1.leftJoin(source2,KStream.to
andKTable.to
. -+joined.to("topic4");@@ -649,7 +649,7 @@ joined.to("topic4"); to a topic viato
above, one option is to construct a new stream that reads from the output topic; Kafka Streams provides a convenience method calledthrough
: -+// equivalent to // // joined.to("topic4"); @@ -677,7 +677,7 @@ KStream<String, String> materialized = joined.through("topic4"); set the necessary parameters, and construct aStreamsConfig
instance from theProperties
instance. -+import java.util.Properties; import org.apache.kafka.streams.StreamsConfig; @@ -702,7 +702,7 @@ StreamsConfig config = new StreamsConfig(settings); If you want to set different values for consumer and producer for such a parameter, you can prefix the parameter name withconsumer.
orproducer.
: -+Properties settings = new Properties(); // Example of a "normal" setting for Kafka Streams settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092"); @@ -750,7 +750,7 @@ settings.put(StremasConfig.producerConfig(ProducerConfig.RECEIVE_BUFFER_CONFIG), that is used to define a topology; The second argument is an instance ofStreamsConfig
mentioned above. -+import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStreamBuilder; @@ -778,7 +778,7 @@ KafkaStreams streams = new KafkaStreams(builder, config); At this point, internal structures have been initialized, but the processing is not started yet. You have to explicitly start the Kafka Streams thread by calling thestart()
method: -+// Start the Kafka Streams instance streams.start();@@ -787,7 +787,7 @@ streams.start(); To catch any unexpected exceptions, you may set anjava.lang.Thread.UncaughtExceptionHandler
before you start the application. This handler is called whenever a stream thread is terminated by an unexpected exception: -+streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public uncaughtException(Thread t, throwable e) { // here you should examine the exception and perform an appropriate action! @@ -799,7 +799,7 @@ streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { To stop the application instance call theclose()
method: -+// Stop the Kafka Streams instance streams.close();@@ -807,7 +807,7 @@ streams.close(); Now it's time to execute your application that uses the Kafka Streams library, which can be run just like any other Java application – there is no special magic or requirement on the side of Kafka Streams. For example, you can package your Java application as a fat jar file and then start the application via: -+# Start the application in class `com.example.MyStreamsApp` # from the fat jar named `path-to-app-fatjar.jar`. $ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp