|
|
|
@ -104,7 +104,7 @@ public class StreamsBrokerDownResilienceTest {
@@ -104,7 +104,7 @@ public class StreamsBrokerDownResilienceTest {
|
|
|
|
|
|
|
|
|
|
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties); |
|
|
|
|
|
|
|
|
|
streams.setUncaughtExceptionHandler( (t,e) -> { |
|
|
|
|
streams.setUncaughtExceptionHandler((t, e) -> { |
|
|
|
|
System.err.println("FATAL: An unexpected exception " + e); |
|
|
|
|
System.err.flush(); |
|
|
|
|
streams.close(Duration.ofSeconds(30)); |
|
|
|
@ -114,11 +114,11 @@ public class StreamsBrokerDownResilienceTest {
@@ -114,11 +114,11 @@ public class StreamsBrokerDownResilienceTest {
|
|
|
|
|
System.out.println("Start Kafka Streams"); |
|
|
|
|
streams.start(); |
|
|
|
|
|
|
|
|
|
Runtime.getRuntime().addShutdownHook(new Thread( () -> { |
|
|
|
|
streams.close(Duration.ofSeconds(30)); |
|
|
|
|
System.out.println("Complete shutdown of streams resilience test app now"); |
|
|
|
|
System.out.flush(); |
|
|
|
|
} |
|
|
|
|
Runtime.getRuntime().addShutdownHook(new Thread(() -> { |
|
|
|
|
streams.close(Duration.ofSeconds(30)); |
|
|
|
|
System.out.println("Complete shutdown of streams resilience test app now"); |
|
|
|
|
System.out.flush(); |
|
|
|
|
} |
|
|
|
|
)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|