From fce771579c3e20f20949c4c7e0a5e3a16c57c7f0 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 22 Jun 2021 09:47:30 -0700 Subject: [PATCH] KAFKA-12888; Add transaction tool from KIP-664 (#10814) This patch adds the transaction tool specified in KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions. This includes all of the logic for describing transactional state and for aborting transactions. The only thing that is left out is the `--find-hanging` implementation, which will be left for a subsequent patch. Reviewers: Boyang Chen , David Jacot --- bin/kafka-transactions.sh | 17 + bin/windows/kafka-transactions.bat | 17 + .../tools/PrintVersionAndExitAction.java | 52 ++ .../kafka/tools/TransactionsCommand.java | 623 ++++++++++++++++++ .../kafka/tools/TransactionsCommandTest.java | 492 ++++++++++++++ 5 files changed, 1201 insertions(+) create mode 100755 bin/kafka-transactions.sh create mode 100644 bin/windows/kafka-transactions.bat create mode 100644 tools/src/main/java/org/apache/kafka/tools/PrintVersionAndExitAction.java create mode 100644 tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java create mode 100644 tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java diff --git a/bin/kafka-transactions.sh b/bin/kafka-transactions.sh new file mode 100755 index 00000000000..6fb52338555 --- /dev/null +++ b/bin/kafka-transactions.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.TransactionsCommand "$@" diff --git a/bin/windows/kafka-transactions.bat b/bin/windows/kafka-transactions.bat new file mode 100644 index 00000000000..9bb7585fca9 --- /dev/null +++ b/bin/windows/kafka-transactions.bat @@ -0,0 +1,17 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +"%~dp0kafka-run-class.bat" org.apache.kafka.tools.TransactionsCommand %* diff --git a/tools/src/main/java/org/apache/kafka/tools/PrintVersionAndExitAction.java b/tools/src/main/java/org/apache/kafka/tools/PrintVersionAndExitAction.java new file mode 100644 index 00000000000..6846c7fab5b --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/PrintVersionAndExitAction.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.inf.Argument; +import net.sourceforge.argparse4j.inf.ArgumentAction; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; + +import java.util.Map; + +class PrintVersionAndExitAction implements ArgumentAction { + + @Override + public void run( + ArgumentParser parser, + Argument arg, + Map attrs, + String flag, + Object value + ) { + String version = AppInfoParser.getVersion(); + String commitId = AppInfoParser.getCommitId(); + System.out.println(version + " (Commit:" + commitId + ")"); + Exit.exit(0); + } + + @Override + public void onAttach(Argument arg) { + + } + + @Override + public boolean consumeArgument() { + return false; + } +} diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java new file mode 100644 index 00000000000..241946092c5 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java @@ -0,0 +1,623 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentGroup; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import org.apache.kafka.clients.admin.AbortTransactionSpec; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.DescribeProducersOptions; +import org.apache.kafka.clients.admin.DescribeProducersResult; +import org.apache.kafka.clients.admin.ProducerState; +import org.apache.kafka.clients.admin.TransactionDescription; +import org.apache.kafka.clients.admin.TransactionListing; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public abstract class TransactionsCommand { + private static final Logger log = LoggerFactory.getLogger(TransactionsCommand.class); + + protected final Time time; + + protected TransactionsCommand(Time time) { + this.time = time; + } + + /** + * Get the name of this command (e.g. `describe-producers`). + */ + abstract String name(); + + /** + * Specify the arguments needed for this command. + */ + abstract void addSubparser(Subparsers subparsers); + + /** + * Execute the command logic. + */ + abstract void execute(Admin admin, Namespace ns, PrintStream out) throws Exception; + + + static class AbortTransactionCommand extends TransactionsCommand { + + AbortTransactionCommand(Time time) { + super(time); + } + + @Override + String name() { + return "abort"; + } + + @Override + void addSubparser(Subparsers subparsers) { + Subparser subparser = subparsers.addParser(name()) + .help("abort a hanging transaction (requires administrative privileges)"); + + subparser.addArgument("--topic") + .help("topic name") + .action(store()) + .type(String.class) + .required(true); + + subparser.addArgument("--partition") + .help("partition number") + .action(store()) + .type(Integer.class) + .required(true); + + ArgumentGroup newBrokerArgumentGroup = subparser + .addArgumentGroup("Brokers on versions 3.0 and above") + .description("For newer brokers, only the start offset of the transaction " + + "to be aborted is required"); + + newBrokerArgumentGroup.addArgument("--start-offset") + .help("start offset of the transaction to abort") + .action(store()) + .type(Long.class); + + ArgumentGroup olderBrokerArgumentGroup = subparser + .addArgumentGroup("Brokers on versions older than 3.0") + .description("For older brokers, you must provide all of these arguments"); + + olderBrokerArgumentGroup.addArgument("--producer-id") + .help("producer id") + .action(store()) + .type(Long.class); + + olderBrokerArgumentGroup.addArgument("--producer-epoch") + .help("producer epoch") + .action(store()) + .type(Short.class); + + olderBrokerArgumentGroup.addArgument("--coordinator-epoch") + .help("coordinator epoch") + .action(store()) + .type(Integer.class); + } + + private AbortTransactionSpec buildAbortSpec( + Admin admin, + TopicPartition topicPartition, + long startOffset + ) throws Exception { + final DescribeProducersResult.PartitionProducerState result; + try { + result = admin.describeProducers(singleton(topicPartition)) + .partitionResult(topicPartition) + .get(); + } catch (ExecutionException e) { + printErrorAndExit("Failed to validate producer state for partition " + + topicPartition, e.getCause()); + return null; + } + + Optional foundProducerState = result.activeProducers().stream() + .filter(producerState -> { + OptionalLong txnStartOffsetOpt = producerState.currentTransactionStartOffset(); + return txnStartOffsetOpt.isPresent() && txnStartOffsetOpt.getAsLong() == startOffset; + }) + .findFirst(); + + if (!foundProducerState.isPresent()) { + printErrorAndExit("Could not find any open transactions starting at offset " + + startOffset + " on partition " + topicPartition); + return null; + } + + ProducerState producerState = foundProducerState.get(); + return new AbortTransactionSpec( + topicPartition, + producerState.producerId(), + (short) producerState.producerEpoch(), + producerState.coordinatorEpoch().orElse(0) + ); + } + + private void abortTransaction( + Admin admin, + AbortTransactionSpec abortSpec + ) throws Exception { + try { + admin.abortTransaction(abortSpec).all().get(); + } catch (ExecutionException e) { + TransactionsCommand.printErrorAndExit("Failed to abort transaction " + abortSpec, e.getCause()); + } + } + + @Override + void execute(Admin admin, Namespace ns, PrintStream out) throws Exception { + String topicName = ns.getString("topic"); + Integer partitionId = ns.getInt("partition"); + TopicPartition topicPartition = new TopicPartition(topicName, partitionId); + + Long startOffset = ns.getLong("start_offset"); + Long producerId = ns.getLong("producer_id"); + + if (startOffset == null && producerId == null) { + printErrorAndExit("The transaction to abort must be identified either with " + + "--start-offset (for brokers on 3.0 or above) or with " + + "--producer-id, --producer-epoch, and --coordinator-epoch (for older brokers)"); + return; + } + + final AbortTransactionSpec abortSpec; + if (startOffset == null) { + Short producerEpoch = ns.getShort("producer_epoch"); + if (producerEpoch == null) { + printErrorAndExit("Missing required argument --producer-epoch"); + return; + } + + Integer coordinatorEpoch = ns.getInt("coordinator_epoch"); + if (coordinatorEpoch == null) { + printErrorAndExit("Missing required argument --coordinator-epoch"); + return; + } + + // If a transaction was started by a new producerId and became hanging + // before the initial commit/abort, then the coordinator epoch will be -1 + // as seen in the `DescribeProducers` output. In this case, we conservatively + // use a coordinator epoch of 0, which is less than or equal to any possible + // leader epoch. + if (coordinatorEpoch < 0) { + coordinatorEpoch = 0; + } + + abortSpec = new AbortTransactionSpec( + topicPartition, + producerId, + producerEpoch, + coordinatorEpoch + ); + } else { + abortSpec = buildAbortSpec(admin, topicPartition, startOffset); + } + + abortTransaction(admin, abortSpec); + } + } + + static class DescribeProducersCommand extends TransactionsCommand { + static final String[] HEADERS = new String[]{ + "ProducerId", + "ProducerEpoch", + "LatestCoordinatorEpoch", + "LastSequence", + "LastTimestamp", + "CurrentTransactionStartOffset" + }; + + DescribeProducersCommand(Time time) { + super(time); + } + + @Override + public String name() { + return "describe-producers"; + } + + @Override + public void addSubparser(Subparsers subparsers) { + Subparser subparser = subparsers.addParser(name()) + .help("describe the states of active producers for a topic partition"); + + subparser.addArgument("--broker-id") + .help("optional broker id to describe the producer state on a specific replica") + .action(store()) + .type(Integer.class) + .required(false); + + subparser.addArgument("--topic") + .help("topic name") + .action(store()) + .type(String.class) + .required(true); + + subparser.addArgument("--partition") + .help("partition number") + .action(store()) + .type(Integer.class) + .required(true); + } + + @Override + public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception { + DescribeProducersOptions options = new DescribeProducersOptions(); + Optional.ofNullable(ns.getInt("broker_id")).ifPresent(options::brokerId); + + String topicName = ns.getString("topic"); + Integer partitionId = ns.getInt("partition"); + TopicPartition topicPartition = new TopicPartition(topicName, partitionId); + + final DescribeProducersResult.PartitionProducerState result; + + try { + result = admin.describeProducers(singleton(topicPartition), options) + .partitionResult(topicPartition) + .get(); + } catch (ExecutionException e) { + String brokerClause = options.brokerId().isPresent() ? + "broker " + options.brokerId().getAsInt() : + "leader"; + printErrorAndExit("Failed to describe producers for partition " + + topicPartition + " on " + brokerClause, e.getCause()); + return; + } + + List rows = result.activeProducers().stream().map(producerState -> { + String currentTransactionStartOffsetColumnValue = + producerState.currentTransactionStartOffset().isPresent() ? + String.valueOf(producerState.currentTransactionStartOffset().getAsLong()) : + "None"; + + return new String[] { + String.valueOf(producerState.producerId()), + String.valueOf(producerState.producerEpoch()), + String.valueOf(producerState.coordinatorEpoch().orElse(-1)), + String.valueOf(producerState.lastSequence()), + String.valueOf(producerState.lastTimestamp()), + currentTransactionStartOffsetColumnValue + }; + }).collect(Collectors.toList()); + + prettyPrintTable(HEADERS, rows, out); + } + } + + static class DescribeTransactionsCommand extends TransactionsCommand { + static final String[] HEADERS = new String[]{ + "CoordinatorId", + "TransactionalId", + "ProducerId", + "ProducerEpoch", + "TransactionState", + "TransactionTimeoutMs", + "CurrentTransactionStartTimeMs", + "TransactionDurationMs", + "TopicPartitions" + }; + + DescribeTransactionsCommand(Time time) { + super(time); + } + + @Override + public String name() { + return "describe"; + } + + @Override + public void addSubparser(Subparsers subparsers) { + Subparser subparser = subparsers.addParser(name()) + .description("Describe the state of an active transactional-id.") + .help("describe the state of an active transactional-id"); + + subparser.addArgument("--transactional-id") + .help("transactional id") + .action(store()) + .type(String.class) + .required(true); + } + + @Override + public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception { + String transactionalId = ns.getString("transactional_id"); + + final TransactionDescription result; + try { + result = admin.describeTransactions(singleton(transactionalId)) + .description(transactionalId) + .get(); + } catch (ExecutionException e) { + printErrorAndExit("Failed to describe transaction state of " + + "transactional-id `" + transactionalId + "`", e.getCause()); + return; + } + + final String transactionDurationMsColumnValue; + final String transactionStartTimeMsColumnValue; + + if (result.transactionStartTimeMs().isPresent()) { + long transactionStartTimeMs = result.transactionStartTimeMs().getAsLong(); + transactionStartTimeMsColumnValue = String.valueOf(transactionStartTimeMs); + transactionDurationMsColumnValue = String.valueOf(time.milliseconds() - transactionStartTimeMs); + } else { + transactionStartTimeMsColumnValue = "None"; + transactionDurationMsColumnValue = "None"; + } + + String[] row = new String[]{ + String.valueOf(result.coordinatorId()), + transactionalId, + String.valueOf(result.producerId()), + String.valueOf(result.producerEpoch()), + result.state().toString(), + String.valueOf(result.transactionTimeoutMs()), + transactionStartTimeMsColumnValue, + transactionDurationMsColumnValue, + Utils.join(result.topicPartitions(), ",") + }; + + prettyPrintTable(HEADERS, singletonList(row), out); + } + } + + static class ListTransactionsCommand extends TransactionsCommand { + static final String[] HEADERS = new String[] { + "TransactionalId", + "Coordinator", + "ProducerId", + "TransactionState" + }; + + ListTransactionsCommand(Time time) { + super(time); + } + + @Override + public String name() { + return "list"; + } + + @Override + public void addSubparser(Subparsers subparsers) { + subparsers.addParser(name()) + .help("list transactions"); + } + + @Override + public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception { + final Map> result; + + try { + result = admin.listTransactions() + .allByBrokerId() + .get(); + } catch (ExecutionException e) { + printErrorAndExit("Failed to list transactions", e.getCause()); + return; + } + + List rows = new ArrayList<>(); + for (Map.Entry> brokerListingsEntry : result.entrySet()) { + String coordinatorIdString = brokerListingsEntry.getKey().toString(); + Collection listings = brokerListingsEntry.getValue(); + + for (TransactionListing listing : listings) { + rows.add(new String[] { + listing.transactionalId(), + coordinatorIdString, + String.valueOf(listing.producerId()), + listing.state().toString() + }); + } + } + + prettyPrintTable(HEADERS, rows, out); + } + } + + private static void appendColumnValue( + StringBuilder rowBuilder, + String value, + int length + ) { + int padLength = length - value.length(); + rowBuilder.append(value); + for (int i = 0; i < padLength; i++) + rowBuilder.append(' '); + } + + private static void printRow( + List columnLengths, + String[] row, + PrintStream out + ) { + StringBuilder rowBuilder = new StringBuilder(); + for (int i = 0; i < row.length; i++) { + Integer columnLength = columnLengths.get(i); + String columnValue = row[i]; + appendColumnValue(rowBuilder, columnValue, columnLength); + rowBuilder.append('\t'); + } + out.println(rowBuilder); + } + + private static void prettyPrintTable( + String[] headers, + List rows, + PrintStream out + ) { + List columnLengths = Arrays.stream(headers) + .map(String::length) + .collect(Collectors.toList()); + + for (String[] row : rows) { + for (int i = 0; i < headers.length; i++) { + columnLengths.set(i, Math.max(columnLengths.get(i), row[i].length())); + } + } + + printRow(columnLengths, headers, out); + rows.forEach(row -> printRow(columnLengths, row, out)); + } + + private static void printErrorAndExit(String message, Throwable t) { + log.debug(message, t); + + String exitMessage = message + ": " + t.getMessage() + "." + + " Enable debug logging for additional detail."; + + printErrorAndExit(exitMessage); + } + + private static void printErrorAndExit(String message) { + System.err.println(message); + Exit.exit(1, message); + } + + private static Admin buildAdminClient(Namespace ns) { + final Properties properties; + + String configFile = ns.getString("command_config"); + if (configFile == null) { + properties = new Properties(); + } else { + try { + properties = Utils.loadProps(configFile); + } catch (IOException e) { + printErrorAndExit("Failed to load admin client properties", e); + return null; + } + } + + String bootstrapServers = ns.getString("bootstrap_server"); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + return Admin.create(properties); + } + + static ArgumentParser buildBaseParser() { + ArgumentParser parser = ArgumentParsers + .newArgumentParser("kafka-transactions.sh"); + + parser.description("This tool is used to analyze the transactional state of producers in the cluster. " + + "It can be used to detect and recover from hanging transactions."); + + parser.addArgument("-v", "--version") + .action(new PrintVersionAndExitAction()) + .help("show the version of this Kafka distribution and exit"); + + parser.addArgument("--command-config") + .help("property file containing configs to be passed to admin client") + .action(store()) + .type(String.class) + .metavar("FILE") + .required(false); + + parser.addArgument("--bootstrap-server") + .help("hostname and port for the broker to connect to, in the form `host:port` " + + "(multiple comma-separated entries can be given)") + .action(store()) + .type(String.class) + .metavar("host:port") + .required(true); + + return parser; + } + + static void execute( + String[] args, + Function adminSupplier, + PrintStream out, + Time time + ) throws Exception { + List commands = Arrays.asList( + new ListTransactionsCommand(time), + new DescribeTransactionsCommand(time), + new DescribeProducersCommand(time), + new AbortTransactionCommand(time) + ); + + ArgumentParser parser = buildBaseParser(); + Subparsers subparsers = parser.addSubparsers() + .dest("command") + .title("commands") + .metavar("COMMAND"); + commands.forEach(command -> command.addSubparser(subparsers)); + + final Namespace ns; + + try { + ns = parser.parseArgs(args); + } catch (ArgumentParserException e) { + parser.handleError(e); + Exit.exit(1); + return; + } + + Admin admin = adminSupplier.apply(ns); + String commandName = ns.getString("command"); + + Optional commandOpt = commands.stream() + .filter(cmd -> cmd.name().equals(commandName)) + .findFirst(); + + if (!commandOpt.isPresent()) { + printErrorAndExit("Unexpected command " + commandName); + } + + TransactionsCommand command = commandOpt.get(); + command.execute(admin, ns, out); + Exit.exit(0); + } + + public static void main(String[] args) throws Exception { + execute(args, TransactionsCommand::buildAdminClient, System.out, Time.SYSTEM); + } + +} diff --git a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java new file mode 100644 index 00000000000..b5d7b9376ba --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.clients.admin.AbortTransactionResult; +import org.apache.kafka.clients.admin.AbortTransactionSpec; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DescribeProducersOptions; +import org.apache.kafka.clients.admin.DescribeProducersResult; +import org.apache.kafka.clients.admin.DescribeProducersResult.PartitionProducerState; +import org.apache.kafka.clients.admin.DescribeTransactionsResult; +import org.apache.kafka.clients.admin.ListTransactionsResult; +import org.apache.kafka.clients.admin.ProducerState; +import org.apache.kafka.clients.admin.TransactionDescription; +import org.apache.kafka.clients.admin.TransactionListing; +import org.apache.kafka.clients.admin.TransactionState; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Utils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mockito; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.Set; + +import static java.util.Arrays.asList; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TransactionsCommandTest { + + private final MockExitProcedure exitProcedure = new MockExitProcedure(); + private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + private final PrintStream out = new PrintStream(outputStream); + private final MockTime time = new MockTime(); + private final Admin admin = Mockito.mock(Admin.class); + + @BeforeEach + public void setupExitProcedure() { + Exit.setExitProcedure(exitProcedure); + } + + @AfterEach + public void resetExitProcedure() { + Exit.resetExitProcedure(); + } + + @Test + public void testDescribeProducersTopicRequired() throws Exception { + assertCommandFailure(new String[]{ + "--bootstrap-server", + "localhost:9092", + "describe-producers", + "--partition", + "0" + }); + } + + @Test + public void testDescribeProducersPartitionRequired() throws Exception { + assertCommandFailure(new String[]{ + "--bootstrap-server", + "localhost:9092", + "describe-producers", + "--topic", + "foo" + }); + } + + @Test + public void testDescribeProducersLeader() throws Exception { + TopicPartition topicPartition = new TopicPartition("foo", 5); + String[] args = new String[] { + "--bootstrap-server", + "localhost:9092", + "describe-producers", + "--topic", + topicPartition.topic(), + "--partition", + String.valueOf(topicPartition.partition()) + }; + + testDescribeProducers(topicPartition, args, new DescribeProducersOptions()); + } + + @Test + public void testDescribeProducersSpecificReplica() throws Exception { + TopicPartition topicPartition = new TopicPartition("foo", 5); + int brokerId = 5; + + String[] args = new String[] { + "--bootstrap-server", + "localhost:9092", + "describe-producers", + "--topic", + topicPartition.topic(), + "--partition", + String.valueOf(topicPartition.partition()), + "--broker-id", + String.valueOf(brokerId) + }; + + testDescribeProducers(topicPartition, args, new DescribeProducersOptions().brokerId(brokerId)); + } + + private void testDescribeProducers( + TopicPartition topicPartition, + String[] args, + DescribeProducersOptions expectedOptions + ) throws Exception { + DescribeProducersResult describeResult = Mockito.mock(DescribeProducersResult.class); + KafkaFuture describeFuture = KafkaFutureImpl.completedFuture( + new PartitionProducerState(asList( + new ProducerState(12345L, 15, 1300, 1599509565L, + OptionalInt.of(20), OptionalLong.of(990)), + new ProducerState(98765L, 30, 2300, 1599509599L, + OptionalInt.empty(), OptionalLong.empty()) + ))); + + + Mockito.when(describeResult.partitionResult(topicPartition)).thenReturn(describeFuture); + Mockito.when(admin.describeProducers(singleton(topicPartition), expectedOptions)).thenReturn(describeResult); + + execute(args); + assertNormalExit(); + + List> table = readOutputAsTable(); + assertEquals(3, table.size()); + + List expectedHeaders = asList(TransactionsCommand.DescribeProducersCommand.HEADERS); + assertEquals(expectedHeaders, table.get(0)); + + Set> expectedRows = Utils.mkSet( + asList("12345", "15", "20", "1300", "1599509565", "990"), + asList("98765", "30", "-1", "2300", "1599509599", "None") + ); + assertEquals(expectedRows, new HashSet<>(table.subList(1, table.size()))); + } + + @Test + public void testListTransactions() throws Exception { + String[] args = new String[] { + "--bootstrap-server", + "localhost:9092", + "list" + }; + + ListTransactionsResult listResult = Mockito.mock(ListTransactionsResult.class); + + Map> transactions = new HashMap<>(); + transactions.put(0, asList( + new TransactionListing("foo", 12345L, TransactionState.ONGOING), + new TransactionListing("bar", 98765L, TransactionState.PREPARE_ABORT) + )); + transactions.put(1, singletonList( + new TransactionListing("baz", 13579L, TransactionState.COMPLETE_COMMIT) + )); + + KafkaFuture>> listTransactionsFuture = + KafkaFutureImpl.completedFuture(transactions); + + Mockito.when(admin.listTransactions()).thenReturn(listResult); + Mockito.when(listResult.allByBrokerId()).thenReturn(listTransactionsFuture); + + execute(args); + assertNormalExit(); + + List> table = readOutputAsTable(); + assertEquals(4, table.size()); + + // Assert expected headers + List expectedHeaders = asList(TransactionsCommand.ListTransactionsCommand.HEADERS); + assertEquals(expectedHeaders, table.get(0)); + + Set> expectedRows = Utils.mkSet( + asList("foo", "0", "12345", "Ongoing"), + asList("bar", "0", "98765", "PrepareAbort"), + asList("baz", "1", "13579", "CompleteCommit") + ); + assertEquals(expectedRows, new HashSet<>(table.subList(1, table.size()))); + } + + @Test + public void testDescribeTransactionsTransactionalIdRequired() throws Exception { + assertCommandFailure(new String[]{ + "--bootstrap-server", + "localhost:9092", + "describe" + }); + } + + @Test + public void testDescribeTransaction() throws Exception { + String transactionalId = "foo"; + String[] args = new String[] { + "--bootstrap-server", + "localhost:9092", + "describe", + "--transactional-id", + transactionalId + }; + + DescribeTransactionsResult describeResult = Mockito.mock(DescribeTransactionsResult.class); + + int coordinatorId = 5; + long transactionStartTime = time.milliseconds(); + + KafkaFuture describeFuture = KafkaFutureImpl.completedFuture( + new TransactionDescription( + coordinatorId, + TransactionState.ONGOING, + 12345L, + 15, + 10000, + OptionalLong.of(transactionStartTime), + singleton(new TopicPartition("bar", 0)) + )); + + Mockito.when(describeResult.description(transactionalId)).thenReturn(describeFuture); + Mockito.when(admin.describeTransactions(singleton(transactionalId))).thenReturn(describeResult); + + // Add a little time so that we can see a positive transaction duration in the output + time.sleep(5000); + + execute(args); + assertNormalExit(); + + List> table = readOutputAsTable(); + assertEquals(2, table.size()); + + List expectedHeaders = asList(TransactionsCommand.DescribeTransactionsCommand.HEADERS); + assertEquals(expectedHeaders, table.get(0)); + + List expectedRow = asList( + String.valueOf(coordinatorId), + transactionalId, + "12345", + "15", + "Ongoing", + "10000", + String.valueOf(transactionStartTime), + "5000", + "bar-0" + ); + assertEquals(expectedRow, table.get(1)); + } + + @Test + public void testDescribeTransactionsStartOffsetOrProducerIdRequired() throws Exception { + assertCommandFailure(new String[]{ + "--bootstrap-server", + "localhost:9092", + "abort", + "--topic", + "foo", + "--partition", + "0" + }); + } + + @Test + public void testDescribeTransactionsTopicRequired() throws Exception { + assertCommandFailure(new String[]{ + "--bootstrap-server", + "localhost:9092", + "abort", + "--partition", + "0", + "--start-offset", + "9990" + }); + } + + @Test + public void testDescribeTransactionsPartitionRequired() throws Exception { + assertCommandFailure(new String[]{ + "--bootstrap-server", + "localhost:9092", + "abort", + "--topic", + "foo", + "--start-offset", + "9990" + }); + } + + @Test + public void testDescribeTransactionsProducerEpochRequiredWithProducerId() throws Exception { + assertCommandFailure(new String[]{ + "--bootstrap-server", + "localhost:9092", + "abort", + "--topic", + "foo", + "--partition", + "0", + "--producer-id", + "12345" + }); + } + + @Test + public void testDescribeTransactionsCoordinatorEpochRequiredWithProducerId() throws Exception { + assertCommandFailure(new String[]{ + "--bootstrap-server", + "localhost:9092", + "abort", + "--topic", + "foo", + "--partition", + "0", + "--producer-id", + "12345", + "--producer-epoch", + "15" + }); + } + + @Test + public void testNewBrokerAbortTransaction() throws Exception { + TopicPartition topicPartition = new TopicPartition("foo", 5); + long startOffset = 9173; + long producerId = 12345L; + short producerEpoch = 15; + int coordinatorEpoch = 76; + + String[] args = new String[] { + "--bootstrap-server", + "localhost:9092", + "abort", + "--topic", + topicPartition.topic(), + "--partition", + String.valueOf(topicPartition.partition()), + "--start-offset", + String.valueOf(startOffset) + }; + + DescribeProducersResult describeResult = Mockito.mock(DescribeProducersResult.class); + KafkaFuture describeFuture = KafkaFutureImpl.completedFuture( + new PartitionProducerState(singletonList( + new ProducerState(producerId, producerEpoch, 1300, 1599509565L, + OptionalInt.of(coordinatorEpoch), OptionalLong.of(startOffset)) + ))); + + AbortTransactionResult abortTransactionResult = Mockito.mock(AbortTransactionResult.class); + KafkaFuture abortFuture = KafkaFutureImpl.completedFuture(null); + AbortTransactionSpec expectedAbortSpec = new AbortTransactionSpec( + topicPartition, producerId, producerEpoch, coordinatorEpoch); + + Mockito.when(describeResult.partitionResult(topicPartition)).thenReturn(describeFuture); + Mockito.when(admin.describeProducers(singleton(topicPartition))).thenReturn(describeResult); + + Mockito.when(abortTransactionResult.all()).thenReturn(abortFuture); + Mockito.when(admin.abortTransaction(expectedAbortSpec)).thenReturn(abortTransactionResult); + + execute(args); + assertNormalExit(); + } + + @ParameterizedTest + @ValueSource(ints = {29, -1}) + public void testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordinatorEpoch) throws Exception { + TopicPartition topicPartition = new TopicPartition("foo", 5); + long producerId = 12345L; + short producerEpoch = 15; + + String[] args = new String[] { + "--bootstrap-server", + "localhost:9092", + "abort", + "--topic", + topicPartition.topic(), + "--partition", + String.valueOf(topicPartition.partition()), + "--producer-id", + String.valueOf(producerId), + "--producer-epoch", + String.valueOf(producerEpoch), + "--coordinator-epoch", + String.valueOf(coordinatorEpoch) + }; + + AbortTransactionResult abortTransactionResult = Mockito.mock(AbortTransactionResult.class); + KafkaFuture abortFuture = KafkaFutureImpl.completedFuture(null); + + final int expectedCoordinatorEpoch; + if (coordinatorEpoch < 0) { + expectedCoordinatorEpoch = 0; + } else { + expectedCoordinatorEpoch = coordinatorEpoch; + } + + AbortTransactionSpec expectedAbortSpec = new AbortTransactionSpec( + topicPartition, producerId, producerEpoch, expectedCoordinatorEpoch); + + Mockito.when(abortTransactionResult.all()).thenReturn(abortFuture); + Mockito.when(admin.abortTransaction(expectedAbortSpec)).thenReturn(abortTransactionResult); + + execute(args); + assertNormalExit(); + } + + private void execute(String[] args) throws Exception { + TransactionsCommand.execute(args, ns -> admin, out, time); + } + + private List> readOutputAsTable() throws IOException { + List> table = new ArrayList<>(); + ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + + while (true) { + List row = readRow(reader); + if (row == null) { + break; + } + table.add(row); + } + return table; + } + + private List readRow(BufferedReader reader) throws IOException { + String line = reader.readLine(); + if (line == null) { + return null; + } else { + return asList(line.split("\\s+")); + } + } + + private void assertNormalExit() { + assertTrue(exitProcedure.hasExited); + assertEquals(0, exitProcedure.statusCode); + } + + private void assertCommandFailure(String[] args) throws Exception { + execute(args); + assertTrue(exitProcedure.hasExited); + assertEquals(1, exitProcedure.statusCode); + } + + private static class MockExitProcedure implements Exit.Procedure { + private boolean hasExited = false; + private int statusCode; + + @Override + public void execute(int statusCode, String message) { + if (!this.hasExited) { + this.hasExited = true; + this.statusCode = statusCode; + } + } + } + +}