Browse Source

MINOR: Update TransactionManager to use LogContext

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3852 from hachikuji/minor-use-log-context-txn-manager
pull/3861/head
Jason Gustafson 7 years ago committed by Ismael Juma
parent
commit
2656659e0d
  1. 6
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  2. 12
      clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
  3. 53
      clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
  4. 24
      clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  5. 3
      clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java

6
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -363,7 +363,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -363,7 +363,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.transactionManager = configureTransactionState(config, log);
this.transactionManager = configureTransactionState(config, logContext, log);
int retries = configureRetries(config, transactionManager != null, log);
int maxInflightRequests = configureInflightRequests(config, transactionManager != null, log);
short acks = configureAcks(config, transactionManager != null, log);
@ -429,7 +429,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -429,7 +429,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
}
private static TransactionManager configureTransactionState(ProducerConfig config, Logger log) {
private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) {
TransactionManager transactionManager = null;
@ -453,7 +453,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -453,7 +453,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
transactionManager = new TransactionManager(transactionalId, transactionTimeoutMs, retryBackoffMs);
transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs);
if (transactionManager.isTransactional())
log.info("Instantiated a transactional producer.");
else

12
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

@ -346,15 +346,14 @@ public class Sender implements Runnable { @@ -346,15 +346,14 @@ public class Sender implements Runnable {
ClientRequest clientRequest = client.newClientRequest(targetNode.idString(),
requestBuilder, now, true, nextRequestHandler);
transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId());
log.debug("{}Sending transactional request {} to node {}",
transactionManager.logPrefix, requestBuilder, targetNode);
log.debug("Sending transactional request {} to node {}", requestBuilder, targetNode);
client.send(clientRequest, now);
return true;
}
} catch (IOException e) {
log.debug("{}Disconnect from {} while trying to send request {}. Going " +
"to back off and retry", transactionManager.logPrefix, targetNode, requestBuilder);
log.debug("Disconnect from {} while trying to send request {}. Going " +
"to back off and retry", targetNode, requestBuilder);
if (nextRequestHandler.needsCoordinator()) {
// We break here so that we pick up the FindCoordinator request immediately.
transactionManager.lookupCoordinator(nextRequestHandler);
@ -372,10 +371,7 @@ public class Sender implements Runnable { @@ -372,10 +371,7 @@ public class Sender implements Runnable {
private void maybeAbortBatches(RuntimeException exception) {
if (accumulator.hasIncomplete()) {
String logPrefix = "";
if (transactionManager != null)
logPrefix = transactionManager.logPrefix;
log.error("{}Aborting producer batches due to fatal error", logPrefix, exception);
log.error("Aborting producer batches due to fatal error", exception);
accumulator.abortBatches(exception);
}
}

53
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

@ -41,8 +41,8 @@ import org.apache.kafka.common.requests.TransactionResult; @@ -41,8 +41,8 @@ import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Comparator;
@ -59,14 +59,12 @@ import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID; @@ -59,14 +59,12 @@ import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
* A class which maintains state for transactions. Also keeps the state necessary to ensure idempotent production.
*/
public class TransactionManager {
private static final Logger log = LoggerFactory.getLogger(TransactionManager.class);
private static final int NO_INFLIGHT_REQUEST_CORRELATION_ID = -1;
private final Logger log;
private final String transactionalId;
private final int transactionTimeoutMs;
public final String logPrefix;
private final Map<TopicPartition, Integer> sequenceNumbers;
private final PriorityQueue<TxnRequestHandler> pendingRequests;
private final Set<TopicPartition> newPartitionsInTransaction;
@ -142,11 +140,11 @@ public class TransactionManager { @@ -142,11 +140,11 @@ public class TransactionManager {
}
}
public TransactionManager(String transactionalId, int transactionTimeoutMs, long retryBackoffMs) {
public TransactionManager(LogContext logContext, String transactionalId, int transactionTimeoutMs, long retryBackoffMs) {
this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
this.sequenceNumbers = new HashMap<>();
this.transactionalId = transactionalId;
this.logPrefix = transactionalId == null ? "" : "[TransactionalId " + transactionalId + "] ";
this.log = logContext.logger(TransactionManager.class);
this.transactionTimeoutMs = transactionTimeoutMs;
this.transactionCoordinator = null;
this.consumerGroupCoordinator = null;
@ -165,7 +163,7 @@ public class TransactionManager { @@ -165,7 +163,7 @@ public class TransactionManager {
}
TransactionManager() {
this(null, 0, 100);
this(new LogContext(), null, 0, 100);
}
public synchronized TransactionalRequestResult initializeTransactions() {
@ -221,7 +219,7 @@ public class TransactionManager { @@ -221,7 +219,7 @@ public class TransactionManager {
throw new KafkaException("Cannot send offsets to transaction either because the producer is not in an " +
"active transaction");
log.debug("{}Begin adding offsets {} for consumer group {} to transaction", logPrefix, offsets, consumerGroupId);
log.debug("Begin adding offsets {} for consumer group {} to transaction", offsets, consumerGroupId);
AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId,
producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, consumerGroupId);
AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets);
@ -235,7 +233,7 @@ public class TransactionManager { @@ -235,7 +233,7 @@ public class TransactionManager {
if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition))
return;
log.debug("{}Begin adding new partition {} to transaction", logPrefix, topicPartition);
log.debug("Begin adding new partition {} to transaction", topicPartition);
newPartitionsInTransaction.add(topicPartition);
}
@ -338,8 +336,7 @@ public class TransactionManager { @@ -338,8 +336,7 @@ public class TransactionManager {
* Set the producer id and epoch atomically.
*/
void setProducerIdAndEpoch(ProducerIdAndEpoch producerIdAndEpoch) {
log.info("{}ProducerId set to {} with epoch {}", logPrefix, producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch);
log.info("ProducerId set to {} with epoch {}", producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
this.producerIdAndEpoch = producerIdAndEpoch;
}
@ -403,23 +400,23 @@ public class TransactionManager { @@ -403,23 +400,23 @@ public class TransactionManager {
pendingRequests.poll();
if (maybeTerminateRequestWithError(nextRequestHandler)) {
log.trace("{}Not sending transactional request {} because we are in an error state",
logPrefix, nextRequestHandler.requestBuilder());
log.trace("Not sending transactional request {} because we are in an error state",
nextRequestHandler.requestBuilder());
return null;
}
if (nextRequestHandler.isEndTxn() && !transactionStarted) {
nextRequestHandler.result.done();
if (currentState != State.FATAL_ERROR) {
log.debug("{}Not sending EndTxn for completed transaction since no partitions " +
"or offsets were successfully added", logPrefix);
log.debug("Not sending EndTxn for completed transaction since no partitions " +
"or offsets were successfully added");
completeTransaction();
}
nextRequestHandler = pendingRequests.poll();
}
if (nextRequestHandler != null)
log.trace("{}Request {} dequeued for sending", logPrefix, nextRequestHandler.requestBuilder());
log.trace("Request {} dequeued for sending", nextRequestHandler.requestBuilder());
return nextRequestHandler;
}
@ -507,9 +504,9 @@ public class TransactionManager { @@ -507,9 +504,9 @@ public class TransactionManager {
}
if (lastError != null)
log.debug("{}Transition from state {} to error state {}", logPrefix, currentState, target, lastError);
log.debug("Transition from state {} to error state {}", currentState, target, lastError);
else
log.debug("{}Transition from state {} to {}", logPrefix, currentState, target);
log.debug("Transition from state {} to {}", currentState, target);
currentState = target;
}
@ -537,7 +534,7 @@ public class TransactionManager { @@ -537,7 +534,7 @@ public class TransactionManager {
}
private void enqueueRequest(TxnRequestHandler requestHandler) {
log.debug("{}Enqueuing transactional request {}", logPrefix, requestHandler.requestBuilder());
log.debug("Enqueuing transactional request {}", requestHandler.requestBuilder());
pendingRequests.add(requestHandler);
}
@ -634,15 +631,15 @@ public class TransactionManager { @@ -634,15 +631,15 @@ public class TransactionManager {
} else {
clearInFlightRequestCorrelationId();
if (response.wasDisconnected()) {
log.debug("{}Disconnected from {}. Will retry.", logPrefix, response.destination());
log.debug("Disconnected from {}. Will retry.", response.destination());
if (this.needsCoordinator())
lookupCoordinator(this.coordinatorType(), this.coordinatorKey());
reenqueue();
} else if (response.versionMismatch() != null) {
fatalError(response.versionMismatch());
} else if (response.hasResponse()) {
log.trace("{}Received transactional response {} for request {}", logPrefix,
response.responseBody(), requestBuilder());
log.trace("Received transactional response {} for request {}", response.responseBody(),
requestBuilder());
synchronized (TransactionManager.this) {
handleResponse(response.responseBody());
}
@ -781,10 +778,11 @@ public class TransactionManager { @@ -781,10 +778,11 @@ public class TransactionManager {
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
unauthorizedTopics.add(topicPartition.topic());
} else if (error == Errors.OPERATION_NOT_ATTEMPTED) {
log.debug("{}Did not attempt to add partition {} to transaction because other partitions in the batch had errors.", logPrefix, topicPartition);
log.debug("Did not attempt to add partition {} to transaction because other partitions in the " +
"batch had errors.", topicPartition);
hasPartitionErrors = true;
} else {
log.error("{}Could not add partition {} due to unexpected error {}", logPrefix, topicPartition, error);
log.error("Could not add partition {} due to unexpected error {}", topicPartition, error);
hasPartitionErrors = true;
}
}
@ -803,7 +801,7 @@ public class TransactionManager { @@ -803,7 +801,7 @@ public class TransactionManager {
} else if (hasPartitionErrors) {
abortableError(new KafkaException("Could not add partitions to transaction due to errors: " + errors));
} else {
log.debug("{}Successfully added partitions {} to transaction", logPrefix, partitions);
log.debug("Successfully added partitions {} to transaction", partitions);
partitionsInTransaction.addAll(partitions);
transactionStarted = true;
result.done();
@ -956,8 +954,7 @@ public class TransactionManager { @@ -956,8 +954,7 @@ public class TransactionManager {
Errors error = addOffsetsToTxnResponse.error();
if (error == Errors.NONE) {
log.debug("{}Successfully added partition for consumer group {} to transaction", logPrefix,
builder.consumerGroupId());
log.debug("Successfully added partition for consumer group {} to transaction", builder.consumerGroupId());
// note the result is not completed until the TxnOffsetCommit returns
pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId()));
@ -1019,7 +1016,7 @@ public class TransactionManager { @@ -1019,7 +1016,7 @@ public class TransactionManager {
TopicPartition topicPartition = entry.getKey();
Errors error = entry.getValue();
if (error == Errors.NONE) {
log.debug("{}Successfully added offsets {} from consumer group {} to transaction.", logPrefix,
log.debug("Successfully added offsets {} from consumer group {} to transaction.",
builder.offsets(), builder.consumerGroupId());
pendingTxnOffsetCommits.remove(topicPartition);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE

24
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

@ -104,7 +104,7 @@ public class SenderTest { @@ -104,7 +104,7 @@ public class SenderTest {
private RecordAccumulator accumulator = null;
private Sender sender = null;
private SenderMetricsRegistry senderMetricsRegistry = null;
private final LogContext loggerFactory = new LogContext();
private final LogContext logContext = new LogContext();
@Before
public void setup() {
@ -240,7 +240,7 @@ public class SenderTest { @@ -240,7 +240,7 @@ public class SenderTest {
Node node = cluster.nodes().get(0);
NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
1000, 1000, 64 * 1024, 64 * 1024, 1000,
time, true, new ApiVersions(), throttleTimeSensor, new LogContext());
time, true, new ApiVersions(), throttleTimeSensor, logContext);
short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE).serialize(apiVersionsResponseVersion, new ResponseHeader(0));
@ -277,7 +277,7 @@ public class SenderTest { @@ -277,7 +277,7 @@ public class SenderTest {
int maxRetries = 1;
Metrics m = new Metrics();
try {
Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, null, apiVersions);
// do a successful retry
Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@ -324,7 +324,7 @@ public class SenderTest { @@ -324,7 +324,7 @@ public class SenderTest {
int maxRetries = 1;
Metrics m = new Metrics();
try {
Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, null, apiVersions);
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
@ -576,7 +576,7 @@ public class SenderTest { @@ -576,7 +576,7 @@ public class SenderTest {
int maxRetries = 10;
Metrics m = new Metrics();
Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@ -618,7 +618,7 @@ public class SenderTest { @@ -618,7 +618,7 @@ public class SenderTest {
int maxRetries = 10;
Metrics m = new Metrics();
SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry();
Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, metricsRegistry, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@ -655,7 +655,7 @@ public class SenderTest { @@ -655,7 +655,7 @@ public class SenderTest {
int maxRetries = 10;
Metrics m = new Metrics();
Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@ -684,7 +684,7 @@ public class SenderTest { @@ -684,7 +684,7 @@ public class SenderTest {
public void testTransactionalSplitBatchAndSend() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
TransactionManager txnManager = new TransactionManager("testSplitBatchAndSend", 60000, 100);
TransactionManager txnManager = new TransactionManager(logContext, "testSplitBatchAndSend", 60000, 100);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@ -705,10 +705,10 @@ public class SenderTest { @@ -705,10 +705,10 @@ public class SenderTest {
// Set a good compression ratio.
CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
try (Metrics m = new Metrics()) {
accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
new ApiVersions(), txnManager);
SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry();
Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, metricsRegistry, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
Cluster cluster1 = TestUtils.clusterWith(2, topic, 2);
@ -826,10 +826,10 @@ public class SenderTest { @@ -826,10 +826,10 @@ public class SenderTest {
metricTags.put("client-id", CLIENT_ID);
MetricConfig metricConfig = new MetricConfig().tags(metricTags);
this.metrics = new Metrics(metricConfig, time);
this.accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time,
this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time,
apiVersions, transactionManager);
this.senderMetricsRegistry = new SenderMetricsRegistry(metricTags.keySet());
this.sender = new Sender(loggerFactory, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
MAX_RETRIES, this.metrics, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
}

3
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java

@ -120,7 +120,8 @@ public class TransactionManagerTest { @@ -120,7 +120,8 @@ public class TransactionManagerTest {
int batchSize = 16 * 1024;
MetricConfig metricConfig = new MetricConfig().tags(metricTags);
this.brokerNode = new Node(0, "localhost", 2211);
this.transactionManager = new TransactionManager(transactionalId, transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS);
this.transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs,
DEFAULT_RETRY_BACKOFF_MS);
Metrics metrics = new Metrics(metricConfig, time);
this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager);
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,

Loading…
Cancel
Save