Browse Source

KAFKA-9053: AssignmentInfo#encode hardcodes the LATEST_SUPPORTED_VERSION (#7537)

Also put in some additional logging that makes sense to add, and proved helpful in debugging this particular issue.

Unit tests verifying the encoded supported version were added.

This should get cherry-picked back to 2.1

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
pull/7543/head
A. Sophie Blee-Goldman 5 years ago committed by Guozhang Wang
parent
commit
78f5da914e
  1. 22
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
  2. 22
      streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
  3. 4
      streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
  4. 9
      streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
  5. 17
      streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java

22
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java

@ -383,10 +383,15 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -383,10 +383,15 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
}
if (minReceivedMetadataVersion < LATEST_SUPPORTED_VERSION) {
log.info("Downgrading metadata to version {}. Latest supported version is {}.",
log.info("Downgrade metadata to version {}. Latest supported version is {}.",
minReceivedMetadataVersion,
LATEST_SUPPORTED_VERSION);
}
if (minSupportedMetadataVersion < LATEST_SUPPORTED_VERSION) {
log.info("Downgrade latest supported metadata to version {}. Latest supported version is {}.",
minSupportedMetadataVersion,
LATEST_SUPPORTED_VERSION);
}
log.debug("Constructed client metadata {} from the member subscriptions.", clientMetadataMap);
@ -1055,9 +1060,10 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -1055,9 +1060,10 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
log.info(
"Sent a version {} subscription and got version {} assignment back (successful version probing). "
+
"Downgrade subscription metadata to commonly supported version and trigger new rebalance.",
"Downgrade subscription metadata to commonly supported version {} and trigger new rebalance.",
usedSubscriptionMetadataVersion,
receivedAssignmentMetadataVersion
receivedAssignmentMetadataVersion,
latestCommonlySupportedVersion
);
usedSubscriptionMetadataVersion = latestCommonlySupportedVersion;
return true;
@ -1237,7 +1243,15 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -1237,7 +1243,15 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
}
private int updateMinSupportedVersion(final int supportedVersion, final int minSupportedMetadataVersion) {
return supportedVersion < minSupportedMetadataVersion ? supportedVersion : minSupportedMetadataVersion;
if (supportedVersion < minSupportedMetadataVersion) {
log.debug("Downgrade the current minimum supported version {} to the smaller seen supported version {}",
minSupportedMetadataVersion, supportedVersion);
return supportedVersion;
} else {
log.debug("Current minimum supported version remains at {}, last seen supported version was {}",
minSupportedMetadataVersion, supportedVersion);
return minSupportedMetadataVersion;
}
}
protected void setAssignmentErrorCode(final Integer errorCode) {

22
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java

@ -145,7 +145,7 @@ public class AssignmentInfo { @@ -145,7 +145,7 @@ public class AssignmentInfo {
break;
default:
throw new IllegalStateException("Unknown metadata version: " + usedVersion
+ "; latest supported version: " + LATEST_SUPPORTED_VERSION);
+ "; latest commonly supported version: " + commonlySupportedVersion);
}
out.flush();
@ -245,14 +245,14 @@ public class AssignmentInfo { @@ -245,14 +245,14 @@ public class AssignmentInfo {
private void encodeVersionThree(final DataOutputStream out) throws IOException {
out.writeInt(3);
out.writeInt(LATEST_SUPPORTED_VERSION);
out.writeInt(commonlySupportedVersion);
encodeActiveAndStandbyTaskAssignment(out);
encodePartitionsByHost(out);
}
private void encodeVersionFour(final DataOutputStream out) throws IOException {
out.writeInt(4);
out.writeInt(LATEST_SUPPORTED_VERSION);
out.writeInt(commonlySupportedVersion);
encodeActiveAndStandbyTaskAssignment(out);
encodePartitionsByHost(out);
out.writeInt(errCode);
@ -260,7 +260,7 @@ public class AssignmentInfo { @@ -260,7 +260,7 @@ public class AssignmentInfo {
private void encodeVersionFive(final DataOutputStream out) throws IOException {
out.writeInt(5);
out.writeInt(LATEST_SUPPORTED_VERSION);
out.writeInt(commonlySupportedVersion);
encodeActiveAndStandbyTaskAssignment(out);
encodePartitionsByHostAsDictionary(out);
out.writeInt(errCode);
@ -277,7 +277,7 @@ public class AssignmentInfo { @@ -277,7 +277,7 @@ public class AssignmentInfo {
final AssignmentInfo assignmentInfo;
final int usedVersion = in.readInt();
final int latestSupportedVersion;
final int commonlySupportedVersion;
switch (usedVersion) {
case 1:
assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN);
@ -288,18 +288,18 @@ public class AssignmentInfo { @@ -288,18 +288,18 @@ public class AssignmentInfo {
decodeVersionTwoData(assignmentInfo, in);
break;
case 3:
latestSupportedVersion = in.readInt();
assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion);
commonlySupportedVersion = in.readInt();
assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
decodeVersionThreeData(assignmentInfo, in);
break;
case 4:
latestSupportedVersion = in.readInt();
assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion);
commonlySupportedVersion = in.readInt();
assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
decodeVersionFourData(assignmentInfo, in);
break;
case 5:
latestSupportedVersion = in.readInt();
assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion);
commonlySupportedVersion = in.readInt();
assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
decodeVersionFiveData(assignmentInfo, in);
break;
default:

4
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java

@ -212,7 +212,7 @@ public class SubscriptionInfo { @@ -212,7 +212,7 @@ public class SubscriptionInfo {
final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeFourAndFiveByteLength(endPointBytes));
buf.putInt(usedVersion); // used version
buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
buf.putInt(latestSupportedVersion); // supported version
encodeClientUUID(buf);
encodeTasks(buf, prevTasks);
encodeTasks(buf, standbyTasks);
@ -272,7 +272,7 @@ public class SubscriptionInfo { @@ -272,7 +272,7 @@ public class SubscriptionInfo {
default:
latestSupportedVersion = data.getInt();
subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion);
log.info("Unable to decode subscription data: used version: {}; latest supported version: {}", usedVersion, LATEST_SUPPORTED_VERSION);
log.info("Unable to decode subscription data: used version: {}; latest supported version: {}", usedVersion, latestSupportedVersion);
}
return subscriptionInfo;

9
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java

@ -103,4 +103,13 @@ public class AssignmentInfoTest { @@ -103,4 +103,13 @@ public class AssignmentInfoTest {
final AssignmentInfo expectedInfo = new AssignmentInfo(5, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment, 2);
assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
}
@Test
public void shouldEncodeAndDecodeSmallerCommonlySupportedVersion() {
final int usedVersion = LATEST_SUPPORTED_VERSION - 1;
final int commonlySupportedVersion = LATEST_SUPPORTED_VERSION - 1;
final AssignmentInfo info = new AssignmentInfo(usedVersion, commonlySupportedVersion, activeTasks, standbyTasks, globalAssignment, 2);
final AssignmentInfo expectedInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion, activeTasks, standbyTasks, globalAssignment, 2);
assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
}
}

17
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java

@ -85,6 +85,13 @@ public class SubscriptionInfoTest { @@ -85,6 +85,13 @@ public class SubscriptionInfoTest {
assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
}
@Test
public void shouldEncodeAndDecodeVersion5() {
final SubscriptionInfo info = new SubscriptionInfo(5, processId, activeTasks, standbyTasks, "localhost:80");
final SubscriptionInfo expectedInfo = new SubscriptionInfo(5, LATEST_SUPPORTED_VERSION, processId, activeTasks, standbyTasks, "localhost:80");
assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
}
@Test
public void shouldAllowToDecodeFutureSupportedVersion() {
final SubscriptionInfo info = SubscriptionInfo.decode(encodeFutureVersion());
@ -92,6 +99,16 @@ public class SubscriptionInfoTest { @@ -92,6 +99,16 @@ public class SubscriptionInfoTest {
assertEquals(LATEST_SUPPORTED_VERSION + 1, info.latestSupportedVersion());
}
@Test
public void shouldEncodeAndDecodeSmallerLatestSupportedVersion() {
final int usedVersion = LATEST_SUPPORTED_VERSION - 1;
final int latestSupportedVersion = LATEST_SUPPORTED_VERSION - 1;
final SubscriptionInfo info = new SubscriptionInfo(usedVersion, latestSupportedVersion, processId, activeTasks, standbyTasks, "localhost:80");
final SubscriptionInfo expectedInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion, processId, activeTasks, standbyTasks, "localhost:80");
assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
}
private ByteBuffer encodeFutureVersion() {
final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */
+ 4 /* supported version */);

Loading…
Cancel
Save