Browse Source
This patch adds the transaction tool specified in KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions. This includes all of the logic for describing transactional state and for aborting transactions. The only thing that is left out is the `--find-hanging` implementation, which will be left for a subsequent patch. Reviewers: Boyang Chen <boyang@apache.org>, David Jacot <djacot@confluent.io>pull/9590/merge
Jason Gustafson
3 years ago
committed by
GitHub
5 changed files with 1201 additions and 0 deletions
@ -0,0 +1,17 @@
@@ -0,0 +1,17 @@
|
||||
#!/bin/bash |
||||
# Licensed to the Apache Software Foundation (ASF) under one or more |
||||
# contributor license agreements. See the NOTICE file distributed with |
||||
# this work for additional information regarding copyright ownership. |
||||
# The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
# (the "License"); you may not use this file except in compliance with |
||||
# the License. You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.TransactionsCommand "$@" |
@ -0,0 +1,17 @@
@@ -0,0 +1,17 @@
|
||||
@echo off |
||||
rem Licensed to the Apache Software Foundation (ASF) under one or more |
||||
rem contributor license agreements. See the NOTICE file distributed with |
||||
rem this work for additional information regarding copyright ownership. |
||||
rem The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
rem (the "License"); you may not use this file except in compliance with |
||||
rem the License. You may obtain a copy of the License at |
||||
rem |
||||
rem http://www.apache.org/licenses/LICENSE-2.0 |
||||
rem |
||||
rem Unless required by applicable law or agreed to in writing, software |
||||
rem distributed under the License is distributed on an "AS IS" BASIS, |
||||
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
rem See the License for the specific language governing permissions and |
||||
rem limitations under the License. |
||||
|
||||
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.TransactionsCommand %* |
@ -0,0 +1,52 @@
@@ -0,0 +1,52 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.tools; |
||||
|
||||
import net.sourceforge.argparse4j.inf.Argument; |
||||
import net.sourceforge.argparse4j.inf.ArgumentAction; |
||||
import net.sourceforge.argparse4j.inf.ArgumentParser; |
||||
import org.apache.kafka.common.utils.AppInfoParser; |
||||
import org.apache.kafka.common.utils.Exit; |
||||
|
||||
import java.util.Map; |
||||
|
||||
class PrintVersionAndExitAction implements ArgumentAction { |
||||
|
||||
@Override |
||||
public void run( |
||||
ArgumentParser parser, |
||||
Argument arg, |
||||
Map<String, Object> attrs, |
||||
String flag, |
||||
Object value |
||||
) { |
||||
String version = AppInfoParser.getVersion(); |
||||
String commitId = AppInfoParser.getCommitId(); |
||||
System.out.println(version + " (Commit:" + commitId + ")"); |
||||
Exit.exit(0); |
||||
} |
||||
|
||||
@Override |
||||
public void onAttach(Argument arg) { |
||||
|
||||
} |
||||
|
||||
@Override |
||||
public boolean consumeArgument() { |
||||
return false; |
||||
} |
||||
} |
@ -0,0 +1,623 @@
@@ -0,0 +1,623 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.tools; |
||||
|
||||
import net.sourceforge.argparse4j.ArgumentParsers; |
||||
import net.sourceforge.argparse4j.inf.ArgumentGroup; |
||||
import net.sourceforge.argparse4j.inf.ArgumentParser; |
||||
import net.sourceforge.argparse4j.inf.ArgumentParserException; |
||||
import net.sourceforge.argparse4j.inf.Namespace; |
||||
import net.sourceforge.argparse4j.inf.Subparser; |
||||
import net.sourceforge.argparse4j.inf.Subparsers; |
||||
import org.apache.kafka.clients.admin.AbortTransactionSpec; |
||||
import org.apache.kafka.clients.admin.Admin; |
||||
import org.apache.kafka.clients.admin.AdminClientConfig; |
||||
import org.apache.kafka.clients.admin.DescribeProducersOptions; |
||||
import org.apache.kafka.clients.admin.DescribeProducersResult; |
||||
import org.apache.kafka.clients.admin.ProducerState; |
||||
import org.apache.kafka.clients.admin.TransactionDescription; |
||||
import org.apache.kafka.clients.admin.TransactionListing; |
||||
import org.apache.kafka.common.TopicPartition; |
||||
import org.apache.kafka.common.utils.Exit; |
||||
import org.apache.kafka.common.utils.Time; |
||||
import org.apache.kafka.common.utils.Utils; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.io.IOException; |
||||
import java.io.PrintStream; |
||||
import java.util.ArrayList; |
||||
import java.util.Arrays; |
||||
import java.util.Collection; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Optional; |
||||
import java.util.OptionalLong; |
||||
import java.util.Properties; |
||||
import java.util.concurrent.ExecutionException; |
||||
import java.util.function.Function; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import static java.util.Collections.singleton; |
||||
import static java.util.Collections.singletonList; |
||||
import static net.sourceforge.argparse4j.impl.Arguments.store; |
||||
|
||||
public abstract class TransactionsCommand { |
||||
private static final Logger log = LoggerFactory.getLogger(TransactionsCommand.class); |
||||
|
||||
protected final Time time; |
||||
|
||||
protected TransactionsCommand(Time time) { |
||||
this.time = time; |
||||
} |
||||
|
||||
/** |
||||
* Get the name of this command (e.g. `describe-producers`). |
||||
*/ |
||||
abstract String name(); |
||||
|
||||
/** |
||||
* Specify the arguments needed for this command. |
||||
*/ |
||||
abstract void addSubparser(Subparsers subparsers); |
||||
|
||||
/** |
||||
* Execute the command logic. |
||||
*/ |
||||
abstract void execute(Admin admin, Namespace ns, PrintStream out) throws Exception; |
||||
|
||||
|
||||
static class AbortTransactionCommand extends TransactionsCommand { |
||||
|
||||
AbortTransactionCommand(Time time) { |
||||
super(time); |
||||
} |
||||
|
||||
@Override |
||||
String name() { |
||||
return "abort"; |
||||
} |
||||
|
||||
@Override |
||||
void addSubparser(Subparsers subparsers) { |
||||
Subparser subparser = subparsers.addParser(name()) |
||||
.help("abort a hanging transaction (requires administrative privileges)"); |
||||
|
||||
subparser.addArgument("--topic") |
||||
.help("topic name") |
||||
.action(store()) |
||||
.type(String.class) |
||||
.required(true); |
||||
|
||||
subparser.addArgument("--partition") |
||||
.help("partition number") |
||||
.action(store()) |
||||
.type(Integer.class) |
||||
.required(true); |
||||
|
||||
ArgumentGroup newBrokerArgumentGroup = subparser |
||||
.addArgumentGroup("Brokers on versions 3.0 and above") |
||||
.description("For newer brokers, only the start offset of the transaction " + |
||||
"to be aborted is required"); |
||||
|
||||
newBrokerArgumentGroup.addArgument("--start-offset") |
||||
.help("start offset of the transaction to abort") |
||||
.action(store()) |
||||
.type(Long.class); |
||||
|
||||
ArgumentGroup olderBrokerArgumentGroup = subparser |
||||
.addArgumentGroup("Brokers on versions older than 3.0") |
||||
.description("For older brokers, you must provide all of these arguments"); |
||||
|
||||
olderBrokerArgumentGroup.addArgument("--producer-id") |
||||
.help("producer id") |
||||
.action(store()) |
||||
.type(Long.class); |
||||
|
||||
olderBrokerArgumentGroup.addArgument("--producer-epoch") |
||||
.help("producer epoch") |
||||
.action(store()) |
||||
.type(Short.class); |
||||
|
||||
olderBrokerArgumentGroup.addArgument("--coordinator-epoch") |
||||
.help("coordinator epoch") |
||||
.action(store()) |
||||
.type(Integer.class); |
||||
} |
||||
|
||||
private AbortTransactionSpec buildAbortSpec( |
||||
Admin admin, |
||||
TopicPartition topicPartition, |
||||
long startOffset |
||||
) throws Exception { |
||||
final DescribeProducersResult.PartitionProducerState result; |
||||
try { |
||||
result = admin.describeProducers(singleton(topicPartition)) |
||||
.partitionResult(topicPartition) |
||||
.get(); |
||||
} catch (ExecutionException e) { |
||||
printErrorAndExit("Failed to validate producer state for partition " |
||||
+ topicPartition, e.getCause()); |
||||
return null; |
||||
} |
||||
|
||||
Optional<ProducerState> foundProducerState = result.activeProducers().stream() |
||||
.filter(producerState -> { |
||||
OptionalLong txnStartOffsetOpt = producerState.currentTransactionStartOffset(); |
||||
return txnStartOffsetOpt.isPresent() && txnStartOffsetOpt.getAsLong() == startOffset; |
||||
}) |
||||
.findFirst(); |
||||
|
||||
if (!foundProducerState.isPresent()) { |
||||
printErrorAndExit("Could not find any open transactions starting at offset " + |
||||
startOffset + " on partition " + topicPartition); |
||||
return null; |
||||
} |
||||
|
||||
ProducerState producerState = foundProducerState.get(); |
||||
return new AbortTransactionSpec( |
||||
topicPartition, |
||||
producerState.producerId(), |
||||
(short) producerState.producerEpoch(), |
||||
producerState.coordinatorEpoch().orElse(0) |
||||
); |
||||
} |
||||
|
||||
private void abortTransaction( |
||||
Admin admin, |
||||
AbortTransactionSpec abortSpec |
||||
) throws Exception { |
||||
try { |
||||
admin.abortTransaction(abortSpec).all().get(); |
||||
} catch (ExecutionException e) { |
||||
TransactionsCommand.printErrorAndExit("Failed to abort transaction " + abortSpec, e.getCause()); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
void execute(Admin admin, Namespace ns, PrintStream out) throws Exception { |
||||
String topicName = ns.getString("topic"); |
||||
Integer partitionId = ns.getInt("partition"); |
||||
TopicPartition topicPartition = new TopicPartition(topicName, partitionId); |
||||
|
||||
Long startOffset = ns.getLong("start_offset"); |
||||
Long producerId = ns.getLong("producer_id"); |
||||
|
||||
if (startOffset == null && producerId == null) { |
||||
printErrorAndExit("The transaction to abort must be identified either with " + |
||||
"--start-offset (for brokers on 3.0 or above) or with " + |
||||
"--producer-id, --producer-epoch, and --coordinator-epoch (for older brokers)"); |
||||
return; |
||||
} |
||||
|
||||
final AbortTransactionSpec abortSpec; |
||||
if (startOffset == null) { |
||||
Short producerEpoch = ns.getShort("producer_epoch"); |
||||
if (producerEpoch == null) { |
||||
printErrorAndExit("Missing required argument --producer-epoch"); |
||||
return; |
||||
} |
||||
|
||||
Integer coordinatorEpoch = ns.getInt("coordinator_epoch"); |
||||
if (coordinatorEpoch == null) { |
||||
printErrorAndExit("Missing required argument --coordinator-epoch"); |
||||
return; |
||||
} |
||||
|
||||
// If a transaction was started by a new producerId and became hanging
|
||||
// before the initial commit/abort, then the coordinator epoch will be -1
|
||||
// as seen in the `DescribeProducers` output. In this case, we conservatively
|
||||
// use a coordinator epoch of 0, which is less than or equal to any possible
|
||||
// leader epoch.
|
||||
if (coordinatorEpoch < 0) { |
||||
coordinatorEpoch = 0; |
||||
} |
||||
|
||||
abortSpec = new AbortTransactionSpec( |
||||
topicPartition, |
||||
producerId, |
||||
producerEpoch, |
||||
coordinatorEpoch |
||||
); |
||||
} else { |
||||
abortSpec = buildAbortSpec(admin, topicPartition, startOffset); |
||||
} |
||||
|
||||
abortTransaction(admin, abortSpec); |
||||
} |
||||
} |
||||
|
||||
static class DescribeProducersCommand extends TransactionsCommand { |
||||
static final String[] HEADERS = new String[]{ |
||||
"ProducerId", |
||||
"ProducerEpoch", |
||||
"LatestCoordinatorEpoch", |
||||
"LastSequence", |
||||
"LastTimestamp", |
||||
"CurrentTransactionStartOffset" |
||||
}; |
||||
|
||||
DescribeProducersCommand(Time time) { |
||||
super(time); |
||||
} |
||||
|
||||
@Override |
||||
public String name() { |
||||
return "describe-producers"; |
||||
} |
||||
|
||||
@Override |
||||
public void addSubparser(Subparsers subparsers) { |
||||
Subparser subparser = subparsers.addParser(name()) |
||||
.help("describe the states of active producers for a topic partition"); |
||||
|
||||
subparser.addArgument("--broker-id") |
||||
.help("optional broker id to describe the producer state on a specific replica") |
||||
.action(store()) |
||||
.type(Integer.class) |
||||
.required(false); |
||||
|
||||
subparser.addArgument("--topic") |
||||
.help("topic name") |
||||
.action(store()) |
||||
.type(String.class) |
||||
.required(true); |
||||
|
||||
subparser.addArgument("--partition") |
||||
.help("partition number") |
||||
.action(store()) |
||||
.type(Integer.class) |
||||
.required(true); |
||||
} |
||||
|
||||
@Override |
||||
public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception { |
||||
DescribeProducersOptions options = new DescribeProducersOptions(); |
||||
Optional.ofNullable(ns.getInt("broker_id")).ifPresent(options::brokerId); |
||||
|
||||
String topicName = ns.getString("topic"); |
||||
Integer partitionId = ns.getInt("partition"); |
||||
TopicPartition topicPartition = new TopicPartition(topicName, partitionId); |
||||
|
||||
final DescribeProducersResult.PartitionProducerState result; |
||||
|
||||
try { |
||||
result = admin.describeProducers(singleton(topicPartition), options) |
||||
.partitionResult(topicPartition) |
||||
.get(); |
||||
} catch (ExecutionException e) { |
||||
String brokerClause = options.brokerId().isPresent() ? |
||||
"broker " + options.brokerId().getAsInt() : |
||||
"leader"; |
||||
printErrorAndExit("Failed to describe producers for partition " + |
||||
topicPartition + " on " + brokerClause, e.getCause()); |
||||
return; |
||||
} |
||||
|
||||
List<String[]> rows = result.activeProducers().stream().map(producerState -> { |
||||
String currentTransactionStartOffsetColumnValue = |
||||
producerState.currentTransactionStartOffset().isPresent() ? |
||||
String.valueOf(producerState.currentTransactionStartOffset().getAsLong()) : |
||||
"None"; |
||||
|
||||
return new String[] { |
||||
String.valueOf(producerState.producerId()), |
||||
String.valueOf(producerState.producerEpoch()), |
||||
String.valueOf(producerState.coordinatorEpoch().orElse(-1)), |
||||
String.valueOf(producerState.lastSequence()), |
||||
String.valueOf(producerState.lastTimestamp()), |
||||
currentTransactionStartOffsetColumnValue |
||||
}; |
||||
}).collect(Collectors.toList()); |
||||
|
||||
prettyPrintTable(HEADERS, rows, out); |
||||
} |
||||
} |
||||
|
||||
static class DescribeTransactionsCommand extends TransactionsCommand { |
||||
static final String[] HEADERS = new String[]{ |
||||
"CoordinatorId", |
||||
"TransactionalId", |
||||
"ProducerId", |
||||
"ProducerEpoch", |
||||
"TransactionState", |
||||
"TransactionTimeoutMs", |
||||
"CurrentTransactionStartTimeMs", |
||||
"TransactionDurationMs", |
||||
"TopicPartitions" |
||||
}; |
||||
|
||||
DescribeTransactionsCommand(Time time) { |
||||
super(time); |
||||
} |
||||
|
||||
@Override |
||||
public String name() { |
||||
return "describe"; |
||||
} |
||||
|
||||
@Override |
||||
public void addSubparser(Subparsers subparsers) { |
||||
Subparser subparser = subparsers.addParser(name()) |
||||
.description("Describe the state of an active transactional-id.") |
||||
.help("describe the state of an active transactional-id"); |
||||
|
||||
subparser.addArgument("--transactional-id") |
||||
.help("transactional id") |
||||
.action(store()) |
||||
.type(String.class) |
||||
.required(true); |
||||
} |
||||
|
||||
@Override |
||||
public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception { |
||||
String transactionalId = ns.getString("transactional_id"); |
||||
|
||||
final TransactionDescription result; |
||||
try { |
||||
result = admin.describeTransactions(singleton(transactionalId)) |
||||
.description(transactionalId) |
||||
.get(); |
||||
} catch (ExecutionException e) { |
||||
printErrorAndExit("Failed to describe transaction state of " + |
||||
"transactional-id `" + transactionalId + "`", e.getCause()); |
||||
return; |
||||
} |
||||
|
||||
final String transactionDurationMsColumnValue; |
||||
final String transactionStartTimeMsColumnValue; |
||||
|
||||
if (result.transactionStartTimeMs().isPresent()) { |
||||
long transactionStartTimeMs = result.transactionStartTimeMs().getAsLong(); |
||||
transactionStartTimeMsColumnValue = String.valueOf(transactionStartTimeMs); |
||||
transactionDurationMsColumnValue = String.valueOf(time.milliseconds() - transactionStartTimeMs); |
||||
} else { |
||||
transactionStartTimeMsColumnValue = "None"; |
||||
transactionDurationMsColumnValue = "None"; |
||||
} |
||||
|
||||
String[] row = new String[]{ |
||||
String.valueOf(result.coordinatorId()), |
||||
transactionalId, |
||||
String.valueOf(result.producerId()), |
||||
String.valueOf(result.producerEpoch()), |
||||
result.state().toString(), |
||||
String.valueOf(result.transactionTimeoutMs()), |
||||
transactionStartTimeMsColumnValue, |
||||
transactionDurationMsColumnValue, |
||||
Utils.join(result.topicPartitions(), ",") |
||||
}; |
||||
|
||||
prettyPrintTable(HEADERS, singletonList(row), out); |
||||
} |
||||
} |
||||
|
||||
static class ListTransactionsCommand extends TransactionsCommand { |
||||
static final String[] HEADERS = new String[] { |
||||
"TransactionalId", |
||||
"Coordinator", |
||||
"ProducerId", |
||||
"TransactionState" |
||||
}; |
||||
|
||||
ListTransactionsCommand(Time time) { |
||||
super(time); |
||||
} |
||||
|
||||
@Override |
||||
public String name() { |
||||
return "list"; |
||||
} |
||||
|
||||
@Override |
||||
public void addSubparser(Subparsers subparsers) { |
||||
subparsers.addParser(name()) |
||||
.help("list transactions"); |
||||
} |
||||
|
||||
@Override |
||||
public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception { |
||||
final Map<Integer, Collection<TransactionListing>> result; |
||||
|
||||
try { |
||||
result = admin.listTransactions() |
||||
.allByBrokerId() |
||||
.get(); |
||||
} catch (ExecutionException e) { |
||||
printErrorAndExit("Failed to list transactions", e.getCause()); |
||||
return; |
||||
} |
||||
|
||||
List<String[]> rows = new ArrayList<>(); |
||||
for (Map.Entry<Integer, Collection<TransactionListing>> brokerListingsEntry : result.entrySet()) { |
||||
String coordinatorIdString = brokerListingsEntry.getKey().toString(); |
||||
Collection<TransactionListing> listings = brokerListingsEntry.getValue(); |
||||
|
||||
for (TransactionListing listing : listings) { |
||||
rows.add(new String[] { |
||||
listing.transactionalId(), |
||||
coordinatorIdString, |
||||
String.valueOf(listing.producerId()), |
||||
listing.state().toString() |
||||
}); |
||||
} |
||||
} |
||||
|
||||
prettyPrintTable(HEADERS, rows, out); |
||||
} |
||||
} |
||||
|
||||
private static void appendColumnValue( |
||||
StringBuilder rowBuilder, |
||||
String value, |
||||
int length |
||||
) { |
||||
int padLength = length - value.length(); |
||||
rowBuilder.append(value); |
||||
for (int i = 0; i < padLength; i++) |
||||
rowBuilder.append(' '); |
||||
} |
||||
|
||||
private static void printRow( |
||||
List<Integer> columnLengths, |
||||
String[] row, |
||||
PrintStream out |
||||
) { |
||||
StringBuilder rowBuilder = new StringBuilder(); |
||||
for (int i = 0; i < row.length; i++) { |
||||
Integer columnLength = columnLengths.get(i); |
||||
String columnValue = row[i]; |
||||
appendColumnValue(rowBuilder, columnValue, columnLength); |
||||
rowBuilder.append('\t'); |
||||
} |
||||
out.println(rowBuilder); |
||||
} |
||||
|
||||
private static void prettyPrintTable( |
||||
String[] headers, |
||||
List<String[]> rows, |
||||
PrintStream out |
||||
) { |
||||
List<Integer> columnLengths = Arrays.stream(headers) |
||||
.map(String::length) |
||||
.collect(Collectors.toList()); |
||||
|
||||
for (String[] row : rows) { |
||||
for (int i = 0; i < headers.length; i++) { |
||||
columnLengths.set(i, Math.max(columnLengths.get(i), row[i].length())); |
||||
} |
||||
} |
||||
|
||||
printRow(columnLengths, headers, out); |
||||
rows.forEach(row -> printRow(columnLengths, row, out)); |
||||
} |
||||
|
||||
private static void printErrorAndExit(String message, Throwable t) { |
||||
log.debug(message, t); |
||||
|
||||
String exitMessage = message + ": " + t.getMessage() + "." + |
||||
" Enable debug logging for additional detail."; |
||||
|
||||
printErrorAndExit(exitMessage); |
||||
} |
||||
|
||||
private static void printErrorAndExit(String message) { |
||||
System.err.println(message); |
||||
Exit.exit(1, message); |
||||
} |
||||
|
||||
private static Admin buildAdminClient(Namespace ns) { |
||||
final Properties properties; |
||||
|
||||
String configFile = ns.getString("command_config"); |
||||
if (configFile == null) { |
||||
properties = new Properties(); |
||||
} else { |
||||
try { |
||||
properties = Utils.loadProps(configFile); |
||||
} catch (IOException e) { |
||||
printErrorAndExit("Failed to load admin client properties", e); |
||||
return null; |
||||
} |
||||
} |
||||
|
||||
String bootstrapServers = ns.getString("bootstrap_server"); |
||||
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); |
||||
|
||||
return Admin.create(properties); |
||||
} |
||||
|
||||
static ArgumentParser buildBaseParser() { |
||||
ArgumentParser parser = ArgumentParsers |
||||
.newArgumentParser("kafka-transactions.sh"); |
||||
|
||||
parser.description("This tool is used to analyze the transactional state of producers in the cluster. " + |
||||
"It can be used to detect and recover from hanging transactions."); |
||||
|
||||
parser.addArgument("-v", "--version") |
||||
.action(new PrintVersionAndExitAction()) |
||||
.help("show the version of this Kafka distribution and exit"); |
||||
|
||||
parser.addArgument("--command-config") |
||||
.help("property file containing configs to be passed to admin client") |
||||
.action(store()) |
||||
.type(String.class) |
||||
.metavar("FILE") |
||||
.required(false); |
||||
|
||||
parser.addArgument("--bootstrap-server") |
||||
.help("hostname and port for the broker to connect to, in the form `host:port` " + |
||||
"(multiple comma-separated entries can be given)") |
||||
.action(store()) |
||||
.type(String.class) |
||||
.metavar("host:port") |
||||
.required(true); |
||||
|
||||
return parser; |
||||
} |
||||
|
||||
static void execute( |
||||
String[] args, |
||||
Function<Namespace, Admin> adminSupplier, |
||||
PrintStream out, |
||||
Time time |
||||
) throws Exception { |
||||
List<TransactionsCommand> commands = Arrays.asList( |
||||
new ListTransactionsCommand(time), |
||||
new DescribeTransactionsCommand(time), |
||||
new DescribeProducersCommand(time), |
||||
new AbortTransactionCommand(time) |
||||
); |
||||
|
||||
ArgumentParser parser = buildBaseParser(); |
||||
Subparsers subparsers = parser.addSubparsers() |
||||
.dest("command") |
||||
.title("commands") |
||||
.metavar("COMMAND"); |
||||
commands.forEach(command -> command.addSubparser(subparsers)); |
||||
|
||||
final Namespace ns; |
||||
|
||||
try { |
||||
ns = parser.parseArgs(args); |
||||
} catch (ArgumentParserException e) { |
||||
parser.handleError(e); |
||||
Exit.exit(1); |
||||
return; |
||||
} |
||||
|
||||
Admin admin = adminSupplier.apply(ns); |
||||
String commandName = ns.getString("command"); |
||||
|
||||
Optional<TransactionsCommand> commandOpt = commands.stream() |
||||
.filter(cmd -> cmd.name().equals(commandName)) |
||||
.findFirst(); |
||||
|
||||
if (!commandOpt.isPresent()) { |
||||
printErrorAndExit("Unexpected command " + commandName); |
||||
} |
||||
|
||||
TransactionsCommand command = commandOpt.get(); |
||||
command.execute(admin, ns, out); |
||||
Exit.exit(0); |
||||
} |
||||
|
||||
public static void main(String[] args) throws Exception { |
||||
execute(args, TransactionsCommand::buildAdminClient, System.out, Time.SYSTEM); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,492 @@
@@ -0,0 +1,492 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.tools; |
||||
|
||||
import org.apache.kafka.clients.admin.AbortTransactionResult; |
||||
import org.apache.kafka.clients.admin.AbortTransactionSpec; |
||||
import org.apache.kafka.clients.admin.Admin; |
||||
import org.apache.kafka.clients.admin.DescribeProducersOptions; |
||||
import org.apache.kafka.clients.admin.DescribeProducersResult; |
||||
import org.apache.kafka.clients.admin.DescribeProducersResult.PartitionProducerState; |
||||
import org.apache.kafka.clients.admin.DescribeTransactionsResult; |
||||
import org.apache.kafka.clients.admin.ListTransactionsResult; |
||||
import org.apache.kafka.clients.admin.ProducerState; |
||||
import org.apache.kafka.clients.admin.TransactionDescription; |
||||
import org.apache.kafka.clients.admin.TransactionListing; |
||||
import org.apache.kafka.clients.admin.TransactionState; |
||||
import org.apache.kafka.common.KafkaFuture; |
||||
import org.apache.kafka.common.TopicPartition; |
||||
import org.apache.kafka.common.internals.KafkaFutureImpl; |
||||
import org.apache.kafka.common.utils.Exit; |
||||
import org.apache.kafka.common.utils.MockTime; |
||||
import org.apache.kafka.common.utils.Utils; |
||||
import org.junit.jupiter.api.AfterEach; |
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.junit.jupiter.params.ParameterizedTest; |
||||
import org.junit.jupiter.params.provider.ValueSource; |
||||
import org.mockito.Mockito; |
||||
|
||||
import java.io.BufferedReader; |
||||
import java.io.ByteArrayInputStream; |
||||
import java.io.ByteArrayOutputStream; |
||||
import java.io.IOException; |
||||
import java.io.InputStreamReader; |
||||
import java.io.PrintStream; |
||||
import java.util.ArrayList; |
||||
import java.util.Collection; |
||||
import java.util.HashMap; |
||||
import java.util.HashSet; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.OptionalInt; |
||||
import java.util.OptionalLong; |
||||
import java.util.Set; |
||||
|
||||
import static java.util.Arrays.asList; |
||||
import static java.util.Collections.singleton; |
||||
import static java.util.Collections.singletonList; |
||||
import static org.junit.jupiter.api.Assertions.assertEquals; |
||||
import static org.junit.jupiter.api.Assertions.assertTrue; |
||||
|
||||
public class TransactionsCommandTest { |
||||
|
||||
private final MockExitProcedure exitProcedure = new MockExitProcedure(); |
||||
private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); |
||||
private final PrintStream out = new PrintStream(outputStream); |
||||
private final MockTime time = new MockTime(); |
||||
private final Admin admin = Mockito.mock(Admin.class); |
||||
|
||||
@BeforeEach |
||||
public void setupExitProcedure() { |
||||
Exit.setExitProcedure(exitProcedure); |
||||
} |
||||
|
||||
@AfterEach |
||||
public void resetExitProcedure() { |
||||
Exit.resetExitProcedure(); |
||||
} |
||||
|
||||
@Test |
||||
public void testDescribeProducersTopicRequired() throws Exception { |
||||
assertCommandFailure(new String[]{ |
||||
"--bootstrap-server", |
||||
"localhost:9092", |
||||
"describe-producers", |
||||
"--partition", |
||||
"0" |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
public void testDescribeProducersPartitionRequired() throws Exception { |
||||
assertCommandFailure(new String[]{ |
||||
"--bootstrap-server", |
||||
"localhost:9092", |
||||
"describe-producers", |
||||
"--topic", |
||||
"foo" |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
public void testDescribeProducersLeader() throws Exception { |
||||
TopicPartition topicPartition = new TopicPartition("foo", 5); |
||||
String[] args = new String[] { |
||||
"--bootstrap-server", |
||||
"localhost:9092", |
||||
"describe-producers", |
||||
"--topic", |
||||
topicPartition.topic(), |
||||
"--partition", |
||||
String.valueOf(topicPartition.partition()) |
||||
}; |
||||
|
||||
testDescribeProducers(topicPartition, args, new DescribeProducersOptions()); |
||||
} |
||||
|
||||
@Test |
||||
public void testDescribeProducersSpecificReplica() throws Exception { |
||||
TopicPartition topicPartition = new TopicPartition("foo", 5); |
||||
int brokerId = 5; |
||||
|
||||
String[] args = new String[] { |
||||
"--bootstrap-server", |
||||
"localhost:9092", |
||||
"describe-producers", |
||||
"--topic", |
||||
topicPartition.topic(), |
||||
"--partition", |
||||
String.valueOf(topicPartition.partition()), |
||||
"--broker-id", |
||||
String.valueOf(brokerId) |
||||
}; |
||||
|
||||
testDescribeProducers(topicPartition, args, new DescribeProducersOptions().brokerId(brokerId)); |
||||
} |
||||
|
||||
private void testDescribeProducers( |
||||
TopicPartition topicPartition, |
||||
String[] args, |
||||
DescribeProducersOptions expectedOptions |
||||
) throws Exception { |
||||
DescribeProducersResult describeResult = Mockito.mock(DescribeProducersResult.class); |
||||
KafkaFuture<PartitionProducerState> describeFuture = KafkaFutureImpl.completedFuture( |
||||
new PartitionProducerState(asList( |
||||
new ProducerState(12345L, 15, 1300, 1599509565L, |
||||
OptionalInt.of(20), OptionalLong.of(990)), |
||||
new ProducerState(98765L, 30, 2300, 1599509599L, |
||||
OptionalInt.empty(), OptionalLong.empty()) |
||||
))); |
||||
|
||||
|
||||
Mockito.when(describeResult.partitionResult(topicPartition)).thenReturn(describeFuture); |
||||
Mockito.when(admin.describeProducers(singleton(topicPartition), expectedOptions)).thenReturn(describeResult); |
||||
|
||||
execute(args); |
||||
assertNormalExit(); |
||||
|
||||
List<List<String>> table = readOutputAsTable(); |
||||
assertEquals(3, table.size()); |
||||
|
||||
List<String> expectedHeaders = asList(TransactionsCommand.DescribeProducersCommand.HEADERS); |
||||
assertEquals(expectedHeaders, table.get(0)); |
||||
|
||||
Set<List<String>> expectedRows = Utils.mkSet( |
||||
asList("12345", "15", "20", "1300", "1599509565", "990"), |
||||
asList("98765", "30", "-1", "2300", "1599509599", "None") |
||||
); |
||||
assertEquals(expectedRows, new HashSet<>(table.subList(1, table.size()))); |
||||
} |
||||
|
||||
@Test |
||||
public void testListTransactions() throws Exception { |
||||
String[] args = new String[] { |
||||
"--bootstrap-server", |
||||
"localhost:9092", |
||||
"list" |
||||
}; |
||||
|
||||
ListTransactionsResult listResult = Mockito.mock(ListTransactionsResult.class); |
||||
|
||||
Map<Integer, Collection<TransactionListing>> transactions = new HashMap<>(); |
||||
transactions.put(0, asList( |
||||
new TransactionListing("foo", 12345L, TransactionState.ONGOING), |
||||
new TransactionListing("bar", 98765L, TransactionState.PREPARE_ABORT) |
||||
)); |
||||
transactions.put(1, singletonList( |
||||
new TransactionListing("baz", 13579L, TransactionState.COMPLETE_COMMIT) |
||||
)); |
||||
|
||||
KafkaFuture<Map<Integer, Collection<TransactionListing>>> listTransactionsFuture = |
||||
KafkaFutureImpl.completedFuture(transactions); |
||||
|
||||
Mockito.when(admin.listTransactions()).thenReturn(listResult); |
||||
Mockito.when(listResult.allByBrokerId()).thenReturn(listTransactionsFuture); |
||||
|
||||
execute(args); |
||||
assertNormalExit(); |
||||
|
||||
List<List<String>> table = readOutputAsTable(); |
||||
assertEquals(4, table.size()); |
||||
|
||||
// Assert expected headers
|
||||
List<String> expectedHeaders = asList(TransactionsCommand.ListTransactionsCommand.HEADERS); |
||||
assertEquals(expectedHeaders, table.get(0)); |
||||
|
||||
Set<List<String>> expectedRows = Utils.mkSet( |
||||
asList("foo", "0", "12345", "Ongoing"), |
||||
asList("bar", "0", "98765", "PrepareAbort"), |
||||
asList("baz", "1", "13579", "CompleteCommit") |
||||
); |
||||
assertEquals(expectedRows, new HashSet<>(table.subList(1, table.size()))); |
||||
} |
||||
|
||||
@Test |
||||
public void testDescribeTransactionsTransactionalIdRequired() throws Exception { |
||||
assertCommandFailure(new String[]{ |
||||
"--bootstrap-server", |
||||
"localhost:9092", |
||||
"describe" |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
public void testDescribeTransaction() throws Exception { |
||||
String transactionalId = "foo"; |
||||
String[] args = new String[] { |
||||
"--bootstrap-server", |
||||
"localhost:9092", |
||||
"describe", |
||||
"--transactional-id", |
||||
transactionalId |
||||
}; |
||||
|
||||
DescribeTransactionsResult describeResult = Mockito.mock(DescribeTransactionsResult.class); |
||||
|
||||
int coordinatorId = 5; |
||||
long transactionStartTime = time.milliseconds(); |
||||
|
||||
KafkaFuture<TransactionDescription> describeFuture = KafkaFutureImpl.completedFuture( |
||||
new TransactionDescription( |
||||
coordinatorId, |
||||
TransactionState.ONGOING, |
||||
12345L, |
||||
15, |
||||
10000, |
||||
OptionalLong.of(transactionStartTime), |
||||
singleton(new TopicPartition("bar", 0)) |
||||
)); |
||||
|
||||
Mockito.when(describeResult.description(transactionalId)).thenReturn(describeFuture); |
||||
Mockito.when(admin.describeTransactions(singleton(transactionalId))).thenReturn(describeResult); |
||||
|
||||
// Add a little time so that we can see a positive transaction duration in the output
|
||||
time.sleep(5000); |
||||
|
||||
execute(args); |
||||
assertNormalExit(); |
||||
|
||||
List<List<String>> table = readOutputAsTable(); |
||||
assertEquals(2, table.size()); |
||||
|
||||
List<String> expectedHeaders = asList(TransactionsCommand.DescribeTransactionsCommand.HEADERS); |
||||
assertEquals(expectedHeaders, table.get(0)); |
||||
|
||||
List<String> expectedRow = asList( |
||||
String.valueOf(coordinatorId), |
||||
transactionalId, |
||||
"12345", |
||||
"15", |
||||
"Ongoing", |
||||
"10000", |
||||
String.valueOf(transactionStartTime), |
||||
"5000", |
||||
"bar-0" |
||||
); |
||||
assertEquals(expectedRow, table.get(1)); |
||||
} |
||||
|
||||
@Test |
||||
public void testDescribeTransactionsStartOffsetOrProducerIdRequired() throws Exception { |
||||
assertCommandFailure(new String[]{ |
||||
"--bootstrap-server", |
||||
"localhost:9092", |
||||
"abort", |
||||
"--topic", |
||||
"foo", |
||||
"--partition", |
||||
"0" |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
public void testDescribeTransactionsTopicRequired() throws Exception { |
||||
assertCommandFailure(new String[]{ |
||||
"--bootstrap-server", |
||||
"localhost:9092", |
||||
"abort", |
||||
"--partition", |
||||
"0", |
||||
"--start-offset", |
||||
"9990" |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
public void testDescribeTransactionsPartitionRequired() throws Exception { |
||||
assertCommandFailure(new String[]{ |
||||
"--bootstrap-server", |
||||
"localhost:9092", |
||||
"abort", |
||||
"--topic", |
||||
"foo", |
||||
"--start-offset", |
||||
"9990" |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
public void testDescribeTransactionsProducerEpochRequiredWithProducerId() throws Exception { |
||||
assertCommandFailure(new String[]{ |
||||
"--bootstrap-server", |
||||
"localhost:9092", |
||||
"abort", |
||||
"--topic", |
||||
"foo", |
||||
"--partition", |
||||
"0", |
||||
"--producer-id", |
||||
"12345" |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
public void testDescribeTransactionsCoordinatorEpochRequiredWithProducerId() throws Exception { |
||||
assertCommandFailure(new String[]{ |
||||
"--bootstrap-server", |
||||
"localhost:9092", |
||||
"abort", |
||||
"--topic", |
||||
"foo", |
||||
"--partition", |
||||
"0", |
||||
"--producer-id", |
||||
"12345", |
||||
"--producer-epoch", |
||||
"15" |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
public void testNewBrokerAbortTransaction() throws Exception { |
||||
TopicPartition topicPartition = new TopicPartition("foo", 5); |
||||
long startOffset = 9173; |
||||
long producerId = 12345L; |
||||
short producerEpoch = 15; |
||||
int coordinatorEpoch = 76; |
||||
|
||||
String[] args = new String[] { |
||||
"--bootstrap-server", |
||||
"localhost:9092", |
||||
"abort", |
||||
"--topic", |
||||
topicPartition.topic(), |
||||
"--partition", |
||||
String.valueOf(topicPartition.partition()), |
||||
"--start-offset", |
||||
String.valueOf(startOffset) |
||||
}; |
||||
|
||||
DescribeProducersResult describeResult = Mockito.mock(DescribeProducersResult.class); |
||||
KafkaFuture<PartitionProducerState> describeFuture = KafkaFutureImpl.completedFuture( |
||||
new PartitionProducerState(singletonList( |
||||
new ProducerState(producerId, producerEpoch, 1300, 1599509565L, |
||||
OptionalInt.of(coordinatorEpoch), OptionalLong.of(startOffset)) |
||||
))); |
||||
|
||||
AbortTransactionResult abortTransactionResult = Mockito.mock(AbortTransactionResult.class); |
||||
KafkaFuture<Void> abortFuture = KafkaFutureImpl.completedFuture(null); |
||||
AbortTransactionSpec expectedAbortSpec = new AbortTransactionSpec( |
||||
topicPartition, producerId, producerEpoch, coordinatorEpoch); |
||||
|
||||
Mockito.when(describeResult.partitionResult(topicPartition)).thenReturn(describeFuture); |
||||
Mockito.when(admin.describeProducers(singleton(topicPartition))).thenReturn(describeResult); |
||||
|
||||
Mockito.when(abortTransactionResult.all()).thenReturn(abortFuture); |
||||
Mockito.when(admin.abortTransaction(expectedAbortSpec)).thenReturn(abortTransactionResult); |
||||
|
||||
execute(args); |
||||
assertNormalExit(); |
||||
} |
||||
|
||||
@ParameterizedTest |
||||
@ValueSource(ints = {29, -1}) |
||||
public void testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordinatorEpoch) throws Exception { |
||||
TopicPartition topicPartition = new TopicPartition("foo", 5); |
||||
long producerId = 12345L; |
||||
short producerEpoch = 15; |
||||
|
||||
String[] args = new String[] { |
||||
"--bootstrap-server", |
||||
"localhost:9092", |
||||
"abort", |
||||
"--topic", |
||||
topicPartition.topic(), |
||||
"--partition", |
||||
String.valueOf(topicPartition.partition()), |
||||
"--producer-id", |
||||
String.valueOf(producerId), |
||||
"--producer-epoch", |
||||
String.valueOf(producerEpoch), |
||||
"--coordinator-epoch", |
||||
String.valueOf(coordinatorEpoch) |
||||
}; |
||||
|
||||
AbortTransactionResult abortTransactionResult = Mockito.mock(AbortTransactionResult.class); |
||||
KafkaFuture<Void> abortFuture = KafkaFutureImpl.completedFuture(null); |
||||
|
||||
final int expectedCoordinatorEpoch; |
||||
if (coordinatorEpoch < 0) { |
||||
expectedCoordinatorEpoch = 0; |
||||
} else { |
||||
expectedCoordinatorEpoch = coordinatorEpoch; |
||||
} |
||||
|
||||
AbortTransactionSpec expectedAbortSpec = new AbortTransactionSpec( |
||||
topicPartition, producerId, producerEpoch, expectedCoordinatorEpoch); |
||||
|
||||
Mockito.when(abortTransactionResult.all()).thenReturn(abortFuture); |
||||
Mockito.when(admin.abortTransaction(expectedAbortSpec)).thenReturn(abortTransactionResult); |
||||
|
||||
execute(args); |
||||
assertNormalExit(); |
||||
} |
||||
|
||||
private void execute(String[] args) throws Exception { |
||||
TransactionsCommand.execute(args, ns -> admin, out, time); |
||||
} |
||||
|
||||
private List<List<String>> readOutputAsTable() throws IOException { |
||||
List<List<String>> table = new ArrayList<>(); |
||||
ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); |
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); |
||||
|
||||
while (true) { |
||||
List<String> row = readRow(reader); |
||||
if (row == null) { |
||||
break; |
||||
} |
||||
table.add(row); |
||||
} |
||||
return table; |
||||
} |
||||
|
||||
private List<String> readRow(BufferedReader reader) throws IOException { |
||||
String line = reader.readLine(); |
||||
if (line == null) { |
||||
return null; |
||||
} else { |
||||
return asList(line.split("\\s+")); |
||||
} |
||||
} |
||||
|
||||
private void assertNormalExit() { |
||||
assertTrue(exitProcedure.hasExited); |
||||
assertEquals(0, exitProcedure.statusCode); |
||||
} |
||||
|
||||
private void assertCommandFailure(String[] args) throws Exception { |
||||
execute(args); |
||||
assertTrue(exitProcedure.hasExited); |
||||
assertEquals(1, exitProcedure.statusCode); |
||||
} |
||||
|
||||
private static class MockExitProcedure implements Exit.Procedure { |
||||
private boolean hasExited = false; |
||||
private int statusCode; |
||||
|
||||
@Override |
||||
public void execute(int statusCode, String message) { |
||||
if (!this.hasExited) { |
||||
this.hasExited = true; |
||||
this.statusCode = statusCode; |
||||
} |
||||
} |
||||
} |
||||
|
||||
} |
Loading…
Reference in new issue