diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 23225e8bf2a..d1b5bb895c2 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -567,7 +567,7 @@ class KafkaApis(val requestChannel: RequestChannel, partitionData: FetchResponse.PartitionData[Records]): FetchResponse.PartitionData[BaseRecords] = { val logConfig = replicaManager.getLogConfig(tp) - if (logConfig.forall(_.compressionType == ZStdCompressionCodec.name) && versionId < 10) { + if (logConfig.exists(_.compressionType == ZStdCompressionCodec.name) && versionId < 10) { trace(s"Fetching messages is disabled for ZStandard compressed partition $tp. Sending unsupported version response to $clientId.") errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE) } else { diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index a3ecb0773c1..f3eae1ea8bf 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -18,6 +18,7 @@ package kafka.server import java.net.InetAddress +import java.nio.charset.StandardCharsets import java.util import java.util.{Collections, Optional} import java.util.Arrays.asList @@ -39,14 +40,15 @@ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.FileRecords.TimestampAndOffset -import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint} import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry -import org.apache.kafka.common.requests._ +import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.easymock.{Capture, EasyMock, IAnswer} -import org.junit.Assert.{assertEquals, assertTrue} +import EasyMock._ +import org.junit.Assert.{assertEquals, assertNull, assertTrue} import org.junit.{After, Test} import scala.collection.JavaConverters._ @@ -433,6 +435,66 @@ class KafkaApisTest { assertEquals(Set(0), response.brokers.asScala.map(_.id).toSet) } + /** + * Verifies that sending a fetch request with version 9 works correctly when + * ReplicaManager.getLogConfig returns None. + */ + @Test + def testFetchRequestV9WithNoLogConfig(): Unit = { + val tp = new TopicPartition("foo", 0) + setupBasicMetadataCache(tp.topic, numPartitions = 1) + val hw = 3 + val timestamp = 1000 + + expect(replicaManager.getLogConfig(EasyMock.eq(tp))).andReturn(None) + + replicaManager.fetchMessages(anyLong, anyInt, anyInt, anyInt, anyBoolean, + anyObject[Seq[(TopicPartition, FetchRequest.PartitionData)]], anyObject[ReplicaQuota], + anyObject[Seq[(TopicPartition, FetchPartitionData)] => Unit](), anyObject[IsolationLevel]) + expectLastCall[Unit].andAnswer(new IAnswer[Unit] { + def answer: Unit = { + val callback = getCurrentArguments.apply(7).asInstanceOf[(Seq[(TopicPartition, FetchPartitionData)] => Unit)] + val records = MemoryRecords.withRecords(CompressionType.NONE, + new SimpleRecord(timestamp, "foo".getBytes(StandardCharsets.UTF_8))) + callback(Seq(tp -> new FetchPartitionData(Errors.NONE, hw, 0, records, + None, None))) + } + }) + + val fetchData = Map(tp -> new FetchRequest.PartitionData(0, 0, 1000, + Optional.empty())).asJava + val fetchMetadata = new JFetchMetadata(0, 0) + val fetchContext = new FullFetchContext(time, new FetchSessionCache(1000, 100), + fetchMetadata, fetchData, false) + expect(fetchManager.newContext(anyObject[JFetchMetadata], + anyObject[util.Map[TopicPartition, FetchRequest.PartitionData]], + anyObject[util.List[TopicPartition]], + anyBoolean)).andReturn(fetchContext) + + val capturedResponse = expectNoThrottling() + EasyMock.expect(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + anyObject[RequestChannel.Request](), anyDouble, anyLong)).andReturn(0) + + EasyMock.replay(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, fetchManager) + + val builder = new FetchRequest.Builder(9, 9, -1, 100, 0, fetchData) + val (fetchRequest, request) = buildRequest(builder) + createKafkaApis().handleFetchRequest(request) + + val response = readResponse(ApiKeys.FETCH, fetchRequest, capturedResponse) + .asInstanceOf[FetchResponse[BaseRecords]] + assertTrue(response.responseData.containsKey(tp)) + + val partitionData = response.responseData.get(tp) + assertEquals(Errors.NONE, partitionData.error) + assertEquals(hw, partitionData.highWatermark) + assertEquals(-1, partitionData.lastStableOffset) + assertEquals(0, partitionData.logStartOffset) + assertEquals(timestamp, + partitionData.records.asInstanceOf[MemoryRecords].batches.iterator.next.maxTimestamp) + assertNull(partitionData.abortedTransactions) + } + /** * Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively. */