|
|
@ -80,55 +80,55 @@ public class KStreamKStreamLeftJoinTest { |
|
|
|
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { |
|
|
|
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { |
|
|
|
final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); |
|
|
|
final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); |
|
|
|
|
|
|
|
|
|
|
|
// push two items to the primary stream. the other window is empty
|
|
|
|
// push two items to the primary stream; the other window is empty
|
|
|
|
// w1 {}
|
|
|
|
// w1 {}
|
|
|
|
// w2 {}
|
|
|
|
// w2 {}
|
|
|
|
// --> w1 = { 0:X0, 1:X1 }
|
|
|
|
// --> w1 = { 0:A0, 1:A1 }
|
|
|
|
// --> w2 = {}
|
|
|
|
// --> w2 = {}
|
|
|
|
for (int i = 0; i < 2; i++) { |
|
|
|
for (int i = 0; i < 2; i++) { |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i])); |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "A" + expectedKeys[i])); |
|
|
|
} |
|
|
|
} |
|
|
|
processor.checkAndClearProcessResult("0:X0+null (ts: 0)", "1:X1+null (ts: 0)"); |
|
|
|
processor.checkAndClearProcessResult("0:A0+null (ts: 0)", "1:A1+null (ts: 0)"); |
|
|
|
|
|
|
|
|
|
|
|
// push two items to the other stream. this should produce two items.
|
|
|
|
// push two items to the other stream; this should produce two items
|
|
|
|
// w1 = { 0:X0, 1:X1 }
|
|
|
|
// w1 = { 0:A0, 1:A1 }
|
|
|
|
// w2 {}
|
|
|
|
// w2 {}
|
|
|
|
// --> w1 = { 0:X0, 1:X1 }
|
|
|
|
// --> w1 = { 0:A0, 1:A1 }
|
|
|
|
// --> w2 = { 0:Y0, 1:Y1 }
|
|
|
|
// --> w2 = { 0:a0, 1:a1 }
|
|
|
|
for (int i = 0; i < 2; i++) { |
|
|
|
for (int i = 0; i < 2; i++) { |
|
|
|
driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i])); |
|
|
|
driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "a" + expectedKeys[i])); |
|
|
|
} |
|
|
|
} |
|
|
|
processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 0)"); |
|
|
|
processor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)"); |
|
|
|
|
|
|
|
|
|
|
|
// push three items to the primary stream. this should produce four items.
|
|
|
|
// push three items to the primary stream; this should produce four items
|
|
|
|
// w1 = { 0:X0, 1:X1 }
|
|
|
|
// w1 = { 0:A0, 1:A1 }
|
|
|
|
// w2 = { 0:Y0, 1:Y1 }
|
|
|
|
// w2 = { 0:a0, 1:a1 }
|
|
|
|
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
|
|
|
|
// --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 }
|
|
|
|
// --> w2 = { 0:Y0, 1:Y1 }
|
|
|
|
// --> w2 = { 0:a0, 1:a1 }
|
|
|
|
for (int i = 0; i < 3; i++) { |
|
|
|
for (int i = 0; i < 3; i++) { |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i])); |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "B" + expectedKeys[i])); |
|
|
|
} |
|
|
|
} |
|
|
|
processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 0)", "2:X2+null (ts: 0)"); |
|
|
|
processor.checkAndClearProcessResult("0:B0+a0 (ts: 0)", "1:B1+a1 (ts: 0)", "2:B2+null (ts: 0)"); |
|
|
|
|
|
|
|
|
|
|
|
// push all items to the other stream. this should produce 5 items
|
|
|
|
// push all items to the other stream; this should produce five items
|
|
|
|
// w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
|
|
|
|
// w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 }
|
|
|
|
// w2 = { 0:Y0, 1:Y1 }
|
|
|
|
// w2 = { 0:a0, 1:a1 }
|
|
|
|
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
|
|
|
|
// --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 }
|
|
|
|
// --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
|
|
|
|
// --> w2 = { 0:a0, 1:a1, 0:b0, 1:b1, 2:b2, 3:b3 }
|
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey)); |
|
|
|
driver.pipeInput(recordFactory.create(topic2, expectedKey, "b" + expectedKey)); |
|
|
|
} |
|
|
|
} |
|
|
|
processor.checkAndClearProcessResult("0:X0+YY0 (ts: 0)", "0:X0+YY0 (ts: 0)", "1:X1+YY1 (ts: 0)", "1:X1+YY1 (ts: 0)", "2:X2+YY2 (ts: 0)"); |
|
|
|
processor.checkAndClearProcessResult("0:A0+b0 (ts: 0)", "0:B0+b0 (ts: 0)", "1:A1+b1 (ts: 0)", "1:B1+b1 (ts: 0)", "2:B2+b2 (ts: 0)"); |
|
|
|
|
|
|
|
|
|
|
|
// push all four items to the primary stream. this should produce six items.
|
|
|
|
// push all four items to the primary stream; this should produce six items
|
|
|
|
// w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
|
|
|
|
// w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2 }
|
|
|
|
// w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
|
|
|
|
// w2 = { 0:a0, 1:a1, 0:b0, 1:b1, 2:b2, 3:b3 }
|
|
|
|
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
|
|
|
|
// --> w1 = { 0:A0, 1:A1, 0:B0, 1:B1, 2:B2, 0:C0, 1:C1, 2:C2, 3:C3 }
|
|
|
|
// --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
|
|
|
|
// --> w2 = { 0:a0, 1:a1, 0:b0, 1:b1, 2:b2, 3:b3 }
|
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey)); |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "C" + expectedKey)); |
|
|
|
} |
|
|
|
} |
|
|
|
processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 0)", "0:XX0+YY0 (ts: 0)", "1:XX1+Y1 (ts: 0)", "1:XX1+YY1 (ts: 0)", "2:XX2+YY2 (ts: 0)", "3:XX3+YY3 (ts: 0)"); |
|
|
|
processor.checkAndClearProcessResult("0:C0+a0 (ts: 0)", "0:C0+b0 (ts: 0)", "1:C1+a1 (ts: 0)", "1:C1+b1 (ts: 0)", "2:C2+b2 (ts: 0)", "3:C3+b3 (ts: 0)"); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -159,107 +159,284 @@ public class KStreamKStreamLeftJoinTest { |
|
|
|
|
|
|
|
|
|
|
|
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { |
|
|
|
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { |
|
|
|
final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); |
|
|
|
final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); |
|
|
|
long time = 0L; |
|
|
|
final long time = 0L; |
|
|
|
|
|
|
|
|
|
|
|
// push two items to the primary stream. the other window is empty. this should produce two items
|
|
|
|
// push two items to the primary stream; the other window is empty; this should produce two left-join items
|
|
|
|
// w1 = {}
|
|
|
|
// w1 = {}
|
|
|
|
// w2 = {}
|
|
|
|
// w2 = {}
|
|
|
|
// --> w1 = { 0:X0, 1:X1 }
|
|
|
|
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
|
|
|
|
// --> w2 = {}
|
|
|
|
// --> w2 = {}
|
|
|
|
for (int i = 0; i < 2; i++) { |
|
|
|
for (int i = 0; i < 2; i++) { |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time)); |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "A" + expectedKeys[i], time)); |
|
|
|
} |
|
|
|
} |
|
|
|
processor.checkAndClearProcessResult("0:X0+null (ts: 0)", "1:X1+null (ts: 0)"); |
|
|
|
processor.checkAndClearProcessResult("0:A0+null (ts: 0)", "1:A1+null (ts: 0)"); |
|
|
|
|
|
|
|
|
|
|
|
// push two items to the other stream. this should produce no items.
|
|
|
|
// push four items to the other stream; this should produce two full-join items
|
|
|
|
// w1 = { 0:X0, 1:X1 }
|
|
|
|
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
|
|
|
|
// w2 = {}
|
|
|
|
// w2 = {}
|
|
|
|
// --> w1 = { 0:X0, 1:X1 }
|
|
|
|
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
|
|
|
|
// --> w2 = { 0:Y0, 1:Y1 }
|
|
|
|
// --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0) }
|
|
|
|
for (int i = 0; i < 2; i++) { |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time)); |
|
|
|
driver.pipeInput(recordFactory.create(topic2, expectedKey, "a" + expectedKey, time)); |
|
|
|
} |
|
|
|
} |
|
|
|
processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 0)"); |
|
|
|
processor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)"); |
|
|
|
|
|
|
|
|
|
|
|
// clear logically
|
|
|
|
testUpperWindowBound(expectedKeys, driver, processor); |
|
|
|
time = 1000L; |
|
|
|
testLowerWindowBound(expectedKeys, driver, processor); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// push all items to the other stream. this should produce no items.
|
|
|
|
private void testUpperWindowBound(final int[] expectedKeys, |
|
|
|
// w1 = {}
|
|
|
|
final TopologyTestDriver driver, |
|
|
|
// w2 = {}
|
|
|
|
final MockProcessor<Integer, String> processor) { |
|
|
|
// --> w1 = {}
|
|
|
|
long time; |
|
|
|
// --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
|
|
|
|
|
|
|
|
|
|
|
|
// push four items with larger and increasing timestamp (out of window) to the other stream; this should produce no items
|
|
|
|
|
|
|
|
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
|
|
|
|
|
|
|
|
// w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0) }
|
|
|
|
|
|
|
|
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
|
|
|
|
|
|
|
|
// --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
|
|
|
|
time = 1000L; |
|
|
|
for (int i = 0; i < expectedKeys.length; i++) { |
|
|
|
for (int i = 0; i < expectedKeys.length; i++) { |
|
|
|
driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i)); |
|
|
|
driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "b" + expectedKeys[i], time + i)); |
|
|
|
} |
|
|
|
} |
|
|
|
processor.checkAndClearProcessResult(); |
|
|
|
processor.checkAndClearProcessResult(); |
|
|
|
|
|
|
|
|
|
|
|
// gradually expire items in window 2.
|
|
|
|
// push four items with larger timestamp to the primary stream; this should produce four full-join items
|
|
|
|
// w1 = {}
|
|
|
|
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
|
|
|
|
// w2 = {}
|
|
|
|
// w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
// --> w1 = {}
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
// --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
|
|
|
|
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100) }
|
|
|
|
|
|
|
|
// --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
time = 1000L + 100L; |
|
|
|
time = 1000L + 100L; |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time)); |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "B" + expectedKey, time)); |
|
|
|
} |
|
|
|
} |
|
|
|
processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 1100)", "1:XX1+Y1 (ts: 1100)", "2:XX2+Y2 (ts: 1100)", "3:XX3+Y3 (ts: 1100)"); |
|
|
|
processor.checkAndClearProcessResult("0:B0+b0 (ts: 1100)", "1:B1+b1 (ts: 1100)", "2:B2+b2 (ts: 1100)", "3:B3+b3 (ts: 1100)"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// push four items with increased timestamp to the primary stream; this should produce one left-join and three full-join items
|
|
|
|
|
|
|
|
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100) }
|
|
|
|
|
|
|
|
// w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
|
|
|
|
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
|
|
|
|
|
|
|
|
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101) }
|
|
|
|
|
|
|
|
// --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
time += 1L; |
|
|
|
time += 1L; |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time)); |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "C" + expectedKey, time)); |
|
|
|
} |
|
|
|
} |
|
|
|
processor.checkAndClearProcessResult("0:XX0+null (ts: 1101)", "1:XX1+Y1 (ts: 1101)", "2:XX2+Y2 (ts: 1101)", "3:XX3+Y3 (ts: 1101)"); |
|
|
|
processor.checkAndClearProcessResult("0:C0+null (ts: 1101)", "1:C1+b1 (ts: 1101)", "2:C2+b2 (ts: 1101)", "3:C3+b3 (ts: 1101)"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// push four items with increased timestamp to the primary stream; this should produce two left-join and two full-join items
|
|
|
|
|
|
|
|
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
|
|
|
|
|
|
|
|
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101) }
|
|
|
|
|
|
|
|
// w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
|
|
|
|
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
|
|
|
|
|
|
|
|
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
|
|
|
|
|
|
|
|
// 0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102) }
|
|
|
|
|
|
|
|
// --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
time += 1L; |
|
|
|
time += 1L; |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time)); |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "D" + expectedKey, time)); |
|
|
|
} |
|
|
|
} |
|
|
|
processor.checkAndClearProcessResult("0:XX0+null (ts: 1102)", "1:XX1+null (ts: 1102)", "2:XX2+Y2 (ts: 1102)", "3:XX3+Y3 (ts: 1102)"); |
|
|
|
processor.checkAndClearProcessResult("0:D0+null (ts: 1102)", "1:D1+null (ts: 1102)", "2:D2+b2 (ts: 1102)", "3:D3+b3 (ts: 1102)"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// push four items with increased timestamp to the primary stream; this should produce three left-join and one full-join items
|
|
|
|
|
|
|
|
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
|
|
|
|
|
|
|
|
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
|
|
|
|
|
|
|
|
// 0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102) }
|
|
|
|
|
|
|
|
// w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
|
|
|
|
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
|
|
|
|
|
|
|
|
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
|
|
|
|
|
|
|
|
// 0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
|
|
|
|
|
|
|
|
// 0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103) }
|
|
|
|
|
|
|
|
// --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
time += 1L; |
|
|
|
time += 1L; |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time)); |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "E" + expectedKey, time)); |
|
|
|
} |
|
|
|
} |
|
|
|
processor.checkAndClearProcessResult("0:XX0+null (ts: 1103)", "1:XX1+null (ts: 1103)", "2:XX2+null (ts: 1103)", "3:XX3+Y3 (ts: 1103)"); |
|
|
|
processor.checkAndClearProcessResult("0:E0+null (ts: 1103)", "1:E1+null (ts: 1103)", "2:E2+null (ts: 1103)", "3:E3+b3 (ts: 1103)"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// push four items with increased timestamp to the primary stream; this should produce four left-join and no full-join items
|
|
|
|
|
|
|
|
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
|
|
|
|
|
|
|
|
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
|
|
|
|
|
|
|
|
// 0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
|
|
|
|
|
|
|
|
// 0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103) }
|
|
|
|
|
|
|
|
// w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
|
|
|
|
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
|
|
|
|
|
|
|
|
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
|
|
|
|
|
|
|
|
// 0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
|
|
|
|
|
|
|
|
// 0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
|
|
|
|
|
|
|
|
// 0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104) }
|
|
|
|
|
|
|
|
// --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
time += 1L; |
|
|
|
time += 1L; |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time)); |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "F" + expectedKey, time)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
processor.checkAndClearProcessResult("0:F0+null (ts: 1104)", "1:F1+null (ts: 1104)", "2:F2+null (ts: 1104)", "3:F3+null (ts: 1104)"); |
|
|
|
} |
|
|
|
} |
|
|
|
processor.checkAndClearProcessResult("0:XX0+null (ts: 1104)", "1:XX1+null (ts: 1104)", "2:XX2+null (ts: 1104)", "3:XX3+null (ts: 1104)"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// go back to the time before expiration
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void testLowerWindowBound(final int[] expectedKeys, |
|
|
|
|
|
|
|
final TopologyTestDriver driver, |
|
|
|
|
|
|
|
final MockProcessor<Integer, String> processor) { |
|
|
|
|
|
|
|
long time; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// push four items with smaller timestamp (before the window) to the primary stream; this should produce four left-join and no full-join items
|
|
|
|
|
|
|
|
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
|
|
|
|
|
|
|
|
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
|
|
|
|
|
|
|
|
// 0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
|
|
|
|
|
|
|
|
// 0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
|
|
|
|
|
|
|
|
// 0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104) }
|
|
|
|
|
|
|
|
// w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
|
|
|
|
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
|
|
|
|
|
|
|
|
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
|
|
|
|
|
|
|
|
// 0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
|
|
|
|
|
|
|
|
// 0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
|
|
|
|
|
|
|
|
// 0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104),
|
|
|
|
|
|
|
|
// 0:G0 (ts: 899), 1:G1 (ts: 899), 2:G2 (ts: 899), 3:G3 (ts: 899) }
|
|
|
|
|
|
|
|
// --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
time = 1000L - 100L - 1L; |
|
|
|
time = 1000L - 100L - 1L; |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time)); |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "G" + expectedKey, time)); |
|
|
|
} |
|
|
|
} |
|
|
|
processor.checkAndClearProcessResult("0:XX0+null (ts: 899)", "1:XX1+null (ts: 899)", "2:XX2+null (ts: 899)", "3:XX3+null (ts: 899)"); |
|
|
|
processor.checkAndClearProcessResult("0:G0+null (ts: 899)", "1:G1+null (ts: 899)", "2:G2+null (ts: 899)", "3:G3+null (ts: 899)"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// push four items with increase timestamp to the primary stream; this should produce three left-join and one full-join items
|
|
|
|
|
|
|
|
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
|
|
|
|
|
|
|
|
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
|
|
|
|
|
|
|
|
// 0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
|
|
|
|
|
|
|
|
// 0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
|
|
|
|
|
|
|
|
// 0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104),
|
|
|
|
|
|
|
|
// 0:G0 (ts: 899), 1:G1 (ts: 899), 2:G2 (ts: 899), 3:G3 (ts: 899) }
|
|
|
|
|
|
|
|
// w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
|
|
|
|
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
|
|
|
|
|
|
|
|
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
|
|
|
|
|
|
|
|
// 0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
|
|
|
|
|
|
|
|
// 0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
|
|
|
|
|
|
|
|
// 0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104),
|
|
|
|
|
|
|
|
// 0:G0 (ts: 899), 1:G1 (ts: 899), 2:G2 (ts: 899), 3:G3 (ts: 899),
|
|
|
|
|
|
|
|
// 0:H0 (ts: 900), 1:H1 (ts: 900), 2:H2 (ts: 900), 3:H3 (ts: 900) }
|
|
|
|
|
|
|
|
// --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
time += 1L; |
|
|
|
time += 1L; |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time)); |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "H" + expectedKey, time)); |
|
|
|
} |
|
|
|
} |
|
|
|
processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 900)", "1:XX1+null (ts: 900)", "2:XX2+null (ts: 900)", "3:XX3+null (ts: 900)"); |
|
|
|
processor.checkAndClearProcessResult("0:H0+b0 (ts: 1000)", "1:H1+null (ts: 900)", "2:H2+null (ts: 900)", "3:H3+null (ts: 900)"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// push four items with increase timestamp to the primary stream; this should produce two left-join and two full-join items
|
|
|
|
|
|
|
|
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
|
|
|
|
|
|
|
|
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
|
|
|
|
|
|
|
|
// 0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
|
|
|
|
|
|
|
|
// 0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
|
|
|
|
|
|
|
|
// 0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104),
|
|
|
|
|
|
|
|
// 0:G0 (ts: 899), 1:G1 (ts: 899), 2:G2 (ts: 899), 3:G3 (ts: 899),
|
|
|
|
|
|
|
|
// 0:H0 (ts: 900), 1:H1 (ts: 900), 2:H2 (ts: 900), 3:H3 (ts: 900) }
|
|
|
|
|
|
|
|
// w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
|
|
|
|
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
|
|
|
|
|
|
|
|
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
|
|
|
|
|
|
|
|
// 0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
|
|
|
|
|
|
|
|
// 0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
|
|
|
|
|
|
|
|
// 0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104),
|
|
|
|
|
|
|
|
// 0:G0 (ts: 899), 1:G1 (ts: 899), 2:G2 (ts: 899), 3:G3 (ts: 899),
|
|
|
|
|
|
|
|
// 0:H0 (ts: 900), 1:H1 (ts: 900), 2:H2 (ts: 900), 3:H3 (ts: 900),
|
|
|
|
|
|
|
|
// 0:I0 (ts: 901), 1:I1 (ts: 901), 2:I2 (ts: 901), 3:I3 (ts: 901) }
|
|
|
|
|
|
|
|
// --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
time += 1L; |
|
|
|
time += 1L; |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time)); |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "I" + expectedKey, time)); |
|
|
|
} |
|
|
|
} |
|
|
|
processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 901)", "1:XX1+Y1 (ts: 901)", "2:XX2+null (ts: 901)", "3:XX3+null (ts: 901)"); |
|
|
|
processor.checkAndClearProcessResult("0:I0+b0 (ts: 1000)", "1:I1+b1 (ts: 1001)", "2:I2+null (ts: 901)", "3:I3+null (ts: 901)"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// push four items with increase timestamp to the primary stream; this should produce one left-join and three full-join items
|
|
|
|
|
|
|
|
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
|
|
|
|
|
|
|
|
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
|
|
|
|
|
|
|
|
// 0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
|
|
|
|
|
|
|
|
// 0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
|
|
|
|
|
|
|
|
// 0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104),
|
|
|
|
|
|
|
|
// 0:G0 (ts: 899), 1:G1 (ts: 899), 2:G2 (ts: 899), 3:G3 (ts: 899),
|
|
|
|
|
|
|
|
// 0:H0 (ts: 900), 1:H1 (ts: 900), 2:H2 (ts: 900), 3:H3 (ts: 900),
|
|
|
|
|
|
|
|
// 0:I0 (ts: 901), 1:I1 (ts: 901), 2:I2 (ts: 901), 3:I3 (ts: 901) }
|
|
|
|
|
|
|
|
// w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
|
|
|
|
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
|
|
|
|
|
|
|
|
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
|
|
|
|
|
|
|
|
// 0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
|
|
|
|
|
|
|
|
// 0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
|
|
|
|
|
|
|
|
// 0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104),
|
|
|
|
|
|
|
|
// 0:G0 (ts: 899), 1:G1 (ts: 899), 2:G2 (ts: 899), 3:G3 (ts: 899),
|
|
|
|
|
|
|
|
// 0:H0 (ts: 900), 1:H1 (ts: 900), 2:H2 (ts: 900), 3:H3 (ts: 900),
|
|
|
|
|
|
|
|
// 0:I0 (ts: 901), 1:I1 (ts: 901), 2:I2 (ts: 901), 3:I3 (ts: 901),
|
|
|
|
|
|
|
|
// 0:J0 (ts: 902), 1:J1 (ts: 902), 2:J2 (ts: 902), 3:J3 (ts: 902) }
|
|
|
|
|
|
|
|
// --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
time += 1L; |
|
|
|
time += 1L; |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time)); |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "J" + expectedKey, time)); |
|
|
|
} |
|
|
|
} |
|
|
|
processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 902)", "1:XX1+Y1 (ts: 902)", "2:XX2+Y2 (ts: 902)", "3:XX3+null (ts: 902)"); |
|
|
|
processor.checkAndClearProcessResult("0:J0+b0 (ts: 1000)", "1:J1+b1 (ts: 1001)", "2:J2+b2 (ts: 1002)", "3:J3+null (ts: 902)"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// push four items with increase timestamp to the primary stream; this should produce one left-join and three full-join items
|
|
|
|
|
|
|
|
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
|
|
|
|
|
|
|
|
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
|
|
|
|
|
|
|
|
// 0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
|
|
|
|
|
|
|
|
// 0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
|
|
|
|
|
|
|
|
// 0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104),
|
|
|
|
|
|
|
|
// 0:G0 (ts: 899), 1:G1 (ts: 899), 2:G2 (ts: 899), 3:G3 (ts: 899),
|
|
|
|
|
|
|
|
// 0:H0 (ts: 900), 1:H1 (ts: 900), 2:H2 (ts: 900), 3:H3 (ts: 900),
|
|
|
|
|
|
|
|
// 0:I0 (ts: 901), 1:I1 (ts: 901), 2:I2 (ts: 901), 3:I3 (ts: 901),
|
|
|
|
|
|
|
|
// 0:J0 (ts: 902), 1:J1 (ts: 902), 2:J2 (ts: 902), 3:J3 (ts: 902) }
|
|
|
|
|
|
|
|
// w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
|
|
|
|
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
|
|
|
|
|
|
|
|
// 0:B0 (ts: 1100), 1:B1 (ts: 1100), 2:B2 (ts: 1100), 3:B3 (ts: 1100),
|
|
|
|
|
|
|
|
// 0:C0 (ts: 1101), 1:C1 (ts: 1101), 2:C2 (ts: 1101), 3:C3 (ts: 1101),
|
|
|
|
|
|
|
|
// 0:D0 (ts: 1102), 1:D1 (ts: 1102), 2:D2 (ts: 1102), 3:D3 (ts: 1102),
|
|
|
|
|
|
|
|
// 0:E0 (ts: 1103), 1:E1 (ts: 1103), 2:E2 (ts: 1103), 3:E3 (ts: 1103),
|
|
|
|
|
|
|
|
// 0:F0 (ts: 1104), 1:F1 (ts: 1104), 2:F2 (ts: 1104), 3:F3 (ts: 1104),
|
|
|
|
|
|
|
|
// 0:G0 (ts: 899), 1:G1 (ts: 899), 2:G2 (ts: 899), 3:G3 (ts: 899),
|
|
|
|
|
|
|
|
// 0:H0 (ts: 900), 1:H1 (ts: 900), 2:H2 (ts: 900), 3:H3 (ts: 900),
|
|
|
|
|
|
|
|
// 0:I0 (ts: 901), 1:I1 (ts: 901), 2:I2 (ts: 901), 3:I3 (ts: 901),
|
|
|
|
|
|
|
|
// 0:J0 (ts: 902), 1:J1 (ts: 902), 2:J2 (ts: 902), 3:J3 (ts: 902),
|
|
|
|
|
|
|
|
// 0:K0 (ts: 903), 1:K1 (ts: 903), 2:K2 (ts: 903), 3:K3 (ts: 903) }
|
|
|
|
|
|
|
|
// --> w2 = { 0:a0 (ts: 0), 1:a1 (ts: 0), 2:a2 (ts: 0), 3:a3 (ts: 0),
|
|
|
|
|
|
|
|
// 0:b0 (ts: 1000), 1:b1 (ts: 1001), 2:b2 (ts: 1002), 3:b3 (ts: 1003) }
|
|
|
|
time += 1L; |
|
|
|
time += 1L; |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
for (final int expectedKey : expectedKeys) { |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time)); |
|
|
|
driver.pipeInput(recordFactory.create(topic1, expectedKey, "K" + expectedKey, time)); |
|
|
|
} |
|
|
|
|
|
|
|
processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 903)", "1:XX1+Y1 (ts: 903)", "2:XX2+Y2 (ts: 903)", "3:XX3+Y3 (ts: 903)"); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
processor.checkAndClearProcessResult("0:K0+b0 (ts: 1000)", "1:K1+b1 (ts: 1001)", "2:K2+b2 (ts: 1002)", "3:K3+b3 (ts: 1003)"); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|