diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java b/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java new file mode 100644 index 00000000000..7dbb2703947 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry; + +import org.apache.kafka.common.utils.Utils; + +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; + +/** + * State that helps determine where client exists in the telemetry state i.e. subscribe->wait->push loop. + */ +public enum ClientTelemetryState { + + /** + * Client needs subscription from the broker. + */ + SUBSCRIPTION_NEEDED, + + /** + * Network I/O is in progress to retrieve subscription. + */ + SUBSCRIPTION_IN_PROGRESS, + + /** + * Awaiting telemetry interval for pushing metrics to broker. + */ + PUSH_NEEDED, + + /** + * Network I/O in progress for pushing metrics payload. + */ + PUSH_IN_PROGRESS, + + /** + * Need to push the terminal metrics payload. + */ + TERMINATING_PUSH_NEEDED, + + /** + * Network I/O in progress for pushing terminal metrics payload. + */ + TERMINATING_PUSH_IN_PROGRESS, + + /** + * No more work should be performed, telemetry client terminated. + */ + TERMINATED; + + private final static Map> VALID_NEXT_STATES = new EnumMap<>(ClientTelemetryState.class); + + static { + /* + If clients needs a subscription, then issue telemetry API to fetch subscription from broker. + + However, it's still possible that client doesn't get very far before terminating. + */ + VALID_NEXT_STATES.put( + SUBSCRIPTION_NEEDED, Arrays.asList(SUBSCRIPTION_IN_PROGRESS, TERMINATED)); + + /* + If client is finished waiting for subscription, then client is ready to push the telemetry. + But, it's possible that no telemetry metrics are requested, hence client should go back to + subscription needed state i.e. requesting the next updated subscription. + + However, it's still possible that client doesn't get very far before terminating. + */ + VALID_NEXT_STATES.put(SUBSCRIPTION_IN_PROGRESS, Arrays.asList(PUSH_NEEDED, + SUBSCRIPTION_NEEDED, TERMINATING_PUSH_NEEDED, TERMINATED)); + + /* + If client transitions out of this state, then client should proceed to push the metrics. + But, if the push fails (network issues, the subscription changed, etc.) then client should + go back to subscription needed state and request the next subscription. + + However, it's still possible that client doesn't get very far before terminating. + */ + VALID_NEXT_STATES.put(PUSH_NEEDED, Arrays.asList(PUSH_IN_PROGRESS, SUBSCRIPTION_NEEDED, + TERMINATING_PUSH_NEEDED, TERMINATED)); + + /* + A successful push should transition client to push needed which sends the next telemetry + metrics after the elapsed wait interval. But, if the push fails (network issues, the + subscription changed, etc.) then client should go back to subscription needed state and + request the next subscription. + + However, it's still possible that client doesn't get very far before terminating. + */ + VALID_NEXT_STATES.put( + PUSH_IN_PROGRESS, Arrays.asList(PUSH_NEEDED, SUBSCRIPTION_NEEDED, TERMINATING_PUSH_NEEDED, + TERMINATED)); + + /* + If client is moving out of this state, then try to send last metrics push. + + However, it's still possible that client doesn't get very far before terminating. + */ + VALID_NEXT_STATES.put( + TERMINATING_PUSH_NEEDED, Arrays.asList(TERMINATING_PUSH_IN_PROGRESS, TERMINATED)); + + /* + Client should only be transited to terminated state. + */ + VALID_NEXT_STATES.put(TERMINATING_PUSH_IN_PROGRESS, Collections.singletonList(TERMINATED)); + + /* + Client should never be able to transition out of terminated state. + */ + VALID_NEXT_STATES.put(TERMINATED, Collections.emptyList()); + } + + /** + * Validates that the newState is one of the valid transition from the current + * {@code TelemetryState}. + * + * @param newState State into which the telemetry client requesting to transition; must be + * non-null + * @return {@code TelemetryState} newState if validation succeeds. Returning + * newState helps state assignment chaining. + * @throws IllegalStateException if the state transition validation fails. + */ + + public ClientTelemetryState validateTransition(ClientTelemetryState newState) { + List allowableStates = VALID_NEXT_STATES.get(this); + + if (allowableStates != null && allowableStates.contains(newState)) { + return newState; + } + + // State transition validation failed, construct error message and throw exception. + String validStatesClause; + if (allowableStates != null && !allowableStates.isEmpty()) { + validStatesClause = String.format("the valid telemetry state transitions from %s are: %s", + this, + Utils.join(allowableStates, ", ")); + } else { + validStatesClause = String.format("there are no valid telemetry state transitions from %s", this); + } + + String message = String.format("Invalid telemetry state transition from %s to %s; %s", + this, + newState, + validStatesClause); + + throw new IllegalStateException(message); + } + +} diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/ClientTelemetryStateTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/ClientTelemetryStateTest.java new file mode 100644 index 00000000000..9e6831032b4 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/telemetry/ClientTelemetryStateTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ClientTelemetryStateTest { + + @Test + public void testValidateTransitionForSubscriptionNeeded() { + List validStates = new ArrayList<>(); + validStates.add(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS); + validStates.add(ClientTelemetryState.TERMINATED); + + testValidateTransition(ClientTelemetryState.SUBSCRIPTION_NEEDED, validStates); + } + + @Test + public void testValidateTransitionForSubscriptionInProgress() { + List validStates = new ArrayList<>(); + validStates.add(ClientTelemetryState.PUSH_NEEDED); + validStates.add(ClientTelemetryState.SUBSCRIPTION_NEEDED); + validStates.add(ClientTelemetryState.TERMINATING_PUSH_NEEDED); + validStates.add(ClientTelemetryState.TERMINATED); + + testValidateTransition(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS, validStates); + } + + @Test + public void testValidateTransitionForPushNeeded() { + List validStates = new ArrayList<>(); + validStates.add(ClientTelemetryState.PUSH_IN_PROGRESS); + validStates.add(ClientTelemetryState.SUBSCRIPTION_NEEDED); + validStates.add(ClientTelemetryState.TERMINATING_PUSH_NEEDED); + validStates.add(ClientTelemetryState.TERMINATED); + + testValidateTransition(ClientTelemetryState.PUSH_NEEDED, validStates); + } + + @Test + public void testValidateTransitionForPushInProgress() { + List validStates = new ArrayList<>(); + validStates.add(ClientTelemetryState.PUSH_NEEDED); + validStates.add(ClientTelemetryState.SUBSCRIPTION_NEEDED); + validStates.add(ClientTelemetryState.TERMINATING_PUSH_NEEDED); + validStates.add(ClientTelemetryState.TERMINATED); + + testValidateTransition(ClientTelemetryState.PUSH_IN_PROGRESS, validStates); + } + + @Test + public void testValidateTransitionForTerminating() { + List validStates = new ArrayList<>(); + validStates.add(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS); + validStates.add(ClientTelemetryState.TERMINATED); + + testValidateTransition(ClientTelemetryState.TERMINATING_PUSH_NEEDED, validStates); + } + + @Test + public void testValidateTransitionForTerminatingPushInProgress() { + testValidateTransition(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS, + Collections.singletonList(ClientTelemetryState.TERMINATED)); + } + + @Test + public void testValidateTransitionForTerminated() { + // There's no transitioning out of the terminated state + testValidateTransition(ClientTelemetryState.TERMINATED, Collections.emptyList()); + } + + private void testValidateTransition(ClientTelemetryState oldState, List validStates) { + for (ClientTelemetryState newState : validStates) { + oldState.validateTransition(newState); + } + + // Copy value to a new list for modification. + List invalidStates = new ArrayList<>(Arrays.asList(ClientTelemetryState.values())); + // Remove the valid states from the list of all states, leaving only the invalid. + invalidStates.removeAll(validStates); + + for (ClientTelemetryState newState : invalidStates) { + Executable e = () -> oldState.validateTransition(newState); + String unexpectedSuccessMessage = "Should have thrown an IllegalTelemetryStateException for transitioning from " + oldState + " to " + newState; + assertThrows(IllegalStateException.class, e, unexpectedSuccessMessage); + } + } + +}