|
|
|
@ -93,7 +93,7 @@ public class KStreamKStreamJoinTest {
@@ -93,7 +93,7 @@ public class KStreamKStreamJoinTest {
|
|
|
|
|
// push two items to the primary stream. the other window is empty
|
|
|
|
|
// w1 = {}
|
|
|
|
|
// w2 = {}
|
|
|
|
|
// --> w1 = { 0:X1, 1:X1 }
|
|
|
|
|
// --> w1 = { 0:X0, 1:X1 }
|
|
|
|
|
// w2 = {}
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < 2; i++) { |
|
|
|
@ -105,7 +105,7 @@ public class KStreamKStreamJoinTest {
@@ -105,7 +105,7 @@ public class KStreamKStreamJoinTest {
|
|
|
|
|
// push two items to the other stream. this should produce two items.
|
|
|
|
|
// w1 = { 0:X0, 1:X1 }
|
|
|
|
|
// w2 = {}
|
|
|
|
|
// --> w1 = { 0:X1, 1:X1 }
|
|
|
|
|
// --> w1 = { 0:X0, 1:X1 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1 }
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < 2; i++) { |
|
|
|
@ -117,7 +117,7 @@ public class KStreamKStreamJoinTest {
@@ -117,7 +117,7 @@ public class KStreamKStreamJoinTest {
|
|
|
|
|
// push all four items to the primary stream. this should produce two items.
|
|
|
|
|
// w1 = { 0:X0, 1:X1 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1 }
|
|
|
|
|
// --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
|
|
|
|
|
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1 }
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < expectedKeys.length; i++) { |
|
|
|
@ -127,9 +127,9 @@ public class KStreamKStreamJoinTest {
@@ -127,9 +127,9 @@ public class KStreamKStreamJoinTest {
|
|
|
|
|
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); |
|
|
|
|
|
|
|
|
|
// push all items to the other stream. this should produce six items.
|
|
|
|
|
// w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
|
|
|
|
|
// w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1 }
|
|
|
|
|
// --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
|
|
|
|
|
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < expectedKeys.length; i++) { |
|
|
|
@ -139,9 +139,9 @@ public class KStreamKStreamJoinTest {
@@ -139,9 +139,9 @@ public class KStreamKStreamJoinTest {
|
|
|
|
|
processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); |
|
|
|
|
|
|
|
|
|
// push all four items to the primary stream. this should produce six items.
|
|
|
|
|
// w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
|
|
|
|
|
// w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
|
|
|
|
|
// --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
|
|
|
|
|
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < expectedKeys.length; i++) { |
|
|
|
@ -151,9 +151,9 @@ public class KStreamKStreamJoinTest {
@@ -151,9 +151,9 @@ public class KStreamKStreamJoinTest {
|
|
|
|
|
processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); |
|
|
|
|
|
|
|
|
|
// push two items to the other stream. this should produce six item.
|
|
|
|
|
// w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
|
|
|
|
|
// w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
|
|
|
|
|
// --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
|
|
|
|
|
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < 2; i++) { |
|
|
|
@ -191,7 +191,7 @@ public class KStreamKStreamJoinTest {
@@ -191,7 +191,7 @@ public class KStreamKStreamJoinTest {
|
|
|
|
|
// push two items to the primary stream. the other window is empty.this should produce two items
|
|
|
|
|
// w1 = {}
|
|
|
|
|
// w2 = {}
|
|
|
|
|
// --> w1 = { 0:X1, 1:X1 }
|
|
|
|
|
// --> w1 = { 0:X0, 1:X1 }
|
|
|
|
|
// w2 = {}
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < 2; i++) { |
|
|
|
@ -203,7 +203,7 @@ public class KStreamKStreamJoinTest {
@@ -203,7 +203,7 @@ public class KStreamKStreamJoinTest {
|
|
|
|
|
// push two items to the other stream. this should produce two items.
|
|
|
|
|
// w1 = { 0:X0, 1:X1 }
|
|
|
|
|
// w2 = {}
|
|
|
|
|
// --> w1 = { 0:X1, 1:X1 }
|
|
|
|
|
// --> w1 = { 0:X0, 1:X1 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1 }
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < 2; i++) { |
|
|
|
@ -215,7 +215,7 @@ public class KStreamKStreamJoinTest {
@@ -215,7 +215,7 @@ public class KStreamKStreamJoinTest {
|
|
|
|
|
// push all four items to the primary stream. this should produce four items.
|
|
|
|
|
// w1 = { 0:X0, 1:X1 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1 }
|
|
|
|
|
// --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
|
|
|
|
|
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1 }
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < expectedKeys.length; i++) { |
|
|
|
@ -225,9 +225,9 @@ public class KStreamKStreamJoinTest {
@@ -225,9 +225,9 @@ public class KStreamKStreamJoinTest {
|
|
|
|
|
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); |
|
|
|
|
|
|
|
|
|
// push all items to the other stream. this should produce six items.
|
|
|
|
|
// w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
|
|
|
|
|
// w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1 }
|
|
|
|
|
// --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
|
|
|
|
|
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < expectedKeys.length; i++) { |
|
|
|
@ -237,9 +237,9 @@ public class KStreamKStreamJoinTest {
@@ -237,9 +237,9 @@ public class KStreamKStreamJoinTest {
|
|
|
|
|
processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); |
|
|
|
|
|
|
|
|
|
// push all four items to the primary stream. this should produce six items.
|
|
|
|
|
// w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
|
|
|
|
|
// w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
|
|
|
|
|
// --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
|
|
|
|
|
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < expectedKeys.length; i++) { |
|
|
|
@ -249,9 +249,9 @@ public class KStreamKStreamJoinTest {
@@ -249,9 +249,9 @@ public class KStreamKStreamJoinTest {
|
|
|
|
|
processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); |
|
|
|
|
|
|
|
|
|
// push two items to the other stream. this should produce six item.
|
|
|
|
|
// w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
|
|
|
|
|
// w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
|
|
|
|
|
// --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
|
|
|
|
|
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < 2; i++) { |
|
|
|
@ -292,7 +292,7 @@ public class KStreamKStreamJoinTest {
@@ -292,7 +292,7 @@ public class KStreamKStreamJoinTest {
|
|
|
|
|
// push two items to the primary stream. the other window is empty. this should produce no items.
|
|
|
|
|
// w1 = {}
|
|
|
|
|
// w2 = {}
|
|
|
|
|
// --> w1 = { 0:X1, 1:X1 }
|
|
|
|
|
// --> w1 = { 0:X0, 1:X1 }
|
|
|
|
|
// w2 = {}
|
|
|
|
|
setRecordContext(time, topic1); |
|
|
|
|
for (int i = 0; i < 2; i++) { |
|
|
|
@ -304,7 +304,7 @@ public class KStreamKStreamJoinTest {
@@ -304,7 +304,7 @@ public class KStreamKStreamJoinTest {
|
|
|
|
|
// push two items to the other stream. this should produce two items.
|
|
|
|
|
// w1 = { 0:X0, 1:X1 }
|
|
|
|
|
// w2 = {}
|
|
|
|
|
// --> w1 = { 0:X1, 1:X1 }
|
|
|
|
|
// --> w1 = { 0:X0, 1:X1 }
|
|
|
|
|
// w2 = { 0:Y0, 1:Y1 }
|
|
|
|
|
|
|
|
|
|
setRecordContext(time, topic2); |
|
|
|
|