Browse Source

MINOR: JoinGroupRequest V0 invalid rebalance timeout

A JoinGroupRequest V0 built with the Builder had
a rebalance timeout  = -1 rather than equal to session timeout
as it would have been if coming from the wire and deserialized
from a V0 Struct

fix developed with mimaison

Author: Edoardo Comar <ecomar@uk.ibm.com>

Reviewers: Rajini Sivaram

Closes #2936 from edoardocomar/MINOR-JoinGroupRequestV0
pull/3010/merge
Edoardo Comar 8 years ago committed by Rajini Sivaram
parent
commit
27107ee34d
  1. 3
      clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
  2. 11
      clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

3
clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java

@ -90,7 +90,8 @@ public class JoinGroupRequest extends AbstractRequest { @@ -90,7 +90,8 @@ public class JoinGroupRequest extends AbstractRequest {
@Override
public JoinGroupRequest build(short version) {
if (version < 1) {
rebalanceTimeout = -1;
// v0 had no rebalance timeout but used session timeout implicitly
rebalanceTimeout = sessionTimeout;
}
return new JoinGroupRequest(version, groupId, sessionTimeout,
rebalanceTimeout, memberId, protocolType, groupProtocols);

11
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

@ -511,7 +511,15 @@ public class RequestResponseTest { @@ -511,7 +511,15 @@ public class RequestResponseTest {
deserialized = (FetchRequest) deserialize(request, struct, request.version());
assertEquals(request.isolationLevel(), deserialized.isolationLevel());
}
@Test
public void testJoinGroupRequestVersion0RebalanceTimeout() throws Exception {
final short version = 0;
JoinGroupRequest jgr = createJoinGroupRequest(version);
JoinGroupRequest jgr2 = new JoinGroupRequest(jgr.toStruct(), version);
assertEquals(jgr2.rebalanceTimeout(), jgr.rebalanceTimeout());
}
private RequestHeader createRequestHeader() {
return new RequestHeader((short) 10, (short) 1, "", 10);
}
@ -565,7 +573,6 @@ public class RequestResponseTest { @@ -565,7 +573,6 @@ public class RequestResponseTest {
return new HeartbeatResponse(Errors.NONE);
}
@SuppressWarnings("deprecation")
private JoinGroupRequest createJoinGroupRequest(int version) {
ByteBuffer metadata = ByteBuffer.wrap(new byte[] {});
List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>();

Loading…
Cancel
Save