|
|
@ -163,7 +163,7 @@ public class InternalTopicIntegrationTest { |
|
|
|
//
|
|
|
|
//
|
|
|
|
// Step 3: Verify the state changelog topics are compact
|
|
|
|
// Step 3: Verify the state changelog topics are compact
|
|
|
|
//
|
|
|
|
//
|
|
|
|
waitForCompletion(streams, 2, 10000); |
|
|
|
waitForCompletion(streams, 2, 30000); |
|
|
|
streams.close(); |
|
|
|
streams.close(); |
|
|
|
|
|
|
|
|
|
|
|
final Properties changelogProps = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "Counts")); |
|
|
|
final Properties changelogProps = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "Counts")); |
|
|
@ -203,7 +203,7 @@ public class InternalTopicIntegrationTest { |
|
|
|
//
|
|
|
|
//
|
|
|
|
// Step 3: Verify the state changelog topics are compact
|
|
|
|
// Step 3: Verify the state changelog topics are compact
|
|
|
|
//
|
|
|
|
//
|
|
|
|
waitForCompletion(streams, 2, 5000); |
|
|
|
waitForCompletion(streams, 2, 30000); |
|
|
|
streams.close(); |
|
|
|
streams.close(); |
|
|
|
final Properties properties = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "CountWindows")); |
|
|
|
final Properties properties = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "CountWindows")); |
|
|
|
final List<String> policies = Arrays.asList(properties.getProperty(LogConfig.CleanupPolicyProp()).split(",")); |
|
|
|
final List<String> policies = Arrays.asList(properties.getProperty(LogConfig.CleanupPolicyProp()).split(",")); |
|
|
|