Browse Source

KAFKA-5428; Transactional producer should only abort batches in fatal error state

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #3298 from hachikuji/KAFKA-5428
pull/3280/merge
Jason Gustafson 8 years ago
parent
commit
aea53108a7
  1. 3
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  2. 12
      clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
  3. 2
      clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java
  4. 2
      clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  5. 9
      clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
  6. 22
      clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
  7. 240
      clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
  8. 24
      core/src/main/scala/kafka/cluster/Partition.scala
  9. 3
      core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
  10. 3
      core/src/main/scala/kafka/tools/DumpLogSegments.scala
  11. 10
      core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

3
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -607,9 +607,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* Implementation of asynchronously send a record to a topic. * Implementation of asynchronously send a record to a topic.
*/ */
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
if (transactionManager != null)
transactionManager.failIfNotReadyForSend();
TopicPartition tp = null; TopicPartition tp = null;
try { try {
// first make sure the metadata for the topic is available // first make sure the metadata for the topic is available

12
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java

@ -163,9 +163,15 @@ public final class ProducerBatch {
* @param exception The exception that occurred (or null if the request was successful) * @param exception The exception that occurred (or null if the request was successful)
*/ */
public void done(long baseOffset, long logAppendTime, RuntimeException exception) { public void done(long baseOffset, long logAppendTime, RuntimeException exception) {
log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.", final FinalState finalState;
topicPartition, baseOffset, exception); if (exception == null) {
FinalState finalState = exception != null ? FinalState.FAILED : FinalState.SUCCEEDED; log.trace("Successfully produced messages to {} with base offset {}.", topicPartition, baseOffset);
finalState = FinalState.SUCCEEDED;
} else {
log.trace("Failed to produce messages to {}.", topicPartition, exception);
finalState = FinalState.FAILED;
}
if (!this.finalState.compareAndSet(null, finalState)) { if (!this.finalState.compareAndSet(null, finalState)) {
if (this.finalState.get() == FinalState.ABORTED) { if (this.finalState.get() == FinalState.ABORTED) {
log.debug("ProduceResponse returned for {} after batch had already been aborted.", topicPartition); log.debug("ProduceResponse returned for {} after batch had already been aborted.", topicPartition);

2
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java

@ -36,6 +36,6 @@ class ProducerIdAndEpoch {
@Override @Override
public String toString() { public String toString() {
return "(producerId=" + producerId + ", epoch='" + epoch + ")"; return "(producerId=" + producerId + ", epoch=" + epoch + ")";
} }
} }

2
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

@ -622,7 +622,7 @@ public final class RecordAccumulator {
} }
} }
void abortUnclosedBatches(RuntimeException reason) { void abortOpenBatches(RuntimeException reason) {
for (ProducerBatch batch : incomplete.all()) { for (ProducerBatch batch : incomplete.all()) {
Deque<ProducerBatch> dq = getDeque(batch.topicPartition); Deque<ProducerBatch> dq = getDeque(batch.topicPartition);
boolean aborted = false; boolean aborted = false;

9
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

@ -209,12 +209,14 @@ public class Sender implements Runnable {
// do not continue sending if the transaction manager is in a failed state or if there // do not continue sending if the transaction manager is in a failed state or if there
// is no producer id (for the idempotent case). // is no producer id (for the idempotent case).
if (transactionManager.hasError() || !transactionManager.hasProducerId()) { if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
RuntimeException lastError = transactionManager.lastError(); RuntimeException lastError = transactionManager.lastError();
if (lastError != null) if (lastError != null)
maybeAbortBatches(lastError); maybeAbortBatches(lastError);
client.poll(retryBackoffMs, now); client.poll(retryBackoffMs, now);
return; return;
} else if (transactionManager.hasAbortableError()) {
accumulator.abortOpenBatches(transactionManager.lastError());
} }
} }
@ -260,7 +262,6 @@ public class Sender implements Runnable {
} }
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now); List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
boolean needsTransactionStateReset = false; boolean needsTransactionStateReset = false;
// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
// for expired batches. see the documentation of @TransactionState.resetProducerId to understand why // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
@ -298,7 +299,6 @@ public class Sender implements Runnable {
sendProduceRequests(batches, now); sendProduceRequests(batches, now);
return pollTimeout; return pollTimeout;
} }
private boolean maybeSendTransactionalRequest(long now) { private boolean maybeSendTransactionalRequest(long now) {
@ -308,7 +308,7 @@ public class Sender implements Runnable {
// If the transaction is being aborted, then we can clear any unsent produce requests // If the transaction is being aborted, then we can clear any unsent produce requests
if (transactionManager.isAborting()) if (transactionManager.isAborting())
accumulator.abortUnclosedBatches(new KafkaException("Failing batch since transaction was aborted")); accumulator.abortOpenBatches(new KafkaException("Failing batch since transaction was aborted"));
// There may still be requests left which are being retried. Since we do not know whether they had // There may still be requests left which are being retried. Since we do not know whether they had
// been successfully appended to the broker log, we must resend them until their final status is clear. // been successfully appended to the broker log, we must resend them until their final status is clear.
@ -553,7 +553,6 @@ public class Sender implements Runnable {
} else { } else {
completeBatch(batch, response); completeBatch(batch, response);
} }
// Unmute the completed partition. // Unmute the completed partition.

22
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

@ -218,8 +218,7 @@ public class TransactionManager {
} }
public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPartition) { public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPartition) {
if (currentState != State.IN_TRANSACTION) failIfNotReadyForSend();
throw new IllegalStateException("Cannot add partitions to a transaction in state " + currentState);
if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition)) if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition))
return; return;
@ -540,6 +539,8 @@ public class TransactionManager {
transitionTo(State.READY); transitionTo(State.READY);
lastError = null; lastError = null;
transactionStarted = false; transactionStarted = false;
newPartitionsInTransaction.clear();
pendingPartitionsInTransaction.clear();
partitionsInTransaction.clear(); partitionsInTransaction.clear();
} }
@ -757,15 +758,22 @@ public class TransactionManager {
} }
} }
Set<TopicPartition> partitions = errors.keySet();
// Remove the partitions from the pending set regardless of the result. We use the presence
// of partitions in the pending set to know when it is not safe to send batches. However, if
// the partitions failed to be added and we enter an error state, we expect the batches to be
// aborted anyway. In this case, we must be able to continue sending the batches which are in
// retry for partitions that were successfully added.
pendingPartitionsInTransaction.removeAll(partitions);
if (!unauthorizedTopics.isEmpty()) { if (!unauthorizedTopics.isEmpty()) {
abortableError(new TopicAuthorizationException(unauthorizedTopics)); abortableError(new TopicAuthorizationException(unauthorizedTopics));
} else if (hasPartitionErrors) { } else if (hasPartitionErrors) {
abortableError(new KafkaException("Could not add partitions to transaction due to partition level errors")); abortableError(new KafkaException("Could not add partitions to transaction due to errors: " + errors));
} else { } else {
Set<TopicPartition> addedPartitions = errors.keySet(); log.debug("{}Successfully added partitions {} to transaction", logPrefix, partitions);
log.debug("{}Successfully added partitions {} to transaction", logPrefix, addedPartitions); partitionsInTransaction.addAll(partitions);
partitionsInTransaction.addAll(addedPartitions);
pendingPartitionsInTransaction.removeAll(addedPartitions);
transactionStarted = true; transactionStarted = true;
result.done(); result.done();
} }

240
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java

@ -316,7 +316,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0)); transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
} }
@Test(expected = IllegalStateException.class) @Test(expected = KafkaException.class)
public void testMaybeAddPartitionToTransactionAfterAbortableError() { public void testMaybeAddPartitionToTransactionAfterAbortableError() {
long pid = 13131L; long pid = 13131L;
short epoch = 1; short epoch = 1;
@ -326,7 +326,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0)); transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
} }
@Test(expected = IllegalStateException.class) @Test(expected = KafkaException.class)
public void testMaybeAddPartitionToTransactionAfterFatalError() { public void testMaybeAddPartitionToTransactionAfterFatalError() {
long pid = 13131L; long pid = 13131L;
short epoch = 1; short epoch = 1;
@ -836,6 +836,11 @@ public class TransactionManagerTest {
assertTrue(transactionManager.hasError()); assertTrue(transactionManager.hasError());
assertTrue(transactionManager.lastError() instanceof TopicAuthorizationException); assertTrue(transactionManager.lastError() instanceof TopicAuthorizationException);
assertFalse(transactionManager.isPartitionPendingAdd(tp0));
assertFalse(transactionManager.isPartitionPendingAdd(tp1));
assertFalse(transactionManager.isPartitionAdded(tp0));
assertFalse(transactionManager.isPartitionAdded(tp1));
assertFalse(transactionManager.hasPartitionsToAdd());
TopicAuthorizationException exception = (TopicAuthorizationException) transactionManager.lastError(); TopicAuthorizationException exception = (TopicAuthorizationException) transactionManager.lastError();
assertEquals(singleton(tp0.topic()), exception.unauthorizedTopics()); assertEquals(singleton(tp0.topic()), exception.unauthorizedTopics());
@ -843,6 +848,198 @@ public class TransactionManagerTest {
assertAbortableError(TopicAuthorizationException.class); assertAbortableError(TopicAuthorizationException.class);
} }
@Test
public void testRecoveryFromAbortableErrorTransactionNotStarted() throws Exception {
final long pid = 13131L;
final short epoch = 1;
final TopicPartition unauthorizedPartition = new TopicPartition("foo", 0);
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
Future<RecordMetadata> responseFuture = accumulator.append(unauthorizedPartition, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
sender.run(time.milliseconds());
assertTrue(transactionManager.hasAbortableError());
transactionManager.beginAbortingTransaction();
sender.run(time.milliseconds());
assertTrue(responseFuture.isDone());
assertFutureFailed(responseFuture);
// No partitions added, so no need to prepare EndTxn response
sender.run(time.milliseconds());
assertTrue(transactionManager.isReady());
assertFalse(transactionManager.hasPartitionsToAdd());
assertFalse(accumulator.hasUnflushedBatches());
// ensure we can now start a new transaction
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
sender.run(time.milliseconds());
assertTrue(transactionManager.isPartitionAdded(tp0));
assertFalse(transactionManager.hasPartitionsToAdd());
transactionManager.beginCommittingTransaction();
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.run(time.milliseconds());
assertTrue(responseFuture.isDone());
assertNotNull(responseFuture.get());
prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
sender.run(time.milliseconds());
assertTrue(transactionManager.isReady());
}
@Test
public void testRecoveryFromAbortableErrorTransactionStarted() throws Exception {
final long pid = 13131L;
final short epoch = 1;
final TopicPartition unauthorizedPartition = new TopicPartition("foo", 0);
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
prepareAddPartitionsToTxn(tp0, Errors.NONE);
Future<RecordMetadata> authorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
"key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
assertTrue(transactionManager.isPartitionAdded(tp0));
transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
Future<RecordMetadata> unauthorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
"key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
sender.run(time.milliseconds());
assertTrue(transactionManager.hasAbortableError());
assertTrue(transactionManager.isPartitionAdded(tp0));
assertFalse(transactionManager.isPartitionAdded(unauthorizedPartition));
assertFalse(authorizedTopicProduceFuture.isDone());
assertFalse(unauthorizedTopicProduceFuture.isDone());
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
transactionManager.beginAbortingTransaction();
sender.run(time.milliseconds());
// neither produce request has been sent, so they should both be failed immediately
assertFutureFailed(authorizedTopicProduceFuture);
assertFutureFailed(unauthorizedTopicProduceFuture);
assertTrue(transactionManager.isReady());
assertFalse(transactionManager.hasPartitionsToAdd());
assertFalse(accumulator.hasUnflushedBatches());
// ensure we can now start a new transaction
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
FutureRecordMetadata nextTransactionFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
sender.run(time.milliseconds());
assertTrue(transactionManager.isPartitionAdded(tp0));
assertFalse(transactionManager.hasPartitionsToAdd());
transactionManager.beginCommittingTransaction();
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.run(time.milliseconds());
assertTrue(nextTransactionFuture.isDone());
assertNotNull(nextTransactionFuture.get());
prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
sender.run(time.milliseconds());
assertTrue(transactionManager.isReady());
}
@Test
public void testRecoveryFromAbortableErrorProduceRequestInRetry() throws Exception {
final long pid = 13131L;
final short epoch = 1;
final TopicPartition unauthorizedPartition = new TopicPartition("foo", 0);
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
prepareAddPartitionsToTxn(tp0, Errors.NONE);
Future<RecordMetadata> authorizedTopicProduceFuture = accumulator.append(tp0, time.milliseconds(),
"key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
assertTrue(transactionManager.isPartitionAdded(tp0));
accumulator.beginFlush();
prepareProduceResponse(Errors.REQUEST_TIMED_OUT, pid, epoch);
sender.run(time.milliseconds());
assertFalse(authorizedTopicProduceFuture.isDone());
assertTrue(accumulator.hasUnflushedBatches());
transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
Future<RecordMetadata> unauthorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
"key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
sender.run(time.milliseconds());
assertTrue(transactionManager.hasAbortableError());
assertTrue(transactionManager.isPartitionAdded(tp0));
assertFalse(transactionManager.isPartitionAdded(unauthorizedPartition));
assertFalse(authorizedTopicProduceFuture.isDone());
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.run(time.milliseconds());
assertFutureFailed(unauthorizedTopicProduceFuture);
assertTrue(authorizedTopicProduceFuture.isDone());
assertNotNull(authorizedTopicProduceFuture.get());
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
transactionManager.beginAbortingTransaction();
sender.run(time.milliseconds());
// neither produce request has been sent, so they should both be failed immediately
assertTrue(transactionManager.isReady());
assertFalse(transactionManager.hasPartitionsToAdd());
assertFalse(accumulator.hasUnflushedBatches());
// ensure we can now start a new transaction
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
FutureRecordMetadata nextTransactionFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
sender.run(time.milliseconds());
assertTrue(transactionManager.isPartitionAdded(tp0));
assertFalse(transactionManager.hasPartitionsToAdd());
transactionManager.beginCommittingTransaction();
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.run(time.milliseconds());
assertTrue(nextTransactionFuture.isDone());
assertNotNull(nextTransactionFuture.get());
prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
sender.run(time.milliseconds());
assertTrue(transactionManager.isReady());
}
@Test @Test
public void testTransactionalIdAuthorizationFailureInAddPartitions() { public void testTransactionalIdAuthorizationFailureInAddPartitions() {
final long pid = 13131L; final long pid = 13131L;
@ -1479,6 +1676,33 @@ public class TransactionManagerTest {
assertTrue(drainedBatches.get(node1.id()).isEmpty()); assertTrue(drainedBatches.get(node1.id()).isEmpty());
} }
@Test
public void resendFailedProduceRequestAfterAbortableError() throws Exception {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, pid, epoch);
sender.run(time.milliseconds()); // AddPartitions
sender.run(time.milliseconds()); // Produce
assertFalse(responseFuture.isDone());
transactionManager.transitionToAbortableError(new KafkaException());
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.run(time.milliseconds());
assertTrue(responseFuture.isDone());
assertNotNull(responseFuture.get());
}
@Test @Test
public void testTransitionToAbortableErrorOnBatchExpiry() throws InterruptedException, ExecutionException { public void testTransitionToAbortableErrorOnBatchExpiry() throws InterruptedException, ExecutionException {
final long pid = 13131L; final long pid = 13131L;
@ -1866,4 +2090,16 @@ public class TransactionManagerTest {
assertTrue(transactionManager.hasError()); assertTrue(transactionManager.hasError());
} }
} }
private void assertFutureFailed(Future<RecordMetadata> future) throws InterruptedException {
assertTrue(future.isDone());
try {
future.get();
fail("Expected produce future to throw");
} catch (ExecutionException e) {
// expected
}
}
} }

24
core/src/main/scala/kafka/cluster/Partition.scala

@ -268,8 +268,8 @@ class Partition(val topic: String,
if (leaderLWIncremented || leaderHWIncremented) if (leaderLWIncremented || leaderHWIncremented)
tryCompleteDelayedRequests() tryCompleteDelayedRequests()
debug("Recorded replica %d log end offset (LEO) position %d for partition %s." debug("Recorded replica %d log end offset (LEO) position %d."
.format(replicaId, logReadResult.info.fetchOffsetMetadata.messageOffset, topicPartition)) .format(replicaId, logReadResult.info.fetchOffsetMetadata.messageOffset))
case None => case None =>
throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" + throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" +
" is not recognized to be one of the assigned replicas %s for partition %s.") " is not recognized to be one of the assigned replicas %s for partition %s.")
@ -302,7 +302,7 @@ class Partition(val topic: String,
assignedReplicas.map(_.brokerId).contains(replicaId) && assignedReplicas.map(_.brokerId).contains(replicaId) &&
replica.logEndOffset.offsetDiff(leaderHW) >= 0) { replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
val newInSyncReplicas = inSyncReplicas + replica val newInSyncReplicas = inSyncReplicas + replica
info(s"Expanding ISR for partition $topicPartition from ${inSyncReplicas.map(_.brokerId).mkString(",")} " + info(s"Expanding ISR from ${inSyncReplicas.map(_.brokerId).mkString(",")} " +
s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}") s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}")
// update ISR in ZK and cache // update ISR in ZK and cache
updateIsr(newInSyncReplicas) updateIsr(newInSyncReplicas)
@ -333,7 +333,7 @@ class Partition(val topic: String,
def numAcks = curInSyncReplicas.count { r => def numAcks = curInSyncReplicas.count { r =>
if (!r.isLocal) if (!r.isLocal)
if (r.logEndOffset.messageOffset >= requiredOffset) { if (r.logEndOffset.messageOffset >= requiredOffset) {
trace(s"Replica ${r.brokerId} of ${topic}-${partitionId} received offset $requiredOffset") trace(s"Replica ${r.brokerId} received offset $requiredOffset")
true true
} }
else else
@ -342,7 +342,7 @@ class Partition(val topic: String,
true /* also count the local (leader) replica */ true /* also count the local (leader) replica */
} }
trace(s"$numAcks acks satisfied for ${topic}-${partitionId} with acks = -1") trace(s"$numAcks acks satisfied with acks = -1")
val minIsr = leaderReplica.log.get.config.minInSyncReplicas val minIsr = leaderReplica.log.get.config.minInSyncReplicas
@ -388,11 +388,11 @@ class Partition(val topic: String,
val oldHighWatermark = leaderReplica.highWatermark val oldHighWatermark = leaderReplica.highWatermark
if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) { if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) {
leaderReplica.highWatermark = newHighWatermark leaderReplica.highWatermark = newHighWatermark
debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark)) debug(s"High watermark updated to $newHighWatermark")
true true
} else { } else {
debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s" debug(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old hw $oldHighWatermark." +
.format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(","))) s"All LEOs are ${allLogEndOffsets.mkString(",")}")
false false
} }
} }
@ -427,8 +427,8 @@ class Partition(val topic: String,
if(outOfSyncReplicas.nonEmpty) { if(outOfSyncReplicas.nonEmpty) {
val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
assert(newInSyncReplicas.nonEmpty) assert(newInSyncReplicas.nonEmpty)
info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId, info("Shrinking ISR from %s to %s".format(inSyncReplicas.map(_.brokerId).mkString(","),
inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) newInSyncReplicas.map(_.brokerId).mkString(",")))
// update ISR in zk and in cache // update ISR in zk and in cache
updateIsr(newInSyncReplicas) updateIsr(newInSyncReplicas)
// we may need to increment high watermark since ISR could be down to 1 // we may need to increment high watermark since ISR could be down to 1
@ -464,7 +464,7 @@ class Partition(val topic: String,
val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs) val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
if (laggingReplicas.nonEmpty) if (laggingReplicas.nonEmpty)
debug("Lagging replicas for partition %s are %s".format(topicPartition, laggingReplicas.map(_.brokerId).mkString(","))) debug("Lagging replicas are %s".format(laggingReplicas.map(_.brokerId).mkString(",")))
laggingReplicas laggingReplicas
} }

3
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala

@ -87,7 +87,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
} }
} }
} else { } else {
trace(s"Received response $response from node ${response.destination} with correlation id $correlationId") debug(s"Received WriteTxnMarker response $response from node ${response.destination} with correlation id $correlationId")
val writeTxnMarkerResponse = response.responseBody.asInstanceOf[WriteTxnMarkersResponse] val writeTxnMarkerResponse = response.responseBody.asInstanceOf[WriteTxnMarkersResponse]
@ -135,7 +135,6 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
for ((topicPartition: TopicPartition, error: Errors) <- errors) { for ((topicPartition: TopicPartition, error: Errors) <- errors) {
error match { error match {
case Errors.NONE => case Errors.NONE =>
txnMetadata.removePartition(topicPartition) txnMetadata.removePartition(topicPartition)
case Errors.CORRUPT_MESSAGE | case Errors.CORRUPT_MESSAGE |

3
core/src/main/scala/kafka/tools/DumpLogSegments.scala

@ -283,7 +283,8 @@ object DumpLogSegments {
s"producerEpoch:${txnMetadata.producerEpoch}," + s"producerEpoch:${txnMetadata.producerEpoch}," +
s"state=${txnMetadata.state}," + s"state=${txnMetadata.state}," +
s"partitions=${txnMetadata.topicPartitions}," + s"partitions=${txnMetadata.topicPartitions}," +
s"lastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}" s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," +
s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}"
(Some(keyString), Some(valueString)) (Some(keyString), Some(valueString))
} }

10
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

@ -926,10 +926,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
try { try {
// the second time, the call to send itself should fail (the producer becomes unusable // the second time, the call to send itself should fail (the producer becomes unusable
// if no producerId can be obtained) // if no producerId can be obtained)
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)) producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)).get()
fail("Should have raised ClusterAuthorizationException") fail("Should have raised ClusterAuthorizationException")
} catch { } catch {
case e: KafkaException => case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException]) assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException])
} }
} }
@ -959,10 +959,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
try { try {
// the second time, the call to send itself should fail (the producer becomes unusable // the second time, the call to send itself should fail (the producer becomes unusable
// if no producerId can be obtained) // if no producerId can be obtained)
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)) producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)).get()
fail("Should have raised ClusterAuthorizationException") fail("Should have raised ClusterAuthorizationException")
} catch { } catch {
case e: KafkaException => case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException]) assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException])
} }
} }
@ -1061,7 +1061,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
try { try {
producer.send(new ProducerRecord(deleteTopic, 0, "1".getBytes, "1".getBytes)).get producer.send(new ProducerRecord(deleteTopic, 0, "1".getBytes, "1".getBytes)).get
} catch { } catch {
case e : ExecutionException => case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[TopicAuthorizationException]) assertTrue(e.getCause.isInstanceOf[TopicAuthorizationException])
} }
// now rollback // now rollback

Loading…
Cancel
Save