Browse Source
See KIP-632: https://cwiki.apache.org/confluence/display/KAFKA/KIP-632%3A+Add+DirectoryConfigProvider Reviewers: Mickael Maison <mickael.maison@gmail.com>, David Jacot <david.jacot@gmail.com>pull/9134/head
Tom Bentley
4 years ago
committed by
GitHub
2 changed files with 255 additions and 0 deletions
@ -0,0 +1,106 @@
@@ -0,0 +1,106 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.common.config.provider; |
||||
|
||||
import org.apache.kafka.common.config.ConfigData; |
||||
import org.apache.kafka.common.config.ConfigException; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.io.File; |
||||
import java.io.IOException; |
||||
import java.nio.charset.StandardCharsets; |
||||
import java.nio.file.Files; |
||||
import java.nio.file.Path; |
||||
import java.util.Map; |
||||
import java.util.Set; |
||||
import java.util.function.Predicate; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import static java.util.Collections.emptyMap; |
||||
|
||||
/** |
||||
* An implementation of {@link ConfigProvider} based on a directory of files. |
||||
* Property keys correspond to the names of the regular (i.e. non-directory) |
||||
* files in a directory given by the path parameter. |
||||
* Property values are taken from the file contents corresponding to each key. |
||||
*/ |
||||
public class DirectoryConfigProvider implements ConfigProvider { |
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(DirectoryConfigProvider.class); |
||||
|
||||
@Override |
||||
public void configure(Map<String, ?> configs) { } |
||||
|
||||
@Override |
||||
public void close() throws IOException { } |
||||
|
||||
/** |
||||
* Retrieves the data contained in regular files in the directory given by {@code path}. |
||||
* Non-regular files (such as directories) in the given directory are silently ignored. |
||||
* @param path the directory where data files reside. |
||||
* @return the configuration data. |
||||
*/ |
||||
@Override |
||||
public ConfigData get(String path) { |
||||
return get(path, Files::isRegularFile); |
||||
} |
||||
|
||||
/** |
||||
* Retrieves the data contained in the regular files named by {@code keys} in the directory given by {@code path}. |
||||
* Non-regular files (such as directories) in the given directory are silently ignored. |
||||
* @param path the directory where data files reside. |
||||
* @param keys the keys whose values will be retrieved. |
||||
* @return the configuration data. |
||||
*/ |
||||
@Override |
||||
public ConfigData get(String path, Set<String> keys) { |
||||
return get(path, pathname -> |
||||
Files.isRegularFile(pathname) |
||||
&& keys.contains(pathname.getFileName().toString())); |
||||
} |
||||
|
||||
private static ConfigData get(String path, Predicate<Path> fileFilter) { |
||||
Map<String, String> map = emptyMap(); |
||||
if (path != null && !path.isEmpty()) { |
||||
Path dir = new File(path).toPath(); |
||||
if (!Files.isDirectory(dir)) { |
||||
log.warn("The path {} is not a directory", path); |
||||
} else { |
||||
try { |
||||
map = Files.list(dir) |
||||
.filter(fileFilter) |
||||
.collect(Collectors.toMap( |
||||
p -> p.getFileName().toString(), |
||||
p -> read(p))); |
||||
} catch (IOException e) { |
||||
throw new ConfigException("Could not list directory " + dir, e); |
||||
} |
||||
} |
||||
} |
||||
return new ConfigData(map); |
||||
} |
||||
|
||||
private static String read(Path path) { |
||||
try { |
||||
return new String(Files.readAllBytes(path), StandardCharsets.UTF_8); |
||||
} catch (IOException e) { |
||||
throw new ConfigException("Could not read file " + path + " for property " + path.getFileName(), e); |
||||
} |
||||
} |
||||
|
||||
} |
@ -0,0 +1,149 @@
@@ -0,0 +1,149 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.common.config.provider; |
||||
|
||||
import org.apache.kafka.common.config.ConfigData; |
||||
import org.apache.kafka.common.utils.Utils; |
||||
import org.apache.kafka.test.TestUtils; |
||||
import org.junit.After; |
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
|
||||
import java.io.File; |
||||
import java.io.IOException; |
||||
import java.nio.charset.StandardCharsets; |
||||
import java.nio.file.Files; |
||||
import java.util.Collections; |
||||
import java.util.Locale; |
||||
import java.util.Set; |
||||
|
||||
import static java.util.Arrays.asList; |
||||
import static org.apache.kafka.test.TestUtils.toSet; |
||||
import static org.junit.Assert.assertEquals; |
||||
import static org.junit.Assert.assertNull; |
||||
import static org.junit.Assert.assertTrue; |
||||
|
||||
public class DirectoryConfigProviderTest { |
||||
|
||||
private DirectoryConfigProvider provider; |
||||
private File parent; |
||||
private File dir; |
||||
private File bar; |
||||
private File foo; |
||||
private File subdir; |
||||
private File subdirFile; |
||||
private File siblingDir; |
||||
private File siblingDirFile; |
||||
private File siblingFile; |
||||
|
||||
private static File writeFile(File file) throws IOException { |
||||
Files.write(file.toPath(), file.getName().toUpperCase(Locale.ENGLISH).getBytes(StandardCharsets.UTF_8)); |
||||
return file; |
||||
} |
||||
|
||||
@Before |
||||
public void setup() throws IOException { |
||||
provider = new DirectoryConfigProvider(); |
||||
provider.configure(Collections.emptyMap()); |
||||
parent = TestUtils.tempDirectory(); |
||||
dir = new File(parent, "dir"); |
||||
dir.mkdir(); |
||||
foo = writeFile(new File(dir, "foo")); |
||||
bar = writeFile(new File(dir, "bar")); |
||||
subdir = new File(dir, "subdir"); |
||||
subdir.mkdir(); |
||||
subdirFile = writeFile(new File(subdir, "subdirFile")); |
||||
siblingDir = new File(parent, "siblingdir"); |
||||
siblingDir.mkdir(); |
||||
siblingDirFile = writeFile(new File(siblingDir, "siblingdirFile")); |
||||
siblingFile = writeFile(new File(parent, "siblingFile")); |
||||
} |
||||
|
||||
@After |
||||
public void close() throws IOException { |
||||
provider.close(); |
||||
Utils.delete(parent); |
||||
} |
||||
|
||||
@Test |
||||
public void testGetAllKeysAtPath() throws IOException { |
||||
ConfigData configData = provider.get(dir.getAbsolutePath()); |
||||
assertEquals(toSet(asList(foo.getName(), bar.getName())), configData.data().keySet()); |
||||
assertEquals("FOO", configData.data().get(foo.getName())); |
||||
assertEquals("BAR", configData.data().get(bar.getName())); |
||||
assertNull(configData.ttl()); |
||||
} |
||||
|
||||
@Test |
||||
public void testGetSetOfKeysAtPath() { |
||||
Set<String> keys = toSet(asList(foo.getName(), "baz")); |
||||
ConfigData configData = provider.get(dir.getAbsolutePath(), keys); |
||||
assertEquals(Collections.singleton(foo.getName()), configData.data().keySet()); |
||||
assertEquals("FOO", configData.data().get(foo.getName())); |
||||
assertNull(configData.ttl()); |
||||
} |
||||
|
||||
@Test |
||||
public void testNoSubdirs() { |
||||
// Only regular files directly in the path directory are allowed, not in subdirs
|
||||
Set<String> keys = toSet(asList(subdir.getName(), String.join(File.separator, subdir.getName(), subdirFile.getName()))); |
||||
ConfigData configData = provider.get(dir.getAbsolutePath(), keys); |
||||
assertTrue(configData.data().isEmpty()); |
||||
assertNull(configData.ttl()); |
||||
} |
||||
|
||||
@Test |
||||
public void testNoTraversal() { |
||||
// Check we can't escape outside the path directory
|
||||
Set<String> keys = toSet(asList( |
||||
String.join(File.separator, "..", siblingFile.getName()), |
||||
String.join(File.separator, "..", siblingDir.getName()), |
||||
String.join(File.separator, "..", siblingDir.getName(), siblingDirFile.getName()))); |
||||
ConfigData configData = provider.get(dir.getAbsolutePath(), keys); |
||||
assertTrue(configData.data().isEmpty()); |
||||
assertNull(configData.ttl()); |
||||
} |
||||
|
||||
@Test |
||||
public void testEmptyPath() { |
||||
ConfigData configData = provider.get(""); |
||||
assertTrue(configData.data().isEmpty()); |
||||
assertNull(configData.ttl()); |
||||
} |
||||
|
||||
@Test |
||||
public void testEmptyPathWithKey() { |
||||
ConfigData configData = provider.get("", Collections.singleton("foo")); |
||||
assertTrue(configData.data().isEmpty()); |
||||
assertEquals(null, configData.ttl()); |
||||
} |
||||
|
||||
@Test |
||||
public void testNullPath() { |
||||
ConfigData configData = provider.get(null); |
||||
assertTrue(configData.data().isEmpty()); |
||||
assertEquals(null, configData.ttl()); |
||||
} |
||||
|
||||
@Test |
||||
public void testNullPathWithKey() { |
||||
ConfigData configData = provider.get(null, Collections.singleton("foo")); |
||||
assertTrue(configData.data().isEmpty()); |
||||
assertEquals(null, configData.ttl()); |
||||
} |
||||
} |
||||
|
Loading…
Reference in new issue