Browse Source

KAFKA-6167: Timestamp on streams directory contains a colon, which is an illegal character

- change segment delimiter to .
 - added upgrade path
 - added test for old and new upgrade path

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #4210 from mjsax/kafka-6167-windows-issue
pull/4132/merge
Matthias J. Sax 7 years ago committed by Guozhang Wang
parent
commit
539c4d53f8
  1. 53
      streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
  2. 26
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
  3. 73
      streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java

53
streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java

@ -28,7 +28,6 @@ import java.text.ParseException; @@ -28,7 +28,6 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.SimpleTimeZone;
@ -67,9 +66,11 @@ class Segments { @@ -67,9 +66,11 @@ class Segments {
}
String segmentName(final long segmentId) {
// previous format used - as a separator so if this changes in the future
// (1) previous format used - as a separator so if this changes in the future
// then we should use something different.
return name + ":" + segmentId * segmentInterval;
// (2) previous format used : as a separator (which did break KafkaStreams on Windows OS)
// so if this changes in the future then we should use something different.
return name + "." + segmentId * segmentInterval;
}
Segment getSegmentForTimestamp(final long timestamp) {
@ -190,33 +191,49 @@ class Segments { @@ -190,33 +191,49 @@ class Segments {
private long segmentIdFromSegmentName(final String segmentName,
final File parent) {
final int segmentSeparatorIndex = name.length();
final char segmentSeparator = segmentName.charAt(segmentSeparatorIndex);
final String segmentIdString = segmentName.substring(segmentSeparatorIndex + 1);
final long segmentId;
// old style segment name with date
if (segmentName.charAt(name.length()) == '-') {
final String datePart = segmentName.substring(name.length() + 1);
final Date date;
if (segmentSeparator == '-') {
try {
date = formatter.parse(datePart);
final long segmentId = date.getTime() / segmentInterval;
final File newName = new File(parent, segmentName(segmentId));
final File oldName = new File(parent, segmentName);
if (!oldName.renameTo(newName)) {
throw new ProcessorStateException("Unable to rename old style segment from: "
+ oldName
+ " to new name: "
+ newName);
}
return segmentId;
segmentId = formatter.parse(segmentIdString).getTime() / segmentInterval;
} catch (ParseException e) {
log.warn("Unable to parse segmentName {} to a date. This segment will be skipped", segmentName);
return -1L;
}
renameSegmentFile(parent, segmentName, segmentId);
} else {
// for both new formats (with : or .) parse segment ID identically
try {
return Long.parseLong(segmentName.substring(name.length() + 1)) / segmentInterval;
segmentId = Long.parseLong(segmentIdString) / segmentInterval;
} catch (NumberFormatException e) {
throw new ProcessorStateException("Unable to parse segment id as long from segmentName: " + segmentName);
}
// intermediate segment name with : breaks KafkaStreams on Windows OS -> rename segment file to new name with .
if (segmentSeparator == ':') {
renameSegmentFile(parent, segmentName, segmentId);
}
}
return segmentId;
}
private void renameSegmentFile(final File parent,
final String segmentName,
final long segmentId) {
final File newName = new File(parent, segmentName(segmentId));
final File oldName = new File(parent, segmentName);
if (!oldName.renameTo(newName)) {
throw new ProcessorStateException("Unable to rename old style segment from: "
+ oldName
+ " to new name: "
+ newName);
}
}
}

26
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java

@ -164,7 +164,7 @@ public class RocksDBSegmentedBytesStoreTest { @@ -164,7 +164,7 @@ public class RocksDBSegmentedBytesStoreTest {
bytesStore.close();
final String firstSegmentName = segments.segmentName(0);
final String[] nameParts = firstSegmentName.split(":");
final String[] nameParts = firstSegmentName.split("\\.");
final Long segmentId = Long.parseLong(nameParts[1]);
final SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
@ -184,6 +184,30 @@ public class RocksDBSegmentedBytesStoreTest { @@ -184,6 +184,30 @@ public class RocksDBSegmentedBytesStoreTest {
KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L))));
}
@Test
public void shouldLoadSegementsWithOldStyleColonFormattedName() {
final Segments segments = new Segments(storeName, retention, numSegments);
final String key = "a";
bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L));
bytesStore.close();
final String firstSegmentName = segments.segmentName(0);
final String[] nameParts = firstSegmentName.split("\\.");
final File parent = new File(stateDir, storeName);
final File oldStyleName = new File(parent, nameParts[0] + ":" + Long.parseLong(nameParts[1]));
assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName));
bytesStore = new RocksDBSegmentedBytesStore(storeName,
retention,
numSegments,
schema);
bytesStore.init(context, bytesStore);
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L));
assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L),
KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L))));
}
private Set<String> segmentDirs() {
File windowDir = new File(stateDir, storeName);

73
streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java

@ -28,7 +28,10 @@ import org.junit.Before; @@ -28,7 +28,10 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.SimpleTimeZone;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -41,16 +44,19 @@ public class SegmentsTest { @@ -41,16 +44,19 @@ public class SegmentsTest {
private MockProcessorContext context;
private Segments segments;
private long segmentInterval;
private File stateDirectory;
private String storeName = "test";
private final int retentionPeriod = 4 * 60 * 1000;
@Before
public void createContext() {
context = new MockProcessorContext(TestUtils.tempDirectory(),
stateDirectory = TestUtils.tempDirectory();
context = new MockProcessorContext(stateDirectory,
Serdes.String(),
Serdes.Long(),
new NoOpRecordCollector(),
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
int retentionPeriod = 4 * 60 * 1000;
segments = new Segments("test", retentionPeriod, NUM_SEGMENTS);
segments = new Segments(storeName, retentionPeriod, NUM_SEGMENTS);
segmentInterval = Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS);
}
@ -78,9 +84,9 @@ public class SegmentsTest { @@ -78,9 +84,9 @@ public class SegmentsTest {
@Test
public void shouldGetSegmentNameFromId() throws Exception {
assertEquals("test:0", segments.segmentName(0));
assertEquals("test:" + segmentInterval, segments.segmentName(1));
assertEquals("test:" + 2 * segmentInterval, segments.segmentName(2));
assertEquals("test.0", segments.segmentName(0));
assertEquals("test." + segmentInterval, segments.segmentName(1));
assertEquals("test." + 2 * segmentInterval, segments.segmentName(2));
}
@Test
@ -88,9 +94,9 @@ public class SegmentsTest { @@ -88,9 +94,9 @@ public class SegmentsTest {
final Segment segment1 = segments.getOrCreateSegment(0, context);
final Segment segment2 = segments.getOrCreateSegment(1, context);
final Segment segment3 = segments.getOrCreateSegment(2, context);
assertTrue(new File(context.stateDir(), "test/test:0").isDirectory());
assertTrue(new File(context.stateDir(), "test/test:" + segmentInterval).isDirectory());
assertTrue(new File(context.stateDir(), "test/test:" + 2 * segmentInterval).isDirectory());
assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
assertTrue(new File(context.stateDir(), "test/test." + segmentInterval).isDirectory());
assertTrue(new File(context.stateDir(), "test/test." + 2 * segmentInterval).isDirectory());
assertEquals(true, segment1.isOpen());
assertEquals(true, segment2.isOpen());
assertEquals(true, segment3.isOpen());
@ -100,7 +106,7 @@ public class SegmentsTest { @@ -100,7 +106,7 @@ public class SegmentsTest {
public void shouldNotCreateSegmentThatIsAlreadyExpired() {
segments.getOrCreateSegment(7, context);
assertNull(segments.getOrCreateSegment(0, context));
assertFalse(new File(context.stateDir(), "test/test:0").exists());
assertFalse(new File(context.stateDir(), "test/test.0").exists());
}
@Test
@ -111,9 +117,9 @@ public class SegmentsTest { @@ -111,9 +117,9 @@ public class SegmentsTest {
assertFalse(segment1.isOpen());
assertFalse(segment2.isOpen());
assertTrue(segment3.isOpen());
assertFalse(new File(context.stateDir(), "test/test:0").exists());
assertFalse(new File(context.stateDir(), "test/test:" + segmentInterval).exists());
assertTrue(new File(context.stateDir(), "test/test:" + 7 * segmentInterval).exists());
assertFalse(new File(context.stateDir(), "test/test.0").exists());
assertFalse(new File(context.stateDir(), "test/test." + segmentInterval).exists());
assertTrue(new File(context.stateDir(), "test/test." + 7 * segmentInterval).exists());
}
@Test
@ -203,6 +209,47 @@ public class SegmentsTest { @@ -203,6 +209,47 @@ public class SegmentsTest {
verifyCorrectSegments(2, 5);
}
@Test
public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exception {
final String storeDirectoryPath = stateDirectory.getAbsolutePath() + File.separator + storeName;
final File storeDirectory = new File(storeDirectoryPath);
storeDirectory.mkdirs();
final SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + "-" + formatter.format(new Date(segmentId * segmentInterval)));
oldSegment.createNewFile();
}
segments.openExisting(context);
for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (retentionPeriod / (NUM_SEGMENTS - 1)));
assertTrue(newSegment.exists());
}
}
@Test
public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Exception {
final String storeDirectoryPath = stateDirectory.getAbsolutePath() + File.separator + storeName;
final File storeDirectory = new File(storeDirectoryPath);
storeDirectory.mkdirs();
for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + ":" + segmentId * (retentionPeriod / (NUM_SEGMENTS - 1)));
oldSegment.createNewFile();
}
segments.openExisting(context);
for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (retentionPeriod / (NUM_SEGMENTS - 1)));
assertTrue(newSegment.exists());
}
}
private void verifyCorrectSegments(final long first, final int numSegments) {
final List<Segment> result = this.segments.segments(0, Long.MAX_VALUE);
assertEquals(numSegments, result.size());

Loading…
Cancel
Save