diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index 25d995b5782..b0c3952ff90 100644 --- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -93,6 +93,8 @@ class ProducerSendThread[K,V](val threadName: String, events = new ListBuffer[ProducerData[K,V]] } } + // send the last batch of events + tryToHandle(events) if(queue.size > 0) throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue" .format(queue.size))