From 4e35a2bfb7f3b0437a27bb58b8cb39339f750064 Mon Sep 17 00:00:00 2001 From: huxi Date: Fri, 13 Apr 2018 00:35:37 +0800 Subject: [PATCH] KAFKA-6592: ConsoleConsumer should support WindowedSerdes (#4797) Have Console consumer support TimeWindowedDeserializer/SessionWindowedDeserializer. Reviewers: Guozhang Wang --- .../scala/kafka/tools/ConsoleConsumer.scala | 47 +++- .../kafka/tools/ConsoleConsumerTest.scala | 19 ++ .../KStreamAggregationIntegrationTest.java | 232 ++++++++++++------ 3 files changed, 221 insertions(+), 77 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 24fa583f1bb..9df4fb42070 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -37,6 +37,7 @@ import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer} import org.apache.kafka.common.utils.Utils +import scala.collection.JavaConversions import scala.collection.JavaConverters._ /** @@ -45,6 +46,11 @@ import scala.collection.JavaConverters._ object ConsoleConsumer extends Logging { var messageCount = 0 + // Keep same names with StreamConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS + // and StreamConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS + // visible for testing + private[tools] val innerKeySerdeName = "default.windowed.key.serde.inner" + private[tools] val innerValueSerdeName = "default.windowed.value.serde.inner" private val shutdownLatch = new CountDownLatch(1) @@ -291,7 +297,17 @@ object ConsoleConsumer extends Logging { .describedAs("class") .ofType(classOf[String]) .defaultsTo(classOf[DefaultMessageFormatter].getName) - val messageFormatterArgOpt = parser.accepts("property", "The properties to initialize the message formatter.") + val messageFormatterArgOpt = parser.accepts("property", + "The properties to initialize the message formatter. Default properties include:\n" + + "\tprint.timestamp=true|false\n" + + "\tprint.key=true|false\n" + + "\tprint.value=true|false\n" + + "\tkey.separator=\n" + + "\tline.separator=\n" + + "\tkey.deserializer=\n" + + "\tvalue.deserializer=\n" + + "\tdefault.windowed.key.serde.inner=\n" + + "\tdefault.windowed.value.serde.inner=") .withRequiredArg .describedAs("prop") .ofType(classOf[String]) @@ -328,6 +344,18 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("deserializer for values") .ofType(classOf[String]) + val innerKeyDeserializerOpt = parser.accepts(innerKeySerdeName, + "inner serde for key when windowed deserialzier is used; would be ignored otherwise. " + + "For example: org.apache.kafka.common.serialization.Serdes\\$StringSerde") + .withRequiredArg + .describedAs("inner serde for key") + .ofType(classOf[String]) + val innerValueDeserializerOpt = parser.accepts(innerValueSerdeName, + "inner serde for value when windowed deserialzier is used; would be ignored otherwise. " + + "For example: org.apache.kafka.common.serialization.Serdes\\$StringSerde") + .withRequiredArg + .describedAs("inner serde for values") + .ofType(classOf[String]) val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events", "Log lifecycle events of the consumer in addition to logging consumed " + "messages. (This is specific for system tests.)") @@ -372,6 +400,8 @@ object ConsoleConsumer extends Logging { val bootstrapServer = options.valueOf(bootstrapServerOpt) val keyDeserializer = options.valueOf(keyDeserializerOpt) val valueDeserializer = options.valueOf(valueDeserializerOpt) + val innerKeyDeserializer = options.valueOf(innerKeyDeserializerOpt) + val innerValueDeserializer = options.valueOf(innerValueDeserializerOpt) val isolationLevel = options.valueOf(isolationLevelOpt).toString val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] @@ -381,6 +411,13 @@ object ConsoleConsumer extends Logging { if (valueDeserializer != null && !valueDeserializer.isEmpty) { formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer) } + if (innerKeyDeserializer != null && !innerKeyDeserializer.isEmpty) { + formatterArgs.setProperty(innerKeySerdeName, innerKeyDeserializer) + } + if (innerValueDeserializer != null && !innerValueDeserializer.isEmpty) { + formatterArgs.setProperty(innerValueSerdeName, innerValueDeserializer) + } + formatter.init(formatterArgs) if (useOldConsumer) { @@ -521,11 +558,15 @@ class DefaultMessageFormatter extends MessageFormatter { if (props.containsKey("line.separator")) lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8) // Note that `toString` will be called on the instance returned by `Deserializer.deserialize` - if (props.containsKey("key.deserializer")) + if (props.containsKey("key.deserializer")) { keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]]) + keyDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(props).asJava, true) + } // Note that `toString` will be called on the instance returned by `Deserializer.deserialize` - if (props.containsKey("value.deserializer")) + if (props.containsKey("value.deserializer")) { valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]]) + valueDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(props).asJava, false) + } } def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) { diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index 9ae8b966ac0..f5195c3cf76 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -25,6 +25,7 @@ import kafka.utils.{Exit, TestUtils} import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy} import org.apache.kafka.common.TopicPartition import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.easymock.EasyMock import org.junit.Assert._ import org.junit.{Before, Test} @@ -537,4 +538,22 @@ class ConsoleConsumerTest { Exit.resetExitProcedure() } + + @Test + def testCustomPropertyShouldBePassedToConfigureMethod(): Unit = { + val args = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--property", "print.key=true", + "--property", "key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer", + "--" + ConsoleConsumer.innerKeySerdeName, "org.apache.kafka.common.serialization.Serdes$StringSerde", + "--property", "my-test1=abc" + ) + val config = new ConsoleConsumer.ConsumerConfig(args) + assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter]) + val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter] + assertTrue(formatter.keyDeserializer.get.isInstanceOf[ByteArrayDeserializer]) + assertTrue(config.formatterArgs.containsKey("my-test1")) + assertTrue(config.formatterArgs.containsKey(ConsoleConsumer.innerKeySerdeName)) + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 4527c19b471..fc673d0258e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -16,12 +16,14 @@ */ package org.apache.kafka.streams.integration; +import kafka.tools.ConsoleConsumer; import kafka.utils.MockTime; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -43,10 +45,14 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.Serialized; +import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; import org.apache.kafka.streams.kstream.SessionWindows; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlySessionStore; @@ -60,14 +66,18 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.PrintStream; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -75,6 +85,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; @Category({IntegrationTest.class}) public class KStreamAggregationIntegrationTest { @@ -205,32 +216,36 @@ public class KStreamAggregationIntegrationTest { produceMessages(secondBatchTimestamp); produceMessages(secondBatchTimestamp); + Serde> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); groupedStream .windowedBy(TimeWindows.of(500L)) .reduce(reducer) - .toStream(new KeyValueMapper, String, String>() { - @Override - public String apply(final Windowed windowedKey, final String value) { - return windowedKey.key() + "@" + windowedKey.window().start(); - } - }) - .to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); - + .toStream() + .to(outputTopic, Produced.with(windowedSerde, Serdes.String())); startStreams(); - final List> windowedOutput = receiveMessages( - new StringDeserializer(), + final List, String>> windowedOutput = receiveMessages( + new TimeWindowedDeserializer(), new StringDeserializer(), + String.class, 15); - final Comparator> + // read from ConsoleConsumer + String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer( + new TimeWindowedDeserializer(), + new StringDeserializer(), + String.class, + 15); + + final Comparator, String>> comparator = - new Comparator>() { + new Comparator, String>>() { @Override - public int compare(final KeyValue o1, - final KeyValue o2) { - return KStreamAggregationIntegrationTest.compare(o1, o2); + public int compare(final KeyValue, String> o1, + final KeyValue, String> o2) { + final int keyComparison = o1.key.key().compareTo(o2.key.key()); + return keyComparison == 0 ? o1.value.compareTo(o2.value) : keyComparison; } }; @@ -238,25 +253,36 @@ public class KStreamAggregationIntegrationTest { final long firstBatchWindow = firstBatchTimestamp / 500 * 500; final long secondBatchWindow = secondBatchTimestamp / 500 * 500; - assertThat(windowedOutput, is( - Arrays.asList( - new KeyValue<>("A@" + firstBatchWindow, "A"), - new KeyValue<>("A@" + secondBatchWindow, "A"), - new KeyValue<>("A@" + secondBatchWindow, "A:A"), - new KeyValue<>("B@" + firstBatchWindow, "B"), - new KeyValue<>("B@" + secondBatchWindow, "B"), - new KeyValue<>("B@" + secondBatchWindow, "B:B"), - new KeyValue<>("C@" + firstBatchWindow, "C"), - new KeyValue<>("C@" + secondBatchWindow, "C"), - new KeyValue<>("C@" + secondBatchWindow, "C:C"), - new KeyValue<>("D@" + firstBatchWindow, "D"), - new KeyValue<>("D@" + secondBatchWindow, "D"), - new KeyValue<>("D@" + secondBatchWindow, "D:D"), - new KeyValue<>("E@" + firstBatchWindow, "E"), - new KeyValue<>("E@" + secondBatchWindow, "E"), - new KeyValue<>("E@" + secondBatchWindow, "E:E") - ) - )); + List, String>> expectResult = Arrays.asList( + new KeyValue<>(new Windowed<>("A", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "A"), + new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A"), + new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A:A"), + new KeyValue<>(new Windowed<>("B", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "B"), + new KeyValue<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B"), + new KeyValue<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B:B"), + new KeyValue<>(new Windowed<>("C", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "C"), + new KeyValue<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C"), + new KeyValue<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C:C"), + new KeyValue<>(new Windowed<>("D", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "D"), + new KeyValue<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D"), + new KeyValue<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D:D"), + new KeyValue<>(new Windowed<>("E", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "E"), + new KeyValue<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E"), + new KeyValue<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E:E") + ); + assertThat(windowedOutput, is(expectResult)); + + Set expectResultString = new HashSet<>(expectResult.size()); + for (KeyValue, String> eachRecord: expectResult) { + expectResultString.add(eachRecord.toString()); + } + + // check every message is contained in the expect result + String[] allRecords = resultFromConsoleConsumer.split("\n"); + for (String record: allRecords) { + record = "KeyValue(" + record + ")"; + assertTrue(expectResultString.contains(record)); + } } @SuppressWarnings("deprecation") @@ -309,34 +335,39 @@ public class KStreamAggregationIntegrationTest { produceMessages(secondTimestamp); produceMessages(secondTimestamp); + Serde> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); groupedStream.windowedBy(TimeWindows.of(500L)) .aggregate( initializer, aggregator, Materialized.>with(null, Serdes.Integer()) ) - .toStream(new KeyValueMapper, Integer, String>() { - @Override - public String apply(final Windowed windowedKey, final Integer value) { - return windowedKey.key() + "@" + windowedKey.window().start(); - } - }) - .to(outputTopic, Produced.with(Serdes.String(), Serdes.Integer())); + .toStream() + .to(outputTopic, Produced.with(windowedSerde, Serdes.Integer())); startStreams(); - final List> windowedMessages = receiveMessages( - new StringDeserializer(), + final List, Integer>> windowedMessages = receiveMessages( + new TimeWindowedDeserializer(), new IntegerDeserializer(), + String.class, 15); - final Comparator> + // read from ConsoleConsumer + String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer( + new TimeWindowedDeserializer(), + new IntegerDeserializer(), + String.class, + 15); + + final Comparator, Integer>> comparator = - new Comparator>() { + new Comparator, Integer>>() { @Override - public int compare(final KeyValue o1, - final KeyValue o2) { - return KStreamAggregationIntegrationTest.compare(o1, o2); + public int compare(final KeyValue, Integer> o1, + final KeyValue, Integer> o2) { + final int keyComparison = o1.key.key().compareTo(o2.key.key()); + return keyComparison == 0 ? o1.value.compareTo(o2.value) : keyComparison; } }; @@ -345,24 +376,37 @@ public class KStreamAggregationIntegrationTest { final long firstWindow = firstTimestamp / 500 * 500; final long secondWindow = secondTimestamp / 500 * 500; - assertThat(windowedMessages, is( - Arrays.asList( - new KeyValue<>("A@" + firstWindow, 1), - new KeyValue<>("A@" + secondWindow, 1), - new KeyValue<>("A@" + secondWindow, 2), - new KeyValue<>("B@" + firstWindow, 1), - new KeyValue<>("B@" + secondWindow, 1), - new KeyValue<>("B@" + secondWindow, 2), - new KeyValue<>("C@" + firstWindow, 1), - new KeyValue<>("C@" + secondWindow, 1), - new KeyValue<>("C@" + secondWindow, 2), - new KeyValue<>("D@" + firstWindow, 1), - new KeyValue<>("D@" + secondWindow, 1), - new KeyValue<>("D@" + secondWindow, 2), - new KeyValue<>("E@" + firstWindow, 1), - new KeyValue<>("E@" + secondWindow, 1), - new KeyValue<>("E@" + secondWindow, 2) - ))); + List, Integer>> expectResult = Arrays.asList( + new KeyValue<>(new Windowed<>("A", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1), + new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1), + new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2), + new KeyValue<>(new Windowed<>("B", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1), + new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1), + new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2), + new KeyValue<>(new Windowed<>("C", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1), + new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1), + new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2), + new KeyValue<>(new Windowed<>("D", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1), + new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1), + new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2), + new KeyValue<>(new Windowed<>("E", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1), + new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1), + new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2)); + + assertThat(windowedMessages, is(expectResult)); + + Set expectResultString = new HashSet<>(expectResult.size()); + for (KeyValue, Integer> eachRecord: expectResult) { + expectResultString.add(eachRecord.toString()); + } + + // check every message is contained in the expect result + String[] allRecords = resultFromConsoleConsumer.split("\n"); + for (String record: allRecords) { + record = "KeyValue(" + record + ")"; + assertTrue(expectResultString.contains(record)); + } + } private void shouldCountHelper() throws Exception { @@ -685,26 +729,66 @@ public class KStreamAggregationIntegrationTest { kafkaStreams.start(); } - private List> receiveMessages(final Deserializer keyDeserializer, final Deserializer valueDeserializer, final int numMessages) throws InterruptedException { + return receiveMessages(keyDeserializer, valueDeserializer, null, numMessages); + } + + private List> receiveMessages(final Deserializer + keyDeserializer, + final Deserializer + valueDeserializer, + final Class innerClass, + final int numMessages) throws InterruptedException { final Properties consumerProperties = new Properties(); - consumerProperties - .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo); consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName()); consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); + if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) { + consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, + Serdes.serdeFrom(innerClass).getClass().getName()); + } return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - consumerProperties, - outputTopic, - numMessages, - 60 * 1000); - + consumerProperties, + outputTopic, + numMessages, + 60 * 1000); } + private String readWindowedKeyedMessagesViaConsoleConsumer(final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final Class innerClass, + final int numMessages) { + ByteArrayOutputStream newConsole = new ByteArrayOutputStream(); + PrintStream originalStream = System.out; + try (PrintStream newStream = new PrintStream(newConsole)) { + System.setOut(newStream); + + String keySeparator = ", "; + // manually construct the console consumer argument array + String[] args = new String[] { + "--bootstrap-server", CLUSTER.bootstrapServers(), + "--from-beginning", + "--property", "print.key=true", + "--topic", outputTopic, + "--max-messages", String.valueOf(numMessages), + "--property", "key.deserializer=" + keyDeserializer.getClass().getName(), + "--property", "value.deserializer=" + valueDeserializer.getClass().getName(), + "--property", "key.separator=" + keySeparator, + "--" + StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.serdeFrom(innerClass).getClass().getName() + }; + + ConsoleConsumer.messageCount_$eq(0); //reset the message count + ConsoleConsumer.run(new ConsoleConsumer.ConsumerConfig(args)); + newStream.flush(); + System.setOut(originalStream); + return newConsole.toString(); + } + } }