diff --git a/docs/api.html b/docs/api.html index 5f47c3aa28f..7966b901c1a 100644 --- a/docs/api.html +++ b/docs/api.html @@ -35,7 +35,7 @@

To use the producer, you can use the following maven dependency: -

+	
 		<dependency>
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka-clients</artifactId>
@@ -51,7 +51,7 @@
 	javadocs.
 	

To use the consumer, you can use the following maven dependency: -

+	
 		<dependency>
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka-clients</artifactId>
@@ -70,7 +70,7 @@
 	

To use Kafka Streams you can use the following maven dependency: -

+	
 		<dependency>
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka-streams</artifactId>
@@ -92,7 +92,7 @@
 	The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects.
 	

To use the AdminClient API, add the following Maven dependency: -

+	
 		<dependency>
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka-clients</artifactId>
diff --git a/docs/configuration.html b/docs/configuration.html
index 2cad283c428..ea4a5bb231e 100644
--- a/docs/configuration.html
+++ b/docs/configuration.html
@@ -36,23 +36,24 @@
   Topic-level configuration
 
   Configurations pertinent to topics have both a server default as well an optional per-topic override. If no per-topic configuration is given the server default is used. The override can be set at topic creation time by giving one or more --config options. This example creates a topic named my-topic with a custom max message size and flush rate:
-  
-   > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
-          --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
+  
+  > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
+      --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
   
Overrides can also be changed or set later using the alter configs command. This example updates the max message size for my-topic: -
-   > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --add-config max.message.bytes=128000
+  
+  > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic
+      --alter --add-config max.message.bytes=128000
   
To check overrides set on the topic you can do -
-   > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe
+  
+  > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe
   
To remove an override you can do -
-   > bin/kafka-configs.sh --zookeeper localhost:2181  --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes
+  
+  > bin/kafka-configs.sh --zookeeper localhost:2181  --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes
   
The following are the topic-level configurations. The server's default configuration for this property is given under the Server Default Property heading. A given server default config value only applies to a topic if it does not have an explicit topic config override. diff --git a/docs/connect.html b/docs/connect.html index 48c51395409..57505d6fa0b 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -40,7 +40,7 @@ In standalone mode all work is performed in a single process. This configuration is simpler to setup and get started with and may be useful in situations where only one worker makes sense (e.g. collecting log files), but it does not benefit from some of the features of Kafka Connect such as fault tolerance. You can start a standalone process with the following command: -
+    
     > bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
     
@@ -62,7 +62,7 @@ Distributed mode handles automatic balancing of work, allows you to scale up (or down) dynamically, and offers fault tolerance both in the active tasks and for configuration and offset commit data. Execution is very similar to standalone mode: -
+    
     > bin/connect-distributed.sh config/connect-distributed.properties
     
@@ -118,7 +118,7 @@

Throughout the example we'll use schemaless JSON data format. To use schemaless format, we changed the following two lines in connect-standalone.properties from true to false:

-
+    
         key.converter.schemas.enable
         value.converter.schemas.enable
     
@@ -131,7 +131,7 @@ After adding the transformations, connect-file-source.properties file looks as following: -
+    
         name=local-file-source
         connector.class=FileStreamSource
         tasks.max=1
@@ -149,7 +149,7 @@
 
     When we ran the file source connector on my sample file without the transformations, and then read them using kafka-console-consumer.sh, the results were:
 
-    
+    
         "foo"
         "bar"
         "hello world"
@@ -157,7 +157,7 @@
 
     We then create a new file connector, this time after adding the transformations to the configuration file. This time, the results will be:
 
-    
+    
         {"line":"foo","data_source":"test-file-source"}
         {"line":"bar","data_source":"test-file-source"}
         {"line":"hello world","data_source":"test-file-source"}
@@ -247,7 +247,7 @@
 
     We'll cover the SourceConnector as a simple example. SinkConnector implementations are very similar. Start by creating the class that inherits from SourceConnector and add a couple of fields that will store parsed configuration information (the filename to read from and the topic to send data to):
 
-    
+    
     public class FileStreamSourceConnector extends SourceConnector {
         private String filename;
         private String topic;
@@ -255,7 +255,7 @@
 
     The easiest method to fill in is getTaskClass(), which defines the class that should be instantiated in worker processes to actually read the data:
 
-    
+    
     @Override
     public Class<? extends Task> getTaskClass() {
         return FileStreamSourceTask.class;
@@ -264,7 +264,7 @@
 
     We will define the FileStreamSourceTask class below. Next, we add some standard lifecycle methods, start() and stop():
 
-    
+    
     @Override
     public void start(Map<String, String> props) {
         // The complete version includes error handling as well.
@@ -282,7 +282,7 @@
     handling a single file, so even though we may be permitted to generate more tasks as per the
     maxTasks argument, we return a list with only one entry:
 
-    
+    
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
         ArrayList<Map<String, String>> configs = new ArrayList<>();
@@ -310,7 +310,7 @@
     Just as with the connector, we need to create a class inheriting from the appropriate base Task class. It also has some standard lifecycle methods:
 
 
-    
+    
     public class FileStreamSourceTask extends SourceTask {
         String filename;
         InputStream stream;
@@ -333,7 +333,7 @@
 
     Next, we implement the main functionality of the task, the poll() method which gets events from the input system and returns a List<SourceRecord>:
 
-    
+    
     @Override
     public List<SourceRecord> poll() throws InterruptedException {
         try {
@@ -365,7 +365,7 @@
 
     The previous section described how to implement a simple SourceTask. Unlike SourceConnector and SinkConnector, SourceTask and SinkTask have very different interfaces because SourceTask uses a pull interface and SinkTask uses a push interface. Both share the common lifecycle methods, but the SinkTask interface is quite different:
 
-    
+    
     public abstract class SinkTask implements Task {
         public void initialize(SinkTaskContext context) {
             this.context = context;
@@ -388,7 +388,7 @@
 
     To correctly resume upon startup, the task can use the SourceContext passed into its initialize() method to access the offset data. In initialize(), we would add a bit more code to read the offset (if it exists) and seek to that position:
 
-    
+    
         stream = new FileInputStream(filename);
         Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
         if (offset != null) {
@@ -406,7 +406,7 @@
 
     Source connectors need to monitor the source system for changes, e.g. table additions/deletions in a database. When they pick up changes, they should notify the framework via the ConnectorContext object that reconfiguration is necessary. For example, in a SourceConnector:
 
-    
+    
         if (inputsChanged())
             this.context.requestTaskReconfiguration();
     
@@ -423,7 +423,7 @@ The following code in FileStreamSourceConnector defines the configuration and exposes it to the framework. -
+    
         private static final ConfigDef CONFIG_DEF = new ConfigDef()
             .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
             .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
@@ -445,7 +445,7 @@
 
     The API documentation provides a complete reference, but here is a simple example creating a Schema and Struct:
 
-    
+    
     Schema schema = SchemaBuilder.struct().name(NAME)
         .field("name", Schema.STRING_SCHEMA)
         .field("age", Schema.INT_SCHEMA)
@@ -473,7 +473,7 @@
     When a connector is first submitted to the cluster, the workers rebalance the full set of connectors in the cluster and their tasks so that each worker has approximately the same amount of work. This same rebalancing procedure is also used when connectors increase or decrease the number of tasks they require, or when a connector's configuration is changed. You can use the REST API to view the current status of a connector and its tasks, including the id of the worker to which each was assigned. For example, querying the status of a file source (using GET /connectors/file-source/status) might produce output like the following:
     

-
+    
     {
     "name": "file-source",
     "connector": {
diff --git a/docs/design.html b/docs/design.html
index c061cf453ae..0373a2d197c 100644
--- a/docs/design.html
+++ b/docs/design.html
@@ -417,7 +417,7 @@
     

Let's discuss a concrete example of such a stream. Say we have a topic containing user email addresses; every time a user updates their email address we send a message to this topic using their user id as the primary key. Now say we send the following messages over some time period for a user with id 123, each message corresponding to a change in email address (messages for other ids are omitted): -

+    
         123 => bill@microsoft.com
                 .
                 .
@@ -510,11 +510,11 @@
 
     The log cleaner is enabled by default. This will start the pool of cleaner threads. 
     To enable log cleaning on a particular topic you can add the log-specific property
-    
  log.cleanup.policy=compact
+
  log.cleanup.policy=compact
This can be done either at topic creation time or using the alter topic command.

The log cleaner can be configured to retain a minimum amount of the uncompacted "head" of the log. This is enabled by setting the compaction time lag. -

  log.cleaner.min.compaction.lag.ms
+
  log.cleaner.min.compaction.lag.ms
This can be used to prevent messages newer than a minimum message age from being subject to compaction. If not set, all log segments are eligible for compaction except for the last segment, i.e. the one currently being written to. The active segment will not be compacted even if all of its messages are older than the minimum compaction time lag. diff --git a/docs/implementation.html b/docs/implementation.html index 48602f26301..5ff75ec5053 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -22,8 +22,8 @@

The Producer API that wraps the 2 low-level producers - kafka.producer.SyncProducer and kafka.producer.async.AsyncProducer. -

-    class Producer {
+    
+    class Producer<T> {
 
     /* Sends the data, partitioned by key to the topic using either the */
     /* synchronous or the asynchronous producer */
@@ -48,7 +48,7 @@
     

  • handles the serialization of data through a user-specified Encoder: -
    +    
         interface Encoder<T> {
         public Message toMessage(T data);
         }
    @@ -58,7 +58,7 @@
         
  • provides software load balancing through an optionally user-specified Partitioner:

    The routing decision is influenced by the kafka.producer.Partitioner. -

    +    
         interface Partitioner<T> {
         int partition(T key, int numPartitions);
         }
    @@ -78,7 +78,7 @@
         

    Low-level API
    -
    +    
         class SimpleConsumer {
     
         /* Send fetch request to a broker and get back a set of messages. */
    @@ -101,7 +101,7 @@
         The low-level API is used to implement the high-level API as well as being used directly for some of our offline consumers which have particular requirements around maintaining state.
     
         
    High-level API
    -
    +    
     
         /* create a connection to the cluster */
         ConsumerConnector connector = Consumer.create(consumerConfig);
    @@ -156,8 +156,8 @@
     
         

    5.4 Message Format

    -
    -        /**
    +    
    +       /**
             * 1. 4 byte CRC32 of the message
             * 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
             * 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
    @@ -185,7 +185,7 @@
         

    The exact binary format for messages is versioned and maintained as a standard interface so message sets can be transferred between producer, broker, and client without recopying or conversion when desirable. This format is as follows:

    -
    +    
         On-disk format of a message
     
         offset         : 8 bytes 
    @@ -220,7 +220,7 @@
     
         

    The following is the format of the results sent to the consumer. -

    +    
         MessageSetSend (fetch result)
     
         total length     : 4 bytes
    @@ -230,7 +230,7 @@
         message n        : x bytes
         
    -
    +    
         MultiMessageSetSend (multiFetch result)
     
         total length       : 4 bytes
    @@ -292,7 +292,7 @@
         

    Broker Node Registry

    -
    +    
         /brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)
         

    @@ -302,7 +302,7 @@ Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available).

    Broker Topic Registry

    -
    +    
         /brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)
         
    @@ -327,7 +327,7 @@

    Consumer Id Registry

    In addition to the group_id which is shared by all consumers in a group, each consumer is given a transient, unique consumer_id (of the form hostname:uuid) for identification purposes. Consumer ids are registered in the following directory. -

    +    
         /consumers/[group_id]/ids/[consumer_id] --> {"version":...,"subscription":{...:...},"pattern":...,"timestamp":...} (ephemeral node)
         
    Each of the consumers in the group registers under its group and creates a znode with its consumer_id. The value of the znode contains a map of <topic, #streams>. This id is simply used to identify each of the consumers which is currently active within a group. This is an ephemeral node so it will disappear if the consumer process dies. @@ -337,7 +337,7 @@

    Consumers track the maximum offset they have consumed in each partition. This value is stored in a ZooKeeper directory if offsets.storage=zookeeper.

    -
    +    
         /consumers/[group_id]/offsets/[topic]/[partition_id] --> offset_counter_value (persistent node)
         
    @@ -347,7 +347,7 @@ Each broker partition is consumed by a single consumer within a given consumer group. The consumer must establish its ownership of a given partition before any consumption can begin. To establish its ownership, a consumer writes its own id in an ephemeral node under the particular broker partition it is claiming.

    -
    +    
         /consumers/[group_id]/owners/[topic]/[partition_id] --> consumer_node_id (ephemeral node)
         
    @@ -389,7 +389,7 @@

    Each consumer does the following during rebalancing:

    -
    +    
         1. For each topic T that Ci subscribes to
         2.   let PT be all partitions producing topic T
         3.   let CG be all consumers in the same group as Ci that consume topic T
    diff --git a/docs/ops.html b/docs/ops.html
    index 46edb194c0b..94b6c2dc453 100644
    --- a/docs/ops.html
    +++ b/docs/ops.html
    @@ -27,7 +27,7 @@
       You have the option of either adding topics manually or having them be created automatically when data is first published to a non-existent topic. If topics are auto-created then you may want to tune the default topic configurations used for auto-created topics.
       

    Topics are added and modified using the topic tool: -

    +  
       > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
             --partitions 20 --replication-factor 3 --config x=y
       
    @@ -44,26 +44,26 @@ You can change the configuration or partitioning of a topic using the same topic tool.

    To add partitions you can do -

    +  
       > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name
             --partitions 40
       
    Be aware that one use case for partitions is to semantically partition data, and adding partitions doesn't change the partitioning of existing data so this may disturb consumers if they rely on that partition. That is if data is partitioned by hash(key) % number_of_partitions then this partitioning will potentially be shuffled by adding partitions but Kafka will not attempt to automatically redistribute data in any way.

    To add configs: -

    +  
       > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y
       
    To remove a config: -
    +  
       > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --delete-config x
       
    And finally deleting a topic: -
    +  
       > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
       
    Topic deletion option is disabled by default. To enable it set the server config -
    delete.topic.enable=true
    +
    delete.topic.enable=true

    Kafka does not currently support reducing the number of partitions for a topic.

    @@ -80,7 +80,7 @@ Syncing the logs will happen automatically whenever the server is stopped other than by a hard kill, but the controlled leadership migration requires using a special setting: -

    +  
           controlled.shutdown.enable=true
       
    Note that controlled shutdown will only succeed if all the partitions hosted on the broker have replicas (i.e. the replication factor is greater than 1 and at least one of these replicas is alive). This is generally what you want since shutting down the last replica would make that topic partition unavailable. @@ -90,12 +90,12 @@ Whenever a broker stops or crashes leadership for that broker's partitions transfers to other replicas. This means that by default when the broker is restarted it will only be a follower for all its partitions, meaning it will not be used for client reads and writes.

    To avoid this imbalance, Kafka has a notion of preferred replicas. If the list of replicas for a partition is 1,5,9 then node 1 is preferred as the leader to either node 5 or 9 because it is earlier in the replica list. You can have the Kafka cluster try to restore leadership to the restored replicas by running the command: -

    +  
       > bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
       
    Since running this command can be tedious you can also configure Kafka to do this automatically by setting the following configuration: -
    +  
           auto.leader.rebalance.enable=true
       
    @@ -103,7 +103,7 @@ The rack awareness feature spreads replicas of the same partition across different racks. This extends the guarantees Kafka provides for broker-failure to cover rack-failure, limiting the risk of data loss should all the brokers on a rack fail at once. The feature can also be applied to other broker groupings such as availability zones in EC2.

    You can specify that a broker belongs to a particular rack by adding a property to the broker config: -
       broker.rack=my-rack-id
    +
       broker.rack=my-rack-id
    When a topic is created, modified or replicas are redistributed, the rack constraint will be honoured, ensuring replicas span as many racks as they can (a partition will span min(#racks, replication-factor) different racks).

    The algorithm used to assign replicas to brokers ensures that the number of leaders per broker will be constant, regardless of how brokers are distributed across racks. This ensures balanced throughput. @@ -123,7 +123,7 @@ The source and destination clusters are completely independent entities: they can have different numbers of partitions and the offsets will not be the same. For this reason the mirror cluster is not really intended as a fault-tolerance mechanism (as the consumer position will be different); for that we recommend using normal in-cluster replication. The mirror maker process will, however, retain and use the message key for partitioning so order is preserved on a per-key basis.

    Here is an example showing how to mirror a single topic (named my-topic) from an input cluster: -

    +  
       > bin/kafka-mirror-maker.sh
             --consumer.config consumer.properties
             --producer.config producer.properties --whitelist my-topic
    @@ -139,7 +139,7 @@
     
       

    Checking consumer position

    Sometimes it's useful to see the position of your consumers. We have a tool that will show the position of all consumers in a consumer group as well as how far behind the end of the log they are. To run this tool on a consumer group named my-group consuming a topic named my-topic would look like this: -
    +  
       > bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
       Group           Topic                          Pid Offset          logSize         Lag             Owner
       my-group        my-topic                       0   0               0               0               test_jkreps-mn-1394154511599-60744496-0
    @@ -157,7 +157,7 @@
     
       For example, to list all consumer groups across all topics:
     
    -  
    +  
       > bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list
     
       test-consumer-group
    @@ -165,7 +165,7 @@
     
       To view offsets as in the previous example with the ConsumerOffsetChecker, we "describe" the consumer group like this:
     
    -  
    +  
       > bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --describe --group test-consumer-group
     
       TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
    @@ -175,7 +175,7 @@
       If you are using the old high-level consumer and storing the group metadata in ZooKeeper (i.e. offsets.storage=zookeeper), pass
       --zookeeper instead of bootstrap-server:
     
    -  
    +  
       > bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
       
    @@ -199,7 +199,7 @@ For instance, the following example will move all partitions for topics foo1,foo2 to the new set of brokers 5,6. At the end of this move, all partitions for topics foo1 and foo2 will only exist on brokers 5,6.

    Since the tool accepts the input list of topics as a json file, you first need to identify the topics you want to move and create the json file as follows: -

    +  
       > cat topics-to-move.json
       {"topics": [{"topic": "foo1"},
                   {"topic": "foo2"}],
    @@ -207,7 +207,7 @@
       }
       
    Once the json file is ready, use the partition reassignment tool to generate a candidate assignment: -
    +  
       > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
       Current partition replica assignment
     
    @@ -233,7 +233,7 @@
       

    The tool generates a candidate assignment that will move all partitions from topics foo1,foo2 to brokers 5,6. Note, however, that at this point, the partition movement has not started, it merely tells you the current assignment and the proposed new assignment. The current assignment should be saved in case you want to rollback to it. The new assignment should be saved in a json file (e.g. expand-cluster-reassignment.json) to be input to the tool with the --execute option as follows: -

    +  
       > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
       Current partition replica assignment
     
    @@ -259,7 +259,7 @@
       

    Finally, the --verify option can be used with the tool to check the status of the partition reassignment. Note that the same expand-cluster-reassignment.json (used with the --execute option) should be used with the --verify option: -

    +  
       > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
       Status of partition reassignment:
       Reassignment of partition [foo1,0] completed successfully
    @@ -276,12 +276,12 @@
       For instance, the following example moves partition 0 of topic foo1 to brokers 5,6 and partition 1 of topic foo2 to brokers 2,3:
       

    The first step is to hand craft the custom reassignment plan in a json file: -

    +  
       > cat custom-reassignment.json
       {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
       
    Then, use the json file with the --execute option to start the reassignment process: -
    +  
       > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
       Current partition replica assignment
     
    @@ -299,8 +299,8 @@
       

    The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same expand-cluster-reassignment.json (used with the --execute option) should be used with the --verify option: -

    -  bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify
    +  
    +  > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify
       Status of partition reassignment:
       Reassignment of partition [foo1,0] completed successfully
       Reassignment of partition [foo2,1] completed successfully
    @@ -315,13 +315,13 @@
       For instance, the following example increases the replication factor of partition 0 of topic foo from 1 to 3. Before increasing the replication factor, the partition's only replica existed on broker 5. As part of increasing the replication factor, we will add more replicas on brokers 6 and 7.
       

    The first step is to hand craft the custom reassignment plan in a json file: -

    +  
       > cat increase-replication-factor.json
       {"version":1,
       "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
       
    Then, use the json file with the --execute option to start the reassignment process: -
    +  
       > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
       Current partition replica assignment
     
    @@ -335,13 +335,13 @@
       

    The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same increase-replication-factor.json (used with the --execute option) should be used with the --verify option: -

    -  bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
    +  
    +  > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
       Status of partition reassignment:
       Reassignment of partition [foo,0] completed successfully
       
    You can also verify the increase in replication factor with the kafka-topics tool: -
    +  
       > bin/kafka-topics.sh --zookeeper localhost:2181 --topic foo --describe
       Topic:foo	PartitionCount:1	ReplicationFactor:3	Configs:
         Topic: foo	Partition: 0	Leader: 5	Replicas: 5,6,7	Isr: 5,6,7
    @@ -353,13 +353,13 @@
       There are two interfaces that can be used to engage a throttle. The simplest, and safest, is to apply a throttle when invoking the kafka-reassign-partitions.sh, but kafka-configs.sh can also be used to view and alter the throttle values directly.
       

    So for example, if you were to execute a rebalance, with the below command, it would move partitions at no more than 50MB/s. -
    $ bin/kafka-reassign-partitions.sh --zookeeper myhost:2181--execute --reassignment-json-file bigger-cluster.json —throttle 50000000
    +
    $ bin/kafka-reassign-partitions.sh --zookeeper myhost:2181--execute --reassignment-json-file bigger-cluster.json —throttle 50000000
    When you execute this script you will see the throttle engage: -
    +  
       The throttle limit was set to 50000000 B/s
       Successfully started reassignment of partitions.

    Should you wish to alter the throttle, during a rebalance, say to increase the throughput so it completes quicker, you can do this by re-running the execute command passing the same reassignment-json-file:

    -
    $ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --execute --reassignment-json-file bigger-cluster.json --throttle 700000000
    +  
    $ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --execute --reassignment-json-file bigger-cluster.json --throttle 700000000
       There is an existing assignment running.
       The throttle limit was set to 700000000 B/s
    @@ -369,7 +369,8 @@ the --verify option. Failure to do so could cause regular replication traffic to be throttled.

    When the --verify option is executed, and the reassignment has completed, the script will confirm that the throttle was removed:

    -
    $ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --verify --reassignment-json-file bigger-cluster.json
    +  
    +  > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --verify --reassignment-json-file bigger-cluster.json
       Status of partition reassignment:
       Reassignment of partition [my-topic,1] completed successfully
       Reassignment of partition [mytopic,0] completed successfully
    @@ -379,19 +380,20 @@
           configuration used to manage the throttling process. The throttle value itself. This is configured, at a broker
           level, using the dynamic properties: 

    -
    leader.replication.throttled.rate
    +  
    leader.replication.throttled.rate
       follower.replication.throttled.rate

    There is also an enumerated set of throttled replicas:

    -
    leader.replication.throttled.replicas
    +  
    leader.replication.throttled.replicas
       follower.replication.throttled.replicas

    Which are configured per topic. All four config values are automatically assigned by kafka-reassign-partitions.sh (discussed below).

    To view the throttle limit configuration:

    -
    $ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type brokers
    +  
    +  > bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type brokers
       Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
       Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
    @@ -400,7 +402,8 @@

    To view the list of throttled replicas:

    -
    $ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics
    +  
    +  > bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics
       Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
           follower.replication.throttled.replicas=1:101,0:102
    @@ -448,19 +451,19 @@ It is possible to set custom quotas for each (user, client-id), user or client-id group.

    Configure custom quota for (user=user1, client-id=clientA): -

    +  
       > bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
       Updated config for entity: user-principal 'user1', client-id 'clientA'.
       
    Configure custom quota for user=user1: -
    +  
       > bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1
       Updated config for entity: user-principal 'user1'.
       
    Configure custom quota for client-id=clientA: -
    +  
       > bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA
       Updated config for entity: client-id 'clientA'.
       
    @@ -468,53 +471,53 @@ It is possible to set default quotas for each (user, client-id), user or client-id group by specifying --entity-default option instead of --entity-name.

    Configure default client-id quota for user=userA: -

    +  
       > bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default
       Updated config for entity: user-principal 'user1', default client-id.
       
    Configure default quota for user: -
    +  
       > bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default
       Updated config for entity: default user-principal.
       
    Configure default quota for client-id: -
    +  
       > bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default
       Updated config for entity: default client-id.
       
    Here's how to describe the quota for a given (user, client-id): -
    +  
       > bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
       Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
       
    Describe quota for a given user: -
    +  
       > bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type users --entity-name user1
       Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
       
    Describe quota for a given client-id: -
    +  
       > bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type clients --entity-name clientA
       Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
       
    If entity name is not specified, all entities of the specified type are described. For example, describe all users: -
    +  
       > bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type users
       Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
       Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
       
    Similarly for (user, client): -
    +  
       > bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type users --entity-type clients
       Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
       Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
       

    It is possible to set default quotas that apply to all client-ids by setting these configs on the brokers. These properties are applied only if quota overrides or defaults are not configured in Zookeeper. By default, each client-id receives an unlimited quota. The following sets the default quota per producer and consumer client-id to 10MB/sec. -

    +  
         quota.producer.default=10485760
         quota.consumer.default=10485760
       
    @@ -556,7 +559,7 @@

    A Production Server Config

    Here is an example production server configuration: -
    +  
       # ZooKeeper
       zookeeper.connect=[list of ZooKeeper servers]
     
    @@ -582,7 +585,7 @@
       LinkedIn is currently running JDK 1.8 u5 (looking to upgrade to a newer version) with the G1 collector. If you decide to use the G1 collector (the current default) and you are still on JDK 1.7, make sure you are on u51 or newer. LinkedIn tried out u21 in testing, but they had a number of problems with the GC implementation in that version.
     
       LinkedIn's tuning looks like this:
    -  
    +  
       -Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
       -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
       -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
    @@ -648,9 +651,7 @@
       When Pdflush cannot keep up with the rate of data being written it will eventually cause the writing process to block incurring latency in the writes to slow down the accumulation of data.
       

    You can see the current state of OS memory usage by doing -

    -    > cat /proc/meminfo
    -  
    +
     > cat /proc/meminfo 
    The meaning of these values are described in the link above.

    Using pagecache has several advantages over an in-process cache for storing data that will be written out to disk: diff --git a/docs/quickstart.html b/docs/quickstart.html index 461e18474dc..b8722a6b68c 100644 --- a/docs/quickstart.html +++ b/docs/quickstart.html @@ -27,9 +27,9 @@ Since Kafka console scripts are different for Unix-based and Windows platforms, Download the 0.10.2.0 release and un-tar it. -

    -> tar -xzf kafka_2.11-0.10.2.0.tgz
    -> cd kafka_2.11-0.10.2.0
    +
    +> tar -xzf kafka_2.11-0.10.2.0.tgz
    +> cd kafka_2.11-0.10.2.0
     

    Step 2: Start the server

    @@ -38,15 +38,15 @@ Since Kafka console scripts are different for Unix-based and Windows platforms, Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.

    -
    -> bin/zookeeper-server-start.sh config/zookeeper.properties
    +
    +> bin/zookeeper-server-start.sh config/zookeeper.properties
     [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
     ...
     

    Now start the Kafka server:

    -
    -> bin/kafka-server-start.sh config/server.properties
    +
    +> bin/kafka-server-start.sh config/server.properties
     [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
     [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
     ...
    @@ -55,13 +55,13 @@ Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't
     

    Step 3: Create a topic

    Let's create a topic named "test" with a single partition and only one replica:

    -
    -> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    +
    +> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
     

    We can now see that topic if we run the list topic command:

    -
    -> bin/kafka-topics.sh --list --zookeeper localhost:2181
    +
    +> bin/kafka-topics.sh --list --zookeeper localhost:2181
     test
     

    Alternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.

    @@ -72,18 +72,18 @@ test

    Run the producer and then type a few messages into the console to send to the server.

    -
    -> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    -This is a message
    -This is another message
    +
    +> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    +This is a message
    +This is another message
     

    Step 5: Start a consumer

    Kafka also has a command line consumer that will dump out messages to standard output.

    -
    -> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    +
    +> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
     This is a message
     This is another message
     
    @@ -100,15 +100,15 @@ All of the command line tools have additional options; running the command with

    First we make a config file for each of the brokers (on Windows use the copy command instead):

    -
    -> cp config/server.properties config/server-1.properties
    -> cp config/server.properties config/server-2.properties
    +
    +> cp config/server.properties config/server-1.properties
    +> cp config/server.properties config/server-2.properties
     

    Now edit these new files and set the following properties:

    -
    +
     
     config/server-1.properties:
         broker.id=1
    @@ -124,21 +124,21 @@ config/server-2.properties:
     

    We already have Zookeeper and our single node started, so we just need to start the two new nodes:

    -
    -> bin/kafka-server-start.sh config/server-1.properties &
    +
    +> bin/kafka-server-start.sh config/server-1.properties &
     ...
    -> bin/kafka-server-start.sh config/server-2.properties &
    +> bin/kafka-server-start.sh config/server-2.properties &
     ...
     

    Now create a new topic with a replication factor of three:

    -
    -> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
    +
    +> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
     

    Okay but now that we have a cluster how can we know which broker is doing what? To see that run the "describe topics" command:

    -
    -> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    +
    +> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
     Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
     	Topic: my-replicated-topic	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
     
    @@ -152,8 +152,8 @@ Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:

    We can run the same command on the original topic we created to see where it is:

    -
    -> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
    +
    +> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
     Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
     	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
     
    @@ -161,50 +161,50 @@ Topic:test PartitionCount:1 ReplicationFactor:1 Configs:

    Let's publish a few messages to our new topic:

    -
    -> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
    +
    +> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
     ...
    -my test message 1
    -my test message 2
    -^C
    +my test message 1
    +my test message 2
    +^C
     

    Now let's consume these messages:

    -
    -> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
    +
    +> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
     ...
     my test message 1
     my test message 2
    -^C
    +^C
     

    Now let's test out fault-tolerance. Broker 1 was acting as the leader so let's kill it:

    -
    -> ps aux | grep server-1.properties
    -7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
    -> kill -9 7564
    +
    +> ps aux | grep server-1.properties
    +7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
    +> kill -9 7564
     
    On Windows use: -
    -> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
    -java.exe    java  -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.11-0.10.2.0.jar"  kafka.Kafka config\server-1.properties    644
    -> taskkill /pid 644 /f
    +
    +> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
    +java.exe    java  -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.11-0.10.2.0.jar"  kafka.Kafka config\server-1.properties    644
    +> taskkill /pid 644 /f
     

    Leadership has switched to one of the slaves and node 1 is no longer in the in-sync replica set:

    -
    -> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    +
    +> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
     Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
     	Topic: my-replicated-topic	Partition: 0	Leader: 2	Replicas: 1,2,0	Isr: 2,0
     

    But the messages are still available for consumption even though the leader that took the writes originally is down:

    -
    -> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
    +
    +> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
     ...
     my test message 1
     my test message 2
    -^C
    +^C
     
    @@ -221,8 +221,8 @@ Kafka topic to a file.

    First, we'll start by creating some seed data to test with:

    -
    -> echo -e "foo\nbar" > test.txt
    +
    +> echo -e "foo\nbar" > test.txt
     

    Next, we'll start two connectors running in standalone mode, which means they run in a single, local, dedicated @@ -231,8 +231,8 @@ process, containing common configuration such as the Kafka brokers to connect to The remaining configuration files each specify a connector to create. These files include a unique connector name, the connector class to instantiate, and any other configuration required by the connector.

    -
    -> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
    +
    +> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
     

    @@ -250,8 +250,8 @@ by examining the contents of the output file:

    -
    -> cat test.sink.txt
    +
    +> cat test.sink.txt
     foo
     bar
     
    @@ -262,8 +262,8 @@ data in the topic (or use custom consumer code to process it):

    -
    -> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
    +
    +> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
     {"schema":{"type":"string","optional":false},"payload":"foo"}
     {"schema":{"type":"string","optional":false},"payload":"bar"}
     ...
    @@ -271,8 +271,8 @@ data in the topic (or use custom consumer code to process it):
     
     

    The connectors continue to process data, so we can add data to the file and see it move through the pipeline:

    -
    -> echo "Another line" >> test.txt
    +
    +> echo "Another line" >> test.txt
     

    You should see the line appear in the console consumer output and in the sink file.

    @@ -284,7 +284,7 @@ Kafka Streams is a client library of Kafka for real-time stream processing and a This quickstart example will demonstrate how to run a streaming application coded in this library. Here is the gist of the WordCountDemo example code (converted to use Java 8 lambda expressions for easy reading).

    -
    +
     // Serializers/deserializers (serde) for String and Long types
     final Serde<String> stringSerde = Serdes.String();
     final Serde<Long> longSerde = Serdes.Long();
    @@ -333,14 +333,14 @@ As the first step, we will prepare input data to a Kafka topic, which will subse
     
     -->
     
    -
    -> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
    +
    +> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
     
    Or on Windows: -
    -> echo all streams lead to kafka> file-input.txt
    -> echo hello kafka streams>> file-input.txt
    -> echo|set /p=join kafka summit>> file-input.txt
    +
    +> echo all streams lead to kafka> file-input.txt
    +> echo hello kafka streams>> file-input.txt
    +> echo|set /p=join kafka summit>> file-input.txt
     

    @@ -349,25 +349,25 @@ which reads the data from STDIN line-by-line, and publishes each line as a separ stream data will likely be flowing continuously into Kafka where the application will be up and running):

    -
    -> bin/kafka-topics.sh --create \
    -            --zookeeper localhost:2181 \
    -            --replication-factor 1 \
    -            --partitions 1 \
    -            --topic streams-file-input
    +
    +> bin/kafka-topics.sh --create \
    +    --zookeeper localhost:2181 \
    +    --replication-factor 1 \
    +    --partitions 1 \
    +    --topic streams-file-input
     
    -
    -> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
    +
    +> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
     

    We can now run the WordCount demo application to process the input data:

    -
    -> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
    +
    +> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
     

    @@ -380,22 +380,22 @@ The demo will run for a few seconds and then, unlike typical stream processing a We can now inspect the output of the WordCount demo application by reading from its output topic:

    -
    -> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    -            --topic streams-wordcount-output \
    -            --from-beginning \
    -            --formatter kafka.tools.DefaultMessageFormatter \
    -            --property print.key=true \
    -            --property print.value=true \
    -            --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    -            --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
    +
    +> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    +    --topic streams-wordcount-output \
    +    --from-beginning \
    +    --formatter kafka.tools.DefaultMessageFormatter \
    +    --property print.key=true \
    +    --property print.value=true \
    +    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    +    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
     

    with the following output data being printed to the console:

    -
    +
     all     1
     lead    1
     to      1
    diff --git a/docs/security.html b/docs/security.html
    index b2d47598887..09a6c332e43 100644
    --- a/docs/security.html
    +++ b/docs/security.html
    @@ -42,7 +42,7 @@
             
  • Generate SSL key and certificate for each Kafka broker

    The first step of deploying HTTPS is to generate the key and the certificate for each machine in the cluster. You can use Java's keytool utility to accomplish this task. We will generate the key into a temporary keystore initially so that we can export and sign it later with CA. -
    +            
                 keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey
    You need to specify two parameters in the above command: @@ -53,7 +53,7 @@
    Note: By default the property ssl.endpoint.identification.algorithm is not defined, so hostname verification is not performed. In order to enable hostname verification, set the following property: -
    	ssl.endpoint.identification.algorithm=HTTPS 
    +
    	ssl.endpoint.identification.algorithm=HTTPS 
    Once enabled, clients will verify the server's fully qualified domain name (FQDN) against one of the following two fields:
      @@ -62,43 +62,43 @@

    Both fields are valid, RFC-2818 recommends the use of SAN however. SAN is also more flexible, allowing for multiple DNS entries to be declared. Another advantage is that the CN can be set to a more meaningful value for authorization purposes. To add a SAN field append the following argument -ext SAN=DNS:{FQDN} to the keytool command: -
    +        
             keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -ext SAN=DNS:{FQDN}
             
    The following command can be run afterwards to verify the contents of the generated certificate: -
    +        
             keytool -list -v -keystore server.keystore.jks
             
  • Creating your own CA

    After the first step, each machine in the cluster has a public-private key pair, and a certificate to identify the machine. The certificate, however, is unsigned, which means that an attacker can create such a certificate to pretend to be any machine.

    Therefore, it is important to prevent forged certificates by signing them for each machine in the cluster. A certificate authority (CA) is responsible for signing certificates. CA works likes a government that issues passports—the government stamps (signs) each passport so that the passport becomes difficult to forge. Other governments verify the stamps to ensure the passport is authentic. Similarly, the CA signs the certificates, and the cryptography guarantees that a signed certificate is computationally difficult to forge. Thus, as long as the CA is a genuine and trusted authority, the clients have high assurance that they are connecting to the authentic machines. -

    -            openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
    +
    +            openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
    The generated CA is simply a public-private key pair and certificate, and it is intended to sign other certificates.
    The next step is to add the generated CA to the **clients' truststore** so that the clients can trust this CA: -
    +            
                 keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
    Note: If you configure the Kafka brokers to require client authentication by setting ssl.client.auth to be "requested" or "required" on the Kafka brokers config then you must provide a truststore for the Kafka brokers as well and it should have all the CA certificates that clients' keys were signed by. -
    -            keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
    +
    +            keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
    In contrast to the keystore in step 1 that stores each machine's own identity, the truststore of a client stores all the certificates that the client should trust. Importing a certificate into one's truststore also means trusting all certificates that are signed by that certificate. As the analogy above, trusting the government (CA) also means trusting all passports (certificates) that it has issued. This attribute is called the chain of trust, and it is particularly useful when deploying SSL on a large Kafka cluster. You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that trusts the CA. That way all machines can authenticate all other machines.
  • Signing the certificate

    The next step is to sign all certificates generated by step 1 with the CA generated in step 2. First, you need to export the certificate from the keystore: -
    +            
                 keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
    Then sign it with the CA: -
    +            
                 openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
    Finally, you need to import both the certificate of the CA and the signed certificate into the keystore: -
    +            
                 keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
                 keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
    @@ -132,11 +132,11 @@
    listeners
    If SSL is not enabled for inter-broker communication (see below for how to enable it), both PLAINTEXT and SSL ports will be necessary. -
    +            
                 listeners=PLAINTEXT://host.name:port,SSL://host.name:port
    Following SSL configs are needed on the broker side -
    +            
                 ssl.keystore.location=/var/private/ssl/server.keystore.jks
                 ssl.keystore.password=test1234
                 ssl.key.password=test1234
    @@ -189,7 +189,7 @@
             
  • Configuring Kafka Clients

    SSL is supported only for the new Kafka Producer and Consumer, the older API is not supported. The configs for SSL will be the same for both producer and consumer.
    If client authentication is not required in the broker, then the following is a minimal configuration example: -
    +            
                 security.protocol=SSL
                 ssl.truststore.location=/var/private/ssl/client.truststore.jks
                 ssl.truststore.password=test1234
    @@ -197,7 +197,7 @@ Note: ssl.truststore.password is technically optional but highly recommended. If a password is not set access to the truststore is still available, but integrity checking is disabled. If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured: -
    +            
                 ssl.keystore.location=/var/private/ssl/client.keystore.jks
                 ssl.keystore.password=test1234
                 ssl.key.password=test1234
    @@ -212,7 +212,7 @@
    Examples using console-producer and console-consumer: -
    +            
                 kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.properties
                 kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties
  • @@ -278,7 +278,7 @@ SCRAM. For example, GSSAPI credentials may be configured as: -
    +                    
             KafkaClient {
             com.sun.security.auth.module.Krb5LoginModule required
             useKeyTab=true
    @@ -288,7 +288,7 @@
         };
  • Pass the JAAS config file location as JVM parameter to each client JVM. For example: -
        -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
  • +
        -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
  • @@ -350,7 +350,7 @@
  • Create Kerberos Principals
    If you are using the organization's Kerberos or Active Directory server, ask your Kerberos administrator for a principal for each Kafka broker in your cluster and for every operating system user that will access Kafka with Kerberos authentication (via clients and tools).
    If you have installed your own Kerberos, you will need to create these principals yourself using the following commands: -
    +                
             sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
             sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"
  • Make sure all hosts can be reachable using hostnames - it is a Kerberos requirement that all your hosts can be resolved with their FQDNs.
  • @@ -358,7 +358,7 @@
  • Configuring Kafka Brokers
    1. Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example (note that each broker should have its own keytab): -
      +            
               KafkaServer {
                   com.sun.security.auth.module.Krb5LoginModule required
                   useKeyTab=true
      @@ -384,7 +384,7 @@
               -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
    2. Make sure the keytabs configured in the JAAS file are readable by the operating system user who is starting kafka broker.
    3. -
    4. Configure SASL port and SASL mechanisms in server.properties as described here.
  • For example: +
  • Configure SASL port and SASL mechanisms in server.properties as described here. For example:
        listeners=SASL_PLAINTEXT://host.name:port
             security.inter.broker.protocol=SASL_PLAINTEXT
             sasl.mechanism.inter.broker.protocol=GSSAPI
    @@ -422,7 +422,6 @@
                        as described here. Clients use the login section named
                        KafkaClient. This option allows only one user for all client connections from a JVM.
  • Make sure the keytabs configured in the JAAS configuration are readable by the operating system user who is starting kafka client.
  • -
  • Optionally pass the krb5 file locations as JVM parameters to each client JVM (see here for more details):
        -Djava.security.krb5.conf=/etc/kafka/krb5.conf
  • Configure the following properties in producer.properties or consumer.properties: @@ -443,7 +442,7 @@
  • Configuring Kafka Brokers
    1. Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example: -
      +                
               KafkaServer {
                   org.apache.kafka.common.security.plain.PlainLoginModule required
                   username="admin"
      @@ -458,7 +457,7 @@
                       those from other brokers using these properties.
    2. Pass the JAAS config file location as JVM parameter to each Kafka broker:
          -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
    3. -
    4. Configure SASL port and SASL mechanisms in server.properties as described here.
  • For example: +
  • Configure SASL port and SASL mechanisms in server.properties as described here. For example:
        listeners=SASL_SSL://host.name:port
             security.inter.broker.protocol=SASL_SSL
             sasl.mechanism.inter.broker.protocol=PLAIN
    @@ -472,7 +471,7 @@
                 
  • Configure the JAAS configuration property for each client in producer.properties or consumer.properties. The login module describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client for the PLAIN mechanism: -
    +                
         sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
             username="alice" \
             password="alice-secret";
    @@ -538,23 +537,23 @@ before Kafka brokers are started. Client credentials may be created and updated dynamically and updated credentials will be used to authenticate new connections.

    Create SCRAM credentials for user alice with password alice-secret: -

    -    bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice
    +        
    +    > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice
             

    The default iteration count of 4096 is used if iterations are not specified. A random salt is created and the SCRAM identity consisting of salt, iterations, StoredKey and ServerKey are stored in Zookeeper. See RFC 5802 for details on SCRAM identity and the individual fields.

    The following examples also require a user admin for inter-broker communication which can be created using: -

    -    bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin
    +        
    +    > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin
             

    Existing credentials may be listed using the --describe option: -

    -   bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name alice
    +        
    +    > bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name alice
             

    Credentials may be deleted for one or more SCRAM mechanisms using the --delete option: -

    -   bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice
    +        
    +    > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice
             
  • Configuring Kafka Brokers
    @@ -586,7 +585,7 @@
  • Configure the JAAS configuration property for each client in producer.properties or consumer.properties. The login module describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client for the SCRAM mechanisms: -
    +                
        sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
             username="alice" \
             password="alice-secret";
    @@ -599,7 +598,6 @@

    JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.

  • -
  • Configure the following properties in producer.properties or consumer.properties:
         security.protocol=SASL_SSL
    @@ -791,25 +789,25 @@
         
    • Adding Acls
      Suppose you want to add an acl "Principals User:Bob and User:Alice are allowed to perform Operation Read and Write on Topic Test-Topic from IP 198.51.100.0 and IP 198.51.100.1". You can do that by executing the CLI with following options: -
      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
      +
      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
      By default, all principals that don't have an explicit acl that allows access for an operation to a resource are denied. In rare cases where an allow acl is defined that allows access to all but some principal we will have to use the --deny-principal and --deny-host option. For example, if we want to allow all users to Read from Test-topic but only deny User:BadBob from IP 198.51.100.3 we can do so using following commands: -
      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic
      +
      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic
      Note that ``--allow-host`` and ``deny-host`` only support IP addresses (hostnames are not supported). Above examples add acls to a topic by specifying --topic [topic-name] as the resource option. Similarly user can add acls to cluster by specifying --cluster and to a consumer group by specifying --group [group-name].
    • Removing Acls
      Removing acls is pretty much the same. The only difference is instead of --add option users will have to specify --remove option. To remove the acls added by the first example above we can execute the CLI with following options: -
       bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic 
    • +
       bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic 
    • List Acls
      We can list acls for any resource by specifying the --list option with the resource. To list all acls for Test-topic we can execute the CLI with following options: -
      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
    • +
      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
    • Adding or removing a principal as producer or consumer
      The most common use case for acl management are adding/removing a principal as producer or consumer so we added convenience options to handle these cases. In order to add User:Bob as a producer of Test-topic we can execute the following command: -
       bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic
      +
       bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic
      Similarly to add Alice as a consumer of Test-topic with consumer group Group-1 we just have to pass --consumer option: -
       bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1 
      +
       bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1 
      Note that for consumer option we must also specify the consumer group. In order to remove a principal from producer or consumer role we just need to pass --remove option.
    @@ -835,50 +833,58 @@

    As an example, say we wish to encrypt both broker-client and broker-broker communication with SSL. In the first incremental bounce, a SSL port is opened on each node:
    -            listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
    + listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092 +
    We then restart the clients, changing their config to point at the newly opened, secured port:
                 bootstrap.servers = [broker1:9092,...]
                 security.protocol = SSL
    -            ...etc
    + ...etc +
  • In the second incremental server bounce we instruct Kafka to use SSL as the broker-broker protocol (which will use the same SSL port):
                 listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
    -            security.inter.broker.protocol=SSL
    + security.inter.broker.protocol=SSL +
    In the final bounce we secure the cluster by closing the PLAINTEXT port:
                 listeners=SSL://broker1:9092
    -            security.inter.broker.protocol=SSL
    + security.inter.broker.protocol=SSL +
    Alternatively we might choose to open multiple ports so that different protocols can be used for broker-broker and broker-client communication. Say we wished to use SSL encryption throughout (i.e. for broker-broker and broker-client communication) but we'd like to add SASL authentication to the broker-client connection also. We would achieve this by opening two additional ports during the first bounce:
    -            listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
    + listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093 +
    We would then restart the clients, changing their config to point at the newly opened, SASL & SSL secured port:
                 bootstrap.servers = [broker1:9093,...]
                 security.protocol = SASL_SSL
    -            ...etc
    + ...etc +
    The second server bounce would switch the cluster to use encrypted broker-broker communication via the SSL port we previously opened on port 9092:
                 listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
    -            security.inter.broker.protocol=SSL
    + security.inter.broker.protocol=SSL +
    The final bounce secures the cluster by closing the PLAINTEXT port.
             listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
    -        security.inter.broker.protocol=SSL
    + security.inter.broker.protocol=SSL +
    ZooKeeper can be secured independently of the Kafka cluster. The steps for doing this are covered in section 7.6.2. @@ -907,11 +913,11 @@
  • Perform a second rolling restart of brokers, this time omitting the system property that sets the JAAS login file
  • Here is an example of how to run the migration tool: -
    +    
         ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181
         

    Run this to see the full list of parameters:

    -
    +    
         ./bin/zookeeper-security-migration.sh --help
         

    7.6.3 Migrating the ZooKeeper ensemble

    diff --git a/docs/streams.html b/docs/streams.html index bff9bf07dbd..8192bf7e7e4 100644 --- a/docs/streams.html +++ b/docs/streams.html @@ -306,7 +306,7 @@ The following example Processor implementation defines a simple word-count algorithm:

    -
    +
     public class MyProcessor implements Processor<String, String> {
         private ProcessorContext context;
         private KeyValueStore<String, Long> kvStore;
    @@ -380,7 +380,7 @@ public class MyProcessor implements Processor<String, String> {
             by connecting these processors together:
             

    -
    +
     TopologyBuilder builder = new TopologyBuilder();
     
     builder.addSource("SOURCE", "src-topic")
    @@ -424,7 +424,7 @@ builder.addSource("SOURCE", "src-topic")
             In the following example, a persistent key-value store named “Counts” with key type String and value type Long is created.
             

    -
    +
     StateStoreSupplier countStore = Stores.create("Counts")
         .withKeys(Serdes.String())
         .withValues(Serdes.Long())
    @@ -438,7 +438,7 @@ StateStoreSupplier countStore = Stores.create("Counts")
             state store with the existing processor nodes through TopologyBuilder.connectProcessorAndStateStores.
             

    -
    +
     TopologyBuilder builder = new TopologyBuilder();
     
     builder.addSource("SOURCE", "src-topic")
    @@ -526,7 +526,7 @@ builder.addSource("SOURCE", "src-topic")
             from a single topic).
             

    -
    +
     KStreamBuilder builder = new KStreamBuilder();
     
     KStream<String, GenericRecord> source1 = builder.stream("topic1", "topic2");
    @@ -606,7 +606,7 @@ GlobalKTable<String, GenericRecord> source2 = builder.globalTable("topic4"
     
             

    -
    +
     // written in Java 8+, using lambda expressions
     KStream<String, GenericRecord> mapped = source1.mapValue(record -> record.get("category"));
     
    @@ -620,7 +620,7 @@ KStream<String, GenericRecord> mapped = source1.mapValue(record -> record. based on them.

    -
    +
     // written in Java 8+, using lambda expressions
     KTable<Windowed<String>, Long> counts = source1.groupByKey().aggregate(
         () -> 0L,  // initial value
    @@ -641,7 +641,7 @@ KStream<String, String> joined = source1.leftJoin(source2,
             KStream.to and KTable.to.
             

    -
    +
     joined.to("topic4");
     
    @@ -649,7 +649,7 @@ joined.to("topic4"); to a topic via to above, one option is to construct a new stream that reads from the output topic; Kafka Streams provides a convenience method called through: -
    +
     // equivalent to
     //
     // joined.to("topic4");
    @@ -677,7 +677,7 @@ KStream<String, String> materialized = joined.through("topic4");
             set the necessary parameters, and construct a StreamsConfig instance from the Properties instance.
             

    -
    +
     import java.util.Properties;
     import org.apache.kafka.streams.StreamsConfig;
     
    @@ -702,7 +702,7 @@ StreamsConfig config = new StreamsConfig(settings);
             If you want to set different values for consumer and producer for such a parameter, you can prefix the parameter name with consumer. or producer.:
             

    -
    +
     Properties settings = new Properties();
     // Example of a "normal" setting for Kafka Streams
     settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
    @@ -750,7 +750,7 @@ settings.put(StremasConfig.producerConfig(ProducerConfig.RECEIVE_BUFFER_CONFIG),
             that is used to define a topology; The second argument is an instance of StreamsConfig mentioned above.
             

    -
    +
     import org.apache.kafka.streams.KafkaStreams;
     import org.apache.kafka.streams.StreamsConfig;
     import org.apache.kafka.streams.kstream.KStreamBuilder;
    @@ -778,7 +778,7 @@ KafkaStreams streams = new KafkaStreams(builder, config);
             At this point, internal structures have been initialized, but the processing is not started yet. You have to explicitly start the Kafka Streams thread by calling the start() method:
             

    -
    +
     // Start the Kafka Streams instance
     streams.start();
     
    @@ -787,7 +787,7 @@ streams.start(); To catch any unexpected exceptions, you may set an java.lang.Thread.UncaughtExceptionHandler before you start the application. This handler is called whenever a stream thread is terminated by an unexpected exception:

    -
    +
     streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
         public uncaughtException(Thread t, throwable e) {
             // here you should examine the exception and perform an appropriate action!
    @@ -799,7 +799,7 @@ streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             To stop the application instance call the close() method:
             

    -
    +
     // Stop the Kafka Streams instance
     streams.close();
     
    @@ -807,7 +807,7 @@ streams.close(); Now it's time to execute your application that uses the Kafka Streams library, which can be run just like any other Java application – there is no special magic or requirement on the side of Kafka Streams. For example, you can package your Java application as a fat jar file and then start the application via: -
    +
     # Start the application in class `com.example.MyStreamsApp`
     # from the fat jar named `path-to-app-fatjar.jar`.
     $ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp