Browse Source

MINOR: some trace logging for streams debugging

Author: Ubuntu <norwood@confluent.io>

Reviewers: Eno Thereska <eno.thereska@gmail.com>, Guozhang Wang <wangguoz@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1882 from norwood/streams-logging
pull/2115/head
Dan Norwood 8 years ago committed by Ewen Cheslack-Postava
parent
commit
11766ad318
  1. 6
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
  2. 1
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
  3. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
  4. 4
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  5. 9
      streams/src/main/java/org/apache/kafka/streams/state/Stores.java
  6. 3
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
  7. 1
      streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java

6
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java

@ -28,6 +28,8 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
@ -37,6 +39,8 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
public abstract class AbstractTask { public abstract class AbstractTask {
private static final Logger log = LoggerFactory.getLogger(AbstractTask.class);
protected final TaskId id; protected final TaskId id;
protected final String applicationId; protected final String applicationId;
protected final ProcessorTopology topology; protected final ProcessorTopology topology;
@ -78,6 +82,7 @@ public abstract class AbstractTask {
initializeOffsetLimits(); initializeOffsetLimits();
for (StateStore store : this.topology.stateStores()) { for (StateStore store : this.topology.stateStores()) {
log.trace("task [{}] Initializing store {}", id(), store.name());
store.init(this.processorContext, store); store.init(this.processorContext, store);
} }
} }
@ -119,6 +124,7 @@ public abstract class AbstractTask {
* @throws ProcessorStateException if there is an error while closing the state manager * @throws ProcessorStateException if there is an error while closing the state manager
*/ */
void closeStateManager() { void closeStateManager() {
log.trace("task [{}] Closing", id());
try { try {
stateMgr.close(recordCollectorOffsets()); stateMgr.close(recordCollectorOffsets());
} catch (IOException e) { } catch (IOException e) {

1
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java

@ -326,6 +326,7 @@ public class ProcessorStateManager {
context.setCurrentNode(processorNode); context.setCurrentNode(processorNode);
} }
try { try {
log.trace("{} Flushing store={}", logPrefix, store.name());
store.flush(); store.flush();
} catch (Exception e) { } catch (Exception e) {
throw new ProcessorStateException(String.format("%s Failed to flush state store %s", logPrefix, store.name()), e); throw new ProcessorStateException(String.format("%s Failed to flush state store %s", logPrefix, store.name()), e);

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

@ -262,10 +262,10 @@ public class StreamTask extends AbstractTask implements Punctuator {
*/ */
public void commit() { public void commit() {
log.debug("{} Committing its state", logPrefix); log.debug("{} Committing its state", logPrefix);
// 1) flush local state // 1) flush local state
stateMgr.flush(processorContext); stateMgr.flush(processorContext);
log.trace("{} Start flushing its producer's sent records upon committing its state", logPrefix);
// 2) flush produced records in the downstream and change logs of local states // 2) flush produced records in the downstream and change logs of local states
recordCollector.flush(); recordCollector.flush();

4
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -592,6 +592,7 @@ public class StreamThread extends Thread {
* Commit the states of all its tasks * Commit the states of all its tasks
*/ */
private void commitAll() { private void commitAll() {
log.trace("stream-thread [{}] Committing all its owned tasks", this.getName());
for (StreamTask task : activeTasks.values()) { for (StreamTask task : activeTasks.values()) {
commitOne(task); commitOne(task);
} }
@ -604,8 +605,7 @@ public class StreamThread extends Thread {
* Commit the state of a task * Commit the state of a task
*/ */
private void commitOne(AbstractTask task) { private void commitOne(AbstractTask task) {
log.info("{} Committing task {}", logPrefix, task.id()); log.info("{} Committing task {} {}", logPrefix, task.getClass().getSimpleName(), task.id());
try { try {
task.commit(); task.commit();
} catch (CommitFailedException e) { } catch (CommitFailedException e) {

9
streams/src/main/java/org/apache/kafka/streams/state/Stores.java

@ -16,13 +16,15 @@
*/ */
package org.apache.kafka.streams.state; package org.apache.kafka.streams.state;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier; import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.HashMap; import java.util.HashMap;
@ -33,6 +35,8 @@ import java.util.Map;
*/ */
public class Stores { public class Stores {
private static final Logger log = LoggerFactory.getLogger(Stores.class);
/** /**
* Begin to create a new {@link org.apache.kafka.streams.processor.StateStoreSupplier} instance. * Begin to create a new {@link org.apache.kafka.streams.processor.StateStoreSupplier} instance.
* *
@ -83,6 +87,7 @@ public class Stores {
@Override @Override
public StateStoreSupplier build() { public StateStoreSupplier build() {
log.trace("Creating InMemory Store name={} capacity={} logged={}", name, capacity, logged);
if (capacity < Integer.MAX_VALUE) { if (capacity < Integer.MAX_VALUE) {
return new InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde, logged, logConfig); return new InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde, logged, logConfig);
} }
@ -134,6 +139,7 @@ public class Stores {
@Override @Override
public StateStoreSupplier build() { public StateStoreSupplier build() {
log.trace("Creating RocksDb Store name={} numSegments={} logged={}", name, numSegments, logged);
if (numSegments > 0) { if (numSegments > 0) {
return new RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde, windowSize, logged, logConfig, cachingEnabled); return new RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde, windowSize, logged, logConfig, cachingEnabled);
} }
@ -412,3 +418,4 @@ public class Stores {
} }
} }

3
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java

@ -41,8 +41,6 @@ import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator; import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch; import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions; import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.util.Comparator; import java.util.Comparator;
@ -66,7 +64,6 @@ import java.util.Set;
*/ */
public class RocksDBStore<K, V> implements KeyValueStore<K, V> { public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private static final Logger log = LoggerFactory.getLogger(RocksDBStore.class);
private static final int TTL_NOT_USED = -1; private static final int TTL_NOT_USED = -1;
// TODO: these values should be configurable // TODO: these values should be configurable

1
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java

@ -195,6 +195,7 @@ public class ThreadCache {
private void maybeEvict(final String namespace) { private void maybeEvict(final String namespace) {
while (sizeBytes() > maxCacheSizeBytes) { while (sizeBytes() > maxCacheSizeBytes) {
final NamedCache cache = getOrCreateCache(namespace); final NamedCache cache = getOrCreateCache(namespace);
log.trace("Thread {} evicting cache {}", name, namespace);
cache.evict(); cache.evict();
numEvicts++; numEvicts++;

Loading…
Cancel
Save