Browse Source

KAFKA-7935: UNSUPPORTED_COMPRESSION_TYPE if ReplicaManager.getLogConfig returns None (#6274)

Replaced `forall` with `exists`. Added a unit test to `KafkaApisTest` that failed before the change.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
pull/6285/head
Ismael Juma 6 years ago committed by Rajini Sivaram
parent
commit
45a896e741
  1. 2
      core/src/main/scala/kafka/server/KafkaApis.scala
  2. 68
      core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

2
core/src/main/scala/kafka/server/KafkaApis.scala

@ -567,7 +567,7 @@ class KafkaApis(val requestChannel: RequestChannel,
partitionData: FetchResponse.PartitionData[Records]): FetchResponse.PartitionData[BaseRecords] = { partitionData: FetchResponse.PartitionData[Records]): FetchResponse.PartitionData[BaseRecords] = {
val logConfig = replicaManager.getLogConfig(tp) val logConfig = replicaManager.getLogConfig(tp)
if (logConfig.forall(_.compressionType == ZStdCompressionCodec.name) && versionId < 10) { if (logConfig.exists(_.compressionType == ZStdCompressionCodec.name) && versionId < 10) {
trace(s"Fetching messages is disabled for ZStandard compressed partition $tp. Sending unsupported version response to $clientId.") trace(s"Fetching messages is disabled for ZStandard compressed partition $tp. Sending unsupported version response to $clientId.")
errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE) errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)
} else { } else {

68
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

@ -18,6 +18,7 @@
package kafka.server package kafka.server
import java.net.InetAddress import java.net.InetAddress
import java.nio.charset.StandardCharsets
import java.util import java.util
import java.util.{Collections, Optional} import java.util.{Collections, Optional}
import java.util.Arrays.asList import java.util.Arrays.asList
@ -39,14 +40,15 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint} import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint}
import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry
import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.easymock.{Capture, EasyMock, IAnswer} import org.easymock.{Capture, EasyMock, IAnswer}
import org.junit.Assert.{assertEquals, assertTrue} import EasyMock._
import org.junit.Assert.{assertEquals, assertNull, assertTrue}
import org.junit.{After, Test} import org.junit.{After, Test}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
@ -433,6 +435,66 @@ class KafkaApisTest {
assertEquals(Set(0), response.brokers.asScala.map(_.id).toSet) assertEquals(Set(0), response.brokers.asScala.map(_.id).toSet)
} }
/**
* Verifies that sending a fetch request with version 9 works correctly when
* ReplicaManager.getLogConfig returns None.
*/
@Test
def testFetchRequestV9WithNoLogConfig(): Unit = {
val tp = new TopicPartition("foo", 0)
setupBasicMetadataCache(tp.topic, numPartitions = 1)
val hw = 3
val timestamp = 1000
expect(replicaManager.getLogConfig(EasyMock.eq(tp))).andReturn(None)
replicaManager.fetchMessages(anyLong, anyInt, anyInt, anyInt, anyBoolean,
anyObject[Seq[(TopicPartition, FetchRequest.PartitionData)]], anyObject[ReplicaQuota],
anyObject[Seq[(TopicPartition, FetchPartitionData)] => Unit](), anyObject[IsolationLevel])
expectLastCall[Unit].andAnswer(new IAnswer[Unit] {
def answer: Unit = {
val callback = getCurrentArguments.apply(7).asInstanceOf[(Seq[(TopicPartition, FetchPartitionData)] => Unit)]
val records = MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord(timestamp, "foo".getBytes(StandardCharsets.UTF_8)))
callback(Seq(tp -> new FetchPartitionData(Errors.NONE, hw, 0, records,
None, None)))
}
})
val fetchData = Map(tp -> new FetchRequest.PartitionData(0, 0, 1000,
Optional.empty())).asJava
val fetchMetadata = new JFetchMetadata(0, 0)
val fetchContext = new FullFetchContext(time, new FetchSessionCache(1000, 100),
fetchMetadata, fetchData, false)
expect(fetchManager.newContext(anyObject[JFetchMetadata],
anyObject[util.Map[TopicPartition, FetchRequest.PartitionData]],
anyObject[util.List[TopicPartition]],
anyBoolean)).andReturn(fetchContext)
val capturedResponse = expectNoThrottling()
EasyMock.expect(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
anyObject[RequestChannel.Request](), anyDouble, anyLong)).andReturn(0)
EasyMock.replay(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, fetchManager)
val builder = new FetchRequest.Builder(9, 9, -1, 100, 0, fetchData)
val (fetchRequest, request) = buildRequest(builder)
createKafkaApis().handleFetchRequest(request)
val response = readResponse(ApiKeys.FETCH, fetchRequest, capturedResponse)
.asInstanceOf[FetchResponse[BaseRecords]]
assertTrue(response.responseData.containsKey(tp))
val partitionData = response.responseData.get(tp)
assertEquals(Errors.NONE, partitionData.error)
assertEquals(hw, partitionData.highWatermark)
assertEquals(-1, partitionData.lastStableOffset)
assertEquals(0, partitionData.logStartOffset)
assertEquals(timestamp,
partitionData.records.asInstanceOf[MemoryRecords].batches.iterator.next.maxTimestamp)
assertNull(partitionData.abortedTransactions)
}
/** /**
* Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively. * Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively.
*/ */

Loading…
Cancel
Save