|
|
@ -16,17 +16,17 @@ |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
package org.apache.kafka.streams.tests; |
|
|
|
package org.apache.kafka.streams.tests; |
|
|
|
|
|
|
|
|
|
|
|
import java.util.Properties; |
|
|
|
|
|
|
|
import org.apache.kafka.common.utils.Utils; |
|
|
|
import org.apache.kafka.common.utils.Utils; |
|
|
|
import org.apache.kafka.streams.KafkaStreams; |
|
|
|
import org.apache.kafka.streams.KafkaStreams; |
|
|
|
import org.apache.kafka.streams.StreamsBuilder; |
|
|
|
import org.apache.kafka.streams.StreamsBuilder; |
|
|
|
import org.apache.kafka.streams.StreamsConfig; |
|
|
|
import org.apache.kafka.streams.StreamsConfig; |
|
|
|
import org.apache.kafka.streams.kstream.KStream; |
|
|
|
import org.apache.kafka.streams.kstream.KStream; |
|
|
|
import org.apache.kafka.streams.processor.AbstractProcessor; |
|
|
|
import org.apache.kafka.streams.processor.AbstractProcessor; |
|
|
|
import org.apache.kafka.streams.processor.Processor; |
|
|
|
|
|
|
|
import org.apache.kafka.streams.processor.ProcessorContext; |
|
|
|
import org.apache.kafka.streams.processor.ProcessorContext; |
|
|
|
import org.apache.kafka.streams.processor.ProcessorSupplier; |
|
|
|
import org.apache.kafka.streams.processor.ProcessorSupplier; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import java.util.Properties; |
|
|
|
|
|
|
|
|
|
|
|
public class StreamsUpgradeTest { |
|
|
|
public class StreamsUpgradeTest { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -59,40 +59,33 @@ public class StreamsUpgradeTest { |
|
|
|
final KafkaStreams streams = new KafkaStreams(builder.build(), config); |
|
|
|
final KafkaStreams streams = new KafkaStreams(builder.build(), config); |
|
|
|
streams.start(); |
|
|
|
streams.start(); |
|
|
|
|
|
|
|
|
|
|
|
Runtime.getRuntime().addShutdownHook(new Thread() { |
|
|
|
Runtime.getRuntime().addShutdownHook(new Thread(() -> { |
|
|
|
@Override |
|
|
|
streams.close(); |
|
|
|
public void run() { |
|
|
|
System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); |
|
|
|
streams.close(); |
|
|
|
System.out.flush(); |
|
|
|
System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); |
|
|
|
})); |
|
|
|
System.out.flush(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() { |
|
|
|
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() { |
|
|
|
return new ProcessorSupplier<K, V>() { |
|
|
|
return () -> new AbstractProcessor<K, V>() { |
|
|
|
public Processor<K, V> get() { |
|
|
|
private int numRecordsProcessed = 0; |
|
|
|
return new AbstractProcessor<K, V>() { |
|
|
|
|
|
|
|
private int numRecordsProcessed = 0; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
|
|
public void init(final ProcessorContext context) { |
|
|
|
|
|
|
|
System.out.println("[2.1] initializing processor: topic=data taskId=" + context.taskId()); |
|
|
|
|
|
|
|
numRecordsProcessed = 0; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void process(final K key, final V value) { |
|
|
|
public void init(final ProcessorContext context) { |
|
|
|
numRecordsProcessed++; |
|
|
|
System.out.println("[2.1] initializing processor: topic=data taskId=" + context.taskId()); |
|
|
|
if (numRecordsProcessed % 100 == 0) { |
|
|
|
numRecordsProcessed = 0; |
|
|
|
System.out.println("processed " + numRecordsProcessed + " records from topic=data"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void close() {} |
|
|
|
public void process(final K key, final V value) { |
|
|
|
}; |
|
|
|
numRecordsProcessed++; |
|
|
|
|
|
|
|
if (numRecordsProcessed % 100 == 0) { |
|
|
|
|
|
|
|
System.out.println("processed " + numRecordsProcessed + " records from topic=data"); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
|
|
public void close() {} |
|
|
|
}; |
|
|
|
}; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|