To make static consumer group members more persistent, we want to avoid kicking out unjoined members through rebalance timeout. Essentially we allow static members to participate in a rebalance using their old subscription without sending a JoinGroup. The only catch is that an unjoined static member might be the current group leader, and we may need to elect a different leader.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Jason Gustafson <jason@confluent.io>
SslFactory: split the part of SslFactory that creates SSLEngine instances into SslEngineBuilder. When (re)configuring, we simply create a new SslEngineBuilder. This allows us to make all the builder fields immutable. It also simplifies the logic for reconfiguring. Because we sometimes need to test old SslEngine instances against new ones, being able to use both the old and the new builder at once is useful.
Create an enum named SslClientAuth which encodes the possible values for ssl.client.auth. This will simplify the handling of this configuration.
SslTransportLayer#maybeProcessHandshakeFailure should treat an SSLHandshakeException with a "Received fatal alert" message as a handshake error (and therefore an authentication error.)
SslFactoryTest: add some line breaks for very long lines.
ConfigCommand#main: when terminating the command due to an uncaught exception, log the exception using debug level in slf4j, in addition to printing it to stderr. This makes it easier to debug failing junit tests, where stderr may not be kept, or may be reordered with respect to other slf4j messages. The use of debug level is consistent with how we handle other types of exceptions in ConfigCommand#main.
StateChangeLogMerger#main: spell out the full name of scala.io.Source rather than abbreviating it as io.Source. This makes it clearer that it is part of the Scala standard library. It also avoids compiler errors when other libraries whose groupId starts with "io" are used in the broker.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Enable reconfiguration of SSL keystores and truststores in client-side channel builders used by brokers for controller, transaction coordinator and replica fetchers. This enables brokers using TLS mutual authentication for inter-broker listener to use short-lived certs that may be updated before expiry without restarting brokers.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
`ConsumerTopicCreationTest` relied on `KafkaConsumer#poll` to send a `MetadataRequest` within 100ms to verify if a topic is auto created or not. This is brittle and does not guarantee if the request made it to the broker or was processed successfully. This PR fixes the flaky test by adding another topic; we wait until we consume a previously produced record to this topic. This ensures MetadataRequest was processed and we could then check if the topic we're interested in was created or not.
Reviewers: Boyang Chen <bchen11@outlook.com>, Jason Gustafson <jason@confluent.io>
MINOR: update documentation for the log cleaner max compaction lag feature (KIP-354) implemented in KAFKA-7321
Author: Xiongqi Wu <xiowu@linkedin.com>
Reviewer: Joel Koshy <jjkoshy@gmail.com>
The main problem we are trying to solve here is the batching of StopReplica requests and the lack of test coverage for `ControllerChannelManager`. Addressing the first problem was straightforward, but the second problem required quite a bit of work because of the dependence on `KafkaController` for all of the events. It seemed to make sense to separate the events from the processing of events so that we could remove this dependence and improve testability. With the refactoring, I was able to add test cases covering most of the logic in `ControllerChannelManager` including the generation of requests and the expected response handling logic. Note that I have not actually changed any of the event handling logic in `KafkaController`.
While refactoring this logic, I found that the event queue time metric was not being correctly computed. The problem is that many of the controller events were singleton objects which inherited the `enqueueTimeMs` field from the `ControllerEvent` trait. This would never get updated, so queue time would be skewed.
Reviewers: Jun Rao <junrao@gmail.com>
KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354)
Records become eligible for compaction after the specified time interval.
Author: Xiongqi Wu <xiowu@linkedin.com>
Reviewer: Joel Koshy <jjkoshy@gmail.com>
The log cleaner attempts to preserve the last entry for each producerId in order to ensure that sequence/epoch state is not lost. The current validation checks only the last sequence number for each producerId in order to decide whether a batch should be retained. There are two problems with this:
1. Sequence numbers are not unique alone. It is the tuple of sequence number and epoch which is uniquely defined.
2. The group coordinator always writes batches beginning with sequence number 0, which means there could be many batches which have the same sequence number.
The complete fix for the second issue would probably add proper sequence number bookkeeping in the coordinator. For now, we have left the coordinator implementation unchanged and changed the cleaner logic to use the last offset written by a producer instead of the last sequence number.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Because of how conversions between Java collections and Scala collections work, ImplicitLinkedHashMultiSet objects were being treated as unordered in some contexts where they shouldn't be. This broke JOIN_GROUP handling.
This patch renames ImplicitLinkedHashMultiSet to ImplicitLinkedHashMultCollection. The order of Collection objects will be preserved when converting to scala. Adding Set and List "views" to the Collection gives us a more elegant way of accessing that functionality when needed.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
* Fixed bug in Struct.equals where we returned prematurely and added tests
* Update RequestResponseTest to check that `equals` and `hashCode` of
the struct is the same after serialization/deserialization only when possible.
* Use `Objects.equals` and `Long.hashCode` to simplify code
* Removed deprecated usages of `JUnitTestSuite`
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Implements KIP-361 to provide a consumer configuration to specify whether subscribing or assigning a non-existent topic would result in it being automatically created or not.
Reviewers: Jason Gustafson <jason@confluent.io>
This patch refactors the implementation of the --version option and moves it into the default command options. This has the benefit of automatically including it in the usage output of the command line tools. Several tools had to be manually updated because they did not use the common options.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
This patch ensures that the log end offset of each partition is initialized consistently with the checkpointed log start offset.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
During a partial message format upgrade, it is possible for the message format to flap between new and old versions. If we detect that data appended to the log is on an old format, we can clear the leader epoch cache so that we revert to truncation by high watermark. Once the upgrade completes and all replicas are on the same format, we will append to the epoch cache as usual. Note this is related to KAFKA-7897, which handles message format downgrades through configuration.
Reviewers: Jun Rao <junrao@gmail.com>
This patch fixes a bug in the sending of transactional requests. We need to call `KafkaClient.send` with an updated current time. Failing to do so can result in an `IllegalStateExcepton` which leaves the producer effectively dead since the in-flight correlation id has been set, but no request has been sent. To avoid the same problem in the future, we update the in flight correlationId only after sending the request.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This is the first diff for the implementation of JoinGroup logic for static membership. The goal of this diff contains:
* Add group.instance.id to be unique identifier for consumer instances, provided by end user;
Modify group coordinator to accept JoinGroupRequest with/without static membership, refactor the logic for readability and code reusability.
* Add client side support for incorporating static membership changes, including new config for group.instance.id, apply stream thread client id by default, and new join group exception handling.
* Increase max session timeout to 30 min for more user flexibility if they are inclined to tolerate partial unavailability than burdening rebalance.
* Unit tests for each module changes, especially on the group coordinator logic. Crossing the possibilities like:
6.1 Dynamic/Static member
6.2 Known/Unknown member id
6.3 Group stable/unstable
6.4 Leader/Follower
The rest of the 345 change will be broken down to 4 separate diffs:
* Avoid kicking out members through rebalance.timeout, only do the kick out through session timeout.
* Changes around LeaveGroup logic, including version bumping, broker logic, client logic, etc.
* Admin client changes to add ability to batch remove static members
* Deprecate group.initial.rebalance.delay
Reviewers: Liquan Pei <liquanpei@gmail.com>, Stanislav Kozlovski <familyguyuser192@windowslive.com>, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
The controller maintains state across `ControllerContext`, `PartitionStateMachine`, `ReplicaStateMachine`, and `TopicDeletionManager`. None of this state is actually isolated from the rest. For example, topics undergoing deletion are intertwined with the partition and replica states. As a consequence of this, each of these components tends to be dependent on all the rest, which makes testing and reasoning about the system difficult. This is a first step toward untangling all the state. This patch moves it all into `ControllerContext` and removes many of the circular dependencies. So far, this is mostly a direct translation, but in the future we can add additional validation in `ControllerContext` to make sure that state is maintained consistently.
Additionally, this patch adds several mock objects to enable easier testing: `MockReplicaStateMachine` and `MockPartitionStateMachine`. These have simplified logic for updating the current state. This is used to create some new test cases for `TopicDeletionManager`.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jun Rao <junrao@gmail.com>
We suspect the problem might be a race condition after broker startup where the consumer has yet to find the coordinator and rebalance. The fix here rolls all the brokers first and then waits for the expected exception.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Gwen Shapira
Closes#6608 from hachikuji/KAFKA-7965
This patch fixes a bug in the append logic which can cause duplicate offsets to be appended to the log when the append to the transaction index fails. Rather than incrementing the log end offset after the index append, we do it immediately after the records are written to the log. If the index append later fails, we do two things:
1) We ensure that the last stable offset cannot advance. This guarantees that the aborted data will not be returned to the user until the transaction index contains the corresponding entry.
2) We skip updating the end offset of the producer state. When recovering the log, we will have to reprocess the log and write the index entries.
Reviewers: Jun Rao <junrao@gmail.com>
We should include partition/offset information when we raise exceptions during producer state validation. This saves a lot of the discovery work to figure out where the problem occurred. This patch also includes a new test case to verify additional coordinator fencing cases.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Most of the time, the group coordinator runs on broker 1. Occasionally the group coordinator will be placed on broker 2. If that's the case, the loop starting at line 320 have no chance to check and update `kickedOutConsumerIdx`. A quick fix is to safely do another round of loop to ensure `kickedOutConsumerIdx` always be checked after the last broker restart.
Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Jason Gustafson <jason@confluent.io>
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Viktor Somogyi <viktorsomogyi@gmail.com>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>
* Describe/Delete/Reset offsets on multiple consumer groups at a time (including each group by repeating `--group` parameter)
* Describe/Delete/Reset offsets on ALL consumer groups at a time (add new `--all-groups` option similar to `--all-topics`)
* Reset plan CSV file generation reworked: structure updated to support multiple consumer groups and make sure that CSV file generation is done properly since there are no restrictions on consumer group names and symbols like commas and quotes are allowed.
* Extending data output table format by adding `GROUP` column for all `--describe` queries
This patch updates the InitProducerId request API to use the generated sources. It also fixes a small bug in the DescribeAclsRequest class where we were using the wrong api key.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Colin McCabe <cmccabe@apache.org>
ConsumerBounceTest redundantly executes a couple test cases which were included in the abstract class `BaseConsumerTest`. We should try to keep a cleaner separation of testing logic and utility logic so that this does not happen (the build time is long enough without doing unnecessary work). This PR moves the cluster initialization and consumer utilities out of BaseConsumerTest and into a new class AbstractConsumerTest. We then let ConsumerBounceTest extend AbstractConsumerTest.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This PR should help address the flakiness in the ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup test (https://issues.apache.org/jira/browse/KAFKA-7965). I tested this locally and have verified it significantly reduces flakiness - 25/25 tests now pass. Running the test 25 times in trunk, I'd get `18/25` passes.
It does so by reusing the less-flaky consumer integration testing functionality inside `BaseConsumerTest`. Most notably, the test now makes use of the `ConsumerAssignmentPoller` class - each consumer now polls non-stop rather than the more batch-oriented polling we had in `ConsumerBounceTest#waitForRebalance()`.
Reviewers: Jason Gustafson <jason@confluent.io>
Ensure that modification time is checked against the file used to create the SSLContext that is in-use so that SSLContext is updated whenever file is modified and a config update request is received.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This change adds waits for metadata updates after killing the broker in order to make the tests more stable.
Author: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Closes#6505 from viktorsomogyi/flaky-min-isr-test