https://issues.apache.org/jira/browse/KAFKA-6685
Added Exception message in `WorkerSinkTask.convertMessages` to distinguish message Key from Value during deserialization to Kafka connect format.
*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*
*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*
Author: Jagadesh Adireddi <adireddijagadesh@gmail.com>
Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4765 from jadireddi/KAFKA-6685---log-message-should-distinguish-key-from-value
test_broker_type_bounce_at_start tries to validate that when the controller is down, the streams client will always fail trying to create the topic; with the current behavior of admin client it is actually not always true: the actual behavior depends on the admin client internals as well as when the controller becomes unavailable during the leader assign partitions phase. I'd suggest at least ignore this test for now until the admin client has more stable (personally I'd even suggest removing this test as its coverage benefits is smaller than its introduced issues to me).
Also adding a few more log4j entries as a result of investigating this issue.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Due to #4644 the consumer connector logs will be much more clean with fewer "broker may not be available" entries. We need to reduce the required frequency from 100 to a smaller number.
I've thought about reducing to just 1, but it may still be transient (i.e. even if broker is starting up you may see a few entries) so I reduced it to 10.
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
* The consumer groups API should expose group state and coordinator information. This information is needed by administrative tools and scripts that access consume groups.
* The partition assignment will be empty when the group is rebalancing. Fix an issue where the adminclient attempted to deserialize this empty buffer.
* Remove nulls from the API and make all collections immutable.
* DescribeConsumerGroupsResult#all should return a result as expected, rather than Void
* Fix exception text for GroupIdNotFoundException, GroupNotEmptyException. It was being filled in as "The group id The group id does not exist was not found" and similar.
Reviewers: Attila Sasvari <asasvari@apache.org>, Andras Beni <andrasbeni@cloudera.com>, Dong Lin <lindong28@gmail.com>, Jason Gustafson <jason@confluent.io>
The type inference doesn't currently work for the join functions in Scala as it doesn't know yet the types of the given KStream[K, V] or KTable[K, V].
The fix here is to curry the joiner function. I personally prefer this notation but this also means it differs more from the Java API.
I believe the diff with the Java API is worth in this case as it's not only solving the type inference but also fits better the Scala way of coding (ex: fold).
Moreover any Scala dev will bug and spend little time on these functions trying to understand why the type inference is not working and then get frustrated to be obliged to be explicit here where it's not harmful to be inferred.
Reviewers: Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>, Ismael Juma <ismael@juma.me.uk>
This is a follow-up to #5022 which added documentation to the Processor
interface. This commit adds similar documentation to Transformer and
ValueTransformer.
Also, s/processor/transformer/ in the close() docs.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This is a change to improve resource cleanup for sink tasks and source tasks. Now `Task.stop()` is called from both `WorkerSinkTask.close()` and `WorkerSourceTask.close()`.
It is called from `WorkerXXXTask.close()` since this method is called in the `finally` block of `WorkerTask.run()`, and Connect developers use `stop()` to clean up resources.
Author: Robert Yokota <rayokota@gmail.com>
Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#5020 from rayokota/K6566-improve-connect-resource-cleanup
The wakeup-based strategy caused more problems than it
solved, so we'll instead focus on KIP-266.
Revert commit 2d8049b.
Keep the metrics addition and the new test util.
Also keep the tests for shutdown, although they must be ignored until
poll(Duration) is done in the scope of KIP-266.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
1. In InternalTopologyBuilder#topicGroups, which is used in StreamsPartitionAssignor, look for book-kept storeToChangelogTopic map before creating a new internal changelog topics. In this way if the source KTable is created, its source topic stored in storeToChangelogTopic will be used.
2. Added unit test (confirmed that without 1) it will fail).
3. MINOR: removed TODOs that are related to removed KStreamBuilder.
4. MINOR: removed TODOs in StreamsBuilderTest util functions and replaced with TopologyWrapper.
5. MINOR: removed StreamsBuilderTest#testFrom as it is already covered by TopologyTest#shouldNotAllowToAddSourcesWithSameName, plus it requires KStreamImpl.SOURCE_NAME which should be a package private field of the KStreamImpl.
Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Matthias
J. Sax <matthias@confluent.io>
We would like to also export the producer metrics from StreamThread just like consumer metrics, so that we could gain more visibility of stream application. The approach is to pass in the threadProducer into the StreamThread so that we could export its metrics in dynamic.
Note that this is a pure internal change that doesn't require a KIP, and in the future we also want to export admin client metrics. A followup KIP for admin client will be created once this is merged.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This patch adds a few metrics that are useful for monitoring controller health. See KIP-237 for more detail.
Author: Dong Lin <lindong28@gmail.com>
Reviewers: Jun Rao <junrao@gmail.com>
Closes#4392 from lindong28/KAFKA-3473
1. Remove TopologyBuilder, TopologyBuilderException, KStreamBuilder,
2. Completed the leftover work of https://issues.apache.org/jira/browse/KAFKA-5660, when we remove TopologyBuilderException.
3. Added MockStoreBuilder to replace MockStateStoreSupplier, remove all XXStoreSupplier except StateStoreSupplier as it is still referenced in the logical streams graph.
4. Minor: rename KStreamsFineGrainedAutoResetIntegrationTest.java to FineGrainedAutoResetIntegrationTest.java.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Removed usage of deprecated AdminClient from StreamsResetter
No additional tests are required.
Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Also include a few clean-ups:
* Method/variable/parameter renames to make them consistent with
the class name
* Return `ApiVersion` from `minSupportedFor`
* Use `values` to remove some code duplication
* Reduce duplication in `ApiVersion` by introducing the `shortVersion`
method and building the versions map programatically
* Avoid unnecessary `regex` in `ApiVersion.apply`
* Added scaladoc to a few methods
Some of these were originally discussed in:
https://github.com/apache/kafka/pull/4583#pullrequestreview-98089400
Added a test for `ApiVersion.shortVersion`. Relying on existing tests
for the rest since there is no change in behaviour.
Reviewers: Jason Gustafson <jason@confluent.io>
Also removed the InternalValueTransformerWithKey / Supplier which is used to mock away the deprecated punctuate function.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Implementation of KIP-279 as described here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over
In summary:
- Added leader_epoch to OFFSET_FOR_LEADER_EPOCH_RESPONSE
- Leader replies with the pair( largest epoch less than or equal to the requested epoch, the end offset of this epoch)
- If Follower does not know about the leader epoch that leader replies with, it truncates to the end offset of largest leader epoch less than leader epoch that leader replied with, and sends another OffsetForLeaderEpoch request. That request contains the largest leader epoch less than leader epoch that leader replied with.
Reviewers: Dong Lin <lindong28@gmail.com>, Jun Rao <junrao@gmail.com>
1. Remove the deprecated StateStoreSuppliers, and the corresponding Stores.create() functions and factories: only the base StateStoreSupplier and MockStoreSupplier were still preserved as they are needed by the deprecated TopologyBuilder and KStreamBuilder. Will remove them in a follow-up PR.
2. Add TopologyWrapper.java as the original InternalTopologyBuilderAccessor was removed, but I realized it is still needed as of now.
3. Minor: removed StateStoreTestUtils.java and inline its logic in its callers since now with StoreBuilder it is just a one-liner.
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This is continuation of #4978.
From Guozhang:
I think to fix this issue, in init we could consider switching the steps of 1 and 2:
initInternal(context);
underlying.init(context, root);
since
volatile boolean open = false;
it should be sufficient. In this case the check on step 3) will fail if underlying.init is not completed and we will throw InvalidStateStoreException.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
When AdminClient gets a NOT_CONTROLLER error, it should refresh its metadata and retry the request, rather than making the end-user deal with NotControllerException.
Move AdminClient's metadata management outside of NetworkClient and into AdminMetadataManager. This will make it easier to do more sophisticated metadata management in the future, such as implementing a NodeProvider which fetches the leaders for topics.
Rather than manipulating newCalls directly, the AdminClient service thread now drains it directly into pendingCalls. This minimizes the amount of locking we have to do, since pendingCalls is only accessed from the service thread.
Grow buffers in log cleaner to hold one message set after sanity check even if message set is bigger than max.message.bytes.
Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
This patch removed a completedFetch from the completedFetches queue upon a failed parse if it contains no records. The following scenario explains why this is needed for an instance of this case – i.e. in TopicAuthorizationException.
0. Let's assume a scenario, in which the consumer is attempting to read from a topic without the necessary read permission.
1. In Fetcher#fetchedRecords(), after peeking the completedFetches, the Fetcher#parseCompletedFetch(CompletedFetch) throws a TopicAuthorizationException (as expected).
2. Fetcher#fetchedRecords() passes the TopicAuthorizationException up without having a chance to poll completedFetches. So, the same completedFetch remains at the completedFetches queue.
3. Upon following calls to Fetcher#fetchedRecords(), peeking the completedFetches will always return the same completedFetch independent of any updates to the ACL that the topic is trying to read from.
4. Hence, despite the creation of an ACL with correct permissions, once the consumer sees the TopicAuthorizationException, it will be unable to recover without a bounce.
Author: Adem Efe Gencer <agencer@linkedin.com>
Reviewers: Jiangjie (Becket) Qin <becket.qin@gmail.com>
Closes#4974 from efeg/fix/parseCompletedFetchRemainsInQueue
Switch from sum operations to subtraction to avoid type casting in checks and type overflow during `FlieLogInputStream` work, especially in cases where property `log.segment.bytes` was set close to the `Integer.MAX_VALUE` and used as a `position` inside `nextBatch()` function.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Fixes a deadlock between the controller's beforeInitializingSession callback which holds the zookeeper client initialization lock while awaiting completion of an asynchronous event which itself depends on the same lock.
Also catch and log callback exceptions to ensure the ZooKeeper reconnection takes place.
Finally, configure KafkaScheduler in ZooKeeperClient to have at least 1 thread.
Added tests that fail or hang without the changes in this PR.
Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Serdes are confusing in the Scala wrapper:
* We have wrappers around Serializer, Deserializer and Serde which are not very useful.
* We have Serdes in 2 places org.apache.kafka.common.serialization.Serde and in DefaultSerdes, instead we should be having only one place where to find all the Serdes.
I wanted to do this PR before the release as this is a breaking change.
This shouldn't add more so the current tests should be enough.
Reviewers: Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>