@ -16,8 +16,21 @@
@@ -16,8 +16,21 @@
* /
package org.apache.kafka.tools ;
import static org.junit.jupiter.api.Assertions.assertEquals ;
import static org.junit.jupiter.api.Assertions.assertThrows ;
import static org.junit.jupiter.api.Assertions.assertTrue ;
import static org.mockito.ArgumentMatchers.any ;
import static org.mockito.ArgumentMatchers.anyLong ;
import static org.mockito.ArgumentMatchers.anyString ;
import static org.mockito.ArgumentMatchers.eq ;
import static org.mockito.Mockito.inOrder ;
import static org.mockito.Mockito.mock ;
import static org.mockito.Mockito.verify ;
import static org.mockito.Mockito.when ;
import com.fasterxml.jackson.databind.JsonNode ;
import com.fasterxml.jackson.databind.ObjectMapper ;
import java.io.IOException ;
import java.util.List ;
import org.apache.kafka.common.MetricName ;
import org.apache.kafka.common.config.ConfigException ;
@ -26,15 +39,6 @@ import org.apache.kafka.common.metrics.KafkaMetric;
@@ -26,15 +39,6 @@ import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig ;
import org.apache.kafka.common.utils.MockTime ;
import org.apache.kafka.common.utils.Time ;
import org.easymock.Capture ;
import org.easymock.EasyMock ;
import org.junit.Before ;
import org.junit.Test ;
import org.junit.runner.RunWith ;
import org.powermock.api.easymock.PowerMock ;
import org.powermock.api.easymock.annotation.MockStrict ;
import org.powermock.core.classloader.annotations.PrepareForTest ;
import org.powermock.modules.junit4.PowerMockRunner ;
import java.io.InputStream ;
import java.io.OutputStream ;
@ -49,14 +53,14 @@ import java.util.HashMap;
@@ -49,14 +53,14 @@ import java.util.HashMap;
import java.util.Map ;
import java.util.concurrent.ScheduledExecutorService ;
import java.util.concurrent.TimeUnit ;
import org.junit.jupiter.api.AfterEach ;
import org.junit.jupiter.api.BeforeEach ;
import org.junit.jupiter.api.Test ;
import org.mockito.ArgumentCaptor ;
import org.mockito.InOrder ;
import org.mockito.MockedStatic ;
import org.mockito.Mockito ;
import static org.junit.Assert.assertEquals ;
import static org.junit.Assert.assertTrue ;
import static org.powermock.api.easymock.PowerMock.replayAll ;
import static org.powermock.api.easymock.PowerMock.verifyAll ;
@RunWith ( PowerMockRunner . class )
@PrepareForTest ( PushHttpMetricsReporter . class )
public class PushHttpMetricsReporterTest {
private static final URL URL ;
@ -67,64 +71,67 @@ public class PushHttpMetricsReporterTest {
@@ -67,64 +71,67 @@ public class PushHttpMetricsReporterTest {
throw new RuntimeException ( e ) ;
}
}
private final Time time = new MockTime ( ) ;
private final ScheduledExecutorService executor = mock ( ScheduledExecutorService . class ) ;
private final HttpURLConnection httpReq = mock ( HttpURLConnection . class ) ;
private final OutputStream httpOut = mock ( OutputStream . class ) ;
private final InputStream httpErr = mock ( InputStream . class ) ;
private final ArgumentCaptor < Runnable > reportRunnableCaptor = ArgumentCaptor . forClass ( Runnable . class ) ;
private final ArgumentCaptor < byte [ ] > httpPayloadCaptor = ArgumentCaptor . forClass ( byte [ ] . class ) ;
private PushHttpMetricsReporter reporter ;
private Time time = new MockTime ( ) ;
@MockStrict
private ScheduledExecutorService executor ;
private Capture < Runnable > reportRunnable = EasyMock . newCapture ( ) ;
@MockStrict
private HttpURLConnection httpReq ;
@MockStrict
private OutputStream httpOut ;
private Capture < byte [ ] > httpPayload = EasyMock . newCapture ( ) ;
@MockStrict
private InputStream httpErr ;
@Before
private MockedStatic < PushHttpMetricsReporter > mockedStaticReporter ;
@BeforeEach
public void setUp ( ) {
reporter = new PushHttpMetricsReporter ( time , executor ) ;
PowerMock . mockStatic ( PushHttpMetricsReporter . class ) ;
mockedStaticReporter = Mockito . mockStatic ( PushHttpMetricsReporter . class ) ;
}
@AfterEach
public void tearDown ( ) {
mockedStaticReporter . close ( ) ;
}
@Test
public void testConfigureClose ( ) throws Exception {
expectConfigure ( ) ;
expectClose ( ) ;
replayAll ( ) ;
whenClose ( ) ;
configure ( ) ;
reporter . close ( ) ;
verifyAll ( ) ;
verifyConfigure ( ) ;
verifyClose ( ) ;
}
@Test ( expected = ConfigException . class )
public void testConfigureBadUrl ( ) throws Exception {
@Test
public void testConfigureBadUrl ( ) {
Map < String , String > config = new HashMap < > ( ) ;
config . put ( PushHttpMetricsReporter . METRICS_URL_CONFIG , "malformed;url" ) ;
config . put ( PushHttpMetricsReporter . METRICS_PERIOD_CONFIG , "5" ) ;
reporter . configure ( config ) ;
assertThrows ( ConfigException . class , ( ) - > reporter . configure ( config ) ) ;
}
@Test ( expected = ConfigException . class )
public void testConfigureMissingPeriod ( ) throws Exception {
@Test
public void testConfigureMissingPeriod ( ) {
Map < String , String > config = new HashMap < > ( ) ;
config . put ( PushHttpMetricsReporter . METRICS_URL_CONFIG , URL . toString ( ) ) ;
reporter . configure ( config ) ;
assertThrows ( ConfigException . class , ( ) - > reporter . configure ( config ) ) ;
}
@Test
public void testNoMetrics ( ) throws Exception {
expectConfigure ( ) ;
expectRequest ( 200 ) ;
expectClose ( ) ;
replayAll ( ) ;
whenRequest ( 200 ) ;
configure ( ) ;
reportRunnable . getValue ( ) . run ( ) ;
JsonNode payload = new ObjectMapper ( ) . readTree ( httpPayload . getValue ( ) ) ;
verifyConfigure ( ) ;
reportRunnableCaptor . getValue ( ) . run ( ) ;
verifyResponse ( ) ;
JsonNode payload = new ObjectMapper ( ) . readTree ( httpPayloadCaptor . getValue ( ) ) ;
assertTrue ( payload . isObject ( ) ) ;
assertPayloadHasClientInfo ( payload ) ;
@ -136,52 +143,44 @@ public class PushHttpMetricsReporterTest {
@@ -136,52 +143,44 @@ public class PushHttpMetricsReporterTest {
assertEquals ( 0 , metrics . size ( ) ) ;
reporter . close ( ) ;
verifyAll ( ) ;
verifyClose ( ) ;
}
// For error conditions, we expect them to come with a response body that we can read & log
@Test
public void testClientError ( ) throws Exception {
expectConfigure ( ) ;
expectRequest ( 400 , true ) ;
expectClose ( ) ;
replayAll ( ) ;
whenRequest ( 400 , true ) ;
configure ( ) ;
reportRunnable . getValue ( ) . run ( ) ;
verifyConfigure ( ) ;
reporter . close ( ) ;
reportRunnableCaptor . getValue ( ) . run ( ) ;
verifyResponse ( ) ;
verifyAll ( ) ;
reporter . close ( ) ;
verifyClose ( ) ;
}
@Test
public void testServerError ( ) throws Exception {
expectConfigure ( ) ;
expectRequest ( 500 , true ) ;
expectClose ( ) ;
replayAll ( ) ;
whenRequest ( 500 , true ) ;
configure ( ) ;
reportRunnable . getValue ( ) . run ( ) ;
verifyConfigure ( ) ;
reporter . close ( ) ;
reportRunnableCaptor . getValue ( ) . run ( ) ;
verifyResponse ( ) ;
verifyAll ( ) ;
reporter . close ( ) ;
verifyClose ( ) ;
}
@Test
public void testMetricValues ( ) throws Exception {
expectConfigure ( ) ;
expectRequest ( 200 ) ;
expectClose ( ) ;
replayAll ( ) ;
whenRequest ( 200 ) ;
configure ( ) ;
verifyConfigure ( ) ;
KafkaMetric metric1 = new KafkaMetric (
new Object ( ) ,
new MetricName ( "name1" , "group1" , "desc1" , Collections . singletonMap ( "key1" , "value1" ) ) ,
@ -223,8 +222,10 @@ public class PushHttpMetricsReporterTest {
@@ -223,8 +222,10 @@ public class PushHttpMetricsReporterTest {
reporter . metricChange ( metric3 ) ; // added by change
reporter . metricRemoval ( metric2 ) ; // added in init, deleted by removal
reportRunnable . getValue ( ) . run ( ) ;
JsonNode payload = new ObjectMapper ( ) . readTree ( httpPayload . getValue ( ) ) ;
reportRunnableCaptor . getValue ( ) . run ( ) ;
verifyResponse ( ) ;
JsonNode payload = new ObjectMapper ( ) . readTree ( httpPayloadCaptor . getValue ( ) ) ;
assertTrue ( payload . isObject ( ) ) ;
assertPayloadHasClientInfo ( payload ) ;
@ -264,14 +265,7 @@ public class PushHttpMetricsReporterTest {
@@ -264,14 +265,7 @@ public class PushHttpMetricsReporterTest {
assertEquals ( "value4" , m4 . get ( "value" ) . textValue ( ) ) ;
reporter . close ( ) ;
verifyAll ( ) ;
}
private void expectConfigure ( ) {
EasyMock . expect (
executor . scheduleAtFixedRate ( EasyMock . capture ( reportRunnable ) , EasyMock . eq ( 5L ) , EasyMock . eq ( 5L ) , EasyMock . eq ( TimeUnit . SECONDS ) )
) . andReturn ( null ) ; // return value not expected to be used
verifyClose ( ) ;
}
private void configure ( ) {
@ -281,42 +275,17 @@ public class PushHttpMetricsReporterTest {
@@ -281,42 +275,17 @@ public class PushHttpMetricsReporterTest {
reporter . configure ( config ) ;
}
private void expect Request( int returnStatus ) throws Exception {
expect Request( returnStatus , false ) ;
private void when Request( int returnStatus ) throws Exception {
when Request( returnStatus , false ) ;
}
// Expect that a request is made with the given response code
private void expectRequest ( int returnStatus , boolean readResponse ) throws Exception {
EasyMock . expect ( PushHttpMetricsReporter . newHttpConnection ( URL ) ) . andReturn ( httpReq ) ;
httpReq . setRequestMethod ( "POST" ) ;
EasyMock . expectLastCall ( ) ;
httpReq . setDoInput ( true ) ;
EasyMock . expectLastCall ( ) ;
httpReq . setRequestProperty ( "Content-Type" , "application/json" ) ;
EasyMock . expectLastCall ( ) ;
httpReq . setRequestProperty ( EasyMock . eq ( "Content-Length" ) , EasyMock . anyString ( ) ) ;
EasyMock . expectLastCall ( ) ;
httpReq . setRequestProperty ( "Accept" , "*/*" ) ;
EasyMock . expectLastCall ( ) ;
httpReq . setUseCaches ( false ) ;
EasyMock . expectLastCall ( ) ;
httpReq . setDoOutput ( true ) ;
EasyMock . expectLastCall ( ) ;
EasyMock . expect ( httpReq . getOutputStream ( ) ) . andReturn ( httpOut ) ;
httpOut . write ( EasyMock . capture ( httpPayload ) ) ;
EasyMock . expectLastCall ( ) ;
httpOut . flush ( ) ;
EasyMock . expectLastCall ( ) ;
httpOut . close ( ) ;
EasyMock . expectLastCall ( ) ;
EasyMock . expect ( httpReq . getResponseCode ( ) ) . andReturn ( returnStatus ) ;
private void whenRequest ( int returnStatus , boolean readResponse ) throws Exception {
when ( PushHttpMetricsReporter . newHttpConnection ( URL ) ) . thenReturn ( httpReq ) ;
when ( httpReq . getOutputStream ( ) ) . thenReturn ( httpOut ) ;
when ( httpReq . getResponseCode ( ) ) . thenReturn ( returnStatus ) ;
if ( readResponse )
expectReadResponse ( ) ;
httpReq . disconnect ( ) ;
EasyMock . expectLastCall ( ) ;
whenReadResponse ( ) ;
}
private void assertPayloadHasClientInfo ( JsonNode payload ) throws UnknownHostException {
@ -328,15 +297,39 @@ public class PushHttpMetricsReporterTest {
@@ -328,15 +297,39 @@ public class PushHttpMetricsReporterTest {
assertEquals ( time . milliseconds ( ) , client . get ( "time" ) . longValue ( ) ) ;
}
private void expectReadResponse ( ) throws Exception {
EasyMock . expect ( httpReq . getErrorStream ( ) ) . andReturn ( httpErr ) ;
EasyMock . expect ( PushHttpMetricsReporter . readResponse ( httpErr ) ) . andReturn ( "error response message" ) ;
EasyMock . expectLastCall ( ) ;
private void whenReadResponse ( ) {
when ( httpReq . getErrorStream ( ) ) . thenReturn ( httpErr ) ;
when ( PushHttpMetricsReporter . readResponse ( httpErr ) ) . thenReturn ( "error response message" ) ;
}
private void whenClose ( ) throws Exception {
when ( executor . awaitTermination ( anyLong ( ) , any ( ) ) ) . thenReturn ( true ) ;
}
private void verifyClose ( ) throws InterruptedException {
InOrder inOrder = inOrder ( executor ) ;
inOrder . verify ( executor ) . shutdown ( ) ;
inOrder . verify ( executor ) . awaitTermination ( 30L , TimeUnit . SECONDS ) ;
}
private void verifyConfigure ( ) {
verify ( executor ) . scheduleAtFixedRate ( reportRunnableCaptor . capture ( ) ,
eq ( 5L ) , eq ( 5L ) , eq ( TimeUnit . SECONDS ) ) ;
}
private void expectClose ( ) throws Exception {
executor . shutdown ( ) ;
EasyMock . expect ( executor . awaitTermination ( EasyMock . anyLong ( ) , EasyMock . anyObject ( TimeUnit . class ) ) ) . andReturn ( true ) ;
private void verifyResponse ( ) throws IOException {
verify ( httpReq ) . setRequestMethod ( "POST" ) ;
verify ( httpReq ) . setDoInput ( true ) ;
verify ( httpReq ) . setRequestProperty ( "Content-Type" , "application/json" ) ;
verify ( httpReq ) . setRequestProperty ( eq ( "Content-Length" ) , anyString ( ) ) ;
verify ( httpReq ) . setRequestProperty ( "Accept" , "*/*" ) ;
verify ( httpReq ) . setUseCaches ( false ) ;
verify ( httpReq ) . setDoOutput ( true ) ;
verify ( httpReq ) . disconnect ( ) ;
verify ( httpOut ) . write ( httpPayloadCaptor . capture ( ) ) ;
verify ( httpOut ) . flush ( ) ;
verify ( httpOut ) . close ( ) ;
}
static class ImmutableValue < T > implements Gauge < T > {