Browse Source

MINOR: log4j improvements on assigned tasks and store changelog reader

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Xavier Léauté <xavier@confluent.io>, Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bill@confluent.io>

Closes #4031 from guozhangwang/KMinor-assigned-task-log4j
pull/4031/merge
Guozhang Wang 7 years ago
parent
commit
2427a44768
  1. 8
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
  2. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java

8
streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java

@ -122,7 +122,7 @@ class AssignedTasks implements RestoringTasks { @@ -122,7 +122,7 @@ class AssignedTasks implements RestoringTasks {
final Map.Entry<TaskId, Task> entry = it.next();
try {
if (!entry.getValue().initialize()) {
log.debug("transitioning {} {} to restoring", taskTypeName, entry.getKey());
log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey());
addToRestoring(entry.getValue());
} else {
transitionToRunning(entry.getValue(), readyPartitions);
@ -140,7 +140,7 @@ class AssignedTasks implements RestoringTasks { @@ -140,7 +140,7 @@ class AssignedTasks implements RestoringTasks {
if (restored.isEmpty()) {
return Collections.emptySet();
}
log.trace("{} partitions restored for {}", taskTypeName, restored);
log.trace("{} changelog partitions that have completed restoring so far: {}", taskTypeName, restored);
final Set<TopicPartition> resume = new HashSet<>();
restoredPartitions.addAll(restored);
for (final Iterator<Map.Entry<TaskId, Task>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
@ -153,10 +153,10 @@ class AssignedTasks implements RestoringTasks { @@ -153,10 +153,10 @@ class AssignedTasks implements RestoringTasks {
if (log.isTraceEnabled()) {
final HashSet<TopicPartition> outstandingPartitions = new HashSet<>(task.changelogPartitions());
outstandingPartitions.removeAll(restoredPartitions);
log.trace("partition restoration not complete for {} {} partitions: {}",
log.trace("{} {} cannot resume processing yet since some of its changelog partitions have not completed restoring: {}",
taskTypeName,
task.id(),
task.changelogPartitions());
outstandingPartitions);
}
}
}

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java

@ -194,7 +194,7 @@ public class StoreChangelogReader implements ChangelogReader { @@ -194,7 +194,7 @@ public class StoreChangelogReader implements ChangelogReader {
private Collection<TopicPartition> completed() {
final Set<TopicPartition> completed = new HashSet<>(stateRestorers.keySet());
completed.removeAll(needsRestoring.keySet());
log.debug("completed partitions {}", completed);
log.trace("The set of restoration completed partitions so far: {}", completed);
return completed;
}

Loading…
Cancel
Save