Browse Source

KAFKA-5515; Remove date formatting from Segments

Remove date formatting from `Segments` and use the `segementId` instead.
Add tests to make sure can load old segments.
Rename old segment dirs to new formatting at load time.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: tedyu <yuzhihong@gmail.com>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3783 from dguy/kafka-5515
pull/3783/merge
Damian Guy 7 years ago
parent
commit
ed0e692147
  1. 1
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
  2. 46
      streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
  3. 39
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
  4. 29
      streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java

1
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java

@ -138,4 +138,5 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { @@ -138,4 +138,5 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
public boolean isOpen() {
return open;
}
}

46
streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java

@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory; @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
@ -61,12 +62,14 @@ class Segments { @@ -61,12 +62,14 @@ class Segments {
this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
}
long segmentId(long timestamp) {
long segmentId(final long timestamp) {
return timestamp / segmentInterval;
}
String segmentName(long segmentId) {
return name + "-" + formatter.format(new Date(segmentId * segmentInterval));
String segmentName(final long segmentId) {
// previous format used - as a separator so if this changes in the future
// then we should use something different.
return name + ":" + segmentId * segmentInterval;
}
Segment getSegmentForTimestamp(final long timestamp) {
@ -101,7 +104,7 @@ class Segments { @@ -101,7 +104,7 @@ class Segments {
if (list != null) {
long[] segmentIds = new long[list.length];
for (int i = 0; i < list.length; i++)
segmentIds[i] = segmentIdFromSegmentName(list[i]);
segmentIds[i] = segmentIdFromSegmentName(list[i], dir);
// open segments in the id order
Arrays.sort(segmentIds);
@ -185,12 +188,35 @@ class Segments { @@ -185,12 +188,35 @@ class Segments {
}
}
private long segmentIdFromSegmentName(String segmentName) {
try {
Date date = formatter.parse(segmentName.substring(name.length() + 1));
return date.getTime() / segmentInterval;
} catch (Exception ex) {
return -1L;
private long segmentIdFromSegmentName(final String segmentName,
final File parent) {
// old style segment name with date
if (segmentName.charAt(name.length()) == '-') {
final String datePart = segmentName.substring(name.length() + 1);
final Date date;
try {
date = formatter.parse(datePart);
final long segmentId = date.getTime() / segmentInterval;
final File newName = new File(parent, segmentName(segmentId));
final File oldName = new File(parent, segmentName);
if (!oldName.renameTo(newName)) {
throw new ProcessorStateException("Unable to rename old style segment from: "
+ oldName
+ " to new name: "
+ newName);
}
return segmentId;
} catch (ParseException e) {
log.warn("Unable to parse segmentName {} to a date. This segment will be skipped", segmentName);
return -1L;
}
} else {
try {
return Long.parseLong(segmentName.substring(name.length() + 1)) / segmentInterval;
} catch (NumberFormatException e) {
throw new ProcessorStateException("Unable to parse segment id as long from segmentName: " + segmentName);
}
}
}
}

39
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java

@ -35,15 +35,22 @@ import org.junit.Before; @@ -35,15 +35,22 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.SimpleTimeZone;
import static org.apache.kafka.streams.state.internals.Segments.segmentInterval;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class RocksDBSegmentedBytesStoreTest {
@ -53,10 +60,10 @@ public class RocksDBSegmentedBytesStoreTest { @@ -53,10 +60,10 @@ public class RocksDBSegmentedBytesStoreTest {
private final String storeName = "bytes-store";
private RocksDBSegmentedBytesStore bytesStore;
private File stateDir;
private final SessionKeySchema schema = new SessionKeySchema();
@Before
public void before() {
final SessionKeySchema schema = new SessionKeySchema();
schema.init("topic");
bytesStore = new RocksDBSegmentedBytesStore(storeName,
retention,
@ -148,6 +155,36 @@ public class RocksDBSegmentedBytesStoreTest { @@ -148,6 +155,36 @@ public class RocksDBSegmentedBytesStoreTest {
}
@Test
public void shouldLoadSegementsWithOldStyleDateFormattedName() {
final Segments segments = new Segments(storeName, retention, numSegments);
final String key = "a";
bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L));
bytesStore.close();
final String firstSegmentName = segments.segmentName(0);
final String[] nameParts = firstSegmentName.split(":");
final Long segmentId = Long.parseLong(nameParts[1]);
final SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
final String formatted = formatter.format(new Date(segmentId * segmentInterval(retention, numSegments)));
final File parent = new File(stateDir, storeName);
final File oldStyleName = new File(parent, nameParts[0] + "-" + formatted);
assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName));
bytesStore = new RocksDBSegmentedBytesStore(storeName,
retention,
numSegments,
schema);
bytesStore.init(context, bytesStore);
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L));
assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L),
KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L))));
}
private Set<String> segmentDirs() {
File windowDir = new File(stateDir, storeName);

29
streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java

@ -40,6 +40,7 @@ public class SegmentsTest { @@ -40,6 +40,7 @@ public class SegmentsTest {
private static final int NUM_SEGMENTS = 5;
private MockProcessorContext context;
private Segments segments;
private long segmentInterval;
@Before
public void createContext() {
@ -48,7 +49,9 @@ public class SegmentsTest { @@ -48,7 +49,9 @@ public class SegmentsTest {
Serdes.Long(),
new NoOpRecordCollector(),
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
segments = new Segments("test", 4 * 60 * 1000, NUM_SEGMENTS);
int retentionPeriod = 4 * 60 * 1000;
segments = new Segments("test", retentionPeriod, NUM_SEGMENTS);
segmentInterval = Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS);
}
@After
@ -74,10 +77,10 @@ public class SegmentsTest { @@ -74,10 +77,10 @@ public class SegmentsTest {
}
@Test
public void shouldGetSegmentNameFromId() {
assertEquals("test-197001010000", segments.segmentName(0));
assertEquals("test-197001010001", segments.segmentName(1));
assertEquals("test-197001010002", segments.segmentName(2));
public void shouldGetSegmentNameFromId() throws Exception {
assertEquals("test:0", segments.segmentName(0));
assertEquals("test:" + segmentInterval, segments.segmentName(1));
assertEquals("test:" + 2 * segmentInterval, segments.segmentName(2));
}
@Test
@ -85,9 +88,9 @@ public class SegmentsTest { @@ -85,9 +88,9 @@ public class SegmentsTest {
final Segment segment1 = segments.getOrCreateSegment(0, context);
final Segment segment2 = segments.getOrCreateSegment(1, context);
final Segment segment3 = segments.getOrCreateSegment(2, context);
assertTrue(new File(context.stateDir(), "test/test-197001010000").isDirectory());
assertTrue(new File(context.stateDir(), "test/test-197001010001").isDirectory());
assertTrue(new File(context.stateDir(), "test/test-197001010002").isDirectory());
assertTrue(new File(context.stateDir(), "test/test:0").isDirectory());
assertTrue(new File(context.stateDir(), "test/test:" + segmentInterval).isDirectory());
assertTrue(new File(context.stateDir(), "test/test:" + 2 * segmentInterval).isDirectory());
assertEquals(true, segment1.isOpen());
assertEquals(true, segment2.isOpen());
assertEquals(true, segment3.isOpen());
@ -97,20 +100,20 @@ public class SegmentsTest { @@ -97,20 +100,20 @@ public class SegmentsTest {
public void shouldNotCreateSegmentThatIsAlreadyExpired() {
segments.getOrCreateSegment(7, context);
assertNull(segments.getOrCreateSegment(0, context));
assertFalse(new File(context.stateDir(), "test/test-197001010000").exists());
assertFalse(new File(context.stateDir(), "test/test:0").exists());
}
@Test
public void shouldCleanupSegmentsThatHaveExpired() {
final Segment segment1 = segments.getOrCreateSegment(0, context);
final Segment segment2 = segments.getOrCreateSegment(0, context);
final Segment segment2 = segments.getOrCreateSegment(1, context);
final Segment segment3 = segments.getOrCreateSegment(7, context);
assertFalse(segment1.isOpen());
assertFalse(segment2.isOpen());
assertTrue(segment3.isOpen());
assertFalse(new File(context.stateDir(), "test/test-197001010000").exists());
assertFalse(new File(context.stateDir(), "test/test-197001010001").exists());
assertTrue(new File(context.stateDir(), "test/test-197001010007").exists());
assertFalse(new File(context.stateDir(), "test/test:0").exists());
assertFalse(new File(context.stateDir(), "test/test:" + segmentInterval).exists());
assertTrue(new File(context.stateDir(), "test/test:" + 7 * segmentInterval).exists());
}
@Test

Loading…
Cancel
Save