Browse Source

MINOR: TopicIdPartition improvements (#11374)

1. It should not require a TopicPartition during construction and normal
usage.
2. Simplify `equals` since `topicId` and `topicPartition` are never
null.
3. Inline `Objects.hash` to avoid array allocation.
4. Make `toString` more concise using a similar approach as
`TopicPartition` since this `TopicIdPartition` will replace
`TopicPartition` in many places in the future.
5. Add unit tests for `TopicIdPartition`, it seems like we had none.
6. Minor clean-up in calling/called classes.

Reviewers: David Jacot <djacot@confluent.io>, Satish Duggana <satishd@apache.org>
pull/11384/head
Ismael Juma 3 years ago committed by GitHub
parent
commit
1a3e23a579
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 35
      clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java
  2. 3
      clients/src/main/java/org/apache/kafka/common/TopicPartition.java
  3. 63
      clients/src/test/java/org/apache/kafka/common/TopicIdPartitionTest.java
  4. 2
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTopicPartitioner.java
  5. 4
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
  6. 4
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
  7. 4
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemotePartitionDeleteMetadataTransform.java

35
clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java

@ -32,6 +32,13 @@ public class TopicIdPartition {
this.topicPartition = Objects.requireNonNull(topicPartition, "topicPartition can not be null"); this.topicPartition = Objects.requireNonNull(topicPartition, "topicPartition can not be null");
} }
public TopicIdPartition(String topic, Uuid topicId, int partition) {
this.topicId = Objects.requireNonNull(topicId, "topicId can not be null");
this.topicPartition = new TopicPartition(
Objects.requireNonNull(topic, "topic can not be null"),
partition);
}
/** /**
* @return Universally unique id representing this topic partition. * @return Universally unique id representing this topic partition.
*/ */
@ -39,6 +46,20 @@ public class TopicIdPartition {
return topicId; return topicId;
} }
/**
* @return the topic name.
*/
public String topic() {
return topicPartition.topic();
}
/**
* @return the partition id.
*/
public int partition() {
return topicPartition.partition();
}
/** /**
* @return Topic partition representing this instance. * @return Topic partition representing this instance.
*/ */
@ -55,20 +76,20 @@ public class TopicIdPartition {
return false; return false;
} }
TopicIdPartition that = (TopicIdPartition) o; TopicIdPartition that = (TopicIdPartition) o;
return Objects.equals(topicId, that.topicId) && return topicId.equals(that.topicId) &&
Objects.equals(topicPartition, that.topicPartition); topicPartition.equals(that.topicPartition);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(topicId, topicPartition); final int prime = 31;
int result = prime + topicId.hashCode();
result = prime * result + topicPartition.hashCode();
return result;
} }
@Override @Override
public String toString() { public String toString() {
return "TopicIdPartition{" + return topicId + ":" + topic() + "-" + partition();
"topicId=" + topicId +
", topicPartition=" + topicPartition +
'}';
} }
} }

3
clients/src/main/java/org/apache/kafka/common/TopicPartition.java

@ -47,8 +47,7 @@ public final class TopicPartition implements Serializable {
if (hash != 0) if (hash != 0)
return hash; return hash;
final int prime = 31; final int prime = 31;
int result = 1; int result = prime + partition;
result = prime * result + partition;
result = prime * result + Objects.hashCode(topic); result = prime * result + Objects.hashCode(topic);
this.hash = result; this.hash = result;
return result; return result;

63
clients/src/test/java/org/apache/kafka/common/TopicIdPartitionTest.java

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import java.util.Objects;
import org.junit.jupiter.api.Test;
class TopicIdPartitionTest {
private final Uuid topicId0 = new Uuid(-4883993789924556279L, -5960309683534398572L);
private final String topicName0 = "a_topic_name";
private final int partition1 = 1;
private final TopicPartition topicPartition0 = new TopicPartition(topicName0, partition1);
private final TopicIdPartition topicIdPartition0 = new TopicIdPartition(topicId0, topicPartition0);
private final TopicIdPartition topicIdPartition1 = new TopicIdPartition(topicName0, topicId0,
partition1);
private final Uuid topicId1 = new Uuid(7759286116672424028L, -5081215629859775948L);
private final String topicName1 = "another_topic_name";
private final TopicIdPartition topicIdPartition2 = new TopicIdPartition(topicName1, topicId1,
partition1);
@Test
public void testEquals() {
assertEquals(topicIdPartition0, topicIdPartition1);
assertEquals(topicIdPartition1, topicIdPartition0);
assertNotEquals(topicIdPartition0, topicIdPartition2);
assertNotEquals(topicIdPartition2, topicIdPartition0);
}
@Test
public void testHashCode() {
assertEquals(Objects.hash(topicIdPartition0.topicId(), topicIdPartition0.topicPartition()),
topicIdPartition0.hashCode());
assertEquals(topicIdPartition0.hashCode(), topicIdPartition1.hashCode());
assertNotEquals(topicIdPartition0.hashCode(), topicIdPartition2.hashCode());
}
@Test
public void testToString() {
assertEquals("vDiRhkpVQgmtSLnsAZx7lA:a_topic_name-1", topicIdPartition0.toString());
}
}

2
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTopicPartitioner.java

@ -43,7 +43,7 @@ public class RemoteLogMetadataTopicPartitioner {
// We do not want to depend upon hash code generation of Uuid as that may change. // We do not want to depend upon hash code generation of Uuid as that may change.
int hash = Objects.hash(topicIdPartition.topicId().getLeastSignificantBits(), int hash = Objects.hash(topicIdPartition.topicId().getLeastSignificantBits(),
topicIdPartition.topicId().getMostSignificantBits(), topicIdPartition.topicId().getMostSignificantBits(),
topicIdPartition.topicPartition().partition()); topicIdPartition.partition());
return toBytes(hash); return toBytes(hash);
} }

4
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java

@ -60,8 +60,8 @@ public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTrans
.setTopicIdPartition( .setTopicIdPartition(
new RemoteLogSegmentMetadataRecord.TopicIdPartitionEntry() new RemoteLogSegmentMetadataRecord.TopicIdPartitionEntry()
.setId(data.remoteLogSegmentId().topicIdPartition().topicId()) .setId(data.remoteLogSegmentId().topicIdPartition().topicId())
.setName(data.remoteLogSegmentId().topicIdPartition().topicPartition().topic()) .setName(data.remoteLogSegmentId().topicIdPartition().topic())
.setPartition(data.remoteLogSegmentId().topicIdPartition().topicPartition().partition())) .setPartition(data.remoteLogSegmentId().topicIdPartition().partition()))
.setId(data.remoteLogSegmentId().id()); .setId(data.remoteLogSegmentId().id());
} }

4
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java

@ -51,8 +51,8 @@ public class RemoteLogSegmentMetadataUpdateTransform implements RemoteLogMetadat
.setId(data.remoteLogSegmentId().id()) .setId(data.remoteLogSegmentId().id())
.setTopicIdPartition( .setTopicIdPartition(
new RemoteLogSegmentMetadataUpdateRecord.TopicIdPartitionEntry() new RemoteLogSegmentMetadataUpdateRecord.TopicIdPartitionEntry()
.setName(data.remoteLogSegmentId().topicIdPartition().topicPartition().topic()) .setName(data.remoteLogSegmentId().topicIdPartition().topic())
.setPartition(data.remoteLogSegmentId().topicIdPartition().topicPartition().partition()) .setPartition(data.remoteLogSegmentId().topicIdPartition().partition())
.setId(data.remoteLogSegmentId().topicIdPartition().topicId())); .setId(data.remoteLogSegmentId().topicIdPartition().topicId()));
} }

4
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemotePartitionDeleteMetadataTransform.java

@ -37,8 +37,8 @@ public final class RemotePartitionDeleteMetadataTransform implements RemoteLogMe
private RemotePartitionDeleteMetadataRecord.TopicIdPartitionEntry createTopicIdPartitionEntry(TopicIdPartition topicIdPartition) { private RemotePartitionDeleteMetadataRecord.TopicIdPartitionEntry createTopicIdPartitionEntry(TopicIdPartition topicIdPartition) {
return new RemotePartitionDeleteMetadataRecord.TopicIdPartitionEntry() return new RemotePartitionDeleteMetadataRecord.TopicIdPartitionEntry()
.setName(topicIdPartition.topicPartition().topic()) .setName(topicIdPartition.topic())
.setPartition(topicIdPartition.topicPartition().partition()) .setPartition(topicIdPartition.partition())
.setId(topicIdPartition.topicId()); .setId(topicIdPartition.topicId());
} }

Loading…
Cancel
Save