Browse Source

Propagate context in reactive HTTP server and client

Prior to this commit, the ServerHttpObservationFilter would not add
the current observation as a key in the Reactor context, preventing
from being used or propagated during the HTTP exchange handling.

Also, the client instrumentation in `DefaultWebClient` would start
the observation once the request is fully formed and immutable,
preventing the context from being propagated through HTTP request
headers.

This commit fixes both uses cases now by:

* adding the current observation as a key in the reactor context
  on the server side
* using the `ClientRequest.Builder` as a Carrier on the client side

Closes gh-29388
pull/29167/head
Marcin Grzejszczak 2 years ago committed by Brian Clozel
parent
commit
c03ccb2e6c
  1. 8
      spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java
  2. 13
      spring-web/src/test/java/org/springframework/web/filter/reactive/ServerHttpObservationFilterTests.java
  3. 23
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientRequestObservationContext.java
  4. 14
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientRequestObservationConvention.java
  5. 22
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java
  6. 16
      spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultClientRequestObservationConventionTests.java
  7. 40
      spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java

8
spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java

@ -59,6 +59,11 @@ public class ServerHttpObservationFilter implements WebFilter { @@ -59,6 +59,11 @@ public class ServerHttpObservationFilter implements WebFilter {
private static final Set<String> DISCONNECTED_CLIENT_EXCEPTIONS = Set.of("AbortedException",
"ClientAbortException", "EOFException", "EofException");
/**
* Aligned with ObservationThreadLocalAccessor#KEY from micrometer-core.
*/
private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation";
private final ObservationRegistry observationRegistry;
private final ServerRequestObservationConvention observationConvention;
@ -117,7 +122,8 @@ public class ServerHttpObservationFilter implements WebFilter { @@ -117,7 +122,8 @@ public class ServerHttpObservationFilter implements WebFilter {
.doOnCancel(() -> {
observationContext.setConnectionAborted(true);
observation.stop();
});
})
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, observation));
}
private void onTerminalSignal(Observation observation, ServerWebExchange exchange) {

13
spring-web/src/test/java/org/springframework/web/filter/reactive/ServerHttpObservationFilterTests.java

@ -19,6 +19,7 @@ package org.springframework.web.filter.reactive; @@ -19,6 +19,7 @@ package org.springframework.web.filter.reactive;
import java.util.Optional;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import io.micrometer.observation.tck.TestObservationRegistry;
import io.micrometer.observation.tck.TestObservationRegistryAssert;
import org.assertj.core.api.ThrowingConsumer;
@ -59,6 +60,18 @@ class ServerHttpObservationFilterTests { @@ -59,6 +60,18 @@ class ServerHttpObservationFilterTests {
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS");
}
@Test
void filterShouldAddNewObservationToReactorContext() {
ServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.post("/test/resource"));
exchange.getResponse().setRawStatusCode(200);
WebFilterChain filterChain = webExchange -> Mono.deferContextual(contextView -> {
assertThat(contextView.getOrEmpty(ObservationThreadLocalAccessor.KEY)).isPresent();
return Mono.empty();
});
this.filter.filter(exchange, filterChain).block();
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS");
}
@Test
void filterShouldUseThrownException() {
ServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.post("/test/resource"));

23
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientRequestObservationContext.java

@ -27,21 +27,24 @@ import org.springframework.lang.Nullable; @@ -27,21 +27,24 @@ import org.springframework.lang.Nullable;
* @author Brian Clozel
* @since 6.0
*/
public class ClientRequestObservationContext extends RequestReplySenderContext<ClientRequest, ClientResponse> {
public class ClientRequestObservationContext extends RequestReplySenderContext<ClientRequest.Builder, ClientResponse> {
@Nullable
private String uriTemplate;
private boolean aborted;
@Nullable
private ClientRequest builtRequest;
public ClientRequestObservationContext() {
super(ClientRequestObservationContext::setRequestHeader);
}
private static void setRequestHeader(@Nullable ClientRequest request, String name, String value) {
private static void setRequestHeader(@Nullable ClientRequest.Builder request, String name, String value) {
if (request != null) {
request.headers().set(name, value);
request.header(name, value);
}
}
@ -75,4 +78,18 @@ public class ClientRequestObservationContext extends RequestReplySenderContext<C @@ -75,4 +78,18 @@ public class ClientRequestObservationContext extends RequestReplySenderContext<C
void setAborted(boolean aborted) {
this.aborted = aborted;
}
/**
* Return the built request.
*/
public ClientRequest getBuiltRequest() {
return this.builtRequest;
}
/**
* Set the built request.
*/
public void setBuiltRequest(ClientRequest builtRequest) {
this.builtRequest = builtRequest;
}
}

14
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientRequestObservationConvention.java

@ -79,7 +79,7 @@ public class DefaultClientRequestObservationConvention implements ClientRequestO @@ -79,7 +79,7 @@ public class DefaultClientRequestObservationConvention implements ClientRequestO
@Override
public String getContextualName(ClientRequestObservationContext context) {
return "http " + context.getCarrier().method().name().toLowerCase();
return "http " + context.getBuiltRequest().method().name().toLowerCase();
}
@Override
@ -95,8 +95,8 @@ public class DefaultClientRequestObservationConvention implements ClientRequestO @@ -95,8 +95,8 @@ public class DefaultClientRequestObservationConvention implements ClientRequestO
}
protected KeyValue method(ClientRequestObservationContext context) {
if (context.getCarrier() != null) {
return KeyValue.of(ClientHttpObservationDocumentation.LowCardinalityKeyNames.METHOD, context.getCarrier().method().name());
if (context.getBuiltRequest() != null) {
return KeyValue.of(ClientHttpObservationDocumentation.LowCardinalityKeyNames.METHOD, context.getBuiltRequest().method().name());
}
else {
return METHOD_NONE;
@ -143,15 +143,15 @@ public class DefaultClientRequestObservationConvention implements ClientRequestO @@ -143,15 +143,15 @@ public class DefaultClientRequestObservationConvention implements ClientRequestO
}
protected KeyValue httpUrl(ClientRequestObservationContext context) {
if (context.getCarrier() != null) {
return KeyValue.of(ClientHttpObservationDocumentation.HighCardinalityKeyNames.HTTP_URL, context.getCarrier().url().toASCIIString());
if (context.getBuiltRequest() != null) {
return KeyValue.of(ClientHttpObservationDocumentation.HighCardinalityKeyNames.HTTP_URL, context.getBuiltRequest().url().toASCIIString());
}
return HTTP_URL_NONE;
}
protected KeyValue clientName(ClientRequestObservationContext context) {
if (context.getCarrier() != null && context.getCarrier().url().getHost() != null) {
return KeyValue.of(ClientHttpObservationDocumentation.HighCardinalityKeyNames.CLIENT_NAME, context.getCarrier().url().getHost());
if (context.getBuiltRequest() != null && context.getBuiltRequest().url().getHost() != null) {
return KeyValue.of(ClientHttpObservationDocumentation.HighCardinalityKeyNames.CLIENT_NAME, context.getBuiltRequest().url().getHost());
}
return CLIENT_NAME_NONE;
}

22
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

@ -75,6 +75,11 @@ class DefaultWebClient implements WebClient { @@ -75,6 +75,11 @@ class DefaultWebClient implements WebClient {
private static final DefaultClientRequestObservationConvention DEFAULT_OBSERVATION_CONVENTION = new DefaultClientRequestObservationConvention();
/**
* Aligned with ObservationThreadLocalAccessor#KEY from micrometer-core.
*/
private static final String MICROMETER_OBSERVATION = "micrometer.observation";
private final ExchangeFunction exchangeFunction;
private final UriBuilderFactory uriBuilderFactory;
@ -450,14 +455,19 @@ class DefaultWebClient implements WebClient { @@ -450,14 +455,19 @@ class DefaultWebClient implements WebClient {
@SuppressWarnings("deprecation")
public Mono<ClientResponse> exchange() {
ClientRequestObservationContext observationContext = new ClientRequestObservationContext();
ClientRequest request = (this.inserter != null ?
initRequestBuilder().body(this.inserter).build() :
initRequestBuilder().build());
return Mono.defer(() -> {
ClientRequest.Builder requestBuilder = this.inserter != null ?
initRequestBuilder().body(this.inserter) :
initRequestBuilder();
return Mono.deferContextual(contextView -> {
Observation observation = ClientHttpObservationDocumentation.HTTP_REQUEST.observation(observationConvention,
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry).start();
observationContext.setCarrier(request);
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry);
observationContext.setCarrier(requestBuilder);
observation
.parentObservation(contextView.getOrDefault(MICROMETER_OBSERVATION, null))
.start();
ClientRequest request = requestBuilder.build();
observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null));
observationContext.setBuiltRequest(request);
Mono<ClientResponse> responseMono = exchangeFunction.exchange(request)
.checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]")
.switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR);

16
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultClientRequestObservationConventionTests.java

@ -44,7 +44,8 @@ class DefaultClientRequestObservationConventionTests { @@ -44,7 +44,8 @@ class DefaultClientRequestObservationConventionTests {
@Test
void shouldHaveContextualName() {
ClientRequestObservationContext context = new ClientRequestObservationContext();
context.setCarrier(ClientRequest.create(HttpMethod.GET, URI.create("/test")).build());
context.setCarrier(ClientRequest.create(HttpMethod.GET, URI.create("/test")));
context.setBuiltRequest(context.getCarrier().build());
assertThat(this.observationConvention.getContextualName(context)).isEqualTo("http get");
}
@ -77,10 +78,11 @@ class DefaultClientRequestObservationConventionTests { @@ -77,10 +78,11 @@ class DefaultClientRequestObservationConventionTests {
@Test
void shouldAddKeyValuesForRequestWithUriTemplate() {
ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/resource/42"))
.attribute(WebClient.class.getName() + ".uriTemplate", "/resource/{id}").build();
ClientRequest.Builder request = ClientRequest.create(HttpMethod.GET, URI.create("/resource/42"))
.attribute(WebClient.class.getName() + ".uriTemplate", "/resource/{id}");
ClientRequestObservationContext context = createContext(request);
context.setUriTemplate("/resource/{id}");
context.setBuiltRequest(context.getCarrier().build());
assertThat(this.observationConvention.getLowCardinalityKeyValues(context))
.contains(KeyValue.of("exception", "none"), KeyValue.of("method", "GET"), KeyValue.of("uri", "/resource/{id}"),
KeyValue.of("status", "200"), KeyValue.of("outcome", "SUCCESS"));
@ -90,7 +92,8 @@ class DefaultClientRequestObservationConventionTests { @@ -90,7 +92,8 @@ class DefaultClientRequestObservationConventionTests {
@Test
void shouldAddKeyValuesForRequestWithoutUriTemplate() {
ClientRequestObservationContext context = createContext(ClientRequest.create(HttpMethod.GET, URI.create("/resource/42")).build());
ClientRequestObservationContext context = createContext(ClientRequest.create(HttpMethod.GET, URI.create("/resource/42")));
context.setBuiltRequest(context.getCarrier().build());
assertThat(this.observationConvention.getLowCardinalityKeyValues(context))
.contains(KeyValue.of("method", "GET"), KeyValue.of("uri", "none"));
assertThat(this.observationConvention.getHighCardinalityKeyValues(context)).hasSize(2).contains(KeyValue.of("http.url", "/resource/42"));
@ -98,11 +101,12 @@ class DefaultClientRequestObservationConventionTests { @@ -98,11 +101,12 @@ class DefaultClientRequestObservationConventionTests {
@Test
void shouldAddClientNameKeyValueForRequestWithHost() {
ClientRequestObservationContext context = createContext(ClientRequest.create(HttpMethod.GET, URI.create("https://localhost:8080/resource/42")).build());
ClientRequestObservationContext context = createContext(ClientRequest.create(HttpMethod.GET, URI.create("https://localhost:8080/resource/42")));
context.setBuiltRequest(context.getCarrier().build());
assertThat(this.observationConvention.getHighCardinalityKeyValues(context)).contains(KeyValue.of("client.name", "localhost"));
}
private ClientRequestObservationContext createContext(ClientRequest request) {
private ClientRequestObservationContext createContext(ClientRequest.Builder request) {
ClientRequestObservationContext context = new ClientRequestObservationContext();
context.setCarrier(request);
context.setResponse(ClientResponse.create(HttpStatus.OK).build());

40
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java

@ -17,7 +17,11 @@ @@ -17,7 +17,11 @@
package org.springframework.web.reactive.function.client;
import java.time.Duration;
import java.util.Collections;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationHandler;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import io.micrometer.observation.tck.TestObservationRegistry;
import io.micrometer.observation.tck.TestObservationRegistryAssert;
import org.junit.jupiter.api.BeforeEach;
@ -28,6 +32,7 @@ import reactor.test.StepVerifier; @@ -28,6 +32,7 @@ import reactor.test.StepVerifier;
import org.springframework.http.HttpStatus;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.when;
@ -57,17 +62,35 @@ class WebClientObservationTests { @@ -57,17 +62,35 @@ class WebClientObservationTests {
when(mockResponse.bodyToMono(Void.class)).thenReturn(Mono.empty());
given(this.exchangeFunction.exchange(this.request.capture())).willReturn(Mono.just(mockResponse));
this.builder = WebClient.builder().baseUrl("/base").exchangeFunction(this.exchangeFunction).observationRegistry(this.observationRegistry);
this.observationRegistry.observationConfig().observationHandler(new HeaderInjectingHandler());
}
@Test
void recordsObservationForSuccessfulExchange() {
this.builder.build().get().uri("/resource/{id}", 42)
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
verifyAndGetRequest();
ClientRequest clientRequest = verifyAndGetRequest();
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS")
.hasLowCardinalityKeyValue("uri", "/resource/{id}");
assertThat(clientRequest.headers()).containsEntry("foo", Collections.singletonList("bar"));
}
@Test
void recordsObservationForSuccessfulExchangeWithParentObservationInReactorContext() {
Observation parent = Observation.start("parent", observationRegistry);
try {
this.builder.build().get().uri("/resource/{id}", 42)
.retrieve().bodyToMono(Void.class).contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, parent)).block(Duration.ofSeconds(10));
verifyAndGetRequest();
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS")
.hasParentObservationEqualTo(parent);
}
finally {
parent.stop();
}
}
@Test
@ -102,4 +125,17 @@ class WebClientObservationTests { @@ -102,4 +125,17 @@ class WebClientObservationTests {
return request.getValue();
}
static class HeaderInjectingHandler implements ObservationHandler<ClientRequestObservationContext> {
@Override
public void onStart(ClientRequestObservationContext context) {
context.getSetter().set(context.getCarrier(), "foo", "bar");
}
@Override
public boolean supportsContext(Observation.Context context) {
return context instanceof ClientRequestObservationContext;
}
}
}

Loading…
Cancel
Save