An attempt to refactor current coordinator logic.
Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Konstantine Karantasis <konstantine@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
A `Partition` object contain one or many `Replica` objects. These replica
objects in turn can have the "log" if the replica corresponds to the
local node. All the code in Partition or ReplicaManager peek into
replica object to fetch the log if they need to operate on that. As
replica object can represent a local replica or a remote one, this
lead to a bunch of "if-else" code in log fetch and offset update code.
NOTE: In addition to a "log" that is in use during normal operation, if
an alter log directory command is issued, we also create a future log
object. This object catches up with local log and then we switch the log
directory. So temporarily a Partition can have two local logs. Before
this change both logs are inside replica objects.
This change is an attempt to untangle this relationship. In particular
it moves `Log` from `Replica` into `Partition`. So a partition contains
a local log to which all writes go and possibly a "future log" if the
partition is being moved between directories. Additionally, it maintains
a list of remote replicas for offset and "caught up time" data that it uses
for replication protocol.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
Include the topic config `segment.bytes`.
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Reviewers: Gwen Shapira
Closes#6945 from vahidhashemian/minor/update_streams_quickstart_doc
1. Add new fields of subscription / assignment and bump up consumer protocol to v2.
2. Update tests to make sure old versioned protocol can be successfully deserialized, and new versioned protocol can be deserialized by old byte code.
Reviewers: Boyang Chen <boyang@confluent.io>, Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
The OffsetFetch requires Topic Describe permission. If a client does not have this, we return TOPIC_AUTHORIZATION_FAILED at the partition level. Currently the consumer does not handle this error explicitly, but raises it as a generic `KafkaException`. For consistency with other APIs and to fix transient test failures in `PlaintextEndToEndAuthorizationTest`, we should raise `TopicAuthorizationFailedException` instead.
Reviewers: Ismael Juma <ismael@juma.me.uk>
De-duplicate the common case in which the prior value is the same as the old value.
Reviewers: Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
- Timeout occurred due to initial slow rebalancing.
- Added code to wait until `KafkaStreams` instance is in state RUNNING to check registration of metrics and in state NOT_RUNNING to check deregistration of metrics.
- I removed all other wait conditions, because they are not needed if `KafkaStreams` instance is in the right state.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
The idempotent producer attempts to detect spurious UNKNOWN_PRODUCER_ID errors and handle them by reassigning sequence numbers to the inflight batches. The inflight batches are tracked in a PriorityQueue. The problem is that the reassignment of sequence numbers depends on the iteration order of PriorityQueue, which does not guarantee any ordering. So this can result in sequence numbers being assigned in the wrong order. This patch fixes the problem by using a sorted set instead of a priority queue so that the iteration order preserves the sequence order. Note that resetting sequence numbers is an exceptional case.
This patch also fixes KAFKA-8484, which can cause an IllegalStateException when the producerId is reset while there are pending produce requests inflight. The solution is to ensure that sequence numbers are only reset if the producerId of a failed batch corresponds to the current producerId.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Makes HostedPartition a sealed trait and make all of the match cases explicit.
Reviewers: Vikas Singh <soondenana@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
This PR fixes a bug in static group membership. Previously we limit the `member.id` replacement in JoinGroup to only cases when the group is in Stable. This is error-prone and could potentially allow duplicate consumers reading from the same topic. For example, imagine a case where two unknown members join in the `PrepareRebalance` stage at the same time.
The PR fixes the following things:
1. Replace `member.id` at any time we see a known static member rejoins group with unknown member.id
2. Immediately fence any ongoing join/sync group callback to early terminate the duplicate member.
3. Clearly handle Dead/Empty cases as exceptional.
4. Return old leader id upon static member leader rejoin to avoid trivial member assignment being triggered.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
Messages containing key and value were moved to the TRACE logging level, however the exception is still adding the key and value.
This commits remove the key and value from StreamsException.
Reviewers: Bill Bejeck <bbejeck@gmail.com>
Currently we load the high watermark checkpoint separately for every replica that we load. This patch makes this loading logic lazy and caches the loaded map while a LeaderAndIsr request is being handled.
Reviewers: Jun Rao <junrao@gmail.com>
The ResetIntegrationTest has experienced several failures and it seems the current timeout of 10 seconds may not be enough time
Reviewers: Matthias J. Sax <mjsax@apache.org>, Boyang Chen <boyang@confluent.io>
We have seen this failing recently due to the follower error:
```
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:366)
at scala.None$.get(Option.scala:364)
at kafka.admin.PreferredReplicaLeaderElectionCommandTest.getLeader(PreferredReplicaLeaderElectionCommandTest.scala:101)
at kafka.admin.PreferredReplicaLeaderElectionCommandTest.testNoopElection(PreferredReplicaLeaderElectionCommandTest.scala:240)
```
We need to wait for the leader to be available.
Reviewers: David Arthur <mumrah@gmail.com>
We've seen `ReplicaVerificationToolTest.test_replica_lags` fail occasionally due to errors such as the following:
```
RemoteCommandError: ubuntuworker7: Command 'kill -15 2896' returned non-zero exit status 1. Remote error message: bash: line 0: kill: (2896) - No such process
```
The problem seems to be a shutdown race condition when using `max_messages` with the producer. The process may already be gone which will cause the signal to fail.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Gwen Shapira
Closes#6906 from hachikuji/fix-failing-replicat-verification-test
We see the upgrade test failing from time to time. I looked into it and found that the root cause is basically that the test throughput can be too high for the 0.9 producer to make progress. Eventually it reaches a point where it has a huge backlog of timed out requests in the accumulator which all have to be expired. We see a long run of messages like this in the output:
```
{"exception":"class org.apache.kafka.common.errors.TimeoutException","time_ms":1559907386132,"name":"producer_send_error","topic":"test_topic","message":"Batch Expired","class":"class org.apache.kafka.tools.VerifiableProducer","value":"335160","key":null}
{"exception":"class org.apache.kafka.common.errors.TimeoutException","time_ms":1559907386132,"name":"producer_send_error","topic":"test_topic","message":"Batch Expired","class":"class org.apache.kafka.tools.VerifiableProducer","value":"335163","key":null}
{"exception":"class org.apache.kafka.common.errors.TimeoutException","time_ms":1559907386133,"name":"producer_send_error","topic":"test_topic","message":"Batch Expired","class":"class org.apache.kafka.tools.VerifiableProducer","value":"335166","key":null}
{"exception":"class org.apache.kafka.common.errors.TimeoutException","time_ms":1559907386133,"name":"producer_send_error","topic":"test_topic","message":"Batch Expired","class":"class org.apache.kafka.tools.VerifiableProducer","value":"335169","key":null}
```
This can continue for a long time (I have observed up to 1 min) and prevents the producer from successfully writing any new data. While it is busy expiring the batches, no data is getting delivered to the consumer, which causes it to eventually raise a timeout.
```
kafka.consumer.ConsumerTimeoutException
at kafka.consumer.NewShinyConsumer.receive(BaseConsumer.scala:50)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:109)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:69)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:47)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
```
The fix here is to reduce the throughput, which seems reasonable since the purpose of the test is to verify the upgrade, which does not demand heavy load. Note that I investigated several failing instances of this test going back to 1.0 and saw a similar pattern, so there does not appear to be a regression.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Gwen Shapira
Closes#6907 from hachikuji/lower-throughput-for-upgrade-test
As title suggested, we boost 3 stream instances stream job with one minute session timeout, and once the group is stable, doing couple of rolling bounces for the entire cluster. Every rejoin based on restart should have no generation bump on the client side.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>
We see this failure from time to time:
```
java.lang.AssertionError: expected:<1> but was:<0>
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.failNotEquals(Assert.java:835)
at org.junit.Assert.assertEquals(Assert.java:647)
at org.junit.Assert.assertEquals(Assert.java:633)
at kafka.api.TransactionsTest.testFencingOnTransactionExpiration(TransactionsTest.scala:512)
```
The cause is probably that we are using `consumeRecordsFor` which has no expectation on the number of records to fetch and a timeout of just 1s. This patch changes the code to use `consumeRecords` and the default 15s timeout.
Note we have also fixed a bug in the test case itself, which was using the wrong topic for the second write, which meant it could never have failed in the anticipated way anyway.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Gwen Shapira
Closes#6905 from hachikuji/fix-flaky-transaction-test
This patch adds test cases for the leader election command added in KIP-460.
Reviewers: Vikas Singh, David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
In RocksDBTimestampedStore#openRocksDB we try to open a db with two column families. If this succeeds but the first column family is empty (db.newIterator.seekToFirst.isValid() == false) we never actually close its ColumnFamilyHandle
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Quick tech debt cleanup. For some reason StreamsPartitionAssignor uses an InternalTopicMetadata class which wraps an InternalTopicConfig object along with the number of partitions. But InternalTopicConfig already has a numPartitions field, so we should just use it directly instead.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
This patch checks for errors handling a fetch request before updating follower state. Previously we were unsafely passing the failed `LogReadResult` with most fields set to -1 into `Replica` to update follower state. Additionally, this patch attempts to improve the test coverage for ISR shrinking and expansion logic in `Partition`.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
The Dead state in the coordinator is used for groups which are either pending deletion or migration to a new coordinator. Currently requests received while in this state result in an UNKNOWN_MEMBER_ID which causes consumers to reset the memberId. This is a problem for KIP-345 since it can cause an older member to fence a newer member. This patch changes the error code returned in this state to COORDINATOR_NOT_AVAILABLE, which causes the consumer to rediscover the coordinator, but not reset the memberId.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
This commit makes three changes:
- Adds a constructor for NewTopic(String, Optional<Integer>, Optional<Short>)
which allows users to specify Optional.empty() for numPartitions or
replicationFactor in order to use the broker default.
- Changes AdminManager to accept -1 as valid options for replication
factor and numPartitions (resolving to broker defaults).
- Makes --partitions and --replication-factor optional arguments when creating
topics using kafka-topics.sh.
- Adds a dependency on scalaJava8Compat library to make it simpler to
convert Scala Option to Java Optional
Reviewers: Ismael Juma <ismael@juma.me.uk>, Ryanne Dolan <ryannedolan@gmail.com>, Jason Gustafson <jason@confluent.io>
The goals for this small diff are:
1. Give user guidance if they want to relax commit timeout threshold
2. Indicate the code path where timeout exception was caught
Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
Reviewers: Bill Bejeck <bill@confluent.io>, Boyang Chen <boyang@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Guozhang Wang <guozhang@confuent.io>
When Connect forwards a REST request from one worker to another, the Authorization header was not forwarded. This commit changes the Connect framework to add include the authorization header when forwarding requests to other workers.
Author: Hai-Dang Dam <damquanghaidang@gmail.com>
Reviewers: Robert Yokota <rayokota@gmail.com>, Randall Hauch <rhauch@gmail.com>
* StreamsMetricsImpl wraps the Kafka Streams' metrics registry and provides logic to create
and register sensors and their corresponding metrics. An example for such logic can be found in
threadLevelSensor(). Furthermore, StreamsMetricsmpl keeps track of the sensors on the
different levels of an application, i.e., thread, task, etc., and provides logic to remove sensors per
level, e.g., removeAllThreadLevelSensors(). There is one StreamsMetricsImpl object per
application instance.
* ThreadMetrics contains only static methods that specify all built-in thread-level sensors and
metrics and provide logic to register and retrieve those thread-level sensors, e.g., commitSensor().
* From anywhere inside the code base with access to StreamsMetricsImpl, thread-level sensors can be accessed by using ThreadMetrics.
* ThreadsMetrics does not inherit from StreamsMetricsImpl anymore.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>