diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 0981291efd1..127a64e80e3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -28,6 +28,8 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.ThreadCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; @@ -37,6 +39,8 @@ import java.util.Map; import java.util.Set; public abstract class AbstractTask { + private static final Logger log = LoggerFactory.getLogger(AbstractTask.class); + protected final TaskId id; protected final String applicationId; protected final ProcessorTopology topology; @@ -78,6 +82,7 @@ public abstract class AbstractTask { initializeOffsetLimits(); for (StateStore store : this.topology.stateStores()) { + log.trace("task [{}] Initializing store {}", id(), store.name()); store.init(this.processorContext, store); } } @@ -119,6 +124,7 @@ public abstract class AbstractTask { * @throws ProcessorStateException if there is an error while closing the state manager */ void closeStateManager() { + log.trace("task [{}] Closing", id()); try { stateMgr.close(recordCollectorOffsets()); } catch (IOException e) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 52a47d3cdb4..795949f8627 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -326,6 +326,7 @@ public class ProcessorStateManager { context.setCurrentNode(processorNode); } try { + log.trace("{} Flushing store={}", logPrefix, store.name()); store.flush(); } catch (Exception e) { throw new ProcessorStateException(String.format("%s Failed to flush state store %s", logPrefix, store.name()), e); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 0733e5a2b10..72b0b992698 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -262,10 +262,10 @@ public class StreamTask extends AbstractTask implements Punctuator { */ public void commit() { log.debug("{} Committing its state", logPrefix); - // 1) flush local state stateMgr.flush(processorContext); + log.trace("{} Start flushing its producer's sent records upon committing its state", logPrefix); // 2) flush produced records in the downstream and change logs of local states recordCollector.flush(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 05d712671f3..fa5b5f11a54 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -592,6 +592,7 @@ public class StreamThread extends Thread { * Commit the states of all its tasks */ private void commitAll() { + log.trace("stream-thread [{}] Committing all its owned tasks", this.getName()); for (StreamTask task : activeTasks.values()) { commitOne(task); } @@ -604,8 +605,7 @@ public class StreamThread extends Thread { * Commit the state of a task */ private void commitOne(AbstractTask task) { - log.info("{} Committing task {}", logPrefix, task.id()); - + log.info("{} Committing task {} {}", logPrefix, task.getClass().getSimpleName(), task.id()); try { task.commit(); } catch (CommitFailedException e) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 107f95014d5..922f4bc1009 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -16,13 +16,15 @@ */ package org.apache.kafka.streams.state; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier; import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.HashMap; @@ -33,6 +35,8 @@ import java.util.Map; */ public class Stores { + private static final Logger log = LoggerFactory.getLogger(Stores.class); + /** * Begin to create a new {@link org.apache.kafka.streams.processor.StateStoreSupplier} instance. * @@ -83,6 +87,7 @@ public class Stores { @Override public StateStoreSupplier build() { + log.trace("Creating InMemory Store name={} capacity={} logged={}", name, capacity, logged); if (capacity < Integer.MAX_VALUE) { return new InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde, logged, logConfig); } @@ -134,6 +139,7 @@ public class Stores { @Override public StateStoreSupplier build() { + log.trace("Creating RocksDb Store name={} numSegments={} logged={}", name, numSegments, logged); if (numSegments > 0) { return new RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde, windowSize, logged, logConfig, cachingEnabled); } @@ -412,3 +418,4 @@ public class Stores { } } + diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index e27ffd8338b..b0f5ee4c316 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -41,8 +41,6 @@ import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; import java.util.Comparator; @@ -66,7 +64,6 @@ import java.util.Set; */ public class RocksDBStore implements KeyValueStore { - private static final Logger log = LoggerFactory.getLogger(RocksDBStore.class); private static final int TTL_NOT_USED = -1; // TODO: these values should be configurable diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index f7355d80202..57ebfc757f8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -195,6 +195,7 @@ public class ThreadCache { private void maybeEvict(final String namespace) { while (sizeBytes() > maxCacheSizeBytes) { final NamedCache cache = getOrCreateCache(namespace); + log.trace("Thread {} evicting cache {}", name, namespace); cache.evict(); numEvicts++;