If you are using multiple nodes, then you should run the format command on each node. Be sure to use the same cluster ID for each one.
This example configures the node as both a broker and controller (i.e. `process.roles=broker,controller`). It is also possible to run the broker and controller nodes separately.
Please see [here](https://github.com/apache/kafka/blob/trunk/config/kraft/broker.properties) and [here](https://github.com/apache/kafka/blob/trunk/config/kraft/controller.properties) for example configurations.
## Start the Kafka Server
Finally, you are ready to start the Kafka server on each node.
Each broker and each controller must set `controller.quorum.voters`. Note that the node ID supplied in the `controller.quorum.voters` configuration must match that supplied to the server.
So on controller1, node.id must be set to 1, and so forth. Note that there is no requirement for controller IDs to start at 0 or 1. However, the easiest and least confusing way to allocate
node IDs is probably just to give each server a numeric ID, starting from 0.
node IDs is probably just to give each server a numeric ID, starting from 0. Also note that each node ID must be unique across all the nodes in a particular cluster; no two nodes can have the same node ID regardless of their `process.roles` values.
Note that clients never need to configure `controller.quorum.voters`; only servers do.
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# The address the socket server listens on.
# Note that only the controller listeners are allowed here when `process.roles=controller`, and this listener should be consistent with `controller.quorum.voters` value.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9093
listeners=CONTROLLER://:9093
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# The address the socket server listens on.
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
# with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092
# Listener, host name, and port for the controller to advertise to the brokers. If
# this server is a controller, this listener must be configured.
# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
# specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd
compression.type=none
# name of the partitioner class for partitioning events; default partition spreads data randomly
# name of the partitioner class for partitioning records;
# The default uses "sticky" partitioning logic which spreads the load evenly between partitions, but improves throughput by attempting to fill the batches sent to each partition.
#partitioner.class=
# the maximum amount of time the client will wait for the response of a request
"If it is not set, the metadata log is placed in the first log directory from log.dirs."
valMetadataSnapshotMaxNewRecordBytesDoc="This is the maximum number of bytes in the log between the latest snapshot and the high-watermark needed before generating a new snapshot."
valControllerListenerNamesDoc="A comma-separated list of the names of the listeners used by the controller. This is required "+
"if running in KRaft mode. The ZK-based controller will not use this configuration."
"if running in KRaft mode. When communicating with the controller quorum, the broker will always use the first listener in this list.\n "+
"Note: The ZK-based controller should not set this configuration."
valSaslMechanismControllerProtocolDoc="SASL mechanism used for communication with controllers. Default is GSSAPI."
valMetadataLogSegmentBytesDoc="The maximum size of a single metadata log file."
valMetadataLogSegmentMinBytesDoc="Override the minimum size for a single metadata log file. This should be used for testing only."