Browse Source

KAFKA-1910; Refactor new consumer and fixed a bunch of corner cases / unit tests; reviewed by Onur Karaman and Jay Kreps

pull/51/head
Guozhang Wang 10 years ago
parent
commit
0b92cec1e0
  1. 2
      clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
  2. 11
      clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
  3. 14
      clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
  4. 28
      clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
  5. 903
      clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
  6. 595
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  7. 459
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  8. 16
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  9. 21
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  10. 26
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  11. 26
      clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
  12. 5
      clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
  13. 5
      clients/src/main/java/org/apache/kafka/common/network/Selector.java
  14. 29
      clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
  15. 12
      clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
  16. 11
      clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
  17. 6
      clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
  18. 2
      clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
  19. 7
      clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
  20. 7
      clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
  21. 14
      clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
  22. 6
      clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
  23. 9
      clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
  24. 7
      clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
  25. 59
      clients/src/test/java/org/apache/kafka/clients/MockClient.java
  26. 284
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  27. 177
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  28. 45
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  29. 3
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  30. 3
      clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
  31. 24
      clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  32. 2
      clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
  33. 6
      core/src/main/scala/kafka/common/ErrorMapping.scala
  34. 27
      core/src/main/scala/kafka/common/NoOffsetsCommittedException.scala
  35. 2
      core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
  36. 38
      core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
  37. 8
      core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala
  38. 2
      core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
  39. 7
      core/src/main/scala/kafka/coordinator/GroupRegistry.scala
  40. 2
      core/src/main/scala/kafka/server/KafkaServer.scala
  41. 301
      core/src/test/scala/integration/kafka/api/ConsumerTest.scala
  42. 12
      core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
  43. 2
      core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
  44. 2
      core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
  45. 2
      core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
  46. 22
      core/src/test/scala/unit/kafka/utils/TestUtils.scala

2
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java

@ -54,5 +54,5 @@ public class CommonClientConfigs { @@ -54,5 +54,5 @@ public class CommonClientConfigs {
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
}

11
clients/src/main/java/org/apache/kafka/clients/KafkaClient.java

@ -53,11 +53,20 @@ public interface KafkaClient { @@ -53,11 +53,20 @@ public interface KafkaClient {
*/
public long connectionDelay(Node node, long now);
/**
* Check if the connection of the node has failed, based on the connection state. Such connection failure are
* usually transient and can be resumed in the next {@link #ready(org.apache.kafka.common.Node, long)} }
* call, but there are cases where transient failures needs to be caught and re-acted upon.
*
* @param node the node to check
* @return true iff the connection has failed and the node is disconnected
*/
public boolean connectionFailed(Node node);
/**
* Queue up the given request for sending. Requests can only be sent on ready connections.
*
* @param request The request
* @param now The current time
*/
public void send(ClientRequest request);

14
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

@ -132,6 +132,19 @@ public class NetworkClient implements KafkaClient { @@ -132,6 +132,19 @@ public class NetworkClient implements KafkaClient {
return connectionStates.connectionDelay(node.id(), now);
}
/**
* Check if the connection of the node has failed, based on the connection state. Such connection failure are
* usually transient and can be resumed in the next {@link #ready(org.apache.kafka.common.Node, long)} }
* call, but there are cases where transient failures needs to be caught and re-acted upon.
*
* @param node the node to check
* @return true iff the connection has failed and the node is disconnected
*/
@Override
public boolean connectionFailed(Node node) {
return connectionStates.connectionState(node.id()).equals(ConnectionState.DISCONNECTED);
}
/**
* Check if the node with the given id is ready to send more requests.
*
@ -174,7 +187,6 @@ public class NetworkClient implements KafkaClient { @@ -174,7 +187,6 @@ public class NetworkClient implements KafkaClient {
* Queue up the given request for sending. Requests can only be sent out to ready nodes.
*
* @param request The request
* @param now The current time
*/
@Override
public void send(ClientRequest request) {

28
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

@ -15,7 +15,9 @@ package org.apache.kafka.clients.consumer; @@ -15,7 +15,9 @@ package org.apache.kafka.clients.consumer;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceCallback;
@ -23,6 +25,7 @@ import org.apache.kafka.common.config.AbstractConfig; @@ -23,6 +25,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.serialization.Deserializer;
/**
* The consumer configuration keys
@ -153,6 +156,7 @@ public class ConsumerConfig extends AbstractConfig { @@ -153,6 +156,7 @@ public class ConsumerConfig extends AbstractConfig {
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface.";
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
@ -276,6 +280,30 @@ public class ConsumerConfig extends AbstractConfig { @@ -276,6 +280,30 @@ public class ConsumerConfig extends AbstractConfig {
VALUE_DESERIALIZER_CLASS_DOC);
}
public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,
Deserializer<?> keyDeserializer,
Deserializer<?> valueDeserializer) {
Map<String, Object> newConfigs = new HashMap<String, Object>();
newConfigs.putAll(configs);
if (keyDeserializer != null)
newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
if (keyDeserializer != null)
newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
return newConfigs;
}
public static Properties addDeserializerToConfig(Properties properties,
Deserializer<?> keyDeserializer,
Deserializer<?> valueDeserializer) {
Properties newProperties = new Properties();
newProperties.putAll(properties);
if (keyDeserializer != null)
newProperties.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
if (keyDeserializer != null)
newProperties.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
return newProperties;
}
ConsumerConfig(Map<? extends Object, ? extends Object> props) {
super(CONFIG, props);
}

903
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

File diff suppressed because it is too large Load Diff

595
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java

@ -0,0 +1,595 @@ @@ -0,0 +1,595 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.ConsumerMetadataRequest;
import org.apache.kafka.common.requests.ConsumerMetadataResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* This class manage the coordination process with the consumer coordinator.
*/
public final class Coordinator {
private static final Logger log = LoggerFactory.getLogger(Coordinator.class);
private final KafkaClient client;
private final Time time;
private final String groupId;
private final Metadata metadata;
private final Heartbeat heartbeat;
private final long sessionTimeoutMs;
private final String assignmentStrategy;
private final SubscriptionState subscriptions;
private final CoordinatorMetrics sensors;
private final long retryBackoffMs;
private Node consumerCoordinator;
private String consumerId;
private int generation;
/**
* Initialize the coordination manager.
*/
public Coordinator(KafkaClient client,
String groupId,
long retryBackoffMs,
long sessionTimeoutMs,
String assignmentStrategy,
Metadata metadata,
SubscriptionState subscriptions,
Metrics metrics,
String metricGrpPrefix,
Map<String, String> metricTags,
Time time) {
this.time = time;
this.client = client;
this.generation = -1;
this.consumerId = "";
this.groupId = groupId;
this.metadata = metadata;
this.consumerCoordinator = null;
this.subscriptions = subscriptions;
this.retryBackoffMs = retryBackoffMs;
this.sessionTimeoutMs = sessionTimeoutMs;
this.assignmentStrategy = assignmentStrategy;
this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds());
this.sensors = new CoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
}
/**
* Assign partitions for the subscribed topics.
*
* @param subscribedTopics The subscribed topics list
* @param now The current time
* @return The assigned partition info
*/
public List<TopicPartition> assignPartitions(List<String> subscribedTopics, long now) {
// send a join group request to the coordinator
log.debug("(Re-)joining group {} with subscribed topics {}", groupId, subscribedTopics);
JoinGroupRequest request = new JoinGroupRequest(groupId,
(int) this.sessionTimeoutMs,
subscribedTopics,
this.consumerId,
this.assignmentStrategy);
ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.JOIN_GROUP, request.toStruct(), null, now);
// process the response
JoinGroupResponse response = new JoinGroupResponse(resp.responseBody());
// TODO: needs to handle disconnects and errors
Errors.forCode(response.errorCode()).maybeThrow();
this.consumerId = response.consumerId();
// set the flag to refresh last committed offsets
this.subscriptions.needRefreshCommits();
log.debug("Joined group: {}", response);
// record re-assignment time
this.sensors.partitionReassignments.record(time.milliseconds() - now);
// return assigned partitions
return response.assignedPartitions();
}
/**
* Commit offsets for the specified list of topics and partitions.
*
* A non-blocking commit will attempt to commit offsets asychronously. No error will be thrown if the commit fails.
* A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until
* the commit succeeds.
*
* @param offsets The list of offsets per partition that should be committed.
* @param blocking Control whether the commit is blocking
* @param now The current time
*/
public void commitOffsets(final Map<TopicPartition, Long> offsets, boolean blocking, long now) {
if (!offsets.isEmpty()) {
// create the offset commit request
Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData;
offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), now, ""));
OffsetCommitRequest req = new OffsetCommitRequest(this.groupId, this.generation, this.consumerId, offsetData);
// send request and possibly wait for response if it is blocking
RequestCompletionHandler handler = new CommitOffsetCompletionHandler(offsets);
if (blocking) {
boolean done;
do {
ClientResponse response = blockingCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now);
// check for errors
done = true;
OffsetCommitResponse commitResponse = new OffsetCommitResponse(response.responseBody());
for (short errorCode : commitResponse.responseData().values()) {
if (errorCode != Errors.NONE.code())
done = false;
}
if (!done) {
log.debug("Error in offset commit, backing off for {} ms before retrying again.",
this.retryBackoffMs);
Utils.sleep(this.retryBackoffMs);
}
} while (!done);
} else {
this.client.send(initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now));
}
}
}
/**
* Fetch the committed offsets of the given set of partitions.
*
* @param partitions The list of partitions which need to ask for committed offsets
* @param now The current time
* @return The fetched offset values
*/
public Map<TopicPartition, Long> fetchOffsets(Set<TopicPartition> partitions, long now) {
log.debug("Fetching committed offsets for partitions: " + Utils.join(partitions, ", "));
while (true) {
// construct the request
OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
// send the request and block on waiting for response
ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.OFFSET_FETCH, request.toStruct(), null, now);
// parse the response to get the offsets
boolean offsetsReady = true;
OffsetFetchResponse response = new OffsetFetchResponse(resp.responseBody());
// TODO: needs to handle disconnects
Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(response.responseData().size());
for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition tp = entry.getKey();
OffsetFetchResponse.PartitionData data = entry.getValue();
if (data.hasError()) {
log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode)
.exception()
.getMessage());
if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) {
// just retry
offsetsReady = false;
Utils.sleep(this.retryBackoffMs);
} else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
// re-discover the coordinator and retry
coordinatorDead();
offsetsReady = false;
Utils.sleep(this.retryBackoffMs);
} else if (data.errorCode == Errors.NO_OFFSETS_FETCHABLE.code()
|| data.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
// just ignore this partition
log.debug("No committed offset for partition " + tp);
} else {
throw new IllegalStateException("Unexpected error code " + data.errorCode + " while fetching offset");
}
} else if (data.offset >= 0) {
// record the position with the offset (-1 seems to indicate no
// such offset known)
offsets.put(tp, data.offset);
} else {
log.debug("No committed offset for partition " + tp);
}
}
if (offsetsReady)
return offsets;
}
}
/**
* Attempt to heartbeat the consumer coordinator if necessary, and check if the coordinator is still alive.
*
* @param now The current time
*/
public void maybeHeartbeat(long now) {
if (heartbeat.shouldHeartbeat(now)) {
HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.consumerId);
this.client.send(initiateCoordinatorRequest(ApiKeys.HEARTBEAT, req.toStruct(), new HeartbeatCompletionHandler(), now));
this.heartbeat.sentHeartbeat(now);
}
}
public boolean coordinatorUnknown() {
return this.consumerCoordinator == null;
}
/**
* Repeatedly attempt to send a request to the coordinator until a response is received (retry if we are
* disconnected). Note that this means any requests sent this way must be idempotent.
*
* @return The response
*/
private ClientResponse blockingCoordinatorRequest(ApiKeys api,
Struct request,
RequestCompletionHandler handler,
long now) {
while (true) {
ClientRequest coordinatorRequest = initiateCoordinatorRequest(api, request, handler, now);
ClientResponse coordinatorResponse = sendAndReceive(coordinatorRequest, now);
if (coordinatorResponse.wasDisconnected()) {
handleCoordinatorDisconnect(coordinatorResponse);
Utils.sleep(this.retryBackoffMs);
} else {
return coordinatorResponse;
}
}
}
/**
* Ensure the consumer coordinator is known and we have a ready connection to it.
*/
private void ensureCoordinatorReady() {
while (true) {
if (this.consumerCoordinator == null)
discoverCoordinator();
while (true) {
boolean ready = this.client.ready(this.consumerCoordinator, time.milliseconds());
if (ready) {
return;
} else {
log.debug("No connection to coordinator, attempting to connect.");
this.client.poll(this.retryBackoffMs, time.milliseconds());
// if the coordinator connection has failed, we need to
// break the inner loop to re-discover the coordinator
if (this.client.connectionFailed(this.consumerCoordinator)) {
log.debug("Coordinator connection failed. Attempting to re-discover.");
coordinatorDead();
break;
}
}
}
}
}
/**
* Mark the current coordinator as dead.
*/
private void coordinatorDead() {
if (this.consumerCoordinator != null) {
log.info("Marking the coordinator {} dead.", this.consumerCoordinator.id());
this.consumerCoordinator = null;
}
}
/**
* Keep discovering the consumer coordinator until it is found.
*/
private void discoverCoordinator() {
while (this.consumerCoordinator == null) {
log.debug("No coordinator known, attempting to discover one.");
Node coordinator = fetchConsumerCoordinator();
if (coordinator == null) {
log.debug("No coordinator found, backing off.");
Utils.sleep(this.retryBackoffMs);
} else {
log.debug("Found coordinator: " + coordinator);
this.consumerCoordinator = coordinator;
}
}
}
/**
* Get the current consumer coordinator information via consumer metadata request.
*
* @return the consumer coordinator node
*/
private Node fetchConsumerCoordinator() {
// initiate the consumer metadata request
ClientRequest request = initiateConsumerMetadataRequest();
// send the request and wait for its response
ClientResponse response = sendAndReceive(request, request.createdTime());
// parse the response to get the coordinator info if it is not disconnected,
// otherwise we need to request metadata update
if (!response.wasDisconnected()) {
ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(response.responseBody());
// use MAX_VALUE - node.id as the coordinator id to mimic separate connections
// for the coordinator in the underlying network client layer
// TODO: this needs to be better handled in KAFKA-1935
if (consumerMetadataResponse.errorCode() == Errors.NONE.code())
return new Node(Integer.MAX_VALUE - consumerMetadataResponse.node().id(),
consumerMetadataResponse.node().host(),
consumerMetadataResponse.node().port());
} else {
this.metadata.requestUpdate();
}
return null;
}
/**
* Handle the case when the request gets cancelled due to coordinator disconnection.
*/
private void handleCoordinatorDisconnect(ClientResponse response) {
int correlation = response.request().request().header().correlationId();
log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected",
response.request(),
correlation,
response.request().request().destination());
// mark the coordinator as dead
coordinatorDead();
}
/**
* Initiate a consumer metadata request to the least loaded node.
*
* @return The created request
*/
private ClientRequest initiateConsumerMetadataRequest() {
// find a node to ask about the coordinator
Node node = this.client.leastLoadedNode(time.milliseconds());
while (node == null || !this.client.ready(node, time.milliseconds())) {
long now = time.milliseconds();
this.client.poll(this.retryBackoffMs, now);
node = this.client.leastLoadedNode(now);
// if there is no ready node, backoff before retry
if (node == null)
Utils.sleep(this.retryBackoffMs);
}
// create a consumer metadata request
log.debug("Issuing consumer metadata request to broker {}", node.id());
ConsumerMetadataRequest request = new ConsumerMetadataRequest(this.groupId);
RequestSend send = new RequestSend(node.id(),
this.client.nextRequestHeader(ApiKeys.CONSUMER_METADATA),
request.toStruct());
long now = time.milliseconds();
return new ClientRequest(now, true, send, null);
}
/**
* Initiate a request to the coordinator.
*/
private ClientRequest initiateCoordinatorRequest(ApiKeys api, Struct request, RequestCompletionHandler handler, long now) {
// first make sure the coordinator is known and ready
ensureCoordinatorReady();
// create the request for the coordinator
log.debug("Issuing request ({}: {}) to coordinator {}", api, request, this.consumerCoordinator.id());
RequestHeader header = this.client.nextRequestHeader(api);
RequestSend send = new RequestSend(this.consumerCoordinator.id(), header, request);
return new ClientRequest(now, true, send, handler);
}
/**
* Attempt to send a request and receive its response.
*
* @return The response
*/
private ClientResponse sendAndReceive(ClientRequest clientRequest, long now) {
// send the request
this.client.send(clientRequest);
// drain all responses from the destination node
List<ClientResponse> responses = this.client.completeAll(clientRequest.request().destination(), now);
if (responses.isEmpty()) {
throw new IllegalStateException("This should not happen.");
} else {
// other requests should be handled by the callback, and
// we only care about the response of the last request
return responses.get(responses.size() - 1);
}
}
private class HeartbeatCompletionHandler implements RequestCompletionHandler {
@Override
public void onComplete(ClientResponse resp) {
if (resp.wasDisconnected()) {
handleCoordinatorDisconnect(resp);
} else {
HeartbeatResponse response = new HeartbeatResponse(resp.responseBody());
if (response.errorCode() == Errors.NONE.code()) {
log.debug("Received successful heartbeat response.");
} else if (response.errorCode() == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
|| response.errorCode() == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
coordinatorDead();
} else if (response.errorCode() == Errors.ILLEGAL_GENERATION.code()) {
subscriptions.needReassignment();
} else {
throw new KafkaException("Unexpected error in heartbeat response: "
+ Errors.forCode(response.errorCode()).exception().getMessage());
}
}
sensors.heartbeatLatency.record(resp.requestLatencyMs());
}
}
private class CommitOffsetCompletionHandler implements RequestCompletionHandler {
private final Map<TopicPartition, Long> offsets;
public CommitOffsetCompletionHandler(Map<TopicPartition, Long> offsets) {
this.offsets = offsets;
}
@Override
public void onComplete(ClientResponse resp) {
if (resp.wasDisconnected()) {
handleCoordinatorDisconnect(resp);
} else {
OffsetCommitResponse response = new OffsetCommitResponse(resp.responseBody());
for (Map.Entry<TopicPartition, Short> entry : response.responseData().entrySet()) {
TopicPartition tp = entry.getKey();
short errorCode = entry.getValue();
long offset = this.offsets.get(tp);
if (errorCode == Errors.NONE.code()) {
log.debug("Committed offset {} for partition {}", offset, tp);
subscriptions.committed(tp, offset);
} else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
|| errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
coordinatorDead();
} else {
log.error("Error committing partition {} at offset {}: {}",
tp,
offset,
Errors.forCode(errorCode).exception().getMessage());
}
}
}
sensors.commitLatency.record(resp.requestLatencyMs());
}
}
private class CoordinatorMetrics {
public final Metrics metrics;
public final String metricGrpName;
public final Sensor commitLatency;
public final Sensor heartbeatLatency;
public final Sensor partitionReassignments;
public CoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
this.commitLatency = metrics.sensor("commit-latency");
this.commitLatency.add(new MetricName("commit-latency-avg",
this.metricGrpName,
"The average time taken for a commit request",
tags), new Avg());
this.commitLatency.add(new MetricName("commit-latency-max",
this.metricGrpName,
"The max time taken for a commit request",
tags), new Max());
this.commitLatency.add(new MetricName("commit-rate",
this.metricGrpName,
"The number of commit calls per second",
tags), new Rate(new Count()));
this.heartbeatLatency = metrics.sensor("heartbeat-latency");
this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max",
this.metricGrpName,
"The max time taken to receive a response to a hearbeat request",
tags), new Max());
this.heartbeatLatency.add(new MetricName("heartbeat-rate",
this.metricGrpName,
"The average number of heartbeats per second",
tags), new Rate(new Count()));
this.partitionReassignments = metrics.sensor("reassignment-latency");
this.partitionReassignments.add(new MetricName("reassignment-time-avg",
this.metricGrpName,
"The average time taken for a partition reassignment",
tags), new Avg());
this.partitionReassignments.add(new MetricName("reassignment-time-max",
this.metricGrpName,
"The max time taken for a partition reassignment",
tags), new Avg());
this.partitionReassignments.add(new MetricName("reassignment-rate",
this.metricGrpName,
"The number of partition reassignments per second",
tags), new Rate(new Count()));
Measurable numParts =
new Measurable() {
public double measure(MetricConfig config, long now) {
return subscriptions.assignedPartitions().size();
}
};
metrics.addMetric(new MetricName("assigned-partitions",
this.metricGrpName,
"The number of partitions currently assigned to this consumer",
tags),
numParts);
Measurable lastHeartbeat =
new Measurable() {
public double measure(MetricConfig config, long now) {
return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
}
};
metrics.addMetric(new MetricName("last-heartbeat-seconds-ago",
this.metricGrpName,
"The number of seconds since the last controller heartbeat",
tags),
lastHeartbeat);
}
}
}

459
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

@ -0,0 +1,459 @@ @@ -0,0 +1,459 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.LogEntry;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* This class manage the fetching process with the brokers.
*/
public class Fetcher<K, V> {
private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
private static final long LATEST_OFFSET_TIMESTAMP = -1L;
private final KafkaClient client;
private final Time time;
private final int minBytes;
private final int maxWaitMs;
private final int fetchSize;
private final boolean checkCrcs;
private final long retryBackoffMs;
private final Metadata metadata;
private final FetchManagerMetrics sensors;
private final SubscriptionState subscriptions;
private final List<PartitionRecords<K, V>> records;
private final AutoOffsetResetStrategy offsetResetStrategy;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
public Fetcher(KafkaClient client,
long retryBackoffMs,
int minBytes,
int maxWaitMs,
int fetchSize,
boolean checkCrcs,
String offsetReset,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
Metadata metadata,
SubscriptionState subscriptions,
Metrics metrics,
String metricGrpPrefix,
Map<String, String> metricTags,
Time time) {
this.time = time;
this.client = client;
this.metadata = metadata;
this.subscriptions = subscriptions;
this.retryBackoffMs = retryBackoffMs;
this.minBytes = minBytes;
this.maxWaitMs = maxWaitMs;
this.fetchSize = fetchSize;
this.checkCrcs = checkCrcs;
this.offsetResetStrategy = AutoOffsetResetStrategy.valueOf(offsetReset);
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
this.records = new LinkedList<PartitionRecords<K, V>>();
this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
}
/**
* Set-up a fetch request for any node that we have assigned partitions for which doesn't have one.
*
* @param cluster The current cluster metadata
* @param now The current time
*/
public void initFetches(Cluster cluster, long now) {
for (ClientRequest request : createFetchRequests(cluster)) {
Node node = cluster.nodeById(request.request().destination());
if (client.ready(node, now)) {
log.trace("Initiating fetch to node {}: {}", node.id(), request);
client.send(request);
}
}
}
/**
* Return the fetched records, empty the record buffer and update the consumed position.
*
* @return The fetched records per partition
*/
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
if (this.subscriptions.partitionAssignmentNeeded()) {
return Collections.emptyMap();
} else {
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
for (PartitionRecords<K, V> part : this.records) {
Long consumed = subscriptions.consumed(part.partition);
if (this.subscriptions.assignedPartitions().contains(part.partition)
&& (consumed == null || part.fetchOffset == consumed)) {
List<ConsumerRecord<K, V>> records = drained.get(part.partition);
if (records == null) {
records = part.records;
drained.put(part.partition, records);
} else {
records.addAll(part.records);
}
subscriptions.consumed(part.partition, part.records.get(part.records.size() - 1).offset() + 1);
} else {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
log.debug("Ignoring fetched records for {} at offset {}", part.partition, part.fetchOffset);
}
}
this.records.clear();
return drained;
}
}
/**
* Reset offsets for the given partition using the offset reset strategy.
*
* @param partition The given partition that needs reset offset
* @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
*/
public void resetOffset(TopicPartition partition) {
long timestamp;
if (this.offsetResetStrategy == AutoOffsetResetStrategy.EARLIEST)
timestamp = EARLIEST_OFFSET_TIMESTAMP;
else if (this.offsetResetStrategy == AutoOffsetResetStrategy.LATEST)
timestamp = LATEST_OFFSET_TIMESTAMP;
else
throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
log.debug("Resetting offset for partition {} to {} offset.", partition, this.offsetResetStrategy.name()
.toLowerCase());
this.subscriptions.seek(partition, offsetBefore(partition, timestamp));
}
/**
* Fetch a single offset before the given timestamp for the partition.
*
* @param topicPartition The partition that needs fetching offset.
* @param timestamp The timestamp for fetching offset.
* @return The offset of the message that is published before the given timestamp
*/
public long offsetBefore(TopicPartition topicPartition, long timestamp) {
log.debug("Fetching offsets for partition {}.", topicPartition);
Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>(1);
partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1));
while (true) {
long now = time.milliseconds();
PartitionInfo info = metadata.fetch().partition(topicPartition);
if (info == null) {
metadata.add(topicPartition.topic());
log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition);
awaitMetadataUpdate();
} else if (info.leader() == null) {
log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition);
awaitMetadataUpdate();
} else if (this.client.ready(info.leader(), now)) {
Node node = info.leader();
ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
RequestSend send = new RequestSend(node.id(),
this.client.nextRequestHeader(ApiKeys.LIST_OFFSETS),
request.toStruct());
ClientRequest clientRequest = new ClientRequest(now, true, send, null);
this.client.send(clientRequest);
List<ClientResponse> responses = this.client.completeAll(node.id(), now);
if (responses.isEmpty())
throw new IllegalStateException("This should not happen.");
ClientResponse response = responses.get(responses.size() - 1);
if (response.wasDisconnected()) {
awaitMetadataUpdate();
} else {
ListOffsetResponse lor = new ListOffsetResponse(response.responseBody());
short errorCode = lor.responseData().get(topicPartition).errorCode;
if (errorCode == Errors.NONE.code()) {
List<Long> offsets = lor.responseData().get(topicPartition).offsets;
if (offsets.size() != 1)
throw new IllegalStateException("This should not happen.");
long offset = offsets.get(0);
log.debug("Fetched offset {} for partition {}", offset, topicPartition);
return offset;
} else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
|| errorCode == Errors.LEADER_NOT_AVAILABLE.code()) {
log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
topicPartition);
awaitMetadataUpdate();
} else {
Errors.forCode(errorCode).maybeThrow();
}
}
} else {
log.debug("Leader for partition {} is not ready, retry fetching offsets", topicPartition);
client.poll(this.retryBackoffMs, now);
}
}
}
/**
* Create fetch requests for all nodes for which we have assigned partitions
* that have no existing requests in flight.
*/
private List<ClientRequest> createFetchRequests(Cluster cluster) {
// create the fetch info
Map<Integer, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<Integer, Map<TopicPartition, FetchRequest.PartitionData>>();
for (TopicPartition partition : subscriptions.assignedPartitions()) {
Node node = cluster.leaderFor(partition);
// if there is a leader and no in-flight requests, issue a new fetch
if (node != null && this.client.inFlightRequestCount(node.id()) == 0) {
Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
if (fetch == null) {
fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>();
fetchable.put(node.id(), fetch);
}
long offset = this.subscriptions.fetched(partition);
fetch.put(partition, new FetchRequest.PartitionData(offset, this.fetchSize));
}
}
// create the requests
List<ClientRequest> requests = new ArrayList<ClientRequest>(fetchable.size());
for (Map.Entry<Integer, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
int nodeId = entry.getKey();
final FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
RequestSend send = new RequestSend(nodeId, this.client.nextRequestHeader(ApiKeys.FETCH), fetch.toStruct());
RequestCompletionHandler handler = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleFetchResponse(response, fetch);
}
};
requests.add(new ClientRequest(time.milliseconds(), true, send, handler));
}
return requests;
}
/**
* The callback for fetch completion
*/
private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
if (resp.wasDisconnected()) {
int correlation = resp.request().request().header().correlationId();
log.debug("Cancelled fetch request {} with correlation id {} due to node {} being disconnected",
resp.request(), correlation, resp.request().request().destination());
} else {
int totalBytes = 0;
int totalCount = 0;
FetchResponse response = new FetchResponse(resp.responseBody());
for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition tp = entry.getKey();
FetchResponse.PartitionData partition = entry.getValue();
if (!subscriptions.assignedPartitions().contains(tp)) {
log.debug("Ignoring fetched data for partition {} which is no longer assigned.", tp);
} else if (partition.errorCode == Errors.NONE.code()) {
int bytes = 0;
ByteBuffer buffer = partition.recordSet;
MemoryRecords records = MemoryRecords.readableRecords(buffer);
long fetchOffset = request.fetchData().get(tp).offset;
List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
for (LogEntry logEntry : records) {
parsed.add(parseRecord(tp, logEntry));
bytes += logEntry.size();
}
if (parsed.size() > 0) {
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
this.subscriptions.fetched(tp, record.offset() + 1);
this.records.add(new PartitionRecords<K, V>(fetchOffset, tp, parsed));
this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
}
this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
totalBytes += bytes;
totalCount += parsed.size();
} else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
|| partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
this.metadata.requestUpdate();
} else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
// TODO: this could be optimized by grouping all out-of-range partitions
log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
resetOffset(tp);
} else if (partition.errorCode == Errors.UNKNOWN.code()) {
log.warn("Unknown error fetching data for topic-partition {}", tp);
} else {
throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data");
}
}
this.sensors.bytesFetched.record(totalBytes);
this.sensors.recordsFetched.record(totalCount);
}
this.sensors.fetchLatency.record(resp.requestLatencyMs());
}
/**
* Parse the record entry, deserializing the key / value fields if necessary
*/
private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) {
if (this.checkCrcs)
logEntry.record().ensureValid();
long offset = logEntry.offset();
ByteBuffer keyBytes = logEntry.record().key();
K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes));
ByteBuffer valueBytes = logEntry.record().value();
V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), Utils.toArray(valueBytes));
return new ConsumerRecord<K, V>(partition.topic(), partition.partition(), offset, key, value);
}
/*
* Request a metadata update and wait until it has occurred
*/
private void awaitMetadataUpdate() {
int version = this.metadata.requestUpdate();
do {
long now = time.milliseconds();
this.client.poll(this.retryBackoffMs, now);
} while (this.metadata.version() == version);
}
private static class PartitionRecords<K, V> {
public long fetchOffset;
public TopicPartition partition;
public List<ConsumerRecord<K, V>> records;
public PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) {
this.fetchOffset = fetchOffset;
this.partition = partition;
this.records = records;
}
}
private static enum AutoOffsetResetStrategy {
LATEST, EARLIEST, NONE
}
private class FetchManagerMetrics {
public final Metrics metrics;
public final String metricGrpName;
public final Sensor bytesFetched;
public final Sensor recordsFetched;
public final Sensor fetchLatency;
public final Sensor recordsFetchLag;
public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics";
this.bytesFetched = metrics.sensor("bytes-fetched");
this.bytesFetched.add(new MetricName("fetch-size-avg",
this.metricGrpName,
"The average number of bytes fetched per request",
tags), new Avg());
this.bytesFetched.add(new MetricName("fetch-size-max",
this.metricGrpName,
"The maximum number of bytes fetched per request",
tags), new Max());
this.bytesFetched.add(new MetricName("bytes-consumed-rate",
this.metricGrpName,
"The average number of bytes consumed per second",
tags), new Rate());
this.recordsFetched = metrics.sensor("records-fetched");
this.recordsFetched.add(new MetricName("records-per-request-avg",
this.metricGrpName,
"The average number of records in each request",
tags), new Avg());
this.recordsFetched.add(new MetricName("records-consumed-rate",
this.metricGrpName,
"The average number of records consumed per second",
tags), new Rate());
this.fetchLatency = metrics.sensor("fetch-latency");
this.fetchLatency.add(new MetricName("fetch-latency-avg",
this.metricGrpName,
"The average time taken for a fetch request.",
tags), new Avg());
this.fetchLatency.add(new MetricName("fetch-latency-max",
this.metricGrpName,
"The max time taken for any fetch request.",
tags), new Max());
this.fetchLatency.add(new MetricName("fetch-rate",
this.metricGrpName,
"The number of fetch requests per second.",
tags), new Rate(new Count()));
this.recordsFetchLag = metrics.sensor("records-lag");
this.recordsFetchLag.add(new MetricName("records-lag-max",
this.metricGrpName,
"The maximum lag in terms of number of records for any partition in this window",
tags), new Max());
}
public void recordTopicFetchMetrics(String topic, int bytes, int records) {
// record bytes fetched
String name = "topic." + topic + ".bytes-fetched";
Sensor bytesFetched = this.metrics.getSensor(name);
if (bytesFetched == null)
bytesFetched = this.metrics.sensor(name);
bytesFetched.record(bytes);
// record records fetched
name = "topic." + topic + ".records-fetched";
Sensor recordsFetched = this.metrics.getSensor(name);
if (recordsFetched == null)
recordsFetched = this.metrics.sensor(name);
recordsFetched.record(records);
}
}
}

16
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java

@ -21,34 +21,20 @@ public final class Heartbeat { @@ -21,34 +21,20 @@ public final class Heartbeat {
* so, e.g., with a session timeout of 3 seconds we would attempt a heartbeat
* once per second.
*/
private final static int HEARTBEATS_PER_SESSION_INTERVAL = 3;
public final static int HEARTBEATS_PER_SESSION_INTERVAL = 3;
private final long timeout;
private long lastHeartbeatSend;
private long lastHeartbeatResponse;
public Heartbeat(long timeout, long now) {
this.timeout = timeout;
this.lastHeartbeatSend = now;
this.lastHeartbeatResponse = now;
}
public void sentHeartbeat(long now) {
this.lastHeartbeatSend = now;
}
public void receivedResponse(long now) {
this.lastHeartbeatResponse = now;
}
public void markDead() {
this.lastHeartbeatResponse = -1;
}
public boolean isAlive(long now) {
return now - lastHeartbeatResponse <= timeout;
}
public boolean shouldHeartbeat(long now) {
return now - lastHeartbeatSend > (1.0 / HEARTBEATS_PER_SESSION_INTERVAL) * this.timeout;
}

21
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java

@ -43,9 +43,12 @@ public class SubscriptionState { @@ -43,9 +43,12 @@ public class SubscriptionState {
/* the last committed offset for each partition */
private final Map<TopicPartition, Long> committed;
/* do we need to request a partition assignment from the co-ordinator? */
/* do we need to request a partition assignment from the coordinator? */
private boolean needsPartitionAssignment;
/* do we need to request the latest committed offsets from the coordinator? */
private boolean needsFetchCommittedOffsets;
public SubscriptionState() {
this.subscribedTopics = new HashSet<String>();
this.subscribedPartitions = new HashSet<TopicPartition>();
@ -54,6 +57,7 @@ public class SubscriptionState { @@ -54,6 +57,7 @@ public class SubscriptionState {
this.fetched = new HashMap<TopicPartition, Long>();
this.committed = new HashMap<TopicPartition, Long>();
this.needsPartitionAssignment = false;
this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
}
public void subscribe(String topic) {
@ -75,6 +79,10 @@ public class SubscriptionState { @@ -75,6 +79,10 @@ public class SubscriptionState {
clearPartition(tp);
}
public void needReassignment() {
this.needsPartitionAssignment = true;
}
public void subscribe(TopicPartition tp) {
if (this.subscribedTopics.size() > 0)
throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive");
@ -119,11 +127,20 @@ public class SubscriptionState { @@ -119,11 +127,20 @@ public class SubscriptionState {
public void committed(TopicPartition tp, long offset) {
this.committed.put(tp, offset);
this.needsFetchCommittedOffsets = false;
}
public Long committed(TopicPartition tp) {
return this.committed.get(tp);
}
public void needRefreshCommits() {
this.needsFetchCommittedOffsets = true;
}
public boolean refreshCommitsNeeded() {
return this.needsFetchCommittedOffsets;
}
public void seek(TopicPartition tp, long offset) {
fetched(tp, offset);
@ -162,7 +179,7 @@ public class SubscriptionState { @@ -162,7 +179,7 @@ public class SubscriptionState {
return copy;
}
public boolean needsPartitionAssignment() {
public boolean partitionAssignmentNeeded() {
return this.needsPartitionAssignment;
}

26
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -162,21 +162,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -162,21 +162,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* be called in the producer when the serializer is passed in directly.
*/
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(addSerializerToConfig(configs, keySerializer, valueSerializer)),
this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)),
keySerializer, valueSerializer);
}
private static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,
Serializer<?> keySerializer, Serializer<?> valueSerializer) {
Map<String, Object> newConfigs = new HashMap<String, Object>();
newConfigs.putAll(configs);
if (keySerializer != null)
newConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass());
if (valueSerializer != null)
newConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass());
return newConfigs;
}
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
* are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
@ -196,21 +185,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -196,21 +185,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* be called in the producer when the serializer is passed in directly.
*/
public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(addSerializerToConfig(properties, keySerializer, valueSerializer)),
this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)),
keySerializer, valueSerializer);
}
private static Properties addSerializerToConfig(Properties properties,
Serializer<?> keySerializer, Serializer<?> valueSerializer) {
Properties newProperties = new Properties();
newProperties.putAll(properties);
if (keySerializer != null)
newProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass().getName());
if (valueSerializer != null)
newProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass().getName());
return newProperties;
}
@SuppressWarnings("unchecked")
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
log.trace("Starting the Kafka producer");

26
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

@ -16,13 +16,16 @@ import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; @@ -16,13 +16,16 @@ import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.serialization.Serializer;
/**
* Configuration for the Kafka Producer. Documentation for these configurations can be found in the <a
@ -166,6 +169,7 @@ public class ProducerConfig extends AbstractConfig { @@ -166,6 +169,7 @@ public class ProducerConfig extends AbstractConfig {
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@ -217,6 +221,28 @@ public class ProducerConfig extends AbstractConfig { @@ -217,6 +221,28 @@ public class ProducerConfig extends AbstractConfig {
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC);
}
public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,
Serializer<?> keySerializer, Serializer<?> valueSerializer) {
Map<String, Object> newConfigs = new HashMap<String, Object>();
newConfigs.putAll(configs);
if (keySerializer != null)
newConfigs.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass());
if (valueSerializer != null)
newConfigs.put(VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass());
return newConfigs;
}
public static Properties addSerializerToConfig(Properties properties,
Serializer<?> keySerializer, Serializer<?> valueSerializer) {
Properties newProperties = new Properties();
newProperties.putAll(properties);
if (keySerializer != null)
newProperties.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass().getName());
if (valueSerializer != null)
newProperties.put(VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass().getName());
return newProperties;
}
ProducerConfig(Map<? extends Object, ? extends Object> props) {
super(CONFIG, props);
}

5
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

@ -298,9 +298,8 @@ public class Sender implements Runnable { @@ -298,9 +298,8 @@ public class Sender implements Runnable {
final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
for (RecordBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
ByteBuffer recordsBuffer = batch.records.buffer();
recordsBuffer.flip();
produceRecordsByPartition.put(tp, recordsBuffer);
batch.records.rewind();
produceRecordsByPartition.put(tp, batch.records.buffer());
recordsByPartition.put(tp, batch);
}
ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);

5
clients/src/main/java/org/apache/kafka/common/network/Selector.java

@ -14,6 +14,7 @@ package org.apache.kafka.common.network; @@ -14,6 +14,7 @@ package org.apache.kafka.common.network;
import java.io.EOFException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.CancelledKeyException;
@ -90,7 +91,7 @@ public class Selector implements Selectable { @@ -90,7 +91,7 @@ public class Selector implements Selectable {
/**
* Create a new selector
*/
public Selector(Metrics metrics, Time time , String metricGrpPrefix , Map<String, String> metricTags) {
public Selector(Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) {
try {
this.selector = java.nio.channels.Selector.open();
} catch (IOException e) {
@ -274,7 +275,7 @@ public class Selector implements Selectable { @@ -274,7 +275,7 @@ public class Selector implements Selectable {
}
} catch (IOException e) {
String desc = socketDescription(channel);
if (e instanceof EOFException)
if (e instanceof EOFException || e instanceof ConnectException)
log.info("Connection {} disconnected", desc);
else
log.warn("Error in I/O with connection to {}", desc, e);

29
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java

@ -41,14 +41,23 @@ public enum Errors { @@ -41,14 +41,23 @@ public enum Errors {
new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
NOT_LEADER_FOR_PARTITION(6,
new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")),
REQUEST_TIMED_OUT(7,
new TimeoutException("The request timed out.")),
// TODO: errorCode 8 for BrokerNotAvailable
REPLICA_NOT_AVAILABLE(9,
new ApiException("The replica is not available for the requested topic-partition")),
MESSAGE_TOO_LARGE(10,
new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")),
OFFSET_LOAD_IN_PROGRESS(14, new ApiException("The coordinator is loading offsets and can't process requests.")),
CONSUMER_COORDINATOR_NOT_AVAILABLE(15, new ApiException("The coordinator is not available.")),
NOT_COORDINATOR_FOR_CONSUMER(16, new ApiException("This is not the correct co-ordinator for this consumer.")),
OFFSET_METADATA_TOO_LARGE(12,
new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
NETWORK_EXCEPTION(13,
new NetworkException("The server disconnected before a response was received.")),
OFFSET_LOAD_IN_PROGRESS(14,
new ApiException("The coordinator is loading offsets and can't process requests.")),
CONSUMER_COORDINATOR_NOT_AVAILABLE(15,
new ApiException("The coordinator is not available.")),
NOT_COORDINATOR_FOR_CONSUMER(16,
new ApiException("This is not the correct co-ordinator for this consumer.")),
INVALID_TOPIC_EXCEPTION(17,
new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
RECORD_LIST_TOO_LARGE(18,
@ -57,7 +66,12 @@ public enum Errors { @@ -57,7 +66,12 @@ public enum Errors {
new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")),
NOT_ENOUGH_REPLICAS_AFTER_APPEND(20,
new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")),
INVALID_REQUIRED_ACKS(21, new InvalidRequiredAcksException("Produce request specified an invalid value for required acks."));
INVALID_REQUIRED_ACKS(21,
new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")),
ILLEGAL_GENERATION(22,
new ApiException("Specified consumer generation id is not valid.")),
NO_OFFSETS_FETCHABLE(23,
new ApiException("No offsets have been committed so far."));
private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();
@ -68,7 +82,6 @@ public enum Errors { @@ -68,7 +82,6 @@ public enum Errors {
if (error.exception != null)
classToError.put(error.exception.getClass(), error);
}
}
private final short code;

12
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java

@ -120,6 +120,16 @@ public class MemoryRecords implements Records { @@ -120,6 +120,16 @@ public class MemoryRecords implements Records {
buffer = compressor.buffer();
}
/**
* Rewind the writable records to read mode
*/
public void rewind() {
if (writable)
throw new IllegalStateException("The memory records need to be closed for write before rewinding for read");
buffer.flip();
}
/** Write the records in this set to the given channel */
public int writeTo(GatheringByteChannel channel) throws IOException {
return channel.write(buffer);
@ -158,7 +168,7 @@ public class MemoryRecords implements Records { @@ -158,7 +168,7 @@ public class MemoryRecords implements Records {
@Override
public Iterator<LogEntry> iterator() {
ByteBuffer copy = (ByteBuffer) this.buffer.duplicate().flip();
ByteBuffer copy = this.buffer.duplicate();
return new RecordsIterator(copy, CompressionType.NONE, false);
}

11
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java

@ -41,6 +41,17 @@ public class FetchResponse extends AbstractRequestResponse { @@ -41,6 +41,17 @@ public class FetchResponse extends AbstractRequestResponse {
// partition level field names
private static final String PARTITION_KEY_NAME = "partition";
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
* Possible error code:
*
* OFFSET_OUT_OF_RANGE (1)
* UNKNOWN_TOPIC_OR_PARTITION (3)
* NOT_LEADER_FOR_PARTITION (6)
* REPLICA_NOT_AVAILABLE (9)
* UNKNOWN (-1)
*/
private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
private static final String RECORD_SET_KEY_NAME = "record_set";

6
clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java

@ -24,6 +24,12 @@ public class HeartbeatResponse extends AbstractRequestResponse { @@ -24,6 +24,12 @@ public class HeartbeatResponse extends AbstractRequestResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id);
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
* Possible error code:
*
* TODO
*/
private final short errorCode;
public HeartbeatResponse(short errorCode) {
super(new Struct(CURRENT_SCHEMA));

2
clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java

@ -30,6 +30,8 @@ public class JoinGroupRequest extends AbstractRequestResponse { @@ -30,6 +30,8 @@ public class JoinGroupRequest extends AbstractRequestResponse {
private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
private static final String STRATEGY_KEY_NAME = "partition_assignment_strategy";
public static final String UNKNOWN_CONSUMER_ID = "";
private final String groupId;
private final int sessionTimeout;
private final List<String> topics;

7
clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java

@ -26,6 +26,13 @@ public class JoinGroupResponse extends AbstractRequestResponse { @@ -26,6 +26,13 @@ public class JoinGroupResponse extends AbstractRequestResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
* Possible error code:
*
* TODO
*/
private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
private static final String ASSIGNED_PARTITIONS_KEY_NAME = "assigned_partitions";

7
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java

@ -41,6 +41,13 @@ public class ListOffsetResponse extends AbstractRequestResponse { @@ -41,6 +41,13 @@ public class ListOffsetResponse extends AbstractRequestResponse {
// partition level field names
private static final String PARTITION_KEY_NAME = "partition";
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
* Possible error code:
*
* TODO
*/
private static final String OFFSETS_KEY_NAME = "offsets";
private final Map<TopicPartition, PartitionData> responseData;

14
clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java

@ -40,11 +40,25 @@ public class MetadataResponse extends AbstractRequestResponse { @@ -40,11 +40,25 @@ public class MetadataResponse extends AbstractRequestResponse {
// topic level field names
private static final String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
/**
* Possible error code:
*
* TODO
*/
private static final String TOPIC_KEY_NAME = "topic";
private static final String PARTITION_METADATA_KEY_NAME = "partition_metadata";
// partition level field names
private static final String PARTITION_ERROR_CODE_KEY_NAME = "partition_error_code";
/**
* Possible error code:
*
* TODO
*/
private static final String PARTITION_KEY_NAME = "partition_id";
private static final String LEADER_KEY_NAME = "leader";
private static final String REPLICAS_KEY_NAME = "replicas";

6
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java

@ -38,6 +38,12 @@ public class OffsetCommitResponse extends AbstractRequestResponse { @@ -38,6 +38,12 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
private static final String PARTITION_KEY_NAME = "partition";
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
* Possible error code:
*
* TODO
*/
private final Map<TopicPartition, Short> responseData;
public OffsetCommitResponse(Map<TopicPartition, Short> responseData) {

9
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java

@ -41,6 +41,15 @@ public class OffsetFetchResponse extends AbstractRequestResponse { @@ -41,6 +41,15 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
private static final String METADATA_KEY_NAME = "metadata";
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
* Possible error code:
*
* UNKNOWN_TOPIC_OR_PARTITION (3)
* OFFSET_LOAD_IN_PROGRESS (14)
* NOT_COORDINATOR_FOR_CONSUMER (16)
* NO_OFFSETS_FETCHABLE (23)
*/
private final Map<TopicPartition, PartitionData> responseData;
public static final class PartitionData {

7
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java

@ -37,6 +37,13 @@ public class ProduceResponse extends AbstractRequestResponse { @@ -37,6 +37,13 @@ public class ProduceResponse extends AbstractRequestResponse {
// partition level field names
private static final String PARTITION_KEY_NAME = "partition";
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
* Possible error code:
*
* TODO
*/
private static final String BASE_OFFSET_KEY_NAME = "base_offset";
private final Map<TopicPartition, PartitionResponse> responses;

59
clients/src/test/java/org/apache/kafka/clients/MockClient.java

@ -35,11 +35,23 @@ import org.apache.kafka.common.utils.Time; @@ -35,11 +35,23 @@ import org.apache.kafka.common.utils.Time;
*/
public class MockClient implements KafkaClient {
private class FutureResponse {
public final Struct responseBody;
public final boolean disconnected;
public FutureResponse(Struct responseBody, boolean disconnected) {
this.responseBody = responseBody;
this.disconnected = disconnected;
}
}
private final Time time;
private int correlation = 0;
private Node node = null;
private final Set<Integer> ready = new HashSet<Integer>();
private final Queue<ClientRequest> requests = new ArrayDeque<ClientRequest>();
private final Queue<ClientResponse> responses = new ArrayDeque<ClientResponse>();
private final Queue<FutureResponse> futureResponses = new ArrayDeque<FutureResponse>();
public MockClient(Time time) {
this.time = time;
@ -52,9 +64,8 @@ public class MockClient implements KafkaClient { @@ -52,9 +64,8 @@ public class MockClient implements KafkaClient {
@Override
public boolean ready(Node node, long now) {
boolean found = isReady(node, now);
ready.add(node.id());
return found;
return true;
}
@Override
@ -62,6 +73,11 @@ public class MockClient implements KafkaClient { @@ -62,6 +73,11 @@ public class MockClient implements KafkaClient {
return 0;
}
@Override
public boolean connectionFailed(Node node) {
return false;
}
public void disconnect(Integer node) {
Iterator<ClientRequest> iter = requests.iterator();
while (iter.hasNext()) {
@ -76,16 +92,25 @@ public class MockClient implements KafkaClient { @@ -76,16 +92,25 @@ public class MockClient implements KafkaClient {
@Override
public void send(ClientRequest request) {
this.requests.add(request);
if (!futureResponses.isEmpty()) {
FutureResponse futureResp = futureResponses.poll();
ClientResponse resp = new ClientResponse(request, time.milliseconds(), futureResp.disconnected, futureResp.responseBody);
responses.add(resp);
} else {
this.requests.add(request);
}
}
@Override
public List<ClientResponse> poll(long timeoutMs, long now) {
for (ClientResponse response: this.responses)
if (response.request().hasCallback())
List<ClientResponse> copy = new ArrayList<ClientResponse>(this.responses);
while (!this.responses.isEmpty()) {
ClientResponse response = this.responses.poll();
if (response.request().hasCallback())
response.request().callback().onComplete(response);
List<ClientResponse> copy = new ArrayList<ClientResponse>();
this.responses.clear();
}
return copy;
}
@ -107,8 +132,24 @@ public class MockClient implements KafkaClient { @@ -107,8 +132,24 @@ public class MockClient implements KafkaClient {
}
public void respond(Struct body) {
respond(body, false);
}
public void respond(Struct body, boolean disconnected) {
ClientRequest request = requests.remove();
responses.add(new ClientResponse(request, time.milliseconds(), false, body));
responses.add(new ClientResponse(request, time.milliseconds(), disconnected, body));
}
public void prepareResponse(Struct body) {
prepareResponse(body, false);
}
public void prepareResponse(Struct body, boolean disconnected) {
futureResponses.add(new FutureResponse(body, disconnected));
}
public void setNode(Node node) {
this.node = node;
}
@Override
@ -136,7 +177,7 @@ public class MockClient implements KafkaClient { @@ -136,7 +177,7 @@ public class MockClient implements KafkaClient {
@Override
public Node leastLoadedNode(long now) {
return null;
return this.node;
}
}

284
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java

@ -0,0 +1,284 @@ @@ -0,0 +1,284 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.ConsumerMetadataResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
public class CoordinatorTest {
private String topicName = "test";
private String groupId = "test-group";
private TopicPartition tp = new TopicPartition(topicName, 0);
private long retryBackoffMs = 0L;
private long sessionTimeoutMs = 10L;
private String rebalanceStrategy = "not-matter";
private MockTime time = new MockTime();
private MockClient client = new MockClient(time);
private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
private Node node = cluster.nodes().get(0);
private SubscriptionState subscriptions = new SubscriptionState();
private Metrics metrics = new Metrics(time);
private Map<String, String> metricTags = new LinkedHashMap<String, String>();
private Coordinator coordinator = new Coordinator(client,
groupId,
retryBackoffMs,
sessionTimeoutMs,
rebalanceStrategy,
metadata,
subscriptions,
metrics,
"consumer" + groupId,
metricTags,
time);
@Before
public void setup() {
metadata.update(cluster, time.milliseconds());
client.setNode(node);
}
@Test
public void testNormalHeartbeat() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
// normal heartbeat
time.sleep(sessionTimeoutMs);
coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
assertEquals(1, client.inFlightRequestCount());
client.respond(heartbeatResponse(Errors.NONE.code()));
assertEquals(1, client.poll(0, time.milliseconds()).size());
}
@Test
public void testCoordinatorNotAvailable() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
// consumer_coordinator_not_available will mark coordinator as unknown
time.sleep(sessionTimeoutMs);
coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
assertEquals(1, client.inFlightRequestCount());
client.respond(heartbeatResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()));
time.sleep(sessionTimeoutMs);
assertEquals(1, client.poll(0, time.milliseconds()).size());
assertTrue(coordinator.coordinatorUnknown());
}
@Test
public void testNotCoordinator() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
// not_coordinator will mark coordinator as unknown
time.sleep(sessionTimeoutMs);
coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
assertEquals(1, client.inFlightRequestCount());
client.respond(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_CONSUMER.code()));
time.sleep(sessionTimeoutMs);
assertEquals(1, client.poll(0, time.milliseconds()).size());
assertTrue(coordinator.coordinatorUnknown());
}
@Test
public void testIllegalGeneration() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
// illegal_generation will cause re-partition
subscriptions.subscribe(topicName);
subscriptions.changePartitionAssignment(Collections.singletonList(tp));
time.sleep(sessionTimeoutMs);
coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
assertEquals(1, client.inFlightRequestCount());
client.respond(heartbeatResponse(Errors.ILLEGAL_GENERATION.code()));
time.sleep(sessionTimeoutMs);
assertEquals(1, client.poll(0, time.milliseconds()).size());
assertTrue(subscriptions.partitionAssignmentNeeded());
}
@Test
public void testCoordinatorDisconnect() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
// coordinator disconnect will mark coordinator as unknown
time.sleep(sessionTimeoutMs);
coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
assertEquals(1, client.inFlightRequestCount());
client.respond(heartbeatResponse(Errors.NONE.code()), true); // return disconnected
time.sleep(sessionTimeoutMs);
assertEquals(1, client.poll(0, time.milliseconds()).size());
assertTrue(coordinator.coordinatorUnknown());
}
@Test
public void testNormalJoinGroup() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
// normal join group
client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()));
assertEquals(Collections.singletonList(tp),
coordinator.assignPartitions(Collections.singletonList(topicName), time.milliseconds()));
assertEquals(0, client.inFlightRequestCount());
}
@Test
public void testReJoinGroup() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
// diconnected from original coordinator will cause re-discover and join again
client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()), true);
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()));
assertEquals(Collections.singletonList(tp),
coordinator.assignPartitions(Collections.singletonList(topicName), time.milliseconds()));
assertEquals(0, client.inFlightRequestCount());
}
@Test
public void testCommitOffsetNormal() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
// sync commit
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds());
// async commit
coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds());
client.respond(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
assertEquals(1, client.poll(0, time.milliseconds()).size());
}
@Test
public void testCommitOffsetError() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
// async commit with coordinator not available
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds());
assertEquals(1, client.poll(0, time.milliseconds()).size());
assertTrue(coordinator.coordinatorUnknown());
// resume
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
// async commit with not coordinator
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds());
assertEquals(1, client.poll(0, time.milliseconds()).size());
assertTrue(coordinator.coordinatorUnknown());
// resume
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
// sync commit with not_coordinator
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds());
// sync commit with coordinator disconnected
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds());
}
@Test
public void testFetchOffset() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
// normal fetch
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
assertEquals(100L, (long) coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).get(tp));
// fetch with loading in progress
client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L));
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
assertEquals(100L, (long) coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).get(tp));
// fetch with not coordinator
client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code(), "", 100L));
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
assertEquals(100L, (long) coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).get(tp));
// fetch with no fetchable offsets
client.prepareResponse(offsetFetchResponse(tp, Errors.NO_OFFSETS_FETCHABLE.code(), "", 100L));
assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size());
// fetch with offset topic unknown
client.prepareResponse(offsetFetchResponse(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "", 100L));
assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size());
// fetch with offset -1
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size());
}
private Struct consumerMetadataResponse(Node node, short error) {
ConsumerMetadataResponse response = new ConsumerMetadataResponse(error, node);
return response.toStruct();
}
private Struct heartbeatResponse(short error) {
HeartbeatResponse response = new HeartbeatResponse(error);
return response.toStruct();
}
private Struct joinGroupResponse(int generationId, String consumerId, List<TopicPartition> assignedPartitions, short error) {
JoinGroupResponse response = new JoinGroupResponse(error, generationId, consumerId, assignedPartitions);
return response.toStruct();
}
private Struct offsetCommitResponse(Map<TopicPartition, Short> responseData) {
OffsetCommitResponse response = new OffsetCommitResponse(responseData);
return response.toStruct();
}
private Struct offsetFetchResponse(TopicPartition tp, Short error, String metadata, long offset) {
OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, metadata, error);
OffsetFetchResponse response = new OffsetFetchResponse(Collections.singletonMap(tp, data));
return response.toStruct();
}
}

177
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java

@ -0,0 +1,177 @@ @@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import static org.junit.Assert.assertEquals;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
public class FetcherTest {
private String topicName = "test";
private String groupId = "test-group";
private TopicPartition tp = new TopicPartition(topicName, 0);
private long retryBackoffMs = 0L;
private int minBytes = 1;
private int maxWaitMs = 0;
private int fetchSize = 1000;
private String offsetReset = "EARLIEST";
private MockTime time = new MockTime();
private MockClient client = new MockClient(time);
private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
private Node node = cluster.nodes().get(0);
private SubscriptionState subscriptions = new SubscriptionState();
private Metrics metrics = new Metrics(time);
private Map<String, String> metricTags = new LinkedHashMap<String, String>();
private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
private Fetcher<byte[], byte[]> fetcher = new Fetcher<byte[], byte[]>(client,
retryBackoffMs,
minBytes,
maxWaitMs,
fetchSize,
true, // check crc
offsetReset,
new ByteArrayDeserializer(),
new ByteArrayDeserializer(),
metadata,
subscriptions,
metrics,
"consumer" + groupId,
metricTags,
time);
@Before
public void setup() throws Exception {
metadata.update(cluster, time.milliseconds());
client.setNode(node);
records.append(1L, "key".getBytes(), "value-1".getBytes());
records.append(2L, "key".getBytes(), "value-2".getBytes());
records.append(3L, "key".getBytes(), "value-3".getBytes());
records.close();
records.rewind();
}
@Test
public void testFetchNormal() {
List<ConsumerRecord<byte[], byte[]>> records;
subscriptions.subscribe(tp);
subscriptions.fetched(tp, 0);
subscriptions.consumed(tp, 0);
// normal fetch
fetcher.initFetches(cluster, time.milliseconds());
client.respond(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
client.poll(0, time.milliseconds());
records = fetcher.fetchedRecords().get(tp);
assertEquals(3, records.size());
assertEquals(4L, (long) subscriptions.fetched(tp)); // this is the next fetching position
assertEquals(4L, (long) subscriptions.consumed(tp));
long offset = 1;
for (ConsumerRecord<byte[], byte[]> record : records) {
assertEquals(offset, record.offset());
offset += 1;
}
}
@Test
public void testFetchFailed() {
List<ConsumerRecord<byte[], byte[]>> records;
subscriptions.subscribe(tp);
subscriptions.fetched(tp, 0);
subscriptions.consumed(tp, 0);
// fetch with not leader
fetcher.initFetches(cluster, time.milliseconds());
client.respond(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L));
client.poll(0, time.milliseconds());
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
// fetch with unknown topic partition
fetcher.initFetches(cluster, time.milliseconds());
client.respond(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L));
client.poll(0, time.milliseconds());
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
// fetch with out of range
subscriptions.fetched(tp, 5);
fetcher.initFetches(cluster, time.milliseconds());
client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
client.prepareResponse(listOffsetResponse(Collections.singletonList(0L), Errors.NONE.code()));
client.poll(0, time.milliseconds());
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(0L, (long) subscriptions.fetched(tp));
assertEquals(0L, (long) subscriptions.consumed(tp));
}
@Test
public void testFetchOutOfRange() {
List<ConsumerRecord<byte[], byte[]>> records;
subscriptions.subscribe(tp);
subscriptions.fetched(tp, 5);
subscriptions.consumed(tp, 5);
// fetch with out of range
fetcher.initFetches(cluster, time.milliseconds());
client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
client.prepareResponse(listOffsetResponse(Collections.singletonList(0L), Errors.NONE.code()));
client.poll(0, time.milliseconds());
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(0L, (long) subscriptions.fetched(tp));
assertEquals(0L, (long) subscriptions.consumed(tp));
}
private Struct fetchResponse(ByteBuffer buffer, short error, long hw) {
FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)));
return response.toStruct();
}
private Struct listOffsetResponse(List<Long> offsets, short error) {
ListOffsetResponse response = new ListOffsetResponse(Collections.singletonMap(tp, new ListOffsetResponse.PartitionData(error, offsets)));
return response.toStruct();
}
}

45
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java

@ -0,0 +1,45 @@ @@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.utils.MockTime;
import org.junit.Test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class HeartbeatTest {
private long timeout = 300L;
private MockTime time = new MockTime();
private Heartbeat heartbeat = new Heartbeat(timeout, -1L);
@Test
public void testShouldHeartbeat() {
heartbeat.sentHeartbeat(time.milliseconds());
time.sleep((long) ((float) timeout / Heartbeat.HEARTBEATS_PER_SESSION_INTERVAL * 1.1));
assertTrue(heartbeat.shouldHeartbeat(time.milliseconds()));
}
@Test
public void testShouldNotHeartbeat() {
heartbeat.sentHeartbeat(time.milliseconds());
time.sleep(timeout / (2 * Heartbeat.HEARTBEATS_PER_SESSION_INTERVAL));
assertFalse(heartbeat.shouldHeartbeat(time.milliseconds()));
}
}

3
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java

@ -16,7 +16,8 @@ @@ -16,7 +16,8 @@
*/
package org.apache.kafka.clients.consumer.internals;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static java.util.Arrays.asList;
import java.util.Collections;

3
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java

@ -76,6 +76,7 @@ public class RecordAccumulatorTest { @@ -76,6 +76,7 @@ public class RecordAccumulatorTest {
List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
batch.records.rewind();
Iterator<LogEntry> iter = batch.records.iterator();
for (int i = 0; i < appends; i++) {
LogEntry entry = iter.next();
@ -104,6 +105,7 @@ public class RecordAccumulatorTest { @@ -104,6 +105,7 @@ public class RecordAccumulatorTest {
List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
batch.records.rewind();
Iterator<LogEntry> iter = batch.records.iterator();
LogEntry entry = iter.next();
assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
@ -156,6 +158,7 @@ public class RecordAccumulatorTest { @@ -156,6 +158,7 @@ public class RecordAccumulatorTest {
List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
if (batches != null) {
for (RecordBatch batch : batches) {
batch.records.rewind();
for (LogEntry entry : batch.records)
read++;
accum.deallocate(batch);

24
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

@ -16,6 +16,7 @@ import static org.junit.Assert.assertEquals; @@ -16,6 +16,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@ -27,11 +28,10 @@ import org.apache.kafka.clients.producer.RecordMetadata; @@ -27,11 +28,10 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
@ -76,7 +76,7 @@ public class SenderTest { @@ -76,7 +76,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // connect
sender.run(time.milliseconds()); // send produce request
assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code()));
client.respond(produceResponse(tp, offset, Errors.NONE.code()));
sender.run(time.milliseconds());
assertEquals("All requests completed.", offset, (long) client.inFlightRequestCount());
sender.run(time.milliseconds());
@ -110,7 +110,7 @@ public class SenderTest { @@ -110,7 +110,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // resend
assertEquals(1, client.inFlightRequestCount());
long offset = 0;
client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code()));
client.respond(produceResponse(tp, offset, Errors.NONE.code()));
sender.run(time.milliseconds());
assertTrue("Request should have retried and completed", future.isDone());
assertEquals(offset, future.get().offset());
@ -138,17 +138,11 @@ public class SenderTest { @@ -138,17 +138,11 @@ public class SenderTest {
}
}
private Struct produceResponse(String topic, int part, long offset, int error) {
Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
Struct response = struct.instance("responses");
response.set("topic", topic);
Struct partResp = response.instance("partition_responses");
partResp.set("partition", part);
partResp.set("error_code", (short) error);
partResp.set("base_offset", offset);
response.set("partition_responses", new Object[] {partResp});
struct.set("responses", new Object[] {response});
return struct;
private Struct produceResponse(TopicPartition tp, long offset, int error) {
ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse((short) error, offset);
Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
ProduceResponse response = new ProduceResponse(partResp);
return response.toStruct();
}
}

2
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java

@ -50,7 +50,9 @@ public class MemoryRecordsTest { @@ -50,7 +50,9 @@ public class MemoryRecordsTest {
recs2.append(i, toArray(r.key()), toArray(r.value()));
}
recs1.close();
recs1.rewind();
recs2.close();
recs2.rewind();
for (int iteration = 0; iteration < 2; iteration++) {
for (MemoryRecords recs : Arrays.asList(recs1, recs2)) {

6
core/src/main/scala/kafka/common/ErrorMapping.scala

@ -49,6 +49,9 @@ object ErrorMapping { @@ -49,6 +49,9 @@ object ErrorMapping {
val MessageSetSizeTooLargeCode: Short = 18
val NotEnoughReplicasCode : Short = 19
val NotEnoughReplicasAfterAppendCode: Short = 20
// 21: InvalidRequiredAcks
// 22: IllegalConsumerGeneration
val NoOffsetsCommittedCode: Short = 23
private val exceptionToCode =
Map[Class[Throwable], Short](
@ -70,7 +73,8 @@ object ErrorMapping { @@ -70,7 +73,8 @@ object ErrorMapping {
classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode,
classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode,
classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode,
classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode
classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode,
classOf[NoOffsetsCommittedException].asInstanceOf[Class[Throwable]] -> NoOffsetsCommittedCode
).withDefaultValue(UnknownCode)
/* invert the mapping */

27
core/src/main/scala/kafka/common/NoOffsetsCommittedException.scala

@ -0,0 +1,27 @@ @@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.common
/**
* Number of insync replicas for the partition is lower than min.insync.replicas
* This exception is raised when the low ISR size is discovered *after* the message
* was already appended to the log. Producer retries will cause duplicates.
*/
class NoOffsetsCommittedException(message: String) extends RuntimeException(message) {
def this() = this(null)
}

2
core/src/main/scala/kafka/common/OffsetMetadataAndError.scala

@ -48,7 +48,7 @@ case class OffsetMetadataAndError(offset: Long, @@ -48,7 +48,7 @@ case class OffsetMetadataAndError(offset: Long,
}
object OffsetMetadataAndError {
val NoOffset = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NoError)
val NoOffset = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NoOffsetsCommittedCode)
val OffsetsLoading = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.OffsetsLoadInProgressCode)
val NotOffsetManagerForGroup = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NotCoordinatorForConsumerCode)
val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.UnknownTopicOrPartitionCode)

38
core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala

@ -25,6 +25,7 @@ import kafka.utils._ @@ -25,6 +25,7 @@ import kafka.utils._
import scala.collection.mutable.HashMap
import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
import org.apache.kafka.common.requests.JoinGroupRequest
/**
@ -114,9 +115,15 @@ class ConsumerCoordinator(val config: KafkaConfig, @@ -114,9 +115,15 @@ class ConsumerCoordinator(val config: KafkaConfig,
if (!consumerGroupRegistries.contains(groupId))
createNewGroup(groupId, partitionAssignmentStrategy)
val groupRegistry = consumerGroupRegistries(groupId)
// if the consumer id is unknown or it does exists in
// the group yet, register this consumer to the group
// TODO
if (consumerId.equals(JoinGroupRequest.UNKNOWN_CONSUMER_ID)) {
createNewConsumer(groupId, groupRegistry.generateNextConsumerId, topics, sessionTimeoutMs)
} else if (!groupRegistry.memberRegistries.contains(consumerId)) {
createNewConsumer(groupId, consumerId, topics, sessionTimeoutMs)
}
// add a delayed join-group operation to the purgatory
// TODO
@ -146,9 +153,9 @@ class ConsumerCoordinator(val config: KafkaConfig, @@ -146,9 +153,9 @@ class ConsumerCoordinator(val config: KafkaConfig,
* Process a heartbeat request from a consumer
*/
def consumerHeartbeat(groupId: String,
consumerId: String,
generationId: Int,
responseCallback: Short => Unit) {
consumerId: String,
generationId: Int,
responseCallback: Short => Unit) {
// check that the group already exists
// TODO
@ -171,21 +178,28 @@ class ConsumerCoordinator(val config: KafkaConfig, @@ -171,21 +178,28 @@ class ConsumerCoordinator(val config: KafkaConfig,
// TODO: this is just a stub for new consumer testing,
// TODO: needs to be replaced with the logic above
// TODO --------------------------------------------------------------
// always return OK for heartbeat immediately
responseCallback(Errors.NONE.code)
// check if the consumer already exist, if yes return OK,
// otherwise return illegal generation error
if (consumerGroupRegistries.contains(groupId)
&& consumerGroupRegistries(groupId).memberRegistries.contains(consumerId))
responseCallback(Errors.NONE.code)
else
responseCallback(Errors.ILLEGAL_GENERATION.code)
}
/**
* Create a new consumer
*/
private def createNewConsumer(consumerId: String,
private def createNewConsumer(groupId: String,
consumerId: String,
topics: List[String],
sessionTimeoutMs: Int,
groupRegistry: GroupRegistry) {
debug("Registering consumer " + consumerId + " for group " + groupRegistry.groupId)
sessionTimeoutMs: Int) {
debug("Registering consumer " + consumerId + " for group " + groupId)
// create the new consumer registry entry
// TODO: specify consumerId as unknown and update at the end of the prepare-rebalance phase
val consumerRegistry = new ConsumerRegistry(groupId, consumerId, topics, sessionTimeoutMs)
consumerGroupRegistries(groupId).memberRegistries.put(consumerId, consumerRegistry)
// check if the partition assignment strategy is consistent with the group
// TODO
@ -202,7 +216,7 @@ class ConsumerCoordinator(val config: KafkaConfig, @@ -202,7 +216,7 @@ class ConsumerCoordinator(val config: KafkaConfig,
// start preparing group partition rebalance
// TODO
info("Registered consumer " + consumerId + " for group " + groupRegistry.groupId)
info("Registered consumer " + consumerId + " for group " + groupId)
}
/**

8
core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala

@ -32,10 +32,10 @@ import java.util.HashMap @@ -32,10 +32,10 @@ import java.util.HashMap
* 1. subscribed topic list
* 2. assigned partitions for the subscribed topics.
*/
class ConsumerRegistry(val consumerId: String,
val subscribedTopics: List[String],
val sessionTimeoutMs: Int,
val groupRegistry: GroupRegistry) {
class ConsumerRegistry(val groupId: String,
val consumerId: String,
val topics: List[String],
val sessionTimeoutMs: Int) {
/* number of expired heartbeat recorded */
val numExpiredHeartbeat = new AtomicInteger(0)

2
core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala

@ -43,6 +43,6 @@ class DelayedHeartbeat(sessionTimeout: Long, @@ -43,6 +43,6 @@ class DelayedHeartbeat(sessionTimeout: Long,
/* mark all consumers within the heartbeat as heartbeat timed out */
override def onComplete() {
for (registry <- bucket.consumerRegistryList)
expireCallback(registry.groupRegistry.groupId, registry.consumerId)
expireCallback(registry.groupId, registry.consumerId)
}
}

7
core/src/main/scala/kafka/coordinator/GroupRegistry.scala

@ -18,6 +18,7 @@ @@ -18,6 +18,7 @@
package kafka.coordinator
import scala.collection.mutable
import java.util.concurrent.atomic.AtomicInteger
sealed trait GroupStates { def state: Byte }
@ -69,6 +70,10 @@ class GroupRegistry(val groupId: String, @@ -69,6 +70,10 @@ class GroupRegistry(val groupId: String,
val state: GroupState = new GroupState()
var generationId: Int = 1
val generationId = new AtomicInteger(1)
val nextConsumerId = new AtomicInteger(1)
def generateNextConsumerId = groupId + "-" + nextConsumerId.getAndIncrement
}

2
core/src/main/scala/kafka/server/KafkaServer.scala

@ -311,7 +311,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg @@ -311,7 +311,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")
val canShutdown = isShuttingDown.compareAndSet(false, true)
if (canShutdown) {
if (canShutdown && shutdownLatch.getCount > 0) {
Utils.swallow(controlledShutdown())
brokerState.newState(BrokerShuttingDown)
if(socketServer != null)

301
core/src/test/scala/integration/kafka/api/ConsumerTest.scala

@ -0,0 +1,301 @@ @@ -0,0 +1,301 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package kafka.api
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.CommitType
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException
import kafka.utils.{ShutdownableThread, TestUtils, Logging}
import kafka.server.OffsetManager
import java.util.ArrayList
import org.junit.Assert._
import scala.collection.JavaConversions._
/**
* Integration tests for the new consumer that cover basic usage as well as server failures
*/
class ConsumerTest extends IntegrationTestHarness with Logging {
val producerCount = 1
val consumerCount = 2
val serverCount = 3
val topic = "topic"
val part = 0
val tp = new TopicPartition(topic, part)
// configure the servers and clients
this.serverConfig.setProperty("controlled.shutdown.enable", "false") // speed up shutdown
this.serverConfig.setProperty("offsets.topic.replication.factor", "3") // don't want to lose offset
this.serverConfig.setProperty("offsets.topic.num.partitions", "1")
this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
override def setUp() {
super.setUp()
// create the test topic with all the brokers as replicas
TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
}
def testSimpleConsumption() {
val numRecords = 10000
sendRecords(numRecords)
assertEquals(0, this.consumers(0).subscriptions.size)
this.consumers(0).subscribe(tp)
assertEquals(1, this.consumers(0).subscriptions.size)
this.consumers(0).seek(tp, 0)
consumeRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0)
}
def testAutoOffsetReset() {
sendRecords(1)
this.consumers(0).subscribe(tp)
consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
}
def testSeek() {
val consumer = this.consumers(0)
val totalRecords = 50L
sendRecords(totalRecords.toInt)
consumer.subscribe(tp)
consumer.seekToEnd(tp)
assertEquals(totalRecords, consumer.position(tp))
assertFalse(consumer.poll(totalRecords).iterator().hasNext)
consumer.seekToBeginning(tp)
assertEquals(0, consumer.position(tp), 0)
consumeRecords(consumer, numRecords = 1, startingOffset = 0)
val mid = totalRecords / 2
consumer.seek(tp, mid)
assertEquals(mid, consumer.position(tp))
consumeRecords(consumer, numRecords = 1, startingOffset = mid.toInt)
}
def testGroupConsumption() {
sendRecords(10)
this.consumers(0).subscribe(topic)
consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
}
def testPositionAndCommit() {
sendRecords(5)
// committed() on a partition with no committed offset throws an exception
intercept[NoOffsetForPartitionException] {
this.consumers(0).committed(new TopicPartition(topic, 15))
}
// position() on a partition that we aren't subscribed to throws an exception
intercept[IllegalArgumentException] {
this.consumers(0).position(new TopicPartition(topic, 15))
}
this.consumers(0).subscribe(tp)
assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp))
this.consumers(0).commit(CommitType.SYNC)
assertEquals(0L, this.consumers(0).committed(tp))
consumeRecords(this.consumers(0), 5, 0)
assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp))
this.consumers(0).commit(CommitType.SYNC)
assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp))
sendRecords(1)
// another consumer in the same group should get the same position
this.consumers(1).subscribe(tp)
consumeRecords(this.consumers(1), 1, 5)
}
def testPartitionsFor() {
val numParts = 2
TestUtils.createTopic(this.zkClient, "part-test", numParts, 1, this.servers)
val parts = this.consumers(0).partitionsFor("part-test")
assertNotNull(parts)
assertEquals(2, parts.length)
assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
}
def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(5)
/*
* 1. Produce a bunch of messages
* 2. Then consume the messages while killing and restarting brokers at random
*/
def consumeWithBrokerFailures(numIters: Int) {
val numRecords = 1000
sendRecords(numRecords)
this.producers.map(_.close)
var consumed = 0
val consumer = this.consumers(0)
consumer.subscribe(topic)
val scheduler = new BounceBrokerScheduler(numIters)
scheduler.start()
while (scheduler.isRunning.get()) {
for (record <- consumer.poll(100)) {
assertEquals(consumed.toLong, record.offset())
consumed += 1
}
consumer.commit(CommitType.SYNC)
if (consumed == numRecords) {
consumer.seekToBeginning()
consumed = 0
}
}
scheduler.shutdown()
}
def testSeekAndCommitWithBrokerFailures() = seekAndCommitWithBrokerFailures(5)
def seekAndCommitWithBrokerFailures(numIters: Int) {
val numRecords = 1000
sendRecords(numRecords)
this.producers.map(_.close)
val consumer = this.consumers(0)
consumer.subscribe(tp)
consumer.seek(tp, 0)
val scheduler = new BounceBrokerScheduler(numIters)
scheduler.start()
while(scheduler.isRunning.get()) {
val coin = TestUtils.random.nextInt(3)
if (coin == 0) {
info("Seeking to end of log")
consumer.seekToEnd()
assertEquals(numRecords.toLong, consumer.position(tp))
} else if (coin == 1) {
val pos = TestUtils.random.nextInt(numRecords).toLong
info("Seeking to " + pos)
consumer.seek(tp, pos)
assertEquals(pos, consumer.position(tp))
} else if (coin == 2) {
info("Committing offset.")
consumer.commit(CommitType.SYNC)
assertEquals(consumer.position(tp), consumer.committed(tp))
}
}
}
def testPartitionReassignmentCallback() {
val callback = new TestConsumerReassignmentCallback()
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "200"); // timeout quickly to avoid slow test
val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer())
consumer0.subscribe(topic)
// the initial subscription should cause a callback execution
while(callback.callsToAssigned == 0)
consumer0.poll(50)
// get metadata for the topic
var parts = consumer0.partitionsFor(OffsetManager.OffsetsTopicName)
while(parts == null)
parts = consumer0.partitionsFor(OffsetManager.OffsetsTopicName)
assertEquals(1, parts.size)
assertNotNull(parts(0).leader())
// shutdown the coordinator
val coordinator = parts(0).leader().id()
this.servers(coordinator).shutdown()
// this should cause another callback execution
while(callback.callsToAssigned < 2)
consumer0.poll(50)
assertEquals(2, callback.callsToAssigned)
assertEquals(2, callback.callsToRevoked)
consumer0.close()
}
private class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback {
var callsToAssigned = 0
var callsToRevoked = 0
def onPartitionsAssigned(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) {
info("onPartitionsAssigned called.")
callsToAssigned += 1
}
def onPartitionsRevoked(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) {
info("onPartitionsRevoked called.")
callsToRevoked += 1
}
}
private class BounceBrokerScheduler(val numIters: Int) extends ShutdownableThread("daemon-bounce-broker", false)
{
var iter: Int = 0
override def doWork(): Unit = {
killRandomBroker()
restartDeadBrokers()
iter += 1
if (iter == numIters)
initiateShutdown()
else
Thread.sleep(500)
}
}
private def sendRecords(numRecords: Int) {
val futures = (0 until numRecords).map { i =>
this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
}
futures.map(_.get)
}
private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int) {
val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
val maxIters = numRecords * 300
var iters = 0
while (records.size < numRecords) {
for (record <- consumer.poll(50))
records.add(record)
if(iters > maxIters)
throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.");
iters += 1
}
for (i <- 0 until numRecords) {
val record = records.get(i)
val offset = startingOffset + i
assertEquals(topic, record.topic())
assertEquals(part, record.partition())
assertEquals(offset.toLong, record.offset())
}
}
}

12
core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala

@ -19,14 +19,11 @@ package kafka.api @@ -19,14 +19,11 @@ package kafka.api
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.scalatest.junit.JUnit3Suite
import collection._
import kafka.utils.TestUtils
import java.util.Properties
import java.util.Arrays
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import kafka.server.KafkaConfig
import kafka.server.{OffsetManager, KafkaConfig}
import kafka.integration.KafkaServerTestHarness
import scala.collection.mutable.Buffer
@ -62,6 +59,13 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { @@ -62,6 +59,13 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
producers += new KafkaProducer(producerConfig)
for(i <- 0 until consumerCount)
consumers += new KafkaConsumer(consumerConfig)
// create the consumer offset topic
TestUtils.createTopic(zkClient, OffsetManager.OffsetsTopicName,
serverConfig.getProperty("offsets.topic.num.partitions").toInt,
serverConfig.getProperty("offsets.topic.replication.factor").toInt,
servers,
servers(0).offsetManager.offsetsTopicConfig)
}
override def tearDown() {

2
core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala

@ -15,7 +15,7 @@ @@ -15,7 +15,7 @@
* limitations under the License.
*/
package kafka.api.test
package kafka.api
import org.junit.Test
import org.junit.Assert._

2
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala

@ -75,7 +75,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { @@ -75,7 +75,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
*/
def restartDeadBrokers() {
for(i <- 0 until servers.length if !alive(i)) {
servers(i) = TestUtils.createServer(configs(i))
servers(i).startup()
alive(i) = true
}
}

2
core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala

@ -159,7 +159,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -159,7 +159,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error)
assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.error)
assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error)
assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error)
assertEquals(ErrorMapping.NoOffsetsCommittedCode, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error)
assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error)
assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get)
assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get)

22
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -48,6 +48,7 @@ import org.apache.kafka.clients.producer.KafkaProducer @@ -48,6 +48,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
import collection.Iterable
import scala.collection.Map
import org.apache.kafka.clients.consumer.KafkaConsumer
/**
* Utility functions to help with testing
@ -406,6 +407,27 @@ object TestUtils extends Logging { @@ -406,6 +407,27 @@ object TestUtils extends Logging {
return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
}
/**
* Create a new consumer with a few pre-configured properties.
*/
def createNewConsumer(brokerList: String,
groupId: String,
autoOffsetReset: String = "earliest",
partitionFetchSize: Long = 4096L) : KafkaConsumer[Array[Byte],Array[Byte]] = {
import org.apache.kafka.clients.consumer.ConsumerConfig
val consumerProps= new Properties()
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset)
consumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, partitionFetchSize.toString)
consumerProps.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
consumerProps.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
return new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
}
/**
* Create a default producer config properties map with the given metadata broker list
*/

Loading…
Cancel
Save