Messages containing key and value were moved to the TRACE logging level, however the exception is still adding the key and value.
This commits remove the key and value from StreamsException.
Reviewers: Bill Bejeck <bbejeck@gmail.com>
Currently we load the high watermark checkpoint separately for every replica that we load. This patch makes this loading logic lazy and caches the loaded map while a LeaderAndIsr request is being handled.
Reviewers: Jun Rao <junrao@gmail.com>
The ResetIntegrationTest has experienced several failures and it seems the current timeout of 10 seconds may not be enough time
Reviewers: Matthias J. Sax <mjsax@apache.org>, Boyang Chen <boyang@confluent.io>
We have seen this failing recently due to the follower error:
```
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:366)
at scala.None$.get(Option.scala:364)
at kafka.admin.PreferredReplicaLeaderElectionCommandTest.getLeader(PreferredReplicaLeaderElectionCommandTest.scala:101)
at kafka.admin.PreferredReplicaLeaderElectionCommandTest.testNoopElection(PreferredReplicaLeaderElectionCommandTest.scala:240)
```
We need to wait for the leader to be available.
Reviewers: David Arthur <mumrah@gmail.com>
We've seen `ReplicaVerificationToolTest.test_replica_lags` fail occasionally due to errors such as the following:
```
RemoteCommandError: ubuntuworker7: Command 'kill -15 2896' returned non-zero exit status 1. Remote error message: bash: line 0: kill: (2896) - No such process
```
The problem seems to be a shutdown race condition when using `max_messages` with the producer. The process may already be gone which will cause the signal to fail.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Gwen Shapira
Closes#6906 from hachikuji/fix-failing-replicat-verification-test
We see the upgrade test failing from time to time. I looked into it and found that the root cause is basically that the test throughput can be too high for the 0.9 producer to make progress. Eventually it reaches a point where it has a huge backlog of timed out requests in the accumulator which all have to be expired. We see a long run of messages like this in the output:
```
{"exception":"class org.apache.kafka.common.errors.TimeoutException","time_ms":1559907386132,"name":"producer_send_error","topic":"test_topic","message":"Batch Expired","class":"class org.apache.kafka.tools.VerifiableProducer","value":"335160","key":null}
{"exception":"class org.apache.kafka.common.errors.TimeoutException","time_ms":1559907386132,"name":"producer_send_error","topic":"test_topic","message":"Batch Expired","class":"class org.apache.kafka.tools.VerifiableProducer","value":"335163","key":null}
{"exception":"class org.apache.kafka.common.errors.TimeoutException","time_ms":1559907386133,"name":"producer_send_error","topic":"test_topic","message":"Batch Expired","class":"class org.apache.kafka.tools.VerifiableProducer","value":"335166","key":null}
{"exception":"class org.apache.kafka.common.errors.TimeoutException","time_ms":1559907386133,"name":"producer_send_error","topic":"test_topic","message":"Batch Expired","class":"class org.apache.kafka.tools.VerifiableProducer","value":"335169","key":null}
```
This can continue for a long time (I have observed up to 1 min) and prevents the producer from successfully writing any new data. While it is busy expiring the batches, no data is getting delivered to the consumer, which causes it to eventually raise a timeout.
```
kafka.consumer.ConsumerTimeoutException
at kafka.consumer.NewShinyConsumer.receive(BaseConsumer.scala:50)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:109)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:69)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:47)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
```
The fix here is to reduce the throughput, which seems reasonable since the purpose of the test is to verify the upgrade, which does not demand heavy load. Note that I investigated several failing instances of this test going back to 1.0 and saw a similar pattern, so there does not appear to be a regression.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Gwen Shapira
Closes#6907 from hachikuji/lower-throughput-for-upgrade-test
As title suggested, we boost 3 stream instances stream job with one minute session timeout, and once the group is stable, doing couple of rolling bounces for the entire cluster. Every rejoin based on restart should have no generation bump on the client side.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>
We see this failure from time to time:
```
java.lang.AssertionError: expected:<1> but was:<0>
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.failNotEquals(Assert.java:835)
at org.junit.Assert.assertEquals(Assert.java:647)
at org.junit.Assert.assertEquals(Assert.java:633)
at kafka.api.TransactionsTest.testFencingOnTransactionExpiration(TransactionsTest.scala:512)
```
The cause is probably that we are using `consumeRecordsFor` which has no expectation on the number of records to fetch and a timeout of just 1s. This patch changes the code to use `consumeRecords` and the default 15s timeout.
Note we have also fixed a bug in the test case itself, which was using the wrong topic for the second write, which meant it could never have failed in the anticipated way anyway.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Gwen Shapira
Closes#6905 from hachikuji/fix-flaky-transaction-test
This patch adds test cases for the leader election command added in KIP-460.
Reviewers: Vikas Singh, David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
In RocksDBTimestampedStore#openRocksDB we try to open a db with two column families. If this succeeds but the first column family is empty (db.newIterator.seekToFirst.isValid() == false) we never actually close its ColumnFamilyHandle
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Quick tech debt cleanup. For some reason StreamsPartitionAssignor uses an InternalTopicMetadata class which wraps an InternalTopicConfig object along with the number of partitions. But InternalTopicConfig already has a numPartitions field, so we should just use it directly instead.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
This patch checks for errors handling a fetch request before updating follower state. Previously we were unsafely passing the failed `LogReadResult` with most fields set to -1 into `Replica` to update follower state. Additionally, this patch attempts to improve the test coverage for ISR shrinking and expansion logic in `Partition`.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
The Dead state in the coordinator is used for groups which are either pending deletion or migration to a new coordinator. Currently requests received while in this state result in an UNKNOWN_MEMBER_ID which causes consumers to reset the memberId. This is a problem for KIP-345 since it can cause an older member to fence a newer member. This patch changes the error code returned in this state to COORDINATOR_NOT_AVAILABLE, which causes the consumer to rediscover the coordinator, but not reset the memberId.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
This commit makes three changes:
- Adds a constructor for NewTopic(String, Optional<Integer>, Optional<Short>)
which allows users to specify Optional.empty() for numPartitions or
replicationFactor in order to use the broker default.
- Changes AdminManager to accept -1 as valid options for replication
factor and numPartitions (resolving to broker defaults).
- Makes --partitions and --replication-factor optional arguments when creating
topics using kafka-topics.sh.
- Adds a dependency on scalaJava8Compat library to make it simpler to
convert Scala Option to Java Optional
Reviewers: Ismael Juma <ismael@juma.me.uk>, Ryanne Dolan <ryannedolan@gmail.com>, Jason Gustafson <jason@confluent.io>
The goals for this small diff are:
1. Give user guidance if they want to relax commit timeout threshold
2. Indicate the code path where timeout exception was caught
Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
Reviewers: Bill Bejeck <bill@confluent.io>, Boyang Chen <boyang@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Guozhang Wang <guozhang@confuent.io>
When Connect forwards a REST request from one worker to another, the Authorization header was not forwarded. This commit changes the Connect framework to add include the authorization header when forwarding requests to other workers.
Author: Hai-Dang Dam <damquanghaidang@gmail.com>
Reviewers: Robert Yokota <rayokota@gmail.com>, Randall Hauch <rhauch@gmail.com>
* StreamsMetricsImpl wraps the Kafka Streams' metrics registry and provides logic to create
and register sensors and their corresponding metrics. An example for such logic can be found in
threadLevelSensor(). Furthermore, StreamsMetricsmpl keeps track of the sensors on the
different levels of an application, i.e., thread, task, etc., and provides logic to remove sensors per
level, e.g., removeAllThreadLevelSensors(). There is one StreamsMetricsImpl object per
application instance.
* ThreadMetrics contains only static methods that specify all built-in thread-level sensors and
metrics and provide logic to register and retrieve those thread-level sensors, e.g., commitSensor().
* From anywhere inside the code base with access to StreamsMetricsImpl, thread-level sensors can be accessed by using ThreadMetrics.
* ThreadsMetrics does not inherit from StreamsMetricsImpl anymore.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
Temporarily restore the SslFactory.sslContext() function, which some connectors use. This function is not a public API and it will be removed eventually. For now, we will mark it as deprecated.
Restart task on reconfiguration under incremental cooperative rebalancing, and keep execution paths separate for config updates between eager and cooperative. Include the group generation in the log message when the worker receives its assignment.
Author: Konstantine Karantasis <konstantine@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
According to KIP-297 a parameter is passed to ConfigProvider with syntax "config.providers.{name}.param.{param-name}". Currently AbstractConfig allows parameters of the format "config.providers.{name}.{param-name}". With this fix AbstractConfig will be consistent with KIP-297 syntax.
Reviewers: Robert Yokota <rayokota@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
Since the originals map passed to AbstractConfig constructor may be immutable, avoid updating this map while resolving indirect config variables. Instead a new ResolvingMap instance is now used to store resolved configs.
Reviewers: Randall Hauch <rhauch@gmail.com>, Boyang Chen <bchen11@outlook.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
When the restored record value is null, we are in danger of NPE during restoration phase.
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Sub-task required to allow to define custom processor names with KStreams DSL(KIP-307) :
- overload methods for stateless operations to accept a Named parameter (filter, filterNot, map, mapValues, foreach, peek, branch, transform, transformValue, flatTransform)
- overload process method to accept a Named parameter
- overload join/leftJoin/outerJoin methods
Reviewers: John Roesler <john@confluent.io>, Boyang Chen <boyang@confluent.io>,
Bill Bejeck <bbejeck@gmail.com>
See also #6684
KTable processors must be supplied with a KTableProcessorSupplier, which in turn requires implementing a ValueGetter, for use with joins and groupings.
For suppression, a correct view only includes the previously emitted values (not the currently buffered ones), so this change also involves pushing the Change value type into the suppression buffer's interface, so that it can get the prior value upon first buffering (which is also the previously emitted value).
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <guozhang@confluent.io>
Remove processedKeys / processedValues / processedWithTimestamps as they are covered with processed already.
Reviewers: Matthias J. Sax <mjsax@apache.org>, John Roesler <john@confluent.io>, Boyang Chen <boyang@confluent.io>
It is possible for the offset of a partition to be changed while we are in the middle of validation. If the OffsetForLeaderEpoch request is in-flight and the offset changes, we need to redo the validation after it returns. We had a check for this situation previously, but it was only checking if the current leader epoch had changed. This patch fixes this and moves the validation in `SubscriptionState` where it can be protected with a lock.
Additionally, this patch adds test cases for the SubscriptionState validation API. We fix a small bug handling broker downgrades. Basically we should skip validation if the latest metadata does not include leader epoch information.
Reviewers: David Arthur <mumrah@gmail.com>
Fix KAFKA-8187: State store record loss across multiple reassignments when using standby tasks.
Do not let the thread to transit to RUNNING until all tasks (including standby tasks) are ready.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>
Now that we can configure RocksDB to bound the total memory we should include docs describing how, as well as touching on some possible options that should be considered when taking advantage of this feature.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jim Galasyn <jim.galasyn@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
A lot of confusion seems to have arisen from the StreamBuilder#addGlobalStore(...ProcessorSupplier) method. Users have assumed they can safely use this to transform records before populating their global state store; unfortunately this results in corrupted data as on restore the records are read directly from the source topic changelog, bypassing their custom processor.
We should probably provide a means to do this at some point but for the time being we should clarify the proper use of #addGlobalStore as it currently functions
Reviewers: Matthias J. Sax <mjsax@apache.org>, Bruno Cadonna <bruno@confluent.io>