|
|
|
@ -13,7 +13,6 @@
@@ -13,7 +13,6 @@
|
|
|
|
|
# See the License for the specific language governing permissions and |
|
|
|
|
# limitations under the License. |
|
|
|
|
|
|
|
|
|
from ducktape.mark import ignore |
|
|
|
|
from ducktape.mark import parametrize |
|
|
|
|
from ducktape.tests.test import Test |
|
|
|
|
from ducktape.utils.util import wait_until |
|
|
|
@ -21,7 +20,7 @@ from kafkatest.services.kafka import KafkaService
@@ -21,7 +20,7 @@ from kafkatest.services.kafka import KafkaService
|
|
|
|
|
from kafkatest.services.streams import StreamsBrokerCompatibilityService |
|
|
|
|
from kafkatest.services.verifiable_consumer import VerifiableConsumer |
|
|
|
|
from kafkatest.services.zookeeper import ZookeeperService |
|
|
|
|
from kafkatest.version import DEV_BRANCH, LATEST_0_11_0, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, KafkaVersion |
|
|
|
|
from kafkatest.version import LATEST_0_11_0, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, KafkaVersion |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class StreamsBrokerCompatibility(Test): |
|
|
|
@ -30,7 +29,6 @@ class StreamsBrokerCompatibility(Test):
@@ -30,7 +29,6 @@ class StreamsBrokerCompatibility(Test):
|
|
|
|
|
- Streams 0.11+ w/ EOS fails fast for older brokers 0.10.2 and 0.10.1 |
|
|
|
|
- Streams 0.11+ w/o EOS works for older brokers 0.10.2 and 0.10.1 |
|
|
|
|
- Streams fails fast for 0.10.0 brokers |
|
|
|
|
- Streams times-out for pre-0.10.0 brokers |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
input = "brokerCompatibilitySourceTopic" |
|
|
|
@ -72,6 +70,9 @@ class StreamsBrokerCompatibility(Test):
@@ -72,6 +70,9 @@ class StreamsBrokerCompatibility(Test):
|
|
|
|
|
|
|
|
|
|
self.kafka.stop() |
|
|
|
|
|
|
|
|
|
@parametrize(broker_version=str(LATEST_2_0)) |
|
|
|
|
@parametrize(broker_version=str(LATEST_1_1)) |
|
|
|
|
@parametrize(broker_version=str(LATEST_1_0)) |
|
|
|
|
@parametrize(broker_version=str(LATEST_0_11_0)) |
|
|
|
|
@parametrize(broker_version=str(LATEST_0_10_2)) |
|
|
|
|
@parametrize(broker_version=str(LATEST_0_10_1)) |
|
|
|
@ -106,19 +107,3 @@ class StreamsBrokerCompatibility(Test):
@@ -106,19 +107,3 @@ class StreamsBrokerCompatibility(Test):
|
|
|
|
|
|
|
|
|
|
self.kafka.stop() |
|
|
|
|
|
|
|
|
|
@ignore |
|
|
|
|
@parametrize(broker_version=str(LATEST_0_9)) |
|
|
|
|
@parametrize(broker_version=str(LATEST_0_8_2)) |
|
|
|
|
def test_timeout_on_pre_010_brokers(self, broker_version): |
|
|
|
|
self.kafka.set_version(KafkaVersion(broker_version)) |
|
|
|
|
self.kafka.start() |
|
|
|
|
|
|
|
|
|
processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False) |
|
|
|
|
|
|
|
|
|
with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor: |
|
|
|
|
processor.start() |
|
|
|
|
monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.BrokerNotFoundException: Could not find any available broker.', |
|
|
|
|
timeout_sec=60, |
|
|
|
|
err_msg="Never saw 'no available brokers' error message " + str(processor.node.account)) |
|
|
|
|
|
|
|
|
|
self.kafka.stop() |
|
|
|
|