* Implements start and stop of task executors
* Introduce flush operation to keep consumer operations out of the processing threads
* Fixes corner case: handle requested unassignment during shutdown
* Fixes corner case: handle race between voluntary unassignment and requested unassigment
* Fixes corner case: task locking future completes for the empty set
* Fixes corner case: we should not reassign a task with an uncaught exception to a task executor
* Improved logging
* Number of threads controlled from outside, of the TaskManager
Reviewers: Bruno Cadonna <bruno@confluent.io>
The Javadoc for `GroupMetadataManager#throwIfMemberEpochIsInvalid` suggests that it throws a `NotCoordinatorException` exception when the member epoch in the consumer heartbeat request is greater than the one known by this coordinator for the given member. This could happen if the heartbeat-ing member got a higher epoch from another coordinator. However the method throws `FencedMemberEpochException` even in this case. This PR corrects the Javadocs to reflect the same.
Reviewers: David Jacot <djacot@confluent.io>
As a first step, we plan to release a preview of the new consumer group rebalance protocol without the client side assignor. This patch removes all the related fields from the ConsumerGroupHeartbeat API for now. Removing fields is fine here because this API is not released yet and not exposed by default. We will add them back while bumping the version of the request when we release this part in the future.
Reviewers: Justine Olshan <jolshan@confluent.io>
A bug in the RemoteIndexCache leads to a situation where the cache does not replace the corrupted index with a new index instance fetched from remote storage. This commit fixes the bug by adding correct handling for `CorruptIndexException`.
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Alexandre Dupriez <duprie@amazon.com>
Currently, when consumer startup, there will be a log message said:
[2023-08-11 15:47:17,713] INFO [Consumer clientId=console-consumer, groupId=console-consumer-28605] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
It confused users and make them think there's something wrong in the consumer application. Since we already log need to re-join with the given member-id logs in the joinGroupResponseHandler and already requestRejoined. So, we can skip this confusion logs.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Paolo Patierno <ppatiern@redhat.com>, vamossagar12 <sagarmeansocean@gmail.com>, David Jacot <djacot@confluent.io>
This patch moves the TopicIdPartition from the metadata module to the server-common module so it can be used by the group-coordinator module as well.
Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, David Jacot <djacot@confluent.io>
This patch fixes the version of the `AuthorizedOperations` field. The schema is not used yet so the bug had no impact.
Reviewers: Kirk True <ktrue@confluent.io>, David Jacot <djacot@confluent.io>
When a fetch response has no record for a partition, validBytes is 0. We shouldn't set the last fetched epoch to logAppendInfo.lastLeaderEpoch.asScala since there is no record and it is Optional.empty. We should use currentFetchState.lastFetchedEpoch instead.
Reviewers: Divij Vaidya <diviv@amazon.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
DeleteSegmentsDueToLogStartOffsetBreach configures the segment such that it can hold at-most 2 record-batches. And, it asserts that the local-log-start-offset based on the assumption that each segment will contain exactly two messages.
During leader switch, the segment can get rotated and may not always contain two records. Previously, we were checking whether the expected local-log-start-offset is equal to the base-offset-of-the-first-local-log-segment. With this patch, we will scan the first local-log-segment for the expected offset.
Reviewers: Divij Vaidya <diviv@amazon.com>
When Streams completes a rebalance, it unlocks state directories
all unassigned tasks. Unfortunately, when the state updater is enabled,
Streams does not look into the state updater to determine the
unassigned tasks.
This commit corrects this by adding the check.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
bump snappy-java version to 1.1.10.4, and add more tests to verify the compressed data can be correctly decompressed and read.
For LogCleanerParameterizedIntegrationTest, we increased the message size for snappy decompression since in the new version of snappy, the decompressed size is increasing compared with the previous version. But since the compression algorithm is not kafka's scope, all we need to do is to make sure the compressed data can be successfully decompressed and parsed/read.
Reviewers: Divij Vaidya <diviv@amazon.com>, Ismael Juma <ismael@juma.me.uk>, Josep Prat <josep.prat@aiven.io>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
Add support for --bootstrap-controller in the following command-line tools:
- kafka-cluster.sh
- kafka-configs.sh
- kafka-features.sh
- kafka-metadata-quorum.sh
To implement this, the following AdminClient APIs now support the new bootstrap.controllers
configuration:
- Admin.alterConfigs
- Admin.describeCluster
- Admin.describeConfigs
- Admin.describeFeatures
- Admin.describeMetadataQuorum
- Admin.incrementalAlterConfigs
- Admin.updateFeatures
Command-line tool changes:
- Add CommandLineUtils.initializeBootstrapProperties to handle parsing --bootstrap-controller
in addition to --bootstrap-server.
- Add --bootstrap-controller to ConfigCommand.scala, ClusterTool.java, FeatureCommand.java, and
MetadataQuorumCommand.java.
KafkaAdminClient changes:
- Add the AdminBootstrapAddresses class to handle extracting bootstrap.servers or
bootstrap.controllers from the config map for KafkaAdminClient.
- In AdminMetadataManager, store the new usingBootstrapControllers boolean. Generalize
authException to encompass the concept of fatal exceptions in general. (For example, the
fatal exception where we talked to the wrong node type.) Treat
MismatchedEndpointTypeException and UnsupportedEndpointTypeException as fatal exceptions.
- Extend NodeProvider to include information about whether bootstrap.controllers is supported.
- Modify the APIs described above to support bootstrap.controllers.
Server-side changes:
- Support DescribeConfigsRequest on kcontrollers.
- Add KRaftMetadataCache to the kcontroller to simplify implemeting describeConfigs (and
probably more APIs in the future). It's mainly a wrapper around MetadataImage, so there is
essentially no extra resource consumption.
- Split RuntimeLoggerManager out of ConfigAdminManager to handle the incrementalAlterConfigs
support for BROKER_LOGGER. This is now supported on kcontrollers as well as brokers.
- Fix bug in AuthHelper.computeDescribeClusterResponse that resulted in us always sending back
BROKER as the endpoint type, even on the kcontroller.
Miscellaneous:
- Fix a few places in exceptions and log messages where we wrote "broker" instead of "node".
For example, an exception in NodeApiVersions.java, and a log message in NetworkClient.java.
- Fix the slf4j log prefix used by KafkaRequestHandler logging so that request handlers on a
controller don't look like they're on a broker.
- Make the FinalizedVersionRange constructor public for the sake of a junit test.
- Add unit and integration tests for the above.
Reviewers: David Arthur <mumrah@gmail.com>, Doguscan Namal <namal.doguscan@gmail.com>
The process method inside the tasks needs to be called from within
the processing threads. However, it currently interacts with the
consumer in two ways:
* It resumes processing when the PartitionGroup buffers are empty
* It fetches the lag from the consumer
We introduce updateLags() and
resumePollingForPartitionsWithAvailableSpace() methods that call into
the task from the polling thread, in order to set up the consumer
correctly for the next poll, and extract metadata from the consumer
after the poll.
Reviewer: Bruno Cadonna <bruno@confluent.io>
When a Streams application is subscribed with a pattern to
input topics and an input topic is deleted, the stream thread
transists to PARTITIONS_REVOKED and a rebalance is triggered.
This happens inside the poll call. Sometimes, the poll call
returns before a new assignment is received. That means, Streams
executes the poll loop in state PARTITIONS_REVOKED.
With the state updater enabled processing is also executed in states
other than RUNNING and so processing is also executed when the
stream thread is in state PARTITION_REVOKED. However, that triggers
an IllegalStateException with error message:
No current assignment for partition TEST-TOPIC-A-0
which is a fatal error.
This commit prevents processing when the stream thread is in state
PARTITIONS_REVOKED.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
State updater can get into a busy loop when all tasks are paused, because changelogReader will never return that all changelogs have been read completely. Fix this, by awaiting if updatingTasks is empty.
Related and included: if we are restoring and all tasks are paused, we should return immediately from StoreChangelogReader.
Reviewer: Bruno Cadonna <cadonna@apache.org>
The state directory throws a lock exception during initialization if a task state directory is still locked by the stream thread that previously owned the task. When this happens, Streams catches the lock exception, ignores the exception, and tries to initialize the task in the next exception.
In the state updater code path, we missed catching the lock exception when Streams recycles a task. That leads to the lock exception thrown to the exception handler, which is unexpected and leads to test failures.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
LogCaptureAppender sets the log level in various tests to check if a certain log message is produced. The log level is however never reverted, changing the log level across the board and introducing flakiness due to non-determinism since the log level depends on execution order. Some log messages change the timing inside tests significantly.
Reviewer: Bruno Cadonna <cadonna@apache.org>
We have observed an issue where inter broker SSL listener is not coming up when running with TLSv3/JDK 17 .
SSL debug logs shows that TLSv3 post handshake messages >16K are not getting read and causing SslEngineValidator process to stuck while validating the provided trust/key store.
- Right now, WRAP returns if there is already data in the buffer. But if we need more data to be wrapped for UNWRAP to succeed, we end up looping forever. To fix this, now we always attempt WRAP and only return early on BUFFER_OVERFLOW.
- Update SslEngineValidator to unwrap post-handshake messages from peer when local handshake status is FINISHED.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
We are currently encoding an empty hostNames array to subjectAltName in the keystore. While parsing the certificates in the test this causes the issue - Unparseable SubjectAlternativeName extension due to java.io.IOException: No data available in passed DER encoded value. Up to Java 17, this parsing error was ignored. This PR assigns subjectAltName to null if hostnames are empty.
Co-authored-by: Ismael Juma <ismael@juma.me.uk>
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
This patch refactors the ReplicaManager.appendRecords method and the AddPartitionsToTxnManager class in order to move the logic to identify the transaction coordinator based on the transaction id from the former to the latter. While working on KAFKA-14505, I found pretty annoying that we require to pass the transaction state partition to appendRecords because we have to do the same from the group coordinator. It seems preferable to delegate that job to the AddPartitionsToTxnManager.
Reviewers: Justine Olshan <jolshan@confluent.io>
* Update CI to build with Java 21 instead of Java 20
* Disable spotbugs when building with Java 21 as it doesn't support it yet (filed KAFKA-15492 for
addressing this)
* Disable SslTransportLayerTest.testValidEndpointIdentificationCN with Java 21 (same as Java 20)
Reviewers: Divij Vaidya <diviv@amazon.com>
When a remote log segment contains multiple epoch, then it gets considered for multiple times during breach by retention size/time/start-offset. This will affect the deletion by remote log retention size as it deletes the number of segments less than expected. This is a follow-up of KAFKA-15352
Reviewers: Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>, Satish Duggana <satishd@apache.org>
It offers a quickfix action for certain errors, includes a number of bug fixes and it
introduces a new warning by default (https://github.com/scala/scala/pull/10462).
In addition to the scala version bump, we also fix the new compiler warnings and
bump the scalafmt version (the previous version failed with the new scala version).
Release notes: https://github.com/scala/scala/releases/tag/v2.13.12
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>
This is one of the steps required for kafka to compile with Java 21.
For each case, one of the following fixes were applied:
1. Suppress warning if fixing would potentially result in an incompatible change (for public classes)
2. Add final to one or more methods so that the escape is not possible
3. Replace method calls with direct field access.
In addition, we also fix a couple of compiler warnings related to deprecated references in the `core` module.
See the following for more details regarding the new lint warning:
https://www.oracle.com/java/technologies/javase/21-relnote-issues.html#JDK-8015831
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Chris Egerton <chrise@aiven.io>
This PR is part of #13247
It contains changes to rewrite single test in java.
Intention is reduce changes in parent PR.
Reviewers: Luke Chen <showuon@gmail.com>, Taras Ledkov <tledkov@apache.org>
TopicMetadataRequestManager is responsible for sending topic metadata requests. The manager manages API requests and build the request accordingly. All topic metadata requests are chained, if requesting the same topic, to avoid sending requests repeatedly.
Co-authored-by: Lianet Magrans <lmagrans@confluent.io>
Co-authored-by: Kirk True <kirk@kirktrue.pro>
Reviewers: Kirk True <kirk@kirktrue.pro>, Lianet Magrans <lianetmr@gmail.com>, Jun Rao <junrao@gmail.com>
Endpoint information provided by KafkaConfig can be incomplete in two ways. One is that endpoints
using ephemeral ports will show up as using port 0. Another is that endpoints binding to 0.0.0.0
will show up with a null or blank hostname. Because we were not accounting for this in controller
registration, it was leading to a null pointer dereference when we tried to register a controller
using an endpoint defined as PLAINTEXT://:9092.
This PR adds a ListenerInfo class which can fix both of the causes of incomplete endpoint
information. It also handles serialization to and from various RPC and record formats.
This allows us to remove a lot of boilerplate code and standardize the handling of listeners
between BrokerServer and ControllerServer.
Reviewers: David Arthur <mumrah@gmail.com>
In this PR, we noticed failed tests caused by the verification of log start offset and local log start offset. After investigation: #14347 (comment)
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14347/13/#showFailuresLink
After investigation, I found it's because the scala 2.12 cannot recognize the override method of maybeWaitForAtLeastOneSegmentUpload since it's using varargs in scala. I think there must be some bugs/gaps between java/scala that causes these issue. We can fix it by not using varargs, instead, using the Seq.
This PR adds back the log start offset and local log start offset verification, and make sure all tests passed.
Reviewers: Satish Duggana <satishd@apache.org>, Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
kafka-acls cli prints the following error message on invalid operation:
$ kafka-acls --bootstrap-server xxx:9095 --remove --allow-principal "User:abc" --allow-host * --operation DESCRIBE_CONFIGS --topic xyz
ResourceType TOPIC only supports operations WRITE,DESCRIBE,ALL,READ,CREATE,ALTER,DELETE,DESCRIBE_CONFIGS,ALTER_CONFIGS
The following command actually works:
$ kafka-acls --bootstrap-server xxx:9095 --remove --allow-principal "User:abc" --allow-host * --operation DescribeConfigs --topic xyz
This PR fixes the invalid formating of operations in the error message and adds a space after the comma.