From a9bf9487dd84c8a874f5a71b2557b57801308249 Mon Sep 17 00:00:00 2001
From: Sergey Prokofiev You can enable or disable fault tolerance for a state store by enabling or disabling the change logging
of the store through Enable or Disable Fault Tolerance of State Stores (Store Changelogs)
enableLogging()
and disableLogging()
.
- You can also fine-tune the associated topic’s configuration if needed.
Example for disabling fault-tolerance:
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
@@ -326,14 +326,14 @@
If the changelog is disabled then the attached state store is no longer fault tolerant and it can’t have any standby replicas.
Here is an example for enabling fault tolerance, with additional changelog-topic configuration: - You can add any log config from kafka.log.LogConfig. + You can add any log config from kafka.log.LogConfig. Unrecognized configs will be ignored.
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
Map<String, String> changelogConfig = new HashMap();
// override min.insync.replicas
-changelogConfig.put("min.insyc.replicas", "1")
+changelogConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")
StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("Counts"),