Before we used the metadata cache to determine whether or not to use topic IDs. Unfortunately, metadata cache updates with ZK controllers are in a separate request and may be too slow for the fetcher thread. This results in switching between topic names and topic IDs for topics that could just use IDs.
This patch adds topic IDs to FetcherState created in LeaderAndIsr requests. It also supports updating this state for follower threads as soon as a LeaderAndIsr request provides a topic ID.
We've opted to only update replica fetcher threads. AlterLogDir threads will use either topic name or topic ID depending on what was present when they were created.
Reviewers: David Jacot <djacot@confluent.io>
The ReplicaManager, LogManager, and KafkaApis class all have many
constructor parameters. It is often difficult to add or remove a
parameter, since there are so many locations that need to be updated. In
order to address this problem, we should use named parameters when
constructing these objects from Scala code. This will make it easy to
add new optional parameters without modifying many test cases. It will
also make it easier to read git diffs and PRs, since the parameters will
have names next to them. Since Java does not support named paramters,
this PR adds several Builder classes which can be used to achieve the
same effect.
ReplicaManager also had a secondary constructor, which this PR removes.
The function of the secondary constructor was just to provide some
default parameters for the main constructor. However, it is simpler just
to actually use default parameters.
Reviewers: David Arthur <mumrah@gmail.com>
The leader assumes that there is always an in-memory snapshot at the last
committed offset. This means that the controller needs to generate an in-memory
snapshot when getting promoted from inactive to active. This PR adds that
code. This fixes a bug where sometimes we would try to look for that in-memory
snapshot and not find it.
The controller always starts inactive, and there is no requirement that there
exists an in-memory snapshot at the last committed offset when the controller
is inactive. Therefore we can remove the initial snapshot at offset -1.
We should also optimize when a snapshot is cancelled or completes, by deleting
all in-memory snapshots less that the last committed offset.
SnapshotRegistry's createSnapshot should allow the creating of a snapshot if
the last snapshot's offset is the given offset. This allows for simpler client
code. Finally, this PR renames createSnapshot to getOrCreateSnapshot.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Also:
* Deprecate `log.message.format.version` and `message.format.version`.
* Log broker warning if the deprecated config values are ignored due to
the inter-broker protocol version.
* Log warning if `message.format.version` is set via `ConfigCommand`.
* Always down-convert if fetch version is v3 or lower.
* Add tests to verify new message format version based on the
inter-broker protocol version.
* Adjust existing tests that create topics with an older message format to
have the inter-broker protocol set to 2.8.
* Add upgrade note.
Note that the log compaction change to always write new segments with
record format v2 if the IBP is 3.0 or higher will be done as part of
KAFKA-13093 (with Kafka 3.1 as the target release version).
Reviewers: David Jacot <djacot@confluent.io>, David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
After noticing increased LISR times, we discovered a lot of time was spent synchronously flushing the partition metadata file. This PR changes the code so we asynchronously flush the files.
We ensure files are flushed before appending, renaming or closing the log to ensure we have the partition metadata information on disk. Three new tests have been added to address these cases.
Reviewers: Lucas Bradstreet <lucas@confluent.io>, Jun Rao <junrao@gmail.com>
Updated FetchRequest and FetchResponse to use topic IDs rather than topic names.
Some of the complicated code is found in FetchSession and FetchSessionHandler.
We need to be able to store topic IDs and maintain a cache on the broker for IDs that may not have been resolved. On incremental fetch requests, we will try to resolve them or remove them if in toForget.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
This PR implements broker-side KRaft snapshots, including both saving and
loading. The code for triggering a periodic broker-side snapshot will come in a
follow-on PR. Loading should work with just this PR. It also implements
reloading broker snapshots after initialization.
In order to facilitate snapshots, this PR introduces the concept of
MetadataImage and MetadataDelta. MetadataImage represents the metadata state
retained in memory. It is basically a generalization of MetadataCache that
includes a few things that MetadataCache does not (such as features and client
quotas.) KRaftMetadataCache is now an accessor for the data stored in this object.
Similarly, MetadataImage replaces CacheConfigRespository and ClientQuotaCache.
It also subsumes kafka.server.metadata.MetadataImage and related classes.
MetadataDelta represents a change to a MetadataImage. When a KRaft snapshot is
loaded, we will accumulate all the changes into a MetadataDelta first, prior to
applying it. If we must reload a snapshot because we fell too far behind while
consuming metadata, the resulting MetadataDelta will contain all the changes
needed to catch us up. During normal operation, MetadataDelta is also used to
accumulate the changes of each incoming batch of metadata records. These
incremental deltas should be relatively small.
I have removed the logic for updating the various manager objects from
BrokerMetadataListener and placed it into BrokerMetadataPublisher. This makes
it easier to unit test BrokerMetadataListener.
Reviewers: David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
Use MockConfigRepository rather than CachedConfigRepository in unit
tests. This is useful for an upcoming change that will remove
CachedConfigRepository.
Reviewers: David Arthur <mumrah@gmail.com>
Use a caching `BufferSupplier` per request handler thread so that
decompression buffers are cached if supported by the underlying
`CompressionType`. This achieves a similar outcome as #9220, but
with less contention.
We introduce a `RequestLocal` class to make it easier to introduce
new request scoped stateful instances (one example we discussed
previously was an `ActionQueue` that could be used to avoid
some of the complex group coordinator locking).
This is a small win for zstd (no synchronization or soft references) and
a more significant win for lz4. In particular, it reduces allocations
significantly when the number of partitions is high. The decompression
buffer size is typically 64 KB, so a produce request with 1000 partitions
results in 64 MB of allocations even if each produce batch is small (likely,
when there are so many partitions).
I did a quick producer perf local test with 5000 partitions, 1 KB record
size,
1 broker, lz4 and ~0.5 for the producer compression rate metric:
Before this change:
> 20000000 records sent, 346314.349535 records/sec (330.27 MB/sec),
148.33 ms avg latency, 2267.00 ms max latency, 115 ms 50th, 383 ms 95th, 777 ms 99th, 1514 ms 99.9th.
After this change:
> 20000000 records sent, 431956.113259 records/sec (411.95 MB/sec),
117.79 ms avg latency, 1219.00 ms max latency, 99 ms 50th, 295 ms 95th, 440 ms 99th, 662 ms 99.9th.
That's a 25% throughput improvement and p999 latency was reduced to
under half (in this test).
Default arguments will be removed in a subsequent PR.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This patch adds support for running the ZooKeeper-based
kafka.security.authorizer.AclAuthorizer with KRaft clusters. Set the
authorizer.class.name config as well as the zookeeper.connect config while also
setting the typical KRaft configs (node.id, process.roles, etc.), and the
cluster will use KRaft for metadata and ZooKeeper for ACL storage. A system
test that exercises the authorizer is included.
This patch also changes "Raft" to "KRaft" in several system test files. It also
fixes a bug where system test admin clients were unable to connect to a cluster
with broker credentials via the SSL security protocol when the broker was using
that for inter-broker communication and SASL for client communication.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
Support sorting the elements in ImplicitLinkedHashCollection.
This is useful sometimes in unit tests for comparing collections.
Reviewers: Ismael Juma <ismael@juma.me.uk>
KIP-516 introduced partition.metadata file to persist the topic ID on the broker. It is created through handling the LeaderAndIsrRequest in ReplicaManager. (See https://github.com/apache/kafka/pull/10143 for the code path.) RaftReplicaManager was missing the analogue code path for Kip-500 code. Like in ReplicaManager, RaftReplicaManager will now check the partition.metadata file when handling metadata records.
However, since we know that all raft topics will have topic IDs, we can simply set the ID in the log upon the log's creation.
Updated the ReplicaManager path to do the same on newly created topics.
There are also some tweaks to the checking logic to better handle the scenario when the log exists but is not yet associated to Partition (for example, upon startup after a shutdown).
Tests added to ensure the file is created and that the correct error is thrown when the id is inconsistent.
Added tests for creating the log with the new topic ID parameter.
Also adds a few methods to get topic ID from MetadataImageBuilder as this is the most convenient way to get topic ID from RaftReplicaManager.
Reviewers: Ron Dagostino <rdagostino@confluent.io>, Jason Gustafson <jason@confluent.io>
Gradle 7.0 is required for Java 16 compatibility and it removes a number of
deprecated APIs. Fix most issues preventing the upgrade to Gradle 7.0.
The remaining ones are more complicated and should be handled
in a separate PR. Details of the changes:
* Release tarball no longer includes includes test, sources, javadoc and test sources jars (these
are still published to the Maven Central repository).
* Replace `compile` with `api` or `implementation` - note that `implementation`
dependencies appear with `runtime` scope in the pom file so this is a (positive)
change in behavior
* Add missing dependencies that were uncovered by the usage of `implementation`
* Replace `testCompile` with `testImplementation`
* Replace `runtime` with `runtimeOnly` and `testRuntime` with `testRuntimeOnly`
* Replace `configurations.runtime` with `configurations.runtimeClasspath`
* Replace `configurations.testRuntime` with `configurations.testRuntimeClasspath` (except for
the usage in the `streams` project as that causes a cyclic dependency error)
* Use `java-library` plugin instead of `java`
* Use `maven-publish` plugin instead of deprecated `maven` plugin - this changes the
commands used to publish and to install locally, but task aliases for `install` and
`uploadArchives` were added for backwards compatibility
* Removed `-x signArchives` line from the readme since it was wrong (it was a
no-op before and it fails now, however)
* Replaces `artifacts` block with an approach that works with the `maven-publish` plugin
* Don't publish `jmh-benchmark` module - the shadow jar is pretty large and not
particularly useful (before this PR, we would publish the non shadow jars)
* Replace `version` with `archiveVersion`, `baseName` with `archiveBaseName` and
`classifier` with `archiveClassifier`
* Update Gradle and plugins to the latest stable version (7.0 is not stable yet)
* Use `plugin` DSL to configure plugins
* Updated notable changes for 3.0
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Randall Hauch <rhauch@gmail.com>
1. rename INVALID_HIGHWATERMARK to INVALID_HIGH_WATERMARK
2. replace FetchResponse.AbortedTransaction by FetchResponseData.AbortedTransaction
3. remove redundant constructors from FetchResponse.PartitionData
4. rename recordSet to records
5. add helpers "recordsOrFail" and "recordsSize" to FetchResponse to process record casting
Reviewers: Ismael Juma <ismael@juma.me.uk>
Previously all APIs were accessible on every listener exposed by the broker, but
with KIP-500, that is no longer true. We now have more complex requirements for
API accessibility.
For example, the KIP-500 controller exposes some APIs which are not exposed by
brokers, such as BrokerHeartbeatRequest, and does not expose most client APIs,
such as JoinGroupRequest, etc. Similarly, the KIP-500 broker does not implement
some APIs that the ZK-based broker does, such as LeaderAndIsrRequest and
UpdateFeaturesRequest.
All of this means that we need more sophistication in how we expose APIs and
keep them consistent with the ApiVersions API. Up until now, we have been
working around this using the controllerOnly flag inside ApiKeys, but this is
not rich enough to support all of the cases listed above. This PR introduces a
new "listeners" field to the request schema definitions. This field is an array
of strings which indicate the listener types in which the API should be exposed.
We currently support "zkBroker", "broker", and "controller". ("broker"
indicates the KIP-500 broker, whereas zkBroker indicates the old broker).
This PR also creates ApiVersionManager to encapsulate the creation of the
ApiVersionsResponse based on the listener type. Additionally, it modifies
SocketServer to check the listener type of received requests before forwarding
them to the request handler.
Finally, this PR also fixes a bug in the handling of the ApiVersionsResponse
prior to authentication. Previously a static response was sent, which means that
changes to features would not get reflected. This also meant that the logic to
ensure that only the intersection of version ranges supported by the controller
would get exposed did not work. I think this is important because some clients
rely on the initial pre-authenticated ApiVersions response rather than doing a
second round after authentication as the Java client does.
One final cleanup note: I have removed the expectation that envelope requests
are only allowed on "privileged" listeners. This made sense initially because
we expected to use forwarding before the KIP-500 controller was available. That
is not the case anymore and we expect the Envelope API to only be exposed on the
controller listener. I have nevertheless preserved the existing workarounds to
allow verification of the forwarding behavior in integration testing.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
This PR adds the KIP-500 BrokerServer and ControllerServer classes and
makes some related changes to get them working. Note that the ControllerServer
does not instantiate a QuorumController object yet, since that will be added in
PR #10070.
* Add BrokerServer and ControllerServer
* Change ApiVersions#computeMaxUsableProduceMagic so that it can handle
endpoints which do not support PRODUCE (such as KIP-500 controller nodes)
* KafkaAdminClientTest: fix some lingering references to decommissionBroker
that should be references to unregisterBroker.
* Make some changes to allow SocketServer to be used by ControllerServer as
we as by the broker.
* We now return a random active Broker ID as the Controller ID in
MetadataResponse for the Raft-based case as per KIP-590.
* Add the RaftControllerNodeProvider
* Add EnvelopeUtils
* Add MetaLogRaftShim
* In ducktape, in config_property.py: use a KIP-500 compatible cluster ID.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, David Arthur <mumrah@gmail.com>
We don't really need it and it causes problems in older Android versions
and GraalVM native image usage (there are workarounds for the latter).
Move the logic to separate classes that are only invoked when the
relevant compression library is actually used. Place such classes
in their own package and enforce via checkstyle that only these
classes refer to compression library packages.
To avoid cyclic dependencies, moved `BufferSupplier` to the `utils`
package.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Currently the partition.metadata file is created when the log is created. However, clusters with older inter-broker protocols will never use this file. This PR moves the creation of the file to when we write to the file.
This PR also deletes the partition.metadata file on startup if the IBP version is lower than 2.8.
Reviewers: Jun Rao <junrao@gmail.com>
Currently log recovery begins as soon as we instantiate `LogManager`, but when using a
Raft-based metadata quorum we won't have configs until after we catch up on the metadata
log. We therefore defer log recovery until we actually invoke `startup()` on the `LogManager`
instance. This timing difference has no effect when using ZooKeeper because we
immediately invoke `startup()` on the instantiated instance, but it gives us the necessary
flexibility for accurate log recovery with updated configs when using a Raft-based metadata
quorum.
The `LogCleaner` is currently instantiated during construction just after log recovery
completes, and then it is started in `startup()`. As an extra precaution, since we are
no longer performing recovery during construction, we both instantiate and start the
log cleaner in `startup()` after log recovery completes.
We also convert `LogManager` to use a `ConfigRepository` to load topic configs
(which can override the default log configs) instead of having a hard-coded
dependency on ZooKeeper. We retrieve the topic configs when we invoke `startup()`
-- which again is effectively no different from a timing perspective than what we do
today for the ZooKeeper case.
One subtlety is that currently we create the log configs for every topic at this point
-- if a topic has no config overrides then we associate a copy of the default
configuration with the topic inside a map, and we retrieve the log configs for that
topic's partitions from from that map during recovery. This PR makes a change to
this series of events as follows. We do not associate a copy of the the default
configuration with a topic in the map if the topic has no configs set when we query
for them. This saves some memory -- we don't unnecessarily copy the default
config many times -- but it also means we have use the default log configs for
that topic later on when recovery for each of its partitions begins.
The difference is that the default configs are dynamically reconfigurable, and they
could potentially change between the time when we invoke `startup()` and when
log recovery begins (log recovery can begin quite some time after `startup()` is
invoked if shutdown was unclean). Prior to this patch such a change would not
be used; with this patch they could be if they happen before recovery begins.
This actually is better -- we are performing log recovery with the most recent
known defaults when a topic had no overrides at all. Also, `Partition.createLog`
has logic to handle missed config updates, so the behavior is eventually the same.
The transition of the broker state from `STARTING` to `RECOVERY` currently
happens within the `LogManager`, and it only occurs if the shutdown was
unclean. We move this transition into the broker as it avoids passing a
reference to the broker state into the `LogManager`. We also now always
transition the broker into the `RECOVERY` state as dictated by [the KIP-631 broker state machine](https://cwiki.apache.org/confluence/display/KAFKA/KIP-631%3A+The+Quorumbased+Kafka+Controller#KIP631:TheQuorumbasedKafkaController-TheBrokerStateMachine).
Finally, a few clean-ups were included. One worth highlighting is that `Partition`
no longer requires a `ConfigRepository`.
Reviewers: David Arthur <david.arthur@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Consolidate auto topic creation logic to either forward a CreateTopicRequest or handling the creation directly as AutoTopicCreationManager, when handling FindCoordinator/Metadata request.
Co-authored-by: Jason Gustafson <jason@confluent.io>
Reviewers: Jason Gustafson <jason@confluent.io>
Refactor the MetadataCache into two implementations that both implement a common trait. This will let us
continue to use the existing implementation when using ZK, but use the new implementation when in kip-500 mode.
Reviewers: Colin McCabe <cmccabe@apache.org>, Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
`KafkaApis` is configured differently when it is used in a broker with a Raft-based controller quorum vs. a ZooKeeper quorum. For example, when using Raft, `ForwardingManager` is required rather than optional, and there is no `AdminManager`, `KafkaController`, or `KafkaZkClient`. This PR introduces `MetadataSupport` to abstract the two possibilities: `ZkSupport` and `RaftSupport`. This provides a fluent way to decide what to do based on the type of support that `KafkaApis` has been configured with. Certain types of requests are not supported when using raft (`AlterIsrRequest`, `UpdateMetadataRequest`, etc.), and `MetadataSupport` gives us an intuitive way to identify the constraints and requirements associated with the different configurations and react accordingly.
Existing tests are sufficient to detect bugs and regressions.
Reviewers: José Armando García Sancio <jsancio@gmail.com>, Jason Gustafson <jason@confluent.io>
`Partition` objects are able to retrieve topic configs when creating their log, and currently they do so with an implementation of `trait TopicConfigFetcher` that uses ZooKeeper. ZooKeeper is not available when using a Raft-based metadata log, so we need to abstract the retrieval of configs so it can work either with or without ZooKeeper. This PR introduces `trait ConfigRepository` with `ZkConfigRepository` and `CachedConfigRepository` implementations. `Partition` objects now use a provided `ConfigRepository` to retrieve topic configs, and we eliminate `TopicConfigFetcher` as it is no longer needed.
`ReplicaManager` now contains an instance of `ConfigRepository` so it can provide it when creating `Partition` instances.
`KafkaApis` needs to be able to handle describe-config requests; it currently delegates that to `ZkAdminManager`, which of course queries ZooKeeper. To make this work with or without ZooKeeper we move the logic from `ZkAdminManager` into a new `ConfigHelper` class that goes through a `ConfigRepository` instance. We provide `KafkaApis` with such an instance, and it creates an instance of the helper so it can use that instead of going through `ZkAdminManager`.
Existing tests are sufficient to identify bugs and regressions in `Partition`, `ReplicaManager`, `KafkaApis`, and `ConfigHelper`. The `ConfigRepository` implementations have their own unit tests.
Reviewers: Jason Gustafson <jason@confluent.io>
Replace BrokerStates.scala with BrokerState.java, to make it easier to use from Java code if needed. This also makes it easier to go from a numeric type to an enum.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
The primary purpose of this patch is to remove the internal `enable.metadata.quorum` configuration. Instead, we rely on `process.roles` to determine if the self-managed quorum has been enabled. As a part of this, I've done the following:
1. Replace the notion of "disabled" APIs with "controller-only" APIs. We previously marked some APIs which were intended only for the KIP-500 as "disabled" so that they would not be unintentionally exposed. For example, the Raft quorum APIs were disabled. Marking them as "controller-only" carries the same effect, but makes the intent that they should be only exposed by the KIP-500 controller clearer.
2. Make `ForwardingManager` optional in `KafkaServer` and `KafkaApis`. Previously we used `null` if forwarding was enabled and relied on the metadata quorum check.
3. Make `zookeeper.connect` an optional configuration if `process.roles` is defined.
4. Update raft README to remove reference to `zookeeper.conntect`
Reviewers: Colin Patrick McCabe <cmccabe@confluent.io>, Boyang Chen <boyang@confluent.io>
Rename AdminManager to ZkAdminManager to emphasize the fact that it is not used by the KIP-500 code paths.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Boyang Chen <boyang@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
ISR-related cleanup in ReplicaManager and Partition. Removes ISR change logic from ReplicaManager and adds a new ZkIsrManager class which adheres to a new AlterIsrManager trait. Unifies all of the ISR logic in Partition so we don't have separate code paths for ZK vs AlterIsr. Also removes PartitionStateStore
Includes:
- Bump the version of MetadataRequest and MetadataResponse, add topicId in MetadataResponse
- Alter describeTopic in AdminClientTopicService and ZookeeperTopicService
- TopicMetadata is cached in MetadataCache, so we need to add topicId to MetadataCache
- MetadataCache is updated by UpdateMetadataRequest, bump the version of UpdateMetadataReq and UpdateMetadataResp, add topicId in UpdateMetadataReq.
Reviewers: Justine Olshan <jolshan@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
Includes:
- New API to authorize by resource type
- Default implementation for the method that supports super users and ACLs
- Optimized implementation in AclAuthorizer that supports ACLs, super users and allow.everyone.if.no.acl.found
- Benchmarks and tests
- InitProducerIdRequest authorized for Cluster:IdempotentWrite or WRITE to any topic, ProduceRequest authorized only for topic even if idempotent
Reviewers: Lucas Bradstreet <lucas@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
This patch follows up https://github.com/apache/kafka/pull/9547. It refactors AbstractFetcherThread and its descendants to use `OffsetForLeaderEpochRequestData.OffsetForLeaderPartition` instead of `OffsetsForLeaderEpochRequest.PartitionData`. The patch relies on existing tests.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jason Gustafson <jason@confluent.io>
* The naming for `ListOffsets` was inconsistent, in some places it was `ListOffset` and in others
it was `ListOffsets`. Picked the latter since it was used in metrics and the protocol documentation
and made it consistent.
* Removed unused methods in ApiKeys.
* Deleted `CommonFields`.
* Added `lowestSupportedVersion` and `highestSupportedVersion` to `ApiMessageType`
* Removed tests in `MessageTest` that are no longer relevant.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This patch updates the request logger to output request and response payloads in JSON. Payloads are converted to JSON based on their auto-generated schema.
Reviewers: Lucas Bradstreet <lucas@confluent.io>, David Mao <dmao@confluent.io>, David Jacot <djacot@confluent.io>
Also updated the jmh readme to make it easier for new people to know
what's possible and best practices.
There were some changes in the generated benchmarking code that
required adjusting `spotbugs-exclude.xml` and for a `javac` warning
to be suppressed for the benchmarking module. I took the chance
to make the spotbugs exclusion mode maintainable via a regex
pattern.
Tested the commands on Linux and macOS with zsh.
JMH highlights:
* async-profiler integration. Can be used with -prof async,
pass -prof async:help to look for the accepted options.
* perf c2c [2] integration. Can be used with -prof perfc2c,
if available.
* JFR profiler integration. Can be used with -prof jfr, pass
-prof jfr:help to look for the accepted options.
Full details:
* 1.24: https://mail.openjdk.java.net/pipermail/jmh-dev/2020-August/002982.html
* 1.25: https://mail.openjdk.java.net/pipermail/jmh-dev/2020-August/002987.html
* 1.26: https://mail.openjdk.java.net/pipermail/jmh-dev/2020-October/003024.html
* 1.27: https://mail.openjdk.java.net/pipermail/jmh-dev/2020-December/003096.html
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Bill Bejeck <bbejeck@gmail.com>, Lucas Bradstreet <lucasbradstreet@gmail.com>
Connection id is now only present in `NetworkSend`, which is now
the class used by `Selector`/`NetworkClient`/`KafkaChannel` (which
works well since `NetworkReceive` is the class used for
received data).
The previous `NetworkSend` was also responsible for adding a size
prefix. This logic is already present in `SendBuilder`, but for the
minority of cases where `SendBuilder` is not used (including
a number of tests), we now have `ByteBufferSend.sizePrefixed()`.
With regards to the request/message utilities:
* Renamed `toByteBuffer`/`toBytes` in `MessageUtil` to
`toVersionPrefixedByteBuffer`/`toVersionPrefixedBytes` for clarity.
* Introduced new `MessageUtil.toByteBuffer` that does not include
the version as the prefix.
* Renamed `serializeBody` in `AbstractRequest/Response` to
`serialize` for symmetry with `parse`.
* Introduced `RequestTestUtils` and moved relevant methods from
`TestUtils`.
* Moved `serializeWithHeader` methods that were only used in
tests to `RequestTestUtils`.
* Deleted `MessageTestUtil`.
Finally, a couple of changes to simplify coding patterns:
* Added `flip()` and `buffer()` to `ByteBufferAccessor`.
* Added `MessageSizeAccumulator.sizeExcludingZeroCopy`.
* Used lambdas instead of `TestCondition`.
* Used `Arrays.copyOf` instead of `System.arraycopy` in `MessageUtil`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jason Gustafson <jason@confluent.io>
Generated request/response classes have code to serialize/deserialize directly to
`ByteBuffer` so the intermediate conversion to `Struct` can be skipped for them.
We have recently completed the transition to generated request/response classes,
so we can also remove the `Struct` based fallbacks.
Additional noteworthy changes:
* `AbstractRequest.parseRequest` has a more efficient computation of request size that
relies on the received buffer instead of the parsed `Struct`.
* Use `SendBuilder` for `AbstractRequest/Response` `toSend`, made the superclass
implementation final and removed the overrides that are no longer necessary.
* Removed request/response constructors that assume latest version as they are unsafe
outside of tests.
* Removed redundant version fields in requests/responses.
* Removed unnecessary work in `OffsetFetchResponse`'s constructor when version >= 2.
* Made `AbstractResponse.throttleTimeMs()` abstract.
* Using `toSend` in `SaslClientAuthenticator` instead of `serialize`.
* Various changes in Request/Response classes to make them more consistent and to
rely on the Data classes as much as possible when it comes to their state.
* Remove the version argument from `AbstractResponse.toString`.
* Fix `getErrorResponse` for `ProduceRequest` and `DescribeClientQuotasRequest` to
use `ApiError` which processes the error message sent back to the clients. This was
uncovered by an accidental fix to a `RequestResponseTest` test (it was calling
`AbstractResponse.toString` instead of `AbstractResponse.toString(short)`).
Rely on existing protocol tests to ensure this refactoring does not change
observed behavior (aside from improved performance).
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Add small interface to Partition.scala that allows AlterIsr and ZK code paths to update the ISR metrics managed by ReplicaManager. This opens the door for consolidating even more code between the two ISR update code paths.
This patch follows up https://github.com/apache/kafka/pull/9547. It refactors KafkaApis, ReplicaManager and Partition to use `OffsetForLeaderEpochResponseData.EpochEndOffset` instead of `EpochEndOffset`. In the mean time, it removes `OffsetsForLeaderEpochRequest#epochsByTopicPartition` and `OffsetsForLeaderEpochResponse#responses` and replaces their usages to use the automated protocol directly. Finally, it removes old constructors in `OffsetsForLeaderEpochResponse`. The patch relies on existing tests.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jason Gustafson <jason@confluent.io>
From IBP 2.7 onwards, fetch responses include diverging epoch and offset in fetch responses if lastFetchedEpoch is provided in the fetch request. This PR uses that information for truncation and avoids the additional OffsetForLeaderEpoch requests in followers when lastFetchedEpoch is known.
Co-authored-by: Jason Gustafson <jason@confluent.io>
Reviewers: Jason Gustafson <jason@confluent.io>, Nikhil Bhatia <rite2nikhil@gmail.com>
This patch changes the grouping of `Send` objects created by `SendBuilder` in order to reduce the number of generated `Send` objects and thereby the number of system writes.
Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This PR adds support for forwarding of the following RPCs:
AlterConfigs
IncrementalAlterConfigs
AlterClientQuotas
CreateTopics
Co-authored-by: Jason Gustafson <jason@confluent.io>
Reviewers: Jason Gustafson <jason@confluent.io>
This reverts commit 21dc5231ce as we decide to use Envelope for redirection instead of initial principal.
Reviewers: Jason Gustafson <jason@confluent.io>
Summary:
In this PR, I have implemented the write path of the feature versioning system (KIP-584). Here is a summary of what's in this PR:
New APIs in org.apache.kafka.clients.admin.Admin interface, and their client and server implementations. These APIs can be used to describe features and update finalized features. These APIs are: Admin#describeFeatures and Admin#updateFeatures.
The write path is provided by the Admin#updateFeatures API. The corresponding server-side implementation is provided in KafkaApis and KafkaController classes. This can be a good place to start the code review.
The write path is supplemented by Admin#describeFeatures client API. This does not translate 1:1 to a server-side API. Instead, under the hood the API makes an explicit ApiVersionsRequest to the Broker to fetch the supported and finalized features.
Implemented a suite of integration tests in UpdateFeaturesTest.scala that thoroughly exercises the various cases in the write path.
Other changes:
The data type of the FinalizedFeaturesEpoch field in ApiVersionsResponse has been modified from int32 to int64. This change is to conform with the latest changes to the KIP explained in the voting thread.
Along the way, the class SupportedFeatures has been renamed to be called BrokerFeatures, and, it now holds both supported features as well as default minimum version levels.
For the purpose of testing, both the BrokerFeatures and FinalizedFeatureCache classes have been changed to be no longer singleton in implementation. Instead, these are now instantiated once and maintained in KafkaServer. The singleton instances are passed around to various classes, as needed.
Reviewers: Boyang Chen <boyang@confluent.io>, Jun Rao <junrao@gmail.com>
There are no checks on the header key so instantiating key (bytes to string) is unnecessary.
One implication is that conversion failures will be detected a bit later, but this is consistent
with how we handle the header value.
**JMH RESULT**
1. ops: +12%
1. The optimization of memory usage is very small as the cost of creating extra ```ByteBuffer``` is
almost same to byte array copy (used to construct ```String```). Using large key results in better
improvement but I don't think large key is common case.
**BEFORE**
```
Benchmark (bufferSupplierStr) (bytes) (compressionType) (headerKeySize) (maxBatchSize) (maxHeaderSize) (messageSize) (messageVersion) Mode Cnt Score Error Units
RecordBatchIterationBenchmark.measureValidation NO_CACHING RANDOM NONE 10 200 5 1000 2 thrpt 15 2035938.174 ± 1653.566 ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm NO_CACHING RANDOM NONE 10 200 5 1000 2 thrpt 15 2040.000 ± 0.001 B/op
```
```
Benchmark (bufferSupplierStr) (bytes) (compressionType) (headerKeySize) (maxBatchSize) (maxHeaderSize) (messageSize) (messageVersion) Mode Cnt Score Error Units
RecordBatchIterationBenchmark.measureValidation NO_CACHING RANDOM NONE 30 200 5 1000 2 thrpt 15 1979193.376 ± 1239.286 ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm NO_CACHING RANDOM NONE 30 200 5 1000 2 thrpt 15 2120.000 ± 0.001 B/op
```
**AFTER**
```
Benchmark (bufferSupplierStr) (bytes) (compressionType) (headerKeySize) (maxBatchSize) (maxHeaderSize) (messageSize) (messageVersion) Mode Cnt Score Error Units
RecordBatchIterationBenchmark.measureValidation NO_CACHING RANDOM NONE 10 200 5 1000 2 thrpt 15 2289115.973 ± 2661.856 ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm NO_CACHING RANDOM NONE 10 200 5 1000 2 thrpt 15 2032.000 ± 0.001 B/op
```
```
Benchmark (bufferSupplierStr) (bytes) (compressionType) (headerKeySize) (maxBatchSize) (maxHeaderSize) (messageSize) (messageVersion) Mode Cnt Score Error Units
RecordBatchIterationBenchmark.measureValidation NO_CACHING RANDOM NONE 30 200 5 1000 2 thrpt 15 2222625.706 ± 908.358 ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm NO_CACHING RANDOM NONE 30 200 5 1000 2 thrpt 15 2040.000 ± 0.001 B/op
```
Reviewers: Ismael Juma <ismael@juma.me.uk>