From 3aa909575d3f978a5045200986eb692eebd47ab8 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 19 Apr 2019 18:44:27 -0700 Subject: [PATCH] MINOR: Java8 cleanup (#6598) Reviewers: Bill Bejeck , Guozhang Wang --- .../streams/tests/StreamsUpgradeTest.java | 53 ++++++++----------- 1 file changed, 23 insertions(+), 30 deletions(-) diff --git a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 2e108b2e989..797c1e83f96 100644 --- a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -16,17 +16,17 @@ */ package org.apache.kafka.streams.tests; -import java.util.Properties; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; +import java.util.Properties; + public class StreamsUpgradeTest { @@ -59,40 +59,33 @@ public class StreamsUpgradeTest { final KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - streams.close(); - System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); - System.out.flush(); - } - }); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + streams.close(); + System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); + System.out.flush(); + })); } private static ProcessorSupplier printProcessorSupplier() { - return new ProcessorSupplier() { - public Processor get() { - return new AbstractProcessor() { - private int numRecordsProcessed = 0; - - @Override - public void init(final ProcessorContext context) { - System.out.println("[2.1] initializing processor: topic=data taskId=" + context.taskId()); - numRecordsProcessed = 0; - } + return () -> new AbstractProcessor() { + private int numRecordsProcessed = 0; - @Override - public void process(final K key, final V value) { - numRecordsProcessed++; - if (numRecordsProcessed % 100 == 0) { - System.out.println("processed " + numRecordsProcessed + " records from topic=data"); - } - } + @Override + public void init(final ProcessorContext context) { + System.out.println("[2.1] initializing processor: topic=data taskId=" + context.taskId()); + numRecordsProcessed = 0; + } - @Override - public void close() {} - }; + @Override + public void process(final K key, final V value) { + numRecordsProcessed++; + if (numRecordsProcessed % 100 == 0) { + System.out.println("processed " + numRecordsProcessed + " records from topic=data"); + } } + + @Override + public void close() {} }; } }