From aea53108a7d9573b8976da83235639a981c09af3 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Mon, 12 Jun 2017 18:07:27 -0700 Subject: [PATCH] KAFKA-5428; Transactional producer should only abort batches in fatal error state Author: Jason Gustafson Reviewers: Apurva Mehta , Ismael Juma Closes #3298 from hachikuji/KAFKA-5428 --- .../kafka/clients/producer/KafkaProducer.java | 3 - .../producer/internals/ProducerBatch.java | 12 +- .../internals/ProducerIdAndEpoch.java | 2 +- .../producer/internals/RecordAccumulator.java | 2 +- .../clients/producer/internals/Sender.java | 9 +- .../internals/TransactionManager.java | 22 +- .../internals/TransactionManagerTest.java | 240 +++++++++++++++++- .../main/scala/kafka/cluster/Partition.scala | 24 +- ...actionMarkerRequestCompletionHandler.scala | 3 +- .../scala/kafka/tools/DumpLogSegments.scala | 3 +- .../kafka/api/AuthorizerIntegrationTest.scala | 10 +- 11 files changed, 288 insertions(+), 42 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index a7336d8da2e..4f155a46ef9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -607,9 +607,6 @@ public class KafkaProducer implements Producer { * Implementation of asynchronously send a record to a topic. */ private Future doSend(ProducerRecord record, Callback callback) { - if (transactionManager != null) - transactionManager.failIfNotReadyForSend(); - TopicPartition tp = null; try { // first make sure the metadata for the topic is available diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index b33e080e107..9d8b82d04ec 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -163,9 +163,15 @@ public final class ProducerBatch { * @param exception The exception that occurred (or null if the request was successful) */ public void done(long baseOffset, long logAppendTime, RuntimeException exception) { - log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.", - topicPartition, baseOffset, exception); - FinalState finalState = exception != null ? FinalState.FAILED : FinalState.SUCCEEDED; + final FinalState finalState; + if (exception == null) { + log.trace("Successfully produced messages to {} with base offset {}.", topicPartition, baseOffset); + finalState = FinalState.SUCCEEDED; + } else { + log.trace("Failed to produce messages to {}.", topicPartition, exception); + finalState = FinalState.FAILED; + } + if (!this.finalState.compareAndSet(null, finalState)) { if (this.finalState.get() == FinalState.ABORTED) { log.debug("ProduceResponse returned for {} after batch had already been aborted.", topicPartition); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java index 293bb51c035..ebd5cc3281c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java @@ -36,6 +36,6 @@ class ProducerIdAndEpoch { @Override public String toString() { - return "(producerId=" + producerId + ", epoch='" + epoch + ")"; + return "(producerId=" + producerId + ", epoch=" + epoch + ")"; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 76cc6bde625..505417c1f4a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -622,7 +622,7 @@ public final class RecordAccumulator { } } - void abortUnclosedBatches(RuntimeException reason) { + void abortOpenBatches(RuntimeException reason) { for (ProducerBatch batch : incomplete.all()) { Deque dq = getDeque(batch.topicPartition); boolean aborted = false; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index b8d4ab92fa0..c9a1292a845 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -209,12 +209,14 @@ public class Sender implements Runnable { // do not continue sending if the transaction manager is in a failed state or if there // is no producer id (for the idempotent case). - if (transactionManager.hasError() || !transactionManager.hasProducerId()) { + if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) { RuntimeException lastError = transactionManager.lastError(); if (lastError != null) maybeAbortBatches(lastError); client.poll(retryBackoffMs, now); return; + } else if (transactionManager.hasAbortableError()) { + accumulator.abortOpenBatches(transactionManager.lastError()); } } @@ -260,7 +262,6 @@ public class Sender implements Runnable { } List expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now); - boolean needsTransactionStateReset = false; // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why @@ -298,7 +299,6 @@ public class Sender implements Runnable { sendProduceRequests(batches, now); return pollTimeout; - } private boolean maybeSendTransactionalRequest(long now) { @@ -308,7 +308,7 @@ public class Sender implements Runnable { // If the transaction is being aborted, then we can clear any unsent produce requests if (transactionManager.isAborting()) - accumulator.abortUnclosedBatches(new KafkaException("Failing batch since transaction was aborted")); + accumulator.abortOpenBatches(new KafkaException("Failing batch since transaction was aborted")); // There may still be requests left which are being retried. Since we do not know whether they had // been successfully appended to the broker log, we must resend them until their final status is clear. @@ -553,7 +553,6 @@ public class Sender implements Runnable { } else { completeBatch(batch, response); - } // Unmute the completed partition. diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index a26c3b7b5e8..49738b21f62 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -218,8 +218,7 @@ public class TransactionManager { } public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPartition) { - if (currentState != State.IN_TRANSACTION) - throw new IllegalStateException("Cannot add partitions to a transaction in state " + currentState); + failIfNotReadyForSend(); if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition)) return; @@ -540,6 +539,8 @@ public class TransactionManager { transitionTo(State.READY); lastError = null; transactionStarted = false; + newPartitionsInTransaction.clear(); + pendingPartitionsInTransaction.clear(); partitionsInTransaction.clear(); } @@ -757,15 +758,22 @@ public class TransactionManager { } } + Set partitions = errors.keySet(); + + // Remove the partitions from the pending set regardless of the result. We use the presence + // of partitions in the pending set to know when it is not safe to send batches. However, if + // the partitions failed to be added and we enter an error state, we expect the batches to be + // aborted anyway. In this case, we must be able to continue sending the batches which are in + // retry for partitions that were successfully added. + pendingPartitionsInTransaction.removeAll(partitions); + if (!unauthorizedTopics.isEmpty()) { abortableError(new TopicAuthorizationException(unauthorizedTopics)); } else if (hasPartitionErrors) { - abortableError(new KafkaException("Could not add partitions to transaction due to partition level errors")); + abortableError(new KafkaException("Could not add partitions to transaction due to errors: " + errors)); } else { - Set addedPartitions = errors.keySet(); - log.debug("{}Successfully added partitions {} to transaction", logPrefix, addedPartitions); - partitionsInTransaction.addAll(addedPartitions); - pendingPartitionsInTransaction.removeAll(addedPartitions); + log.debug("{}Successfully added partitions {} to transaction", logPrefix, partitions); + partitionsInTransaction.addAll(partitions); transactionStarted = true; result.done(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index c4abd3c84c6..bdad483b9f4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -316,7 +316,7 @@ public class TransactionManagerTest { transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0)); } - @Test(expected = IllegalStateException.class) + @Test(expected = KafkaException.class) public void testMaybeAddPartitionToTransactionAfterAbortableError() { long pid = 13131L; short epoch = 1; @@ -326,7 +326,7 @@ public class TransactionManagerTest { transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0)); } - @Test(expected = IllegalStateException.class) + @Test(expected = KafkaException.class) public void testMaybeAddPartitionToTransactionAfterFatalError() { long pid = 13131L; short epoch = 1; @@ -836,6 +836,11 @@ public class TransactionManagerTest { assertTrue(transactionManager.hasError()); assertTrue(transactionManager.lastError() instanceof TopicAuthorizationException); + assertFalse(transactionManager.isPartitionPendingAdd(tp0)); + assertFalse(transactionManager.isPartitionPendingAdd(tp1)); + assertFalse(transactionManager.isPartitionAdded(tp0)); + assertFalse(transactionManager.isPartitionAdded(tp1)); + assertFalse(transactionManager.hasPartitionsToAdd()); TopicAuthorizationException exception = (TopicAuthorizationException) transactionManager.lastError(); assertEquals(singleton(tp0.topic()), exception.unauthorizedTopics()); @@ -843,6 +848,198 @@ public class TransactionManagerTest { assertAbortableError(TopicAuthorizationException.class); } + @Test + public void testRecoveryFromAbortableErrorTransactionNotStarted() throws Exception { + final long pid = 13131L; + final short epoch = 1; + final TopicPartition unauthorizedPartition = new TopicPartition("foo", 0); + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition); + + Future responseFuture = accumulator.append(unauthorizedPartition, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + + prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED)); + sender.run(time.milliseconds()); + + assertTrue(transactionManager.hasAbortableError()); + transactionManager.beginAbortingTransaction(); + sender.run(time.milliseconds()); + assertTrue(responseFuture.isDone()); + assertFutureFailed(responseFuture); + + // No partitions added, so no need to prepare EndTxn response + sender.run(time.milliseconds()); + assertTrue(transactionManager.isReady()); + assertFalse(transactionManager.hasPartitionsToAdd()); + assertFalse(accumulator.hasUnflushedBatches()); + + // ensure we can now start a new transaction + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + + responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + + prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE)); + sender.run(time.milliseconds()); + assertTrue(transactionManager.isPartitionAdded(tp0)); + assertFalse(transactionManager.hasPartitionsToAdd()); + + transactionManager.beginCommittingTransaction(); + prepareProduceResponse(Errors.NONE, pid, epoch); + sender.run(time.milliseconds()); + + assertTrue(responseFuture.isDone()); + assertNotNull(responseFuture.get()); + + prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch); + sender.run(time.milliseconds()); + + assertTrue(transactionManager.isReady()); + } + + @Test + public void testRecoveryFromAbortableErrorTransactionStarted() throws Exception { + final long pid = 13131L; + final short epoch = 1; + final TopicPartition unauthorizedPartition = new TopicPartition("foo", 0); + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + prepareAddPartitionsToTxn(tp0, Errors.NONE); + + Future authorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(), + "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + assertTrue(transactionManager.isPartitionAdded(tp0)); + + transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition); + Future unauthorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(), + "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED)); + sender.run(time.milliseconds()); + assertTrue(transactionManager.hasAbortableError()); + assertTrue(transactionManager.isPartitionAdded(tp0)); + assertFalse(transactionManager.isPartitionAdded(unauthorizedPartition)); + assertFalse(authorizedTopicProduceFuture.isDone()); + assertFalse(unauthorizedTopicProduceFuture.isDone()); + + prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch); + transactionManager.beginAbortingTransaction(); + sender.run(time.milliseconds()); + // neither produce request has been sent, so they should both be failed immediately + assertFutureFailed(authorizedTopicProduceFuture); + assertFutureFailed(unauthorizedTopicProduceFuture); + assertTrue(transactionManager.isReady()); + assertFalse(transactionManager.hasPartitionsToAdd()); + assertFalse(accumulator.hasUnflushedBatches()); + + // ensure we can now start a new transaction + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + + FutureRecordMetadata nextTransactionFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + + prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE)); + sender.run(time.milliseconds()); + assertTrue(transactionManager.isPartitionAdded(tp0)); + assertFalse(transactionManager.hasPartitionsToAdd()); + + transactionManager.beginCommittingTransaction(); + prepareProduceResponse(Errors.NONE, pid, epoch); + sender.run(time.milliseconds()); + + assertTrue(nextTransactionFuture.isDone()); + assertNotNull(nextTransactionFuture.get()); + + prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch); + sender.run(time.milliseconds()); + + assertTrue(transactionManager.isReady()); + } + + @Test + public void testRecoveryFromAbortableErrorProduceRequestInRetry() throws Exception { + final long pid = 13131L; + final short epoch = 1; + final TopicPartition unauthorizedPartition = new TopicPartition("foo", 0); + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + prepareAddPartitionsToTxn(tp0, Errors.NONE); + + Future authorizedTopicProduceFuture = accumulator.append(tp0, time.milliseconds(), + "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + assertTrue(transactionManager.isPartitionAdded(tp0)); + + accumulator.beginFlush(); + prepareProduceResponse(Errors.REQUEST_TIMED_OUT, pid, epoch); + sender.run(time.milliseconds()); + assertFalse(authorizedTopicProduceFuture.isDone()); + assertTrue(accumulator.hasUnflushedBatches()); + + transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition); + Future unauthorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(), + "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED)); + sender.run(time.milliseconds()); + assertTrue(transactionManager.hasAbortableError()); + assertTrue(transactionManager.isPartitionAdded(tp0)); + assertFalse(transactionManager.isPartitionAdded(unauthorizedPartition)); + assertFalse(authorizedTopicProduceFuture.isDone()); + + prepareProduceResponse(Errors.NONE, pid, epoch); + sender.run(time.milliseconds()); + assertFutureFailed(unauthorizedTopicProduceFuture); + assertTrue(authorizedTopicProduceFuture.isDone()); + assertNotNull(authorizedTopicProduceFuture.get()); + + prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch); + transactionManager.beginAbortingTransaction(); + sender.run(time.milliseconds()); + // neither produce request has been sent, so they should both be failed immediately + assertTrue(transactionManager.isReady()); + assertFalse(transactionManager.hasPartitionsToAdd()); + assertFalse(accumulator.hasUnflushedBatches()); + + // ensure we can now start a new transaction + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + + FutureRecordMetadata nextTransactionFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + + prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE)); + sender.run(time.milliseconds()); + assertTrue(transactionManager.isPartitionAdded(tp0)); + assertFalse(transactionManager.hasPartitionsToAdd()); + + transactionManager.beginCommittingTransaction(); + prepareProduceResponse(Errors.NONE, pid, epoch); + sender.run(time.milliseconds()); + + assertTrue(nextTransactionFuture.isDone()); + assertNotNull(nextTransactionFuture.get()); + + prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch); + sender.run(time.milliseconds()); + + assertTrue(transactionManager.isReady()); + } + @Test public void testTransactionalIdAuthorizationFailureInAddPartitions() { final long pid = 13131L; @@ -1479,6 +1676,33 @@ public class TransactionManagerTest { assertTrue(drainedBatches.get(node1.id()).isEmpty()); } + @Test + public void resendFailedProduceRequestAfterAbortableError() throws Exception { + final long pid = 13131L; + final short epoch = 1; + doInitTransactions(pid, epoch); + transactionManager.beginTransaction(); + + transactionManager.maybeAddPartitionToTransaction(tp0); + + Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + + prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); + prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, pid, epoch); + sender.run(time.milliseconds()); // AddPartitions + sender.run(time.milliseconds()); // Produce + + assertFalse(responseFuture.isDone()); + + transactionManager.transitionToAbortableError(new KafkaException()); + prepareProduceResponse(Errors.NONE, pid, epoch); + + sender.run(time.milliseconds()); + assertTrue(responseFuture.isDone()); + assertNotNull(responseFuture.get()); + } + @Test public void testTransitionToAbortableErrorOnBatchExpiry() throws InterruptedException, ExecutionException { final long pid = 13131L; @@ -1866,4 +2090,16 @@ public class TransactionManagerTest { assertTrue(transactionManager.hasError()); } } + + private void assertFutureFailed(Future future) throws InterruptedException { + assertTrue(future.isDone()); + + try { + future.get(); + fail("Expected produce future to throw"); + } catch (ExecutionException e) { + // expected + } + } + } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index aa11ba18ba6..32b6865c32c 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -268,8 +268,8 @@ class Partition(val topic: String, if (leaderLWIncremented || leaderHWIncremented) tryCompleteDelayedRequests() - debug("Recorded replica %d log end offset (LEO) position %d for partition %s." - .format(replicaId, logReadResult.info.fetchOffsetMetadata.messageOffset, topicPartition)) + debug("Recorded replica %d log end offset (LEO) position %d." + .format(replicaId, logReadResult.info.fetchOffsetMetadata.messageOffset)) case None => throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" + " is not recognized to be one of the assigned replicas %s for partition %s.") @@ -302,7 +302,7 @@ class Partition(val topic: String, assignedReplicas.map(_.brokerId).contains(replicaId) && replica.logEndOffset.offsetDiff(leaderHW) >= 0) { val newInSyncReplicas = inSyncReplicas + replica - info(s"Expanding ISR for partition $topicPartition from ${inSyncReplicas.map(_.brokerId).mkString(",")} " + + info(s"Expanding ISR from ${inSyncReplicas.map(_.brokerId).mkString(",")} " + s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}") // update ISR in ZK and cache updateIsr(newInSyncReplicas) @@ -333,7 +333,7 @@ class Partition(val topic: String, def numAcks = curInSyncReplicas.count { r => if (!r.isLocal) if (r.logEndOffset.messageOffset >= requiredOffset) { - trace(s"Replica ${r.brokerId} of ${topic}-${partitionId} received offset $requiredOffset") + trace(s"Replica ${r.brokerId} received offset $requiredOffset") true } else @@ -342,7 +342,7 @@ class Partition(val topic: String, true /* also count the local (leader) replica */ } - trace(s"$numAcks acks satisfied for ${topic}-${partitionId} with acks = -1") + trace(s"$numAcks acks satisfied with acks = -1") val minIsr = leaderReplica.log.get.config.minInSyncReplicas @@ -388,11 +388,11 @@ class Partition(val topic: String, val oldHighWatermark = leaderReplica.highWatermark if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) { leaderReplica.highWatermark = newHighWatermark - debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark)) + debug(s"High watermark updated to $newHighWatermark") true - } else { - debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s" - .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(","))) + } else { + debug(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old hw $oldHighWatermark." + + s"All LEOs are ${allLogEndOffsets.mkString(",")}") false } } @@ -427,8 +427,8 @@ class Partition(val topic: String, if(outOfSyncReplicas.nonEmpty) { val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas assert(newInSyncReplicas.nonEmpty) - info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId, - inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) + info("Shrinking ISR from %s to %s".format(inSyncReplicas.map(_.brokerId).mkString(","), + newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in zk and in cache updateIsr(newInSyncReplicas) // we may need to increment high watermark since ISR could be down to 1 @@ -464,7 +464,7 @@ class Partition(val topic: String, val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs) if (laggingReplicas.nonEmpty) - debug("Lagging replicas for partition %s are %s".format(topicPartition, laggingReplicas.map(_.brokerId).mkString(","))) + debug("Lagging replicas are %s".format(laggingReplicas.map(_.brokerId).mkString(","))) laggingReplicas } diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala index c34ea72e922..4abaada8552 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala @@ -87,7 +87,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, } } } else { - trace(s"Received response $response from node ${response.destination} with correlation id $correlationId") + debug(s"Received WriteTxnMarker response $response from node ${response.destination} with correlation id $correlationId") val writeTxnMarkerResponse = response.responseBody.asInstanceOf[WriteTxnMarkersResponse] @@ -135,7 +135,6 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, for ((topicPartition: TopicPartition, error: Errors) <- errors) { error match { case Errors.NONE => - txnMetadata.removePartition(topicPartition) case Errors.CORRUPT_MESSAGE | diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 3680d10b922..9179fecfccd 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -283,7 +283,8 @@ object DumpLogSegments { s"producerEpoch:${txnMetadata.producerEpoch}," + s"state=${txnMetadata.state}," + s"partitions=${txnMetadata.topicPartitions}," + - s"lastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}" + s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," + + s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}" (Some(keyString), Some(valueString)) } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 22699a94d34..bd04c7b61ca 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -926,10 +926,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest { try { // the second time, the call to send itself should fail (the producer becomes unusable // if no producerId can be obtained) - producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)) + producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)).get() fail("Should have raised ClusterAuthorizationException") } catch { - case e: KafkaException => + case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException]) } } @@ -959,10 +959,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest { try { // the second time, the call to send itself should fail (the producer becomes unusable // if no producerId can be obtained) - producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)) + producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)).get() fail("Should have raised ClusterAuthorizationException") } catch { - case e: KafkaException => + case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException]) } } @@ -1061,7 +1061,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { try { producer.send(new ProducerRecord(deleteTopic, 0, "1".getBytes, "1".getBytes)).get } catch { - case e : ExecutionException => + case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[TopicAuthorizationException]) } // now rollback