diff --git a/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java b/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java new file mode 100644 index 00000000000..adf2774f093 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyMap; + +/** + * An implementation of {@link ConfigProvider} based on a directory of files. + * Property keys correspond to the names of the regular (i.e. non-directory) + * files in a directory given by the path parameter. + * Property values are taken from the file contents corresponding to each key. + */ +public class DirectoryConfigProvider implements ConfigProvider { + + private static final Logger log = LoggerFactory.getLogger(DirectoryConfigProvider.class); + + @Override + public void configure(Map configs) { } + + @Override + public void close() throws IOException { } + + /** + * Retrieves the data contained in regular files in the directory given by {@code path}. + * Non-regular files (such as directories) in the given directory are silently ignored. + * @param path the directory where data files reside. + * @return the configuration data. + */ + @Override + public ConfigData get(String path) { + return get(path, Files::isRegularFile); + } + + /** + * Retrieves the data contained in the regular files named by {@code keys} in the directory given by {@code path}. + * Non-regular files (such as directories) in the given directory are silently ignored. + * @param path the directory where data files reside. + * @param keys the keys whose values will be retrieved. + * @return the configuration data. + */ + @Override + public ConfigData get(String path, Set keys) { + return get(path, pathname -> + Files.isRegularFile(pathname) + && keys.contains(pathname.getFileName().toString())); + } + + private static ConfigData get(String path, Predicate fileFilter) { + Map map = emptyMap(); + if (path != null && !path.isEmpty()) { + Path dir = new File(path).toPath(); + if (!Files.isDirectory(dir)) { + log.warn("The path {} is not a directory", path); + } else { + try { + map = Files.list(dir) + .filter(fileFilter) + .collect(Collectors.toMap( + p -> p.getFileName().toString(), + p -> read(p))); + } catch (IOException e) { + throw new ConfigException("Could not list directory " + dir, e); + } + } + } + return new ConfigData(map); + } + + private static String read(Path path) { + try { + return new String(Files.readAllBytes(path), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new ConfigException("Could not read file " + path + " for property " + path.getFileName(), e); + } + } + +} diff --git a/clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java b/clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java new file mode 100644 index 00000000000..9d7139bca6f --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Collections; +import java.util.Locale; +import java.util.Set; + +import static java.util.Arrays.asList; +import static org.apache.kafka.test.TestUtils.toSet; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class DirectoryConfigProviderTest { + + private DirectoryConfigProvider provider; + private File parent; + private File dir; + private File bar; + private File foo; + private File subdir; + private File subdirFile; + private File siblingDir; + private File siblingDirFile; + private File siblingFile; + + private static File writeFile(File file) throws IOException { + Files.write(file.toPath(), file.getName().toUpperCase(Locale.ENGLISH).getBytes(StandardCharsets.UTF_8)); + return file; + } + + @Before + public void setup() throws IOException { + provider = new DirectoryConfigProvider(); + provider.configure(Collections.emptyMap()); + parent = TestUtils.tempDirectory(); + dir = new File(parent, "dir"); + dir.mkdir(); + foo = writeFile(new File(dir, "foo")); + bar = writeFile(new File(dir, "bar")); + subdir = new File(dir, "subdir"); + subdir.mkdir(); + subdirFile = writeFile(new File(subdir, "subdirFile")); + siblingDir = new File(parent, "siblingdir"); + siblingDir.mkdir(); + siblingDirFile = writeFile(new File(siblingDir, "siblingdirFile")); + siblingFile = writeFile(new File(parent, "siblingFile")); + } + + @After + public void close() throws IOException { + provider.close(); + Utils.delete(parent); + } + + @Test + public void testGetAllKeysAtPath() throws IOException { + ConfigData configData = provider.get(dir.getAbsolutePath()); + assertEquals(toSet(asList(foo.getName(), bar.getName())), configData.data().keySet()); + assertEquals("FOO", configData.data().get(foo.getName())); + assertEquals("BAR", configData.data().get(bar.getName())); + assertNull(configData.ttl()); + } + + @Test + public void testGetSetOfKeysAtPath() { + Set keys = toSet(asList(foo.getName(), "baz")); + ConfigData configData = provider.get(dir.getAbsolutePath(), keys); + assertEquals(Collections.singleton(foo.getName()), configData.data().keySet()); + assertEquals("FOO", configData.data().get(foo.getName())); + assertNull(configData.ttl()); + } + + @Test + public void testNoSubdirs() { + // Only regular files directly in the path directory are allowed, not in subdirs + Set keys = toSet(asList(subdir.getName(), String.join(File.separator, subdir.getName(), subdirFile.getName()))); + ConfigData configData = provider.get(dir.getAbsolutePath(), keys); + assertTrue(configData.data().isEmpty()); + assertNull(configData.ttl()); + } + + @Test + public void testNoTraversal() { + // Check we can't escape outside the path directory + Set keys = toSet(asList( + String.join(File.separator, "..", siblingFile.getName()), + String.join(File.separator, "..", siblingDir.getName()), + String.join(File.separator, "..", siblingDir.getName(), siblingDirFile.getName()))); + ConfigData configData = provider.get(dir.getAbsolutePath(), keys); + assertTrue(configData.data().isEmpty()); + assertNull(configData.ttl()); + } + + @Test + public void testEmptyPath() { + ConfigData configData = provider.get(""); + assertTrue(configData.data().isEmpty()); + assertNull(configData.ttl()); + } + + @Test + public void testEmptyPathWithKey() { + ConfigData configData = provider.get("", Collections.singleton("foo")); + assertTrue(configData.data().isEmpty()); + assertEquals(null, configData.ttl()); + } + + @Test + public void testNullPath() { + ConfigData configData = provider.get(null); + assertTrue(configData.data().isEmpty()); + assertEquals(null, configData.ttl()); + } + + @Test + public void testNullPathWithKey() { + ConfigData configData = provider.get(null, Collections.singleton("foo")); + assertTrue(configData.data().isEmpty()); + assertEquals(null, configData.ttl()); + } +} +