`loadClass` needs to be synchronized to protect subsequent calls to `defineClass`.
Details in the javadoc of this PR as well as here too: https://docs.oracle.com/javase/7/docs/technotes/guides/lang/cl-mt.html
/cc ewencp rhauch
Author: Konstantine Karantasis <konstantine@confluent.io>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4428 from kkonstantine/KAFKA-6277-Make-loadClass-thread-safe-for-class-loaders-of-Connect-plugins
When the coordinator is marked unknown, we explicitly disconnect its connection and cancel pending requests. Currently the disconnect happens before the coordinator state is set to null, which means that callbacks which inspect the coordinator state will see it still as active. If there are offset commit requests which need to be cancelled, each request callback will inspect the coordinator state and attempt to mark the coordinator dead again. In the worst case, if there are many pending offset commit requests that need to be cancelled, this can cause a stack overflow. To fix the problem, we need to set the coordinator to null prior to cancelling pending requests.
I have added a test case which reproduced the stack overflow with many pending offset commits. I have also added a basic test case to verify that callbacks for in-flight or unsent requests see the coordinator as unknown which prevents them from attempting to resend.
This patch also includes some minor cleanups which were noticed along the way.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
This is a bug fix that is composed of two parts:
1. The major part is, for all operators that is generating a KTable, we should construct its value getter based on whether the KTable itself is materialized.
1.a If yes, then query the materialized store directly for value getter.
1.b If not, then hand over to its parents value getter (recursively) and apply the computation to return.
2. The minor part is, in KStreamImpl, when joining with a table, we should connect with table's `valueGetterSupplier().storeNames()`, not the `internalStoreName()` as the latter always assume that the KTable is materialized, but that is not always true.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Closes#4421 from guozhangwang/K6398-KTableValueGetter
add check to KafkaApis, add unit test specific to follower fetch
developed with @mimaison
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>
Making clear that implementations of poll() shouldn't block indefinitely in order to allow the task instance to transition to PAUSED state.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
Author: RichardYuSTUG <yohan.richard.yu2@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#4339 from ConcurrencyPractitioner/kafka-6238
Minor edits on description
1. Replaced KStreamKTableJoinIntegrationTest with the abstract based StreamTableJoinIntegrationTest. Added details on per-step verifications.
2. Minor renaming on GlobalKTableIntegrationTest.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bill@confluent.io>
Closes#4419 from guozhangwang/KMinor-join-integration-tests-II
This PR is the partial implementation for KIP-149. As the discussion for this KIP is still ongoing, I made a PR on the "safe" portions of the KIP (so that it can be included in the next release) which are 1) `ValueMapperWithKey`, 2) `ValueTransformerWithKeySupplier`, and 3) `ValueTransformerWithKey`.
Author: Jeyhun Karimov <je.karimov@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4309 from jeyhunkarimov/KIP-149_hope_last
- Add capability to create delegation token
- Add authentication based on delegation token.
- Add capability to renew/expire delegation tokens.
- Add units tests and integration tests
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Reviewers: Jun Rao <junrao@gmail.com>
Closes#3616 from omkreddy/KAFKA-4541
This is the implementation of KIP-225.
It marks the previous metrics as deprecated in the documentation and adds new metrics using tags.
Testing verifies that both the new and the old metric report the same value.
Author: cmolter <cmolter@apple.com>
Reviewers: Jiangjie (Becket) Qin <becket.qin@gmail.com>
Closes#4362 from lahabana/kafka-5890
0. Rename `JoinIntegrationTest` to `StreamStreamJoinIntegrationTest`, which is only for KStream-KStream joins.
1. Extract the `AbstractJoinIntegrationTest` which is going to be used for all the join integration test classes, parameterized with and without caching.
2. Merge `KStreamRepartitionJoinTest.java` into `StreamStreamJoinIntegrationTest.java` with augmented stream-stream join.
3. Add `TableTableJoinIntegrationTest` with detailed per-step expected results and removed `KTableKTableJoinIntegrationTest`.
Findings of the integration test:
1. Confirmed KAFKA-4309 with caching turned on.
2. Found bug KAFKA-6398.
3. Found bug KAFKA-6443.
4. Found a bug that in CachingKeyValueStore, we would flush before putting the record into the underlying store, when the store is going to be used in the downstream processors with flushing it would result in incorrect results, fixed the issue along with this PR.
5. Consider a new optimization described in KAFKA-6286.
Future works including stream-table joins will be in other PRs.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bill@confluent.io>
Closes#4331 from guozhangwang/KMinor-join-integration-tests
When using Kafka Connect with a cluster that doesn't allow the user to
create topics (due to ACL configuration), Connect fails when trying to
create its internal topics even if these topics already exist. This is
incorrect behavior according to the documentation, which mentions that
R/W access should be enough.
This happens specifically when using Aiven Kafka, which does not permit
creation of topics via the Kafka Admin Client API.
The patch ignores the returned error, similar to the behavior for older
brokers that don't support the API.
A spinoff of original pull request #4340 for resolving conflicts.
Author: RichardYuSTUG <yohan.richard.yu2@gmail.com>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4413 from ConcurrencyPractitioner/kafka-6265-2
*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*
*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*
Author: Rohan Desai <desai.p.rohan@gmail.com>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4382 from rodesai/KAFKA-6383
1. Include the parent's queryable store name in KTable.filter if this operator is not materialized.
2. Augment InternalTopologyBuilder checking on null processor / store names from the enum.
3. Unit test.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Closes#4384 from guozhangwang/K6398-topology-builder-exception
Currently CachingKeyValueStore methods are synchronized at method level.
It seems we can use read lock for getter and write lock for put / delete methods.
For getInternal(), if the underlying thread is streamThread, the getInternal() may trigger eviction. This can be handled by obtaining write lock at the beginning of the method for streamThread.
The jmh patch is attached to JIRA:
https://issues.apache.org/jira/secure/attachment/12905140/6412-jmh.v1.txt
Author: tedyu <yuzhihong@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bill@confluent.io>
Closes#4372 from tedyu/6412
Different fix for problem addressed by https://github.com/apache/kafka/pull/1953. Should prevent the CLASSPATH environment variable from being prefixed by a single colon before the JVM is invoked in the run-class script, which will then prevent the current working directory from being unintentionally included in the classpath when using the Reflections library.
If the current working directory should still be included in the classpath, it just needs to be explicitly specified either with its fully-qualified pathname or as a single dot (".").
Author: Chris Egerton <chrise@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>, Jun Rao <junrao@gmail.com>
Closes#4406 from C0urante/fix-colon-prefixed-classpath
* Implement MockAdminClient.deleteTopics
* Use MockAdminClient instead of MockKafkaAdminClientEnv in StreamsResetterTest
* Rename MockKafkaAdminClientEnv to AdminClientUnitTestEnv
* Use MockAdminClient instead of MockKafkaAdminClientEnv in TopicAdminTest
* Rename KafkaAdminClient to AdminClientUnitTestEnv in KafkaAdminClientTest.java
* Migrate StreamThreadTest to MockAdminClient
* Fix style errors
* Address review comments
* Fix MockAdminClient call
Reviewers: Matthias J. Sax <matthias@confluent.io>, Konstantine Karantasis <konstantine@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Handle InvalidTypeIdException as NOT_IMPLEMENTED and add unit tests for all exceptions.
Reviewers: Colin P. Mccabe <cmccabe@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
We are closing the metricGroups created in a Worker, Source task and Sink task before populating them with new metrics. This helps in cases where an Exception is thrown when previously created groups were not cleaned up correctly.
Signed-off-by: Arjun Satish <arjunconfluent.io>
Author: Arjun Satish <arjun@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4397 from wicknicks/KAFKA-6252
When the source file of `FileStreamSource` is a large file, `FileStreamSourceTask.poll()` will result in OOM. This pull request added `batch.size` parameter which can restrict the poll size.
*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*
*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*
Author: Study <ph.study@gmail.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4356 from phstudy/KAFKA-4335
- Add missing locking/volatile in MetadataCache.aliveEndPoint
- Fix topic metadata not to throw BrokerNotAvailableException
when listeners are inconsistent. Add test verifying the fix. As
part of this fix, renamed Broker methods to follow Map
convention where the `get` version returns `Option`.
Reviewers: Jason Gustafson <jason@confluent.io>
LogContext to have two different implementations of Logger. One will be picked based on availability of LocationAwareLogger API.
Reviewers: Jason Gustafson <jason@confluent.io>
- Fix zk session state and session change rate metric names: type
should be SessionExpireListener instead of KafkaHealthCheck. Test
verifying the fix was included.
- Handle missing controller in controlled shutdown in the same way as if
the broker is not registered (i.e. retry after backoff).
- Restructure BrokerInfo to reduce duplication. It now contains a
Broker instance and the JSON serde is done in BrokerIdZNode
since `Broker` does not contain all the fields.
- Remove dead code from `ZooKeeperClient.initialize` and remove
redundant `close` calls.
- Move ACL handling and persistent paths definition from ZkUtils to
ZkData (and call ZkData from ZkUtils).
- Remove ZooKeeperClientWrapper and ZooKeeperClientMetrics from
ZkUtils (avoids metrics clash if third party users create a ZkUtils
instance in the same process as the broker).
- Introduce factory method in KafkaZkClient that creates
ZooKeeperClient and remove metric name defaults from
ZooKeeperClient.
- Fix a few instances where ZooKeeperClient was not closed in tests.
- Update a few TestUtils methods to use KafkaZkClient instead of
ZkUtils.
- Add test verifying SessionState metric.
- Various clean-ups.
Testing: mostly relying on existing tests, but added a couple
of new tests as mentioned above.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Jun Rao <junrao@gmail.com>
Closes#4359 from ijuma/kafka-6320-kafka-health-zk-metrics-follow-up
When a log entry is appended to a Kafka topic using `KafkaLog4jAppender`, the producer.send operation may block waiting for metadata. This can result in deadlocks in a couple of scenarios if a log entry from the producer network thread is also at a log level that results in the entry being appended to a Kafka topic.
1. Producer's network thread will attempt to send data to a Kafka topic and this is unsafe since producer.send may block waiting for metadata, causing a deadlock since the thread will not process the metadata request/response.
2. `KafkaLog4jAppender#append` is invoked while holding the lock of the logger. So the thread waiting for metadata in the initial send will be holding the logger lock. If the producer network thread has.a log entry that needs to be appended, it will attempt to acquire the logger lock and deadlock.
This is a temporary workaround to avoid deadlocks in system tests by setting log level to WARN for `Metadata` in `VerifiableLog4jAppender`. The fix has been verified using the system tests log4j_appender_test.py which started failing when the info-level log entry was introduced.
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Satish Duggana <satish.duggana@gmail.com>, tedyu <yuzhihong@gmail.com>
Closes#4375 from rajinisivaram/KAFKA-6415-log4jappender
Increase commit interval to make it less likely that we flush the cache in-between.
To make it fool-proof, only compare the "final" result records if cache is enabled.
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4364 from mjsax/kafka-6256-flaky-kstream-ktable-join-with-caching-test
* Removed code duplicate from GlobalProcessorContextImpl and ProcessorContextImpl to parent class AbstractProcessorContext
* Exchanged concrete implementations with interfaces to make code more maintainable
* Refactored major code duplicates in InternalTopologyBuilder
* Formatted function parameters as per code review
Added final to code introduced in this PR
* Added missing finals to putNodeGroupName function
Rearranged parameters for resetTopicsPattern function
Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>