Browse Source

MINOR: Rename InitPidRequest/InitPidResponse to InitProducerIdRequest/InitProducerIdResponse

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2997 from hachikuji/minor-rename-initpid
pull/3039/merge
Jason Gustafson 8 years ago
parent
commit
a1c8e7d941
  1. 2
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  2. 4
      clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
  3. 6
      clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java
  4. 12
      clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  5. 24
      clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
  6. 64
      clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
  7. 2
      clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
  8. 3
      clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
  9. 2
      clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
  10. 2
      clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
  11. 20
      clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
  12. 12
      clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
  13. 20
      clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  14. 26
      clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
  15. 8
      clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  16. 119
      core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
  17. 90
      core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
  18. 7
      core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
  19. 84
      core/src/main/scala/kafka/log/ProducerStateManager.scala
  20. 29
      core/src/main/scala/kafka/server/KafkaApis.scala
  21. 2
      core/src/main/scala/kafka/server/KafkaConfig.scala
  22. 6
      core/src/main/scala/kafka/utils/ZkUtils.scala
  23. 14
      core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
  24. 16
      core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
  25. 50
      core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
  26. 4
      core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

2
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -688,7 +688,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -688,7 +688,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
if (transactionManager == null)
return;
if (transactionManager.isTransactional() && !transactionManager.hasPid())
if (transactionManager.isTransactional() && !transactionManager.hasProducerId())
throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled.");
if (transactionManager.isFenced())

4
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java

@ -231,8 +231,8 @@ public final class ProducerBatch { @@ -231,8 +231,8 @@ public final class ProducerBatch {
return recordsBuilder.isFull();
}
public void setProducerState(PidAndEpoch pidAndEpoch, int baseSequence) {
recordsBuilder.setProducerState(pidAndEpoch.producerId, pidAndEpoch.epoch, baseSequence);
public void setProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence) {
recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence);
}
/**

6
clients/src/main/java/org/apache/kafka/clients/producer/internals/PidAndEpoch.java → clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java

@ -19,13 +19,13 @@ package org.apache.kafka.clients.producer.internals; @@ -19,13 +19,13 @@ package org.apache.kafka.clients.producer.internals;
import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
class PidAndEpoch {
static final PidAndEpoch NONE = new PidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
class ProducerIdAndEpoch {
static final ProducerIdAndEpoch NONE = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
public final long producerId;
public final short epoch;
PidAndEpoch(long producerId, short epoch) {
ProducerIdAndEpoch(long producerId, short epoch) {
this.producerId = producerId;
this.epoch = epoch;
}

12
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

@ -444,16 +444,16 @@ public final class RecordAccumulator { @@ -444,16 +444,16 @@ public final class RecordAccumulator {
// request
break;
} else {
PidAndEpoch pidAndEpoch = null;
ProducerIdAndEpoch producerIdAndEpoch = null;
if (transactionManager != null) {
pidAndEpoch = transactionManager.pidAndEpoch();
if (!pidAndEpoch.isValid())
producerIdAndEpoch = transactionManager.pidAndEpoch();
if (!producerIdAndEpoch.isValid())
// we cannot send the batch until we have refreshed the PID
break;
}
ProducerBatch batch = deque.pollFirst();
if (pidAndEpoch != null && !batch.inRetry()) {
if (producerIdAndEpoch != null && !batch.inRetry()) {
// If the batch is in retry, then we should not change the pid and
// sequence number, since this may introduce duplicates. In particular,
// the previous attempt may actually have been accepted, and if we change
@ -461,9 +461,9 @@ public final class RecordAccumulator { @@ -461,9 +461,9 @@ public final class RecordAccumulator {
// a duplicate.
int sequenceNumber = transactionManager.sequenceNumber(batch.topicPartition);
log.debug("Dest: {} : producerId: {}, epoch: {}, Assigning sequence for {}: {}",
node, pidAndEpoch.producerId, pidAndEpoch.epoch,
node, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch,
batch.topicPartition, sequenceNumber);
batch.setProducerState(pidAndEpoch, sequenceNumber);
batch.setProducerState(producerIdAndEpoch, sequenceNumber);
}
batch.close();
size += batch.sizeInBytes();

24
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

@ -42,8 +42,8 @@ import org.apache.kafka.common.metrics.stats.Rate; @@ -42,8 +42,8 @@ import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.InitPidRequest;
import org.apache.kafka.common.requests.InitPidResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.Time;
@ -357,7 +357,7 @@ public class Sender implements Runnable { @@ -357,7 +357,7 @@ public class Sender implements Runnable {
private ClientResponse sendAndAwaitInitPidRequest(Node node) throws IOException {
String nodeId = node.idString();
InitPidRequest.Builder builder = new InitPidRequest.Builder(null);
InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(null);
ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, null);
return NetworkClientUtils.sendAndReceive(client, request, time);
}
@ -376,28 +376,28 @@ public class Sender implements Runnable { @@ -376,28 +376,28 @@ public class Sender implements Runnable {
if (transactionManager == null || transactionManager.isTransactional())
return;
while (!transactionManager.hasPid()) {
while (!transactionManager.hasProducerId()) {
try {
Node node = awaitLeastLoadedNodeReady(requestTimeout);
if (node != null) {
ClientResponse response = sendAndAwaitInitPidRequest(node);
if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
PidAndEpoch pidAndEpoch = new PidAndEpoch(
initPidResponse.producerId(), initPidResponse.epoch());
transactionManager.setPidAndEpoch(pidAndEpoch);
if (response.hasResponse() && (response.responseBody() instanceof InitProducerIdResponse)) {
InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody();
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
} else {
log.error("Received an unexpected response type for an InitPidRequest from {}. " +
log.error("Received an unexpected response type for an InitProducerIdRequest from {}. " +
"We will back off and try again.", node);
}
} else {
log.debug("Could not find an available broker to send InitPidRequest to. " +
log.debug("Could not find an available broker to send InitProducerIdRequest to. " +
"We will back off and try again.");
}
} catch (Exception e) {
log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
}
log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
log.trace("Retry InitProducerIdRequest in {}ms.", retryBackoffMs);
time.sleep(retryBackoffMs);
metadata.requestUpdate();
}

64
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

@ -35,8 +35,8 @@ import org.apache.kafka.common.requests.EndTxnRequest; @@ -35,8 +35,8 @@ import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitPidRequest;
import org.apache.kafka.common.requests.InitPidResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
@ -79,7 +79,7 @@ public class TransactionManager { @@ -79,7 +79,7 @@ public class TransactionManager {
private volatile State currentState = State.UNINITIALIZED;
private volatile Exception lastError = null;
private volatile PidAndEpoch pidAndEpoch;
private volatile ProducerIdAndEpoch producerIdAndEpoch;
private enum State {
UNINITIALIZED,
@ -130,7 +130,7 @@ public class TransactionManager { @@ -130,7 +130,7 @@ public class TransactionManager {
}
public TransactionManager(String transactionalId, int transactionTimeoutMs) {
this.pidAndEpoch = new PidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
this.sequenceNumbers = new HashMap<>();
this.transactionalId = transactionalId;
this.transactionTimeoutMs = transactionTimeoutMs;
@ -155,10 +155,10 @@ public class TransactionManager { @@ -155,10 +155,10 @@ public class TransactionManager {
public synchronized TransactionalRequestResult initializeTransactions() {
ensureTransactional();
transitionTo(State.INITIALIZING);
setPidAndEpoch(PidAndEpoch.NONE);
setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
this.sequenceNumbers.clear();
InitPidRequest.Builder builder = new InitPidRequest.Builder(transactionalId, transactionTimeoutMs);
InitPidHandler handler = new InitPidHandler(builder);
InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs);
InitProducerIdHandler handler = new InitProducerIdHandler(builder);
pendingRequests.add(handler);
return handler.result;
}
@ -190,8 +190,8 @@ public class TransactionManager { @@ -190,8 +190,8 @@ public class TransactionManager {
}
TransactionResult transactionResult = isCommit ? TransactionResult.COMMIT : TransactionResult.ABORT;
EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, pidAndEpoch.producerId,
pidAndEpoch.epoch, transactionResult);
EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch, transactionResult);
EndTxnHandler handler = new EndTxnHandler(builder);
pendingRequests.add(handler);
return handler.result;
@ -206,7 +206,7 @@ public class TransactionManager { @@ -206,7 +206,7 @@ public class TransactionManager {
"active transaction");
AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId,
pidAndEpoch.producerId, pidAndEpoch.epoch, consumerGroupId);
producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, consumerGroupId);
AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets);
pendingRequests.add(handler);
return handler.result;
@ -226,8 +226,8 @@ public class TransactionManager { @@ -226,8 +226,8 @@ public class TransactionManager {
return transactionalId;
}
public boolean hasPid() {
return pidAndEpoch.isValid();
public boolean hasProducerId() {
return producerIdAndEpoch.isValid();
}
public boolean isTransactional() {
@ -262,20 +262,20 @@ public class TransactionManager { @@ -262,20 +262,20 @@ public class TransactionManager {
}
/**
* Get the current pid and epoch without blocking. Callers must use {@link PidAndEpoch#isValid()} to
* Get the current pid and epoch without blocking. Callers must use {@link ProducerIdAndEpoch#isValid()} to
* verify that the result is valid.
*
* @return the current PidAndEpoch.
* @return the current ProducerIdAndEpoch.
*/
PidAndEpoch pidAndEpoch() {
return pidAndEpoch;
ProducerIdAndEpoch pidAndEpoch() {
return producerIdAndEpoch;
}
/**
* Set the pid and epoch atomically.
*/
void setPidAndEpoch(PidAndEpoch pidAndEpoch) {
this.pidAndEpoch = pidAndEpoch;
void setProducerIdAndEpoch(ProducerIdAndEpoch producerIdAndEpoch) {
this.producerIdAndEpoch = producerIdAndEpoch;
}
/**
@ -299,7 +299,7 @@ public class TransactionManager { @@ -299,7 +299,7 @@ public class TransactionManager {
if (isTransactional())
throw new IllegalStateException("Cannot reset producer state for a transactional producer. " +
"You must either abort the ongoing transaction or reinitialize the transactional producer instead");
setPidAndEpoch(PidAndEpoch.NONE);
setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
this.sequenceNumbers.clear();
}
@ -448,7 +448,7 @@ public class TransactionManager { @@ -448,7 +448,7 @@ public class TransactionManager {
pendingPartitionsToBeAddedToTransaction.addAll(newPartitionsToBeAddedToTransaction);
newPartitionsToBeAddedToTransaction.clear();
AddPartitionsToTxnRequest.Builder builder = new AddPartitionsToTxnRequest.Builder(transactionalId,
pidAndEpoch.producerId, pidAndEpoch.epoch, new ArrayList<>(pendingPartitionsToBeAddedToTransaction));
producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, new ArrayList<>(pendingPartitionsToBeAddedToTransaction));
return new AddPartitionsToTxnHandler(builder);
}
@ -461,7 +461,7 @@ public class TransactionManager { @@ -461,7 +461,7 @@ public class TransactionManager {
pendingTxnOffsetCommits.put(entry.getKey(), committedOffset);
}
TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(consumerGroupId,
pidAndEpoch.producerId, pidAndEpoch.epoch, OffsetCommitRequest.DEFAULT_RETENTION_TIME,
producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, OffsetCommitRequest.DEFAULT_RETENTION_TIME,
pendingTxnOffsetCommits);
return new TxnOffsetCommitHandler(result, builder);
}
@ -487,7 +487,7 @@ public class TransactionManager { @@ -487,7 +487,7 @@ public class TransactionManager {
void fenced() {
log.error("Producer has become invalid, which typically means another producer with the same " +
"transactional.id has been started: producerId: {}. epoch: {}.",
pidAndEpoch.producerId, pidAndEpoch.epoch);
producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
result.setError(Errors.INVALID_PRODUCER_EPOCH.exception());
transitionTo(State.FENCED, Errors.INVALID_PRODUCER_EPOCH.exception());
result.done();
@ -548,15 +548,15 @@ public class TransactionManager { @@ -548,15 +548,15 @@ public class TransactionManager {
abstract Priority priority();
}
private class InitPidHandler extends TxnRequestHandler {
private final InitPidRequest.Builder builder;
private class InitProducerIdHandler extends TxnRequestHandler {
private final InitProducerIdRequest.Builder builder;
private InitPidHandler(InitPidRequest.Builder builder) {
private InitProducerIdHandler(InitProducerIdRequest.Builder builder) {
this.builder = builder;
}
@Override
InitPidRequest.Builder requestBuilder() {
InitProducerIdRequest.Builder requestBuilder() {
return builder;
}
@ -567,11 +567,11 @@ public class TransactionManager { @@ -567,11 +567,11 @@ public class TransactionManager {
@Override
public void handleResponse(AbstractResponse response) {
InitPidResponse initPidResponse = (InitPidResponse) response;
Errors error = initPidResponse.error();
InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response;
Errors error = initProducerIdResponse.error();
if (error == Errors.NONE) {
PidAndEpoch pidAndEpoch = new PidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
setPidAndEpoch(pidAndEpoch);
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
setProducerIdAndEpoch(producerIdAndEpoch);
transitionTo(State.READY);
lastError = null;
result.done();
@ -581,7 +581,7 @@ public class TransactionManager { @@ -581,7 +581,7 @@ public class TransactionManager {
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
reenqueue();
} else {
fatal(new KafkaException("Unexpected error in InitPidResponse; " + error.message()));
fatal(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message()));
}
}
}
@ -616,7 +616,7 @@ public class TransactionManager { @@ -616,7 +616,7 @@ public class TransactionManager {
reenqueue();
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
reenqueue();
} else if (error == Errors.INVALID_PID_MAPPING) {
} else if (error == Errors.INVALID_PRODUCER_ID_MAPPING) {
fatal(new KafkaException(error.exception()));
} else if (error == Errors.INVALID_TXN_STATE) {
fatal(new KafkaException(error.exception()));

2
clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java

@ -17,7 +17,7 @@ @@ -17,7 +17,7 @@
package org.apache.kafka.common.errors;
/**
* The transaction coordinator returns this error code if the timeout received via the InitPidRequest is larger than
* The transaction coordinator returns this error code if the timeout received via the InitProducerIdRequest is larger than
* the `max.transaction.timeout.ms` config value.
*/
public class InvalidTxnTimeoutException extends ApiException {

3
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java

@ -439,7 +439,8 @@ public enum Errors { @@ -439,7 +439,8 @@ public enum Errors {
return new InvalidTxnStateException(message);
}
}),
INVALID_PID_MAPPING(49, "The PID mapping is invalid",
INVALID_PRODUCER_ID_MAPPING(49, "The producer attempted to use a producerId which is not currently assigned to " +
"its transactionalId",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {

2
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java

@ -179,7 +179,7 @@ public abstract class AbstractRequest extends AbstractRequestResponse { @@ -179,7 +179,7 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
request = new DeleteRecordsRequest(struct, version);
break;
case INIT_PRODUCER_ID:
request = new InitPidRequest(struct, version);
request = new InitProducerIdRequest(struct, version);
break;
case OFFSET_FOR_LEADER_EPOCH:
request = new OffsetsForLeaderEpochRequest(struct, version);

2
clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java

@ -95,7 +95,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { @@ -95,7 +95,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case DELETE_RECORDS:
return new DeleteRecordsResponse(struct);
case INIT_PRODUCER_ID:
return new InitPidResponse(struct);
return new InitProducerIdResponse(struct);
case OFFSET_FOR_LEADER_EPOCH:
return new OffsetsForLeaderEpochResponse(struct);
case ADD_PARTITIONS_TO_TXN:

20
clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java → clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java

@ -22,7 +22,7 @@ import org.apache.kafka.common.protocol.types.Struct; @@ -22,7 +22,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public class InitPidRequest extends AbstractRequest {
public class InitProducerIdRequest extends AbstractRequest {
public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE;
private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
@ -31,7 +31,7 @@ public class InitPidRequest extends AbstractRequest { @@ -31,7 +31,7 @@ public class InitPidRequest extends AbstractRequest {
private final String transactionalId;
private final int transactionTimeoutMs;
public static class Builder extends AbstractRequest.Builder<InitPidRequest> {
public static class Builder extends AbstractRequest.Builder<InitProducerIdRequest> {
private final String transactionalId;
private final int transactionTimeoutMs;
@ -53,24 +53,24 @@ public class InitPidRequest extends AbstractRequest { @@ -53,24 +53,24 @@ public class InitPidRequest extends AbstractRequest {
}
@Override
public InitPidRequest build(short version) {
return new InitPidRequest(version, transactionalId, transactionTimeoutMs);
public InitProducerIdRequest build(short version) {
return new InitProducerIdRequest(version, transactionalId, transactionTimeoutMs);
}
@Override
public String toString() {
return "(type=InitPidRequest, transactionalId=" + transactionalId + ", transactionTimeoutMs=" +
return "(type=InitProducerIdRequest, transactionalId=" + transactionalId + ", transactionTimeoutMs=" +
transactionTimeoutMs + ")";
}
}
public InitPidRequest(Struct struct, short version) {
public InitProducerIdRequest(Struct struct, short version) {
super(version);
this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
this.transactionTimeoutMs = struct.getInt(TRANSACTION_TIMEOUT_KEY_NAME);
}
private InitPidRequest(short version, String transactionalId, int transactionTimeoutMs) {
private InitProducerIdRequest(short version, String transactionalId, int transactionTimeoutMs) {
super(version);
this.transactionalId = transactionalId;
this.transactionTimeoutMs = transactionTimeoutMs;
@ -78,11 +78,11 @@ public class InitPidRequest extends AbstractRequest { @@ -78,11 +78,11 @@ public class InitPidRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new InitPidResponse(throttleTimeMs, Errors.forException(e));
return new InitProducerIdResponse(throttleTimeMs, Errors.forException(e));
}
public static InitPidRequest parse(ByteBuffer buffer, short version) {
return new InitPidRequest(ApiKeys.INIT_PRODUCER_ID.parseRequest(version, buffer), version);
public static InitProducerIdRequest parse(ByteBuffer buffer, short version) {
return new InitProducerIdRequest(ApiKeys.INIT_PRODUCER_ID.parseRequest(version, buffer), version);
}
public String transactionalId() {

12
clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java → clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java

@ -23,7 +23,7 @@ import org.apache.kafka.common.record.RecordBatch; @@ -23,7 +23,7 @@ import org.apache.kafka.common.record.RecordBatch;
import java.nio.ByteBuffer;
public class InitPidResponse extends AbstractResponse {
public class InitProducerIdResponse extends AbstractResponse {
/**
* Possible Error codes:
* OK
@ -38,21 +38,21 @@ public class InitPidResponse extends AbstractResponse { @@ -38,21 +38,21 @@ public class InitPidResponse extends AbstractResponse {
private final long producerId;
private final short epoch;
public InitPidResponse(int throttleTimeMs, Errors error, long producerId, short epoch) {
public InitProducerIdResponse(int throttleTimeMs, Errors error, long producerId, short epoch) {
this.throttleTimeMs = throttleTimeMs;
this.error = error;
this.producerId = producerId;
this.epoch = epoch;
}
public InitPidResponse(Struct struct) {
public InitProducerIdResponse(Struct struct) {
this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
this.epoch = struct.getShort(EPOCH_KEY_NAME);
}
public InitPidResponse(int throttleTimeMs, Errors errors) {
public InitProducerIdResponse(int throttleTimeMs, Errors errors) {
this(throttleTimeMs, errors, RecordBatch.NO_PRODUCER_ID, (short) 0);
}
@ -82,8 +82,8 @@ public class InitPidResponse extends AbstractResponse { @@ -82,8 +82,8 @@ public class InitPidResponse extends AbstractResponse {
return struct;
}
public static InitPidResponse parse(ByteBuffer buffer, short version) {
return new InitPidResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer));
public static InitProducerIdResponse parse(ByteBuffer buffer, short version) {
return new InitProducerIdResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer));
}
}

20
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

@ -37,9 +37,9 @@ import org.apache.kafka.common.record.RecordBatch; @@ -37,9 +37,9 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.InitPidRequest;
import org.apache.kafka.common.requests.InitPidResponse;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
@ -382,11 +382,11 @@ public class SenderTest { @@ -382,11 +382,11 @@ public class SenderTest {
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
return body instanceof InitPidRequest;
return body instanceof InitProducerIdRequest;
}
}, new InitPidResponse(0, Errors.NONE, producerId, (short) 0));
}, new InitProducerIdResponse(0, Errors.NONE, producerId, (short) 0));
sender.run(time.milliseconds());
assertTrue(transactionManager.hasPid());
assertTrue(transactionManager.hasProducerId());
assertEquals(producerId, transactionManager.pidAndEpoch().producerId);
assertEquals((short) 0, transactionManager.pidAndEpoch().epoch);
}
@ -395,7 +395,7 @@ public class SenderTest { @@ -395,7 +395,7 @@ public class SenderTest {
public void testSequenceNumberIncrement() throws InterruptedException {
final long producerId = 343434L;
TransactionManager transactionManager = new TransactionManager();
transactionManager.setPidAndEpoch(new PidAndEpoch(producerId, (short) 0));
transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
setupWithTransactionState(transactionManager);
client.setNode(new Node(1, "localhost", 33343));
@ -448,7 +448,7 @@ public class SenderTest { @@ -448,7 +448,7 @@ public class SenderTest {
public void testAbortRetryWhenPidChanges() throws InterruptedException {
final long producerId = 343434L;
TransactionManager transactionManager = new TransactionManager();
transactionManager.setPidAndEpoch(new PidAndEpoch(producerId, (short) 0));
transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
setupWithTransactionState(transactionManager);
client.setNode(new Node(1, "localhost", 33343));
@ -480,7 +480,7 @@ public class SenderTest { @@ -480,7 +480,7 @@ public class SenderTest {
assertEquals(0, client.inFlightRequestCount());
assertFalse("Client ready status should be false", client.isReady(node, 0L));
transactionManager.setPidAndEpoch(new PidAndEpoch(producerId + 1, (short) 0));
transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId + 1, (short) 0));
sender.run(time.milliseconds()); // receive error
sender.run(time.milliseconds()); // reconnect
sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We should check the metrics for errors.
@ -497,7 +497,7 @@ public class SenderTest { @@ -497,7 +497,7 @@ public class SenderTest {
public void testResetWhenOutOfOrderSequenceReceived() throws InterruptedException {
final long producerId = 343434L;
TransactionManager transactionManager = new TransactionManager();
transactionManager.setPidAndEpoch(new PidAndEpoch(producerId, (short) 0));
transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
setupWithTransactionState(transactionManager);
client.setNode(new Node(1, "localhost", 33343));
@ -528,7 +528,7 @@ public class SenderTest { @@ -528,7 +528,7 @@ public class SenderTest {
sender.run(time.milliseconds());
assertTrue(responseFuture.isDone());
assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasPid());
assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
}
private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {

26
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java

@ -44,8 +44,8 @@ import org.apache.kafka.common.requests.EndTxnRequest; @@ -44,8 +44,8 @@ import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitPidRequest;
import org.apache.kafka.common.requests.InitPidResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.TransactionResult;
@ -163,7 +163,7 @@ public class TransactionManagerTest { @@ -163,7 +163,7 @@ public class TransactionManagerTest {
sender.run(time.milliseconds()); // get pid.
assertTrue(transactionManager.hasPid());
assertTrue(transactionManager.hasProducerId());
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
@ -275,7 +275,7 @@ public class TransactionManagerTest { @@ -275,7 +275,7 @@ public class TransactionManagerTest {
assertEquals(null, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
assertFalse(initPidResult.isCompleted());
assertFalse(transactionManager.hasPid());
assertFalse(transactionManager.hasProducerId());
prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
sender.run(time.milliseconds());
@ -285,7 +285,7 @@ public class TransactionManagerTest { @@ -285,7 +285,7 @@ public class TransactionManagerTest {
sender.run(time.milliseconds()); // get pid and epoch
assertTrue(initPidResult.isCompleted()); // The future should only return after the second round of retries succeed.
assertTrue(transactionManager.hasPid());
assertTrue(transactionManager.hasProducerId());
assertEquals(pid, transactionManager.pidAndEpoch().producerId);
assertEquals(epoch, transactionManager.pidAndEpoch().epoch);
}
@ -308,7 +308,7 @@ public class TransactionManagerTest { @@ -308,7 +308,7 @@ public class TransactionManagerTest {
sender.run(time.milliseconds()); // get pid.
assertTrue(transactionManager.hasPid());
assertTrue(transactionManager.hasProducerId());
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
@ -365,7 +365,7 @@ public class TransactionManagerTest { @@ -365,7 +365,7 @@ public class TransactionManagerTest {
sender.run(time.milliseconds()); // get pid.
assertTrue(transactionManager.hasPid());
assertTrue(transactionManager.hasProducerId());
transactionManager.beginTransaction();
// User does one producer.sed
transactionManager.maybeAddPartitionToTransaction(tp0);
@ -428,7 +428,7 @@ public class TransactionManagerTest { @@ -428,7 +428,7 @@ public class TransactionManagerTest {
sender.run(time.milliseconds()); // get pid.
assertTrue(transactionManager.hasPid());
assertTrue(transactionManager.hasProducerId());
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
@ -463,7 +463,7 @@ public class TransactionManagerTest { @@ -463,7 +463,7 @@ public class TransactionManagerTest {
sender.run(time.milliseconds()); // get pid.
assertTrue(transactionManager.hasPid());
assertTrue(transactionManager.hasProducerId());
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
@ -530,12 +530,12 @@ public class TransactionManagerTest { @@ -530,12 +530,12 @@ public class TransactionManagerTest {
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
InitPidRequest initPidRequest = (InitPidRequest) body;
assertEquals(initPidRequest.transactionalId(), transactionalId);
assertEquals(initPidRequest.transactionTimeoutMs(), transactionTimeoutMs);
InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body;
assertEquals(initProducerIdRequest.transactionalId(), transactionalId);
assertEquals(initProducerIdRequest.transactionTimeoutMs(), transactionTimeoutMs);
return true;
}
}, new InitPidResponse(0, error, pid, epoch), shouldDisconnect);
}, new InitProducerIdResponse(0, error, pid, epoch), shouldDisconnect);
}
private void prepareProduceResponse(Errors error, final long pid, final short epoch) {

8
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

@ -879,12 +879,12 @@ public class RequestResponseTest { @@ -879,12 +879,12 @@ public class RequestResponseTest {
return new DeleteTopicsResponse(errors);
}
private InitPidRequest createInitPidRequest() {
return new InitPidRequest.Builder(null, 100).build();
private InitProducerIdRequest createInitPidRequest() {
return new InitProducerIdRequest.Builder(null, 100).build();
}
private InitPidResponse createInitPidResponse() {
return new InitPidResponse(0, Errors.NONE, 3332, (short) 3);
private InitProducerIdResponse createInitPidResponse() {
return new InitProducerIdResponse(0, Errors.NONE, 3332, (short) 3);
}

119
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala

@ -20,49 +20,49 @@ import kafka.common.KafkaException @@ -20,49 +20,49 @@ import kafka.common.KafkaException
import kafka.utils.{Json, Logging, ZkUtils}
/**
* ProducerIdManager is the part of the transaction coordinator that provides ProducerIds (PIDs) in a unique way
* such that the same PID will not be assigned twice across multiple transaction coordinators.
* ProducerIdManager is the part of the transaction coordinator that provides ProducerIds in a unique way
* such that the same producerId will not be assigned twice across multiple transaction coordinators.
*
* PIDs are managed via ZooKeeper, where the latest pid block is written on the corresponding ZK path by the manager who
* claims the block, where the written block_start_pid and block_end_pid are both inclusive.
* ProducerIds are managed via ZooKeeper, where the latest producerId block is written on the corresponding ZK
* path by the manager who claims the block, where the written block_start and block_end are both inclusive.
*/
object ProducerIdManager extends Logging {
val CurrentVersion: Long = 1L
val PidBlockSize: Long = 1000L
def generatePidBlockJson(pidBlock: ProducerIdBlock): String = {
def generateProducerIdBlockJson(producerIdBlock: ProducerIdBlock): String = {
Json.encode(Map("version" -> CurrentVersion,
"broker" -> pidBlock.brokerId,
"block_start" -> pidBlock.blockStartPid.toString,
"block_end" -> pidBlock.blockEndPid.toString)
"broker" -> producerIdBlock.brokerId,
"block_start" -> producerIdBlock.blockStartId.toString,
"block_end" -> producerIdBlock.blockEndId.toString)
)
}
def parsePidBlockData(jsonData: String): ProducerIdBlock = {
def parseProducerIdBlockData(jsonData: String): ProducerIdBlock = {
try {
Json.parseFull(jsonData).flatMap { m =>
val pidBlockInfo = m.asInstanceOf[Map[String, Any]]
val brokerId = pidBlockInfo("broker").asInstanceOf[Int]
val blockStartPID = pidBlockInfo("block_start").asInstanceOf[String].toLong
val blockEndPID = pidBlockInfo("block_end").asInstanceOf[String].toLong
Some(ProducerIdBlock(brokerId, blockStartPID, blockEndPID))
}.getOrElse(throw new KafkaException(s"Failed to parse the pid block json $jsonData"))
val producerIdBlockInfo = m.asInstanceOf[Map[String, Any]]
val brokerId = producerIdBlockInfo("broker").asInstanceOf[Int]
val blockStart = producerIdBlockInfo("block_start").asInstanceOf[String].toLong
val blockEnd = producerIdBlockInfo("block_end").asInstanceOf[String].toLong
Some(ProducerIdBlock(brokerId, blockStart, blockEnd))
}.getOrElse(throw new KafkaException(s"Failed to parse the producerId block json $jsonData"))
} catch {
case e: java.lang.NumberFormatException =>
// this should never happen: the written data has exceeded long type limit
fatal(s"Read jason data $jsonData contains pids that have exceeded long type limit")
fatal(s"Read jason data $jsonData contains producerIds that have exceeded long type limit")
throw e
}
}
}
case class ProducerIdBlock(brokerId: Int, blockStartPid: Long, blockEndPid: Long) {
case class ProducerIdBlock(brokerId: Int, blockStartId: Long, blockEndId: Long) {
override def toString: String = {
val pidBlockInfo = new StringBuilder
pidBlockInfo.append("(brokerId:" + brokerId)
pidBlockInfo.append(",blockStartPID:" + blockStartPid)
pidBlockInfo.append(",blockEndPID:" + blockEndPid + ")")
pidBlockInfo.toString()
val producerIdBlockInfo = new StringBuilder
producerIdBlockInfo.append("(brokerId:" + brokerId)
producerIdBlockInfo.append(",blockStartProducerId:" + blockStartId)
producerIdBlockInfo.append(",blockEndProducerId:" + blockEndId + ")")
producerIdBlockInfo.toString()
}
}
@ -70,84 +70,85 @@ class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging @@ -70,84 +70,85 @@ class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging
this.logIdent = "[ProducerId Manager " + brokerId + "]: "
private var currentPIDBlock: ProducerIdBlock = null
private var nextPID: Long = -1L
private var currentProducerIdBlock: ProducerIdBlock = null
private var nextProducerId: Long = -1L
// grab the first block of PIDs
// grab the first block of producerIds
this synchronized {
getNewPidBlock()
nextPID = currentPIDBlock.blockStartPid
getNewProducerIdBlock()
nextProducerId = currentProducerIdBlock.blockStartId
}
private def getNewPidBlock(): Unit = {
private def getNewProducerIdBlock(): Unit = {
var zkWriteComplete = false
while (!zkWriteComplete) {
// refresh current pid block from zookeeper again
val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath)
// refresh current producerId block from zookeeper again
val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.ProducerIdBlockPath)
// generate the new pid block
currentPIDBlock = dataOpt match {
// generate the new producerId block
currentProducerIdBlock = dataOpt match {
case Some(data) =>
val currPIDBlock = ProducerIdManager.parsePidBlockData(data)
debug(s"Read current pid block $currPIDBlock, Zk path version $zkVersion")
val currProducerIdBlock = ProducerIdManager.parseProducerIdBlockData(data)
debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
if (currPIDBlock.blockEndPid > Long.MaxValue - ProducerIdManager.PidBlockSize) {
// we have exhausted all pids (wow!), treat it as a fatal error
fatal(s"Exhausted all pids as the next block's end pid is will has exceeded long type limit (current block end pid is ${currPIDBlock.blockEndPid})")
throw new KafkaException("Have exhausted all pids.")
if (currProducerIdBlock.blockEndId > Long.MaxValue - ProducerIdManager.PidBlockSize) {
// we have exhausted all producerIds (wow!), treat it as a fatal error
fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.blockEndId})")
throw new KafkaException("Have exhausted all producerIds.")
}
ProducerIdBlock(brokerId, currPIDBlock.blockEndPid + 1L, currPIDBlock.blockEndPid + ProducerIdManager.PidBlockSize)
ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
case None =>
debug(s"There is no pid block yet (Zk path version $zkVersion), creating the first block")
debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
}
val newPIDBlockData = ProducerIdManager.generatePidBlockJson(currentPIDBlock)
val newProducerIdBlockData = ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
// try to write the new pid block into zookeeper
val (succeeded, version) = zkUtils.conditionalUpdatePersistentPath(ZkUtils.PidBlockPath, newPIDBlockData, zkVersion, Some(checkPidBlockZkData))
// try to write the new producerId block into zookeeper
val (succeeded, version) = zkUtils.conditionalUpdatePersistentPath(ZkUtils.ProducerIdBlockPath,
newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
zkWriteComplete = succeeded
if (zkWriteComplete)
info(s"Acquired new pid block $currentPIDBlock by writing to Zk with path version $version")
info(s"Acquired new producerId block $currentProducerIdBlock by writing to Zk with path version $version")
}
}
private def checkPidBlockZkData(zkUtils: ZkUtils, path: String, expectedData: String): (Boolean, Int) = {
private def checkProducerIdBlockZkData(zkUtils: ZkUtils, path: String, expectedData: String): (Boolean, Int) = {
try {
val expectedPidBlock = ProducerIdManager.parsePidBlockData(expectedData)
val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath)
val expectedPidBlock = ProducerIdManager.parseProducerIdBlockData(expectedData)
val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.ProducerIdBlockPath)
dataOpt match {
case Some(data) =>
val currPIDBlock = ProducerIdManager.parsePidBlockData(data)
(currPIDBlock.equals(expectedPidBlock), zkVersion)
val currProducerIdBLock = ProducerIdManager.parseProducerIdBlockData(data)
(currProducerIdBLock == expectedPidBlock, zkVersion)
case None =>
(false, -1)
}
} catch {
case e: Exception =>
warn(s"Error while checking for pid block Zk data on path $path: expected data $expectedData", e)
warn(s"Error while checking for producerId block Zk data on path $path: expected data $expectedData", e)
(false, -1)
}
}
def nextPid(): Long = {
def generateProducerId(): Long = {
this synchronized {
// grab a new block of PIDs if this block has been exhausted
if (nextPID > currentPIDBlock.blockEndPid) {
getNewPidBlock()
nextPID = currentPIDBlock.blockStartPid + 1
// grab a new block of producerIds if this block has been exhausted
if (nextProducerId > currentProducerIdBlock.blockEndId) {
getNewProducerIdBlock()
nextProducerId = currentProducerIdBlock.blockStartId + 1
} else {
nextPID += 1
nextProducerId += 1
}
nextPID - 1
nextProducerId - 1
}
}
def shutdown() {
info(s"Shutdown complete: last PID assigned $nextPID")
info(s"Shutdown complete: last producerId assigned $nextProducerId")
}
}

90
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala

@ -47,20 +47,20 @@ object TransactionCoordinator { @@ -47,20 +47,20 @@ object TransactionCoordinator {
config.transactionTopicMinISR,
config.transactionTransactionsExpiredTransactionCleanupIntervalMs)
val pidManager = new ProducerIdManager(config.brokerId, zkUtils)
val producerIdManager = new ProducerIdManager(config.brokerId, zkUtils)
val txnStateManager = new TransactionStateManager(config.brokerId, zkUtils, scheduler, replicaManager, txnConfig, time)
val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId, reaperEnabled = false)
val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager, txnMarkerPurgatory, time)
new TransactionCoordinator(config.brokerId, scheduler, pidManager, txnStateManager, txnMarkerChannelManager, txnMarkerPurgatory, time)
new TransactionCoordinator(config.brokerId, scheduler, producerIdManager, txnStateManager, txnMarkerChannelManager, txnMarkerPurgatory, time)
}
private def initTransactionError(error: Errors): InitPidResult = {
InitPidResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, error)
private def initTransactionError(error: Errors): InitProducerIdResult = {
InitProducerIdResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, error)
}
private def initTransactionMetadata(txnMetadata: TransactionMetadataTransition): InitPidResult = {
InitPidResult(txnMetadata.producerId, txnMetadata.producerEpoch, Errors.NONE)
private def initTransactionMetadata(txnMetadata: TransactionMetadataTransition): InitProducerIdResult = {
InitProducerIdResult(txnMetadata.producerId, txnMetadata.producerEpoch, Errors.NONE)
}
}
@ -74,7 +74,7 @@ object TransactionCoordinator { @@ -74,7 +74,7 @@ object TransactionCoordinator {
*/
class TransactionCoordinator(brokerId: Int,
scheduler: Scheduler,
pidManager: ProducerIdManager,
producerIdManager: ProducerIdManager,
txnManager: TransactionStateManager,
txnMarkerChannelManager: TransactionMarkerChannelManager,
txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
@ -83,22 +83,21 @@ class TransactionCoordinator(brokerId: Int, @@ -83,22 +83,21 @@ class TransactionCoordinator(brokerId: Int,
import TransactionCoordinator._
type InitPidCallback = InitPidResult => Unit
type InitProducerIdCallback = InitProducerIdResult => Unit
type AddPartitionsCallback = Errors => Unit
type EndTxnCallback = Errors => Unit
/* Active flag of the coordinator */
private val isActive = new AtomicBoolean(false)
def handleInitPid(transactionalId: String,
transactionTimeoutMs: Int,
responseCallback: InitPidCallback): Unit = {
def handleInitProducerId(transactionalId: String,
transactionTimeoutMs: Int,
responseCallback: InitProducerIdCallback): Unit = {
if (transactionalId == null || transactionalId.isEmpty) {
// if the transactional id is not specified, then always blindly accept the request
// and return a new pid from the pid manager
val pid = pidManager.nextPid()
responseCallback(InitPidResult(pid, epoch = 0, Errors.NONE))
// and return a new producerId from the producerId manager
val producerId = producerIdManager.generateProducerId()
responseCallback(InitProducerIdResult(producerId, producerEpoch = 0, Errors.NONE))
} else if (!txnManager.isCoordinatorFor(transactionalId)) {
// check if it is the assigned coordinator for the transactional id
responseCallback(initTransactionError(Errors.NOT_COORDINATOR))
@ -108,12 +107,12 @@ class TransactionCoordinator(brokerId: Int, @@ -108,12 +107,12 @@ class TransactionCoordinator(brokerId: Int,
// check transactionTimeoutMs is not larger than the broker configured maximum allowed value
responseCallback(initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT))
} else {
// only try to get a new pid and update the cache if the transactional id is unknown
val result: Either[InitPidResult, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match {
// only try to get a new producerId and update the cache if the transactional id is unknown
val result: Either[InitProducerIdResult, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match {
case None =>
val pid = pidManager.nextPid()
val producerId = producerIdManager.generateProducerId()
val now = time.milliseconds()
val createdMetadata = new TransactionMetadata(producerId = pid,
val createdMetadata = new TransactionMetadata(producerId = producerId,
producerEpoch = 0,
txnTimeoutMs = transactionTimeoutMs,
state = Empty,
@ -129,7 +128,7 @@ class TransactionCoordinator(brokerId: Int, @@ -129,7 +128,7 @@ class TransactionCoordinator(brokerId: Int,
// in this case we will treat it as the metadata has existed already
txnMetadata synchronized {
if (!txnMetadata.eq(createdMetadata)) {
initPidWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
initProducerIdWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
} else {
Right(coordinatorEpoch, txnMetadata.prepareNewPid(time.milliseconds()))
}
@ -140,13 +139,13 @@ class TransactionCoordinator(brokerId: Int, @@ -140,13 +139,13 @@ class TransactionCoordinator(brokerId: Int,
val txnMetadata = existingEpochAndMetadata.transactionMetadata
txnMetadata synchronized {
initPidWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
initProducerIdWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
}
}
result match {
case Left(pidResult) =>
responseCallback(pidResult)
case Left(producerIdResult) =>
responseCallback(producerIdResult)
case Right((coordinatorEpoch, newMetadata)) =>
if (newMetadata.txnState == Ongoing) {
@ -178,11 +177,10 @@ class TransactionCoordinator(brokerId: Int, @@ -178,11 +177,10 @@ class TransactionCoordinator(brokerId: Int,
}
}
private def initPidWithExistingMetadata(transactionalId: String,
transactionTimeoutMs: Int,
coordinatorEpoch: Int,
txnMetadata: TransactionMetadata): Either[InitPidResult, (Int, TransactionMetadataTransition)] = {
private def initProducerIdWithExistingMetadata(transactionalId: String,
transactionTimeoutMs: Int,
coordinatorEpoch: Int,
txnMetadata: TransactionMetadata): Either[InitProducerIdResult, (Int, TransactionMetadataTransition)] = {
if (txnMetadata.pendingTransitionInProgress) {
// return a retriable exception to let the client backoff and retry
Left(initTransactionError(Errors.CONCURRENT_TRANSACTIONS))
@ -216,8 +214,8 @@ class TransactionCoordinator(brokerId: Int, @@ -216,8 +214,8 @@ class TransactionCoordinator(brokerId: Int,
def handleAddPartitionsToTransaction(transactionalId: String,
pid: Long,
epoch: Short,
producerId: Long,
producerEpoch: Short,
partitions: collection.Set[TopicPartition],
responseCallback: AddPartitionsCallback): Unit = {
val error = validateTransactionalId(transactionalId)
@ -225,10 +223,10 @@ class TransactionCoordinator(brokerId: Int, @@ -225,10 +223,10 @@ class TransactionCoordinator(brokerId: Int,
responseCallback(error)
} else {
// try to update the transaction metadata and append the updated metadata to txn log;
// if there is no such metadata treat it as invalid pid mapping error.
// if there is no such metadata treat it as invalid producerId mapping error.
val result: Either[Errors, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match {
case None =>
Left(Errors.INVALID_PID_MAPPING)
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
case Some(epochAndMetadata) =>
val coordinatorEpoch = epochAndMetadata.coordinatorEpoch
@ -236,9 +234,9 @@ class TransactionCoordinator(brokerId: Int, @@ -236,9 +234,9 @@ class TransactionCoordinator(brokerId: Int,
// generate the new transaction metadata with added partitions
txnMetadata synchronized {
if (txnMetadata.producerId != pid) {
Left(Errors.INVALID_PID_MAPPING)
} else if (txnMetadata.producerEpoch != epoch) {
if (txnMetadata.producerId != producerId) {
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
} else if (txnMetadata.producerEpoch != producerEpoch) {
Left(Errors.INVALID_PRODUCER_EPOCH)
} else if (txnMetadata.pendingTransitionInProgress) {
// return a retriable exception to let the client backoff and retry
@ -274,8 +272,8 @@ class TransactionCoordinator(brokerId: Int, @@ -274,8 +272,8 @@ class TransactionCoordinator(brokerId: Int,
}
def handleEndTransaction(transactionalId: String,
pid: Long,
epoch: Short,
producerId: Long,
producerEpoch: Short,
txnMarkerResult: TransactionResult,
responseCallback: EndTxnCallback): Unit = {
val error = validateTransactionalId(transactionalId)
@ -284,16 +282,16 @@ class TransactionCoordinator(brokerId: Int, @@ -284,16 +282,16 @@ class TransactionCoordinator(brokerId: Int,
else {
val preAppendResult: Either[Errors, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match {
case None =>
Left(Errors.INVALID_PID_MAPPING)
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
case Some(epochAndTxnMetadata) =>
val txnMetadata = epochAndTxnMetadata.transactionMetadata
val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
txnMetadata synchronized {
if (txnMetadata.producerId != pid)
Left(Errors.INVALID_PID_MAPPING)
else if (txnMetadata.producerEpoch != epoch)
if (txnMetadata.producerId != producerId)
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
else if (txnMetadata.producerEpoch != producerEpoch)
Left(Errors.INVALID_PRODUCER_EPOCH)
else if (txnMetadata.pendingTransitionInProgress)
Left(Errors.CONCURRENT_TRANSACTIONS)
@ -343,9 +341,9 @@ class TransactionCoordinator(brokerId: Int, @@ -343,9 +341,9 @@ class TransactionCoordinator(brokerId: Int,
val txnMetadata = epochAndMetadata.transactionMetadata
txnMetadata synchronized {
if (txnMetadata.producerId != pid)
Left(Errors.INVALID_PID_MAPPING)
else if (txnMetadata.producerEpoch != epoch)
if (txnMetadata.producerId != producerId)
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
else if (txnMetadata.producerEpoch != producerEpoch)
Left(Errors.INVALID_PRODUCER_EPOCH)
else if (txnMetadata.pendingTransitionInProgress)
Left(Errors.CONCURRENT_TRANSACTIONS)
@ -452,11 +450,11 @@ class TransactionCoordinator(brokerId: Int, @@ -452,11 +450,11 @@ class TransactionCoordinator(brokerId: Int,
isActive.set(false)
scheduler.shutdown()
txnMarkerPurgatory.shutdown()
pidManager.shutdown()
producerIdManager.shutdown()
txnManager.shutdown()
txnMarkerChannelManager.shutdown()
info("Shutdown complete.")
}
}
case class InitPidResult(pid: Long, epoch: Short, error: Errors)
case class InitProducerIdResult(producerId: Long, producerEpoch: Short, error: Errors)

7
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala

@ -251,7 +251,9 @@ class TransactionMarkerChannelManager(config: KafkaConfig, @@ -251,7 +251,9 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
addTxnMarkersToBrokerQueue(transactionalId, txnMetadata.producerId, txnMetadata.producerEpoch, txnResult, coordinatorEpoch, txnMetadata.topicPartitions.toSet)
}
def addTxnMarkersToBrokerQueue(transactionalId: String, pid: Long, epoch: Short, result: TransactionResult, coordinatorEpoch: Int, topicPartitions: immutable.Set[TopicPartition]): Unit = {
def addTxnMarkersToBrokerQueue(transactionalId: String, producerId: Long, producerEpoch: Short,
result: TransactionResult, coordinatorEpoch: Int,
topicPartitions: immutable.Set[TopicPartition]): Unit = {
val txnTopicPartition = txnStateManager.partitionFor(transactionalId)
val partitionsByDestination: immutable.Map[Node, immutable.Set[TopicPartition]] = topicPartitions.groupBy { topicPartition: TopicPartition =>
var brokerNode: Option[Node] = None
@ -269,7 +271,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig, @@ -269,7 +271,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
}
for ((broker: Node, topicPartitions: immutable.Set[TopicPartition]) <- partitionsByDestination) {
val txnIdAndMarker = TxnIdAndMarkerEntry(transactionalId, new TxnMarkerEntry(pid, epoch, coordinatorEpoch, result, topicPartitions.toList.asJava))
val marker = new TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, result, topicPartitions.toList.asJava)
val txnIdAndMarker = TxnIdAndMarkerEntry(transactionalId, marker)
addMarkersForBroker(broker, txnTopicPartition, txnIdAndMarker)
}

84
core/src/main/scala/kafka/log/ProducerStateManager.scala

@ -73,28 +73,28 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc @@ -73,28 +73,28 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc
private var coordinatorEpoch = initialEntry.coordinatorEpoch
private val transactions = ListBuffer.empty[TxnMetadata]
def this(pid: Long, initialEntry: Option[ProducerIdEntry], loadingFromLog: Boolean) =
this(pid, initialEntry.getOrElse(ProducerIdEntry.Empty), loadingFromLog)
def this(producerId: Long, initialEntry: Option[ProducerIdEntry], loadingFromLog: Boolean) =
this(producerId, initialEntry.getOrElse(ProducerIdEntry.Empty), loadingFromLog)
private def validateAppend(epoch: Short, firstSeq: Int, lastSeq: Int, shouldValidateSequenceNumbers: Boolean) = {
if (this.producerEpoch > epoch) {
private def validateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int, shouldValidateSequenceNumbers: Boolean) = {
if (this.producerEpoch > producerEpoch) {
throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer " +
s"with a newer epoch. $epoch (request epoch), ${this.producerEpoch} (server epoch)")
s"with a newer epoch. $producerEpoch (request epoch), ${this.producerEpoch} (server epoch)")
} else if (shouldValidateSequenceNumbers) {
if (this.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || this.producerEpoch < epoch) {
if (this.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || this.producerEpoch < producerEpoch) {
if (firstSeq != 0)
throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $epoch " +
throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " +
s"(request epoch), $firstSeq (seq. number)")
} else if (this.firstSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) {
// the epoch was bumped by a control record, so we expect the sequence number to be reset
throw new OutOfOrderSequenceException(s"Out of order sequence number: $producerId (pid), found $firstSeq " +
throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: found $firstSeq " +
s"(incoming seq. number), but expected 0")
} else if (firstSeq == this.firstSeq && lastSeq == this.lastSeq) {
throw new DuplicateSequenceNumberException(s"Duplicate sequence number: pid: $producerId, (incomingBatch.firstSeq, " +
throw new DuplicateSequenceNumberException(s"Duplicate sequence number for producerId $producerId: (incomingBatch.firstSeq, " +
s"incomingBatch.lastSeq): ($firstSeq, $lastSeq), (lastEntry.firstSeq, lastEntry.lastSeq): " +
s"(${this.firstSeq}, ${this.lastSeq}).")
} else if (firstSeq != this.lastSeq + 1L) {
throw new OutOfOrderSequenceException(s"Out of order sequence number: $producerId (pid), $firstSeq " +
throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: $firstSeq " +
s"(incoming seq. number), ${this.lastSeq} (current end sequence number)")
}
}
@ -202,25 +202,25 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc @@ -202,25 +202,25 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc
}
object ProducerStateManager {
private val PidSnapshotVersion: Short = 1
private val ProducerSnapshotVersion: Short = 1
private val VersionField = "version"
private val CrcField = "crc"
private val PidField = "pid"
private val ProducerIdField = "producer_id"
private val LastSequenceField = "last_sequence"
private val ProducerEpochField = "epoch"
private val LastOffsetField = "last_offset"
private val OffsetDeltaField = "offset_delta"
private val TimestampField = "timestamp"
private val PidEntriesField = "pid_entries"
private val ProducerEntriesField = "producer_entries"
private val CoordinatorEpochField = "coordinator_epoch"
private val CurrentTxnFirstOffsetField = "current_txn_first_offset"
private val VersionOffset = 0
private val CrcOffset = VersionOffset + 2
private val PidEntriesOffset = CrcOffset + 4
private val ProducerEntriesOffset = CrcOffset + 4
val PidSnapshotEntrySchema = new Schema(
new Field(PidField, Type.INT64, "The producer ID"),
val ProducerSnapshotEntrySchema = new Schema(
new Field(ProducerIdField, Type.INT64, "The producer ID"),
new Field(ProducerEpochField, Type.INT16, "Current epoch of the producer"),
new Field(LastSequenceField, Type.INT32, "Last written sequence of the producer"),
new Field(LastOffsetField, Type.INT64, "Last written offset of the producer"),
@ -231,33 +231,33 @@ object ProducerStateManager { @@ -231,33 +231,33 @@ object ProducerStateManager {
val PidSnapshotMapSchema = new Schema(
new Field(VersionField, Type.INT16, "Version of the snapshot file"),
new Field(CrcField, Type.UNSIGNED_INT32, "CRC of the snapshot data"),
new Field(PidEntriesField, new ArrayOf(PidSnapshotEntrySchema), "The entries in the PID table"))
new Field(ProducerEntriesField, new ArrayOf(ProducerSnapshotEntrySchema), "The entries in the producer table"))
def readSnapshot(file: File): Iterable[ProducerIdEntry] = {
val buffer = Files.readAllBytes(file.toPath)
val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer))
val version = struct.getShort(VersionField)
if (version != PidSnapshotVersion)
if (version != ProducerSnapshotVersion)
throw new IllegalArgumentException(s"Unhandled snapshot file version $version")
val crc = struct.getUnsignedInt(CrcField)
val computedCrc = Crc32C.compute(buffer, PidEntriesOffset, buffer.length - PidEntriesOffset)
val computedCrc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.length - ProducerEntriesOffset)
if (crc != computedCrc)
throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted (CRC is no longer valid). " +
s"Stored crc: $crc. Computed crc: $computedCrc")
struct.getArray(PidEntriesField).map { pidEntryObj =>
val pidEntryStruct = pidEntryObj.asInstanceOf[Struct]
val pid: Long = pidEntryStruct.getLong(PidField)
val epoch = pidEntryStruct.getShort(ProducerEpochField)
val seq = pidEntryStruct.getInt(LastSequenceField)
val offset = pidEntryStruct.getLong(LastOffsetField)
val timestamp = pidEntryStruct.getLong(TimestampField)
val offsetDelta = pidEntryStruct.getInt(OffsetDeltaField)
val coordinatorEpoch = pidEntryStruct.getInt(CoordinatorEpochField)
val currentTxnFirstOffset = pidEntryStruct.getLong(CurrentTxnFirstOffsetField)
val newEntry = ProducerIdEntry(pid, epoch, seq, offset, offsetDelta, timestamp,
struct.getArray(ProducerEntriesField).map { producerEntryObj =>
val producerEntryStruct = producerEntryObj.asInstanceOf[Struct]
val producerId: Long = producerEntryStruct.getLong(ProducerIdField)
val producerEpoch = producerEntryStruct.getShort(ProducerEpochField)
val seq = producerEntryStruct.getInt(LastSequenceField)
val offset = producerEntryStruct.getLong(LastOffsetField)
val timestamp = producerEntryStruct.getLong(TimestampField)
val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField)
val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField)
val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField)
val newEntry = ProducerIdEntry(producerId, producerEpoch, seq, offset, offsetDelta, timestamp,
coordinatorEpoch, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None)
newEntry
}
@ -265,12 +265,12 @@ object ProducerStateManager { @@ -265,12 +265,12 @@ object ProducerStateManager {
private def writeSnapshot(file: File, entries: mutable.Map[Long, ProducerIdEntry]) {
val struct = new Struct(PidSnapshotMapSchema)
struct.set(VersionField, PidSnapshotVersion)
struct.set(VersionField, ProducerSnapshotVersion)
struct.set(CrcField, 0L) // we'll fill this after writing the entries
val entriesArray = entries.map {
case (pid, entry) =>
val pidEntryStruct = struct.instance(PidEntriesField)
pidEntryStruct.set(PidField, pid)
case (producerId, entry) =>
val producerEntryStruct = struct.instance(ProducerEntriesField)
producerEntryStruct.set(ProducerIdField, producerId)
.set(ProducerEpochField, entry.producerEpoch)
.set(LastSequenceField, entry.lastSeq)
.set(LastOffsetField, entry.lastOffset)
@ -278,16 +278,16 @@ object ProducerStateManager { @@ -278,16 +278,16 @@ object ProducerStateManager {
.set(TimestampField, entry.timestamp)
.set(CoordinatorEpochField, entry.coordinatorEpoch)
.set(CurrentTxnFirstOffsetField, entry.currentTxnFirstOffset.getOrElse(-1L))
pidEntryStruct
producerEntryStruct
}.toArray
struct.set(PidEntriesField, entriesArray)
struct.set(ProducerEntriesField, entriesArray)
val buffer = ByteBuffer.allocate(struct.sizeOf)
struct.writeTo(buffer)
buffer.flip()
// now fill in the CRC
val crc = Crc32C.compute(buffer, PidEntriesOffset, buffer.limit - PidEntriesOffset)
val crc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.limit - ProducerEntriesOffset)
ByteUtils.writeUnsignedInt(buffer, CrcOffset, crc)
val fos = new FileOutputStream(file)
@ -404,10 +404,10 @@ class ProducerStateManager(val topicPartition: TopicPartition, @@ -404,10 +404,10 @@ class ProducerStateManager(val topicPartition: TopicPartition,
// visible for testing
private[log] def loadProducerEntry(entry: ProducerIdEntry): Unit = {
val pid = entry.producerId
producers.put(pid, entry)
val producerId = entry.producerId
producers.put(producerId, entry)
entry.currentTxnFirstOffset.foreach { offset =>
ongoingTxns.put(offset, new TxnMetadata(pid, offset))
ongoingTxns.put(offset, new TxnMetadata(producerId, offset))
}
}
@ -418,7 +418,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, @@ -418,7 +418,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
* Expire any PIDs which have been idle longer than the configured maximum expiration timeout.
*/
def removeExpiredProducers(currentTimeMs: Long) {
producers.retain { case (pid, lastEntry) =>
producers.retain { case (producerId, lastEntry) =>
!isExpired(currentTimeMs, lastEntry)
}
}
@ -496,7 +496,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, @@ -496,7 +496,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
/**
* When we remove the head of the log due to retention, we need to clean up the id map. This method takes
* the new start offset and removes all pids which have a smaller last written offset.
* the new start offset and removes all producerIds which have a smaller last written offset.
*/
def evictUnretainedProducers(logStartOffset: Long) {
val evictedProducerEntries = producers.filter(_._2.lastOffset < logStartOffset)

29
core/src/main/scala/kafka/server/KafkaApis.scala

@ -32,7 +32,7 @@ import kafka.common.Topic.{GroupMetadataTopicName, TransactionStateTopicName, is @@ -32,7 +32,7 @@ import kafka.common.Topic.{GroupMetadataTopicName, TransactionStateTopicName, is
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.controller.KafkaController
import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
import kafka.coordinator.transaction.{InitPidResult, TransactionCoordinator}
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.log.{Log, LogManager, TimestampOffset}
import kafka.network.{RequestChannel, RequestOrResponseSend}
import kafka.network.RequestChannel.{Response, Session}
@ -110,7 +110,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -110,7 +110,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
case ApiKeys.INIT_PRODUCER_ID => handleInitPidRequest(request)
case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
@ -1386,20 +1386,20 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1386,20 +1386,20 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
def handleInitPidRequest(request: RequestChannel.Request): Unit = {
val initPidRequest = request.body[InitPidRequest]
val transactionalId = initPidRequest.transactionalId
def handleInitProducerIdRequest(request: RequestChannel.Request): Unit = {
val initProducerIdRequest = request.body[InitProducerIdRequest]
val transactionalId = initProducerIdRequest.transactionalId
// Send response callback
def sendResponseCallback(result: InitPidResult): Unit = {
def sendResponseCallback(result: InitProducerIdResult): Unit = {
def createResponse(throttleTimeMs: Int): AbstractResponse = {
val responseBody: InitPidResponse = new InitPidResponse(throttleTimeMs, result.error, result.pid, result.epoch)
trace(s"InitPidRequest: Completed $transactionalId's InitPidRequest with result $result from client ${request.header.clientId}.")
val responseBody = new InitProducerIdResponse(throttleTimeMs, result.error, result.producerId, result.producerEpoch)
trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.")
responseBody
}
sendResponseMaybeThrottle(request, createResponse)
}
txnCoordinator.handleInitPid(transactionalId, initPidRequest.transactionTimeoutMs, sendResponseCallback)
txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback)
}
def handleEndTxnRequest(request: RequestChannel.Request): Unit = {
@ -1408,7 +1408,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1408,7 +1408,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def sendResponseCallback(error: Errors) {
def createResponse(throttleTimeMs: Int): AbstractResponse = {
val responseBody = new EndTxnResponse(throttleTimeMs, error)
trace(s"Completed ${endTxnRequest.transactionalId()}'s EndTxnRequest with command: ${endTxnRequest.command()}, errors: $error from client ${request.header.clientId}.")
trace(s"Completed ${endTxnRequest.transactionalId}'s EndTxnRequest with command: ${endTxnRequest.command}, errors: $error from client ${request.header.clientId}.")
responseBody
}
sendResponseMaybeThrottle(request, createResponse)
@ -1433,23 +1433,22 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1433,23 +1433,22 @@ class KafkaApis(val requestChannel: RequestChannel,
return
}
def sendResponseCallback(pid: Long, result: TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
errors.put(pid, responseStatus.mapValues(_.error).asJava)
def sendResponseCallback(producerId: Long, result: TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
errors.put(producerId, responseStatus.mapValues(_.error).asJava)
val successfulPartitions = responseStatus.filter { case (_, partitionResponse) =>
partitionResponse.error == Errors.NONE
}.keys.toSeq
try {
groupCoordinator.handleTxnCompletion(producerId = pid, topicPartitions = successfulPartitions, transactionResult = result)
groupCoordinator.handleTxnCompletion(producerId = producerId, topicPartitions = successfulPartitions, transactionResult = result)
} catch {
case e: Exception =>
error(s"Received an exception while trying to update the offsets cache on transaction completion: $e")
val producerIdErrors = errors.get(pid)
val producerIdErrors = errors.get(producerId)
successfulPartitions.foreach(producerIdErrors.put(_, Errors.UNKNOWN))
}
if (numAppends.decrementAndGet() == 0)
sendResponseExemptThrottle(request, new RequestChannel.Response(request, new WriteTxnMarkersResponse(errors)))
}

2
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -590,7 +590,7 @@ object KafkaConfig { @@ -590,7 +590,7 @@ object KafkaConfig {
/** ********* Transaction management configuration ***********/
val TransactionalIdExpirationMsDoc = "The maximum amount of time in ms that the transaction coordinator will wait before proactively expire a producer's transactional id without receiving any transaction status updates from it."
val TransactionsMaxTimeoutMsDoc = "The maximum allowed timeout for transactions. " +
"If a client’s requested transaction time exceed this, then the broker will return an error in InitPidRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction."
"If a client’s requested transaction time exceed this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction."
val TransactionsTopicMinISRDoc = "Overridden " + MinInSyncReplicasProp + " config for the transaction topic."
val TransactionsLoadBufferSizeDoc = "Batch size for reading from the transaction log segments when loading pid and transactions into the cache."
val TransactionsTopicReplicationFactorDoc = "The replication factor for the transaction topic (set higher to ensure availability). " +

6
core/src/main/scala/kafka/utils/ZkUtils.scala

@ -64,7 +64,7 @@ object ZkUtils { @@ -64,7 +64,7 @@ object ZkUtils {
val BrokerSequenceIdPath = s"$BrokersPath/seqid"
val ConfigChangesPath = s"$ConfigPath/changes"
val ConfigUsersPath = s"$ConfigPath/users"
val PidBlockPath = "/latest_pid_block"
val ProducerIdBlockPath = "/latest_pid_block"
// Important: it is necessary to add any new top level Zookeeper path to the Seq
val SecureZkRootPaths = Seq(AdminPath,
BrokersPath,
@ -75,7 +75,7 @@ object ZkUtils { @@ -75,7 +75,7 @@ object ZkUtils {
IsrChangeNotificationPath,
KafkaAclPath,
KafkaAclChangesPath,
PidBlockPath)
ProducerIdBlockPath)
// Important: it is necessary to add any new top level Zookeeper path that contains
// sensitive information that should not be world readable to the Seq
@ -239,7 +239,7 @@ class ZkUtils(val zkClient: ZkClient, @@ -239,7 +239,7 @@ class ZkUtils(val zkClient: ZkClient,
DeleteTopicsPath,
BrokerSequenceIdPath,
IsrChangeNotificationPath,
PidBlockPath)
ProducerIdBlockPath)
// Visible for testing
val zkPath = new ZkPath(zkClient)

14
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala

@ -68,22 +68,22 @@ class ProducerIdManagerTest { @@ -68,22 +68,22 @@ class ProducerIdManagerTest {
val manager1: ProducerIdManager = new ProducerIdManager(0, zkUtils)
val manager2: ProducerIdManager = new ProducerIdManager(1, zkUtils)
val pid1 = manager1.nextPid()
val pid2 = manager2.nextPid()
val pid1 = manager1.generateProducerId()
val pid2 = manager2.generateProducerId()
assertEquals(0, pid1)
assertEquals(ProducerIdManager.PidBlockSize, pid2)
for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) {
assertEquals(pid1 + i, manager1.nextPid())
assertEquals(pid1 + i, manager1.generateProducerId())
}
for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) {
assertEquals(pid2 + i, manager2.nextPid())
assertEquals(pid2 + i, manager2.generateProducerId())
}
assertEquals(pid2 + ProducerIdManager.PidBlockSize, manager1.nextPid())
assertEquals(pid2 + ProducerIdManager.PidBlockSize * 2, manager2.nextPid())
assertEquals(pid2 + ProducerIdManager.PidBlockSize, manager1.generateProducerId())
assertEquals(pid2 + ProducerIdManager.PidBlockSize * 2, manager2.generateProducerId())
}
@Test(expected = classOf[KafkaException])
@ -91,7 +91,7 @@ class ProducerIdManagerTest { @@ -91,7 +91,7 @@ class ProducerIdManagerTest {
EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString()))
.andAnswer(new IAnswer[(Option[String], Int)] {
override def answer(): (Option[String], Int) = {
(Some(ProducerIdManager.generatePidBlockJson(ProducerIdBlock(0,
(Some(ProducerIdManager.generateProducerIdBlockJson(ProducerIdBlock(0,
Long.MaxValue - ProducerIdManager.PidBlockSize,
Long.MaxValue))), 0)
}

16
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala

@ -47,19 +47,19 @@ class TransactionCoordinatorIntegrationTest extends KafkaServerTestHarness { @@ -47,19 +47,19 @@ class TransactionCoordinatorIntegrationTest extends KafkaServerTestHarness {
val tc = servers.head.transactionCoordinator
var initPidResult: InitPidResult = null
def callback(result: InitPidResult): Unit = {
initPidResult = result
var initProducerIdResult: InitProducerIdResult = null
def callback(result: InitProducerIdResult): Unit = {
initProducerIdResult = result
}
val txnId = "txn"
tc.handleInitPid(txnId, 900000, callback)
tc.handleInitProducerId(txnId, 900000, callback)
while(initPidResult == null) {
while(initProducerIdResult == null) {
Utils.sleep(1)
}
Assert.assertEquals(Errors.NONE, initPidResult.error)
Assert.assertEquals(Errors.NONE, initProducerIdResult.error)
@volatile var addPartitionErrors: Errors = null
def addPartitionsCallback(errors: Errors): Unit = {
@ -67,8 +67,8 @@ class TransactionCoordinatorIntegrationTest extends KafkaServerTestHarness { @@ -67,8 +67,8 @@ class TransactionCoordinatorIntegrationTest extends KafkaServerTestHarness {
}
tc.handleAddPartitionsToTransaction(txnId,
initPidResult.pid,
initPidResult.epoch,
initProducerIdResult.producerId,
initProducerIdResult.producerEpoch,
Set[TopicPartition](new TopicPartition(topic, 0)),
addPartitionsCallback
)

50
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala

@ -58,11 +58,11 @@ class TransactionCoordinatorTest { @@ -58,11 +58,11 @@ class TransactionCoordinatorTest {
txnMarkerPurgatory,
time)
var result: InitPidResult = _
var result: InitProducerIdResult = _
var error: Errors = Errors.NONE
private def mockPidManager(): Unit = {
EasyMock.expect(pidManager.nextPid())
EasyMock.expect(pidManager.generateProducerId())
.andAnswer(new IAnswer[Long] {
override def answer(): Long = {
nextPid += 1
@ -90,10 +90,10 @@ class TransactionCoordinatorTest { @@ -90,10 +90,10 @@ class TransactionCoordinatorTest {
mockPidManager()
EasyMock.replay(pidManager)
coordinator.handleInitPid("", txnTimeoutMs, initPidMockCallback)
assertEquals(InitPidResult(0L, 0, Errors.NONE), result)
coordinator.handleInitPid("", txnTimeoutMs, initPidMockCallback)
assertEquals(InitPidResult(1L, 0, Errors.NONE), result)
coordinator.handleInitProducerId("", txnTimeoutMs, initProducerIdMockCallback)
assertEquals(InitProducerIdResult(0L, 0, Errors.NONE), result)
coordinator.handleInitProducerId("", txnTimeoutMs, initProducerIdMockCallback)
assertEquals(InitProducerIdResult(1L, 0, Errors.NONE), result)
}
@Test
@ -101,10 +101,10 @@ class TransactionCoordinatorTest { @@ -101,10 +101,10 @@ class TransactionCoordinatorTest {
mockPidManager()
EasyMock.replay(pidManager)
coordinator.handleInitPid(null, txnTimeoutMs, initPidMockCallback)
assertEquals(InitPidResult(0L, 0, Errors.NONE), result)
coordinator.handleInitPid(null, txnTimeoutMs, initPidMockCallback)
assertEquals(InitPidResult(1L, 0, Errors.NONE), result)
coordinator.handleInitProducerId(null, txnTimeoutMs, initProducerIdMockCallback)
assertEquals(InitProducerIdResult(0L, 0, Errors.NONE), result)
coordinator.handleInitProducerId(null, txnTimeoutMs, initProducerIdMockCallback)
assertEquals(InitProducerIdResult(1L, 0, Errors.NONE), result)
}
@Test
@ -143,16 +143,16 @@ class TransactionCoordinatorTest { @@ -143,16 +143,16 @@ class TransactionCoordinatorTest {
.anyTimes()
EasyMock.replay(pidManager, transactionManager)
coordinator.handleInitPid(transactionalId, txnTimeoutMs, initPidMockCallback)
assertEquals(InitPidResult(nextPid - 1, 0, Errors.NONE), result)
coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, initProducerIdMockCallback)
assertEquals(InitProducerIdResult(nextPid - 1, 0, Errors.NONE), result)
}
@Test
def shouldRespondWithNotCoordinatorOnInitPidWhenNotCoordinatorForId(): Unit = {
mockPidManager()
EasyMock.replay(pidManager)
coordinator.handleInitPid("some-pid", txnTimeoutMs, initPidMockCallback)
assertEquals(InitPidResult(-1, -1, Errors.NOT_COORDINATOR), result)
coordinator.handleInitProducerId("some-pid", txnTimeoutMs, initProducerIdMockCallback)
assertEquals(InitProducerIdResult(-1, -1, Errors.NOT_COORDINATOR), result)
}
@Test
@ -165,7 +165,7 @@ class TransactionCoordinatorTest { @@ -165,7 +165,7 @@ class TransactionCoordinatorTest {
EasyMock.replay(transactionManager)
coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback)
assertEquals(Errors.INVALID_PID_MAPPING, error)
assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error)
}
@Test
@ -299,7 +299,7 @@ class TransactionCoordinatorTest { @@ -299,7 +299,7 @@ class TransactionCoordinatorTest {
EasyMock.replay(transactionManager)
coordinator.handleEndTransaction(transactionalId, 0, 0, TransactionResult.COMMIT, errorsCallback)
assertEquals(Errors.INVALID_PID_MAPPING, error)
assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error)
EasyMock.verify(transactionManager)
}
@ -312,7 +312,7 @@ class TransactionCoordinatorTest { @@ -312,7 +312,7 @@ class TransactionCoordinatorTest {
EasyMock.replay(transactionManager)
coordinator.handleEndTransaction(transactionalId, 0, 0, TransactionResult.COMMIT, errorsCallback)
assertEquals(Errors.INVALID_PID_MAPPING, error)
assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error)
EasyMock.verify(transactionManager)
}
@ -513,9 +513,9 @@ class TransactionCoordinatorTest { @@ -513,9 +513,9 @@ class TransactionCoordinatorTest {
EasyMock.replay(transactionManager, transactionMarkerChannelManager)
coordinator.handleInitPid(transactionalId, txnTimeoutMs, initPidMockCallback)
coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, initProducerIdMockCallback)
assertEquals(InitPidResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
assertEquals(InitProducerIdResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
EasyMock.verify(transactionManager)
}
@ -568,7 +568,7 @@ class TransactionCoordinatorTest { @@ -568,7 +568,7 @@ class TransactionCoordinatorTest {
EasyMock.expect(transactionManager.transactionsToExpire())
.andReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId, pid, epoch)))
EasyMock.replay(transactionManager, transactionMarkerChannelManager)
coordinator.startup(false)
@ -589,9 +589,9 @@ class TransactionCoordinatorTest { @@ -589,9 +589,9 @@ class TransactionCoordinatorTest {
EasyMock.replay(transactionManager)
coordinator.handleInitPid(transactionalId, 10, initPidMockCallback)
coordinator.handleInitProducerId(transactionalId, 10, initProducerIdMockCallback)
assertEquals(InitPidResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
assertEquals(InitProducerIdResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
}
private def validateIncrementEpochAndUpdateMetadata(state: TransactionState) = {
@ -620,9 +620,9 @@ class TransactionCoordinatorTest { @@ -620,9 +620,9 @@ class TransactionCoordinatorTest {
EasyMock.replay(transactionManager)
val newTxnTimeoutMs = 10
coordinator.handleInitPid(transactionalId, newTxnTimeoutMs, initPidMockCallback)
coordinator.handleInitProducerId(transactionalId, newTxnTimeoutMs, initProducerIdMockCallback)
assertEquals(InitPidResult(pid, (epoch + 1).toShort, Errors.NONE), result)
assertEquals(InitProducerIdResult(pid, (epoch + 1).toShort, Errors.NONE), result)
assertEquals(newTxnTimeoutMs, metadata.txnTimeoutMs)
assertEquals(time.milliseconds(), metadata.txnLastUpdateTimestamp)
assertEquals((epoch + 1).toShort, metadata.producerEpoch)
@ -704,7 +704,7 @@ class TransactionCoordinatorTest { @@ -704,7 +704,7 @@ class TransactionCoordinatorTest {
completedMetadata
}
def initPidMockCallback(ret: InitPidResult): Unit = {
def initProducerIdMockCallback(ret: InitProducerIdResult): Unit = {
result = ret
}

4
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

@ -252,7 +252,7 @@ class RequestQuotaTest extends BaseRequestTest { @@ -252,7 +252,7 @@ class RequestQuotaTest extends BaseRequestTest {
new DeleteRecordsRequest.Builder(5000, Map(tp -> (0L: java.lang.Long)).asJava)
case ApiKeys.INIT_PRODUCER_ID =>
new InitPidRequest.Builder("abc")
new InitProducerIdRequest.Builder("abc")
case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
new OffsetsForLeaderEpochRequest.Builder().add(tp, 0)
@ -353,7 +353,7 @@ class RequestQuotaTest extends BaseRequestTest { @@ -353,7 +353,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.CREATE_TOPICS => new CreateTopicsResponse(response).throttleTimeMs
case ApiKeys.DELETE_TOPICS => new DeleteTopicsResponse(response).throttleTimeMs
case ApiKeys.DELETE_RECORDS => new DeleteRecordsResponse(response).throttleTimeMs
case ApiKeys.INIT_PRODUCER_ID => new InitPidResponse(response).throttleTimeMs
case ApiKeys.INIT_PRODUCER_ID => new InitProducerIdResponse(response).throttleTimeMs
case ApiKeys.ADD_PARTITIONS_TO_TXN => new AddPartitionsToTxnResponse(response).throttleTimeMs
case ApiKeys.ADD_OFFSETS_TO_TXN => new AddOffsetsToTxnResponse(response).throttleTimeMs
case ApiKeys.END_TXN => new EndTxnResponse(response).throttleTimeMs

Loading…
Cancel
Save