Similar to KAFKA-8928, during consumer construction, some configs might be overridden (client.id for instance), but the actual values will not be reflected in the info log. It'd better display the overridden values for those configs.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Adding (add-config) default user, user, or <user, client-id> quota and then removing it via delete-config does not update quota bound in ClientQuotaManager.Metrics for existing users or <user,client-id>. This causes brokers to continue to throttle with the previously set quotas until brokers restart (or <user,client> stops sending traffic for sometime and sensor expires). This happens only when removing the user or user,client-id where there are no more quotas to fall back to. Common example where the issue happens: Initial no quota state --> add default user quota --> remove default user quota.
The cause of the issue was `DefaultQuotaCallback.quotaLimit` was returning `null` when no default user quota set, which caused `ClientQuotaManager.updateQuotaMetricConfigs` to skip updating the appropriate sensor, which left it unchanged with the previous quota. Since `null` is an acceptable return value for `ClientQuotaCallback.quotaLimit`, which is already treated as unlimited quota in other parts of the code, this PR ensures that `ClientQuotaManager.updateQuotaMetricConfigs` updates the quotas for which `ClientQuotaCallback.quotaLimit` returns `null` to unlimited quota.
Reviewers: Jason Gustafson <jason@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
We need to swallow exceptions from StateManagerUtil#close in dirtyClose for both active and standby tasks.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Fixed issue with MockConsumer#updateEndOffsets where the input offsets were appended to existing ones instead of overwriting them.
Since there's no use for adding to existing end offsets currently, MockConsumer#updateEndOffsets is simplified and MockConsumer#getEndOffset is removed after changing the value type of the member field map 'endOffsets' to Long in MockConsumer
Details in: https://issues.apache.org/jira/browse/KAFKA-9686
The following flaky is fixed by this PR
1. KafkaBasedLogTest.testSendAndReadToEnd
Reviewers: Jason Gustafson <jason@confluent.io>, Konstantine Karantasis <konstantine@confluent.io>
Previously we may be updating the standby's limit offset as committed offsets to those source changelogs, and then inside the inner method we check if the state is in RESTORE_ACTIVE or not, which is a bug.
We should, instead, just check on the caller that we can skip restoring if:
1. we are in RESTORE_ACTIVE mode.
2. there're no source changelog partitions.
3. those partitions do not have any buffered records.
Also updated the unit test for this coverage.
Reviewers: Boyang Chen <boyang@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This is redundant since `Sender` and `NetworkClient` handle throttling. It's
also confusing since the `RecordAccumulator` logic only applies when
`max.in.flight.requests.per.connection=1`.
In `Sender.sendProducerData`, the following code handles throttling:
```java
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
```
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Just a minor refactoring of StreamsPartitionAssignor's endless assign method into logical chunks to hopefully improve readability. No logical changes, literally just moving code around and adding docs.
The hope is to make it easier to write and review KIP-441 PRs that dig into the assignment logic.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
After #7312, we could still return data during the rebalance phase, which means it could be possible to find records without corresponding tasks. We have to fallback to the unsubscribe mode during task migrated as the assignment should be cleared out to keep sync with task manager state.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Before we register the stores (and hence create the store dirs), we check if the task dir is empty except the lock / checkpoint files. Then later when loading the checkpoint files if we do not find the offsets AND the store dirs are not empty, meaning that the stores may be not empty, we treat it as task corrupted.
Reviewers: John Roesler <vvcephei@apache.org>
`KafkaStreams.getAllMetadata()` returns `StreamsMetadataState.getAllMetadata()`. All the latter methods is `synchronized` it returns a reference to internal mutable state. Not only does this break encapsulation, but it means any thread iterating over the returned collection when the metadata gets rebuilt will encounter a `ConcurrentModificationException`.
This change:
* switches from clearing and rebuild `allMetadata` when `onChange` is called to building a new list and swapping this in. This is thread safe and has the benefit that the returned list is not empty during a rebuild: you either get the old or the new list.
* removes synchronisation from `getAllMetadata` and `getLocalMetadata`. These are returning member variables. Synchronisation adds nothing.
* changes `getAllMetadata` to wrap its return value in an unmodifiable wrapper to avoid breaking encapsulation.
* changes the getters in `StreamsMetadata` to wrap their return values in unmodifiable wrapper to avoid breaking encapsulation.
Co-authored-by: Andy Coates <big-andy-coates@users.noreply.github.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
* Replace Prev/Standby task lists with a representation of the current poasition
of all tasks, where each task is encoded as the sum of the positions of all the
changelogs in that task.
* Only the protocol change is implemented, not actual positions, and the
assignor is updated to translate the new protocol back to lists of Prev/Standby
tasks so that the current assignment protocol still functions without modification.
Implements: KIP-441
Reviewers: John Roesler <vvcephei@apache.org>, Bruno Cadonna <bruno@confluent.io>
* Consolidate task/producer management. Now, exactly one component manages
the creation and destruction of Producers, whether they are per-thread or per-task.
* Add missing test coverage on TaskManagerTest
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Boyang Chen <boyang@confluent.io>
During the handle-corruption logic, we first remove the partitions from the changelog reader (and hence from the restore consumer), and then add them back during the task.revive() function. During this period the test main thread may happen to call addRecords which could throw IllegalStateException: a race condition.
The fix here is to wait for a position() call to return 0, which means the partitions have been added back to the restore consumer, and the seek-to-beginning has been called as well.
Reviewers: Matthias J. Sax <mjsax@apache.org>
We should have the following order:
1) close state stores
2) wipe out local directory
3) release directory lock
to avoid the issue. There's an known problem that with some FS one cannot delete the lock file while the calling thread still grabs the file lock, and this would be fixed in another ticket.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
I think the root cause of KAFKA-8893, KAFKA-8894, KAFKA-8895 and KSTREAMS-3779 are the same: some intermediate topics are not deleted in the setup logic before recreating the user topics, which could cause the waitForDeletion (that check exact match of all existing topics) to fail, and also could cause more records to be returned because of the intermediate topics that are not deleted from the previous test case.
Also inspired by https://github.com/apache/kafka/pull/5418/files I used a longer timeout (120 secs) for deleting all topics.
Reviewers: John Roesler <vvcephei@apache.org>
These classes are used by `upgrade_test.py` with old Kafka versions so they can
only use functionality that exists in all Kafka versions. This change fixes the test
for Kafka versions older than 0.11.0.
Reviewers: Ismael Juma <ismael@juma.me.uk>
As described in KIP-568.
Waiting on acceptance of the KIP to write the tests, on the off chance something changes. But rest assured unit tests are coming ⚡️
Will also kick off existing Streams system tests which leverage this new API (eg version probing, sometimes broker bounce)
Reviewers: Boyang Chen <boyang@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
what/why
the throttling_test was broken by this PR (#7785) since it depends on the consumer having partitions-assigned before starting the producer
this PR provides the ability to wait for partitions to be assigned in the console consumer before considering it started.
caveat
this does not support starting up the JmxTool inside the console-consumer for custom metrics while using this wait_until_partitions_assigned flag since the code assumes one JmxTool running per node.
I think a proper fix for this would be to make JmxTool its own standalone single-node service
alternatives
we could use the EndToEnd test suite which uses the verifiable producer/consumer under the hood but I found that there were more changes necessary to get this working unfortunately (specifically doesn't seem like this test suite plays nicely with the ProducerPerformanceService)
Reviewers: Mathew Wong <mwong@confluent.io>, Bill Bejeck <bbejeck.com>
In the event that `CLASSPATH` does not have an ending ":", the shell
can expand the CLASSPATH globs to be space-separated list of paths/jars,
which is not how the JVM CLI accepts arguments to -cp switch. So
double quote the variable to prevent pattern expansion, and pass the
glob pattern directly to the JVM.
Reviewers: Ismael Juma <ismael@juma.me.uk>
This bug is found via the flaky SmokeTestDriverIntegrationTest. Without this PR the test fails every 3-4 times, after this issue is fixed we've run the test 20+ locally without error.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, John Roesler <john@confluent.io>
The test_throttled_reassignment test fails because the consumer that is used to validate reassignment does not start on time to consume all messages. This does not seem like an issue with the throttling of the reassignment, since increasing the timeout allowed the test to pass multiple consecutive runs locally.
This test seemed to rely on the default JmxTool for the console consumer that was removed in this commit: 179d0d7
The console consumer would check to see if it had partitions assigned to it before beginning to consume. Although the test occasionally failed with the JmxTool, it began to fail much more after the removal.
Error messages of failures followed the below format with varying numbers of missed messages. They are the first messages by the producer.
535 acked message did not make it to the Consumer. They are: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19...plus 515 more. Total Acked: 192792, Total Consumed: 192259. We validated that the first 535 of these missing messages correctly made it into Kafka's data files. This suggests they were lost on their way to the consumer.
In the scope of the test, this error suggests that the test is falling into the race condition described in produce_consume_validate.py, which has the timeout to prevent the consumer from missing initial messages.
This can serve as a temporary fix until the logic of consumer startup is addressed further.
Reviewers: Jason Gustafson <jason@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
9.4.25 renamed closeOutput to completeOutput
(c5acf96506),
which is a method used by recent Jersey versions including the
latest (2.30.1). An example of the error:
> java.lang.NoSuchMethodError: org.eclipse.jetty.server.Response.closeOutput()V
> at org.glassfish.jersey.jetty.JettyHttpContainer$ResponseWriter.commit(JettyHttpContainer.java:326)
The request still completes and hence why no test fails. We should think about how
to improve the testing for this kind of problem, but I want to get the fix in before
2.5 RC0.
Credit to @rigelbm for finding this.
Reviewers: Ewen Cheslack-Postava <me@ewencp.org>, Andrew Choi <a24choi@edu.uwaterloo.ca>
Confirmed the test passes after this (minor) change.
Reviewers: Jason Gustafson <jason@confluent.io>, Boyang Chen <boyang@confluent.io>, Ismael Juma <ismael@juma.me.uk>
For handleRevocation, it is possible that previous onAssignment callback has cleaned up the stream tasks, which means no corresponding task could be found for given partitions. We should not throw here as this is expected behavior.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
We only initialize topology when transiting from restoring -> running.
Also tighten some unit tests for this fix:
a. restoring -> suspended should just write checkpoint file without committing.
b. suspended -> restoring should not need any inner updates.
c. restoring -> running should always try to fetch committed offsets, and forward timeout exceptions.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, John Roesler <john@confluent.io>, Boyang Chen <boyang@confluent.io>
Adds a couple of extra checks to the test-output-capturing logic in our gradle build.
Previously, we were seeing a lot of error logs while attempting to write output for a
test whose output file hadn't been initialized.
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
This PR adds new lock is used to prevent the follower replica from being updated while ReplicaAlterDirThread is executing maybeReplaceCurrentWithFutureReplica() to replace follower replica with the future replica.
Now doAppendRecordsToFollowerOrFutureReplica() doesn't need to hold the lock on leaderIsrUpdateLock for local replica updation and ongoing log appends on the follower will not delay the makeFollower() call.
**Benchmark results for Partition.makeFollower() **
Old:
```
Benchmark Mode Cnt Score Error Units
PartitionMakeFollowerBenchmark.testMakeFollower avgt 15 2046.967 ? 22.842 ns/op
```
New:
```
Benchmark Mode Cnt Score Error Units
PartitionMakeFollowerBenchmark.testMakeFollower avgt 15 1278.525 ? 5.354 ns/op
```
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Reviewers: Jun Rao <junrao@gmail.com>
Closes#8153 from omkreddy/KAFKA-9594-LAISR
This bug reproduces through the trunk stream test, the producer was closed unexpectedly when EOS is not turned on.
Will work on adding unit test to guard this logic.
Reviewers: Guozhang Wang <wangguoz@gmail.com>