Browse Source

Prevent duplicate HTTP server observations

Prior to this commit, HTTP server observations for Spring WebFlux could
be recorded twice for a single request in some cases. The "COMPLETE" and
"CANCEL" signals would race in the reactive pipeline and would trigger
both the `doOnComplete()` and ` `doOnCancel()` operators, each calling
`observation.stop()` on the current observation.
This would in fact publish two different observations for the same
request.

This commit ensures that the instrumentation uses the `Mono#tap`
operator to guard against this case and only call `Observation#stop`
once for each request.

Fixes gh-31417
pull/31491/head
Brian Clozel 1 year ago
parent
commit
da95542d8f
  1. 95
      spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java

95
spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,12 +18,14 @@ package org.springframework.web.filter.reactive; @@ -18,12 +18,14 @@ package org.springframework.web.filter.reactive;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import org.reactivestreams.Publisher;
import reactor.core.observability.DefaultSignalListener;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.observation.DefaultServerRequestObservationConvention;
@ -99,40 +101,69 @@ public class ServerHttpObservationFilter implements WebFilter { @@ -99,40 +101,69 @@ public class ServerHttpObservationFilter implements WebFilter {
ServerRequestObservationContext observationContext = new ServerRequestObservationContext(exchange.getRequest(),
exchange.getResponse(), exchange.getAttributes());
exchange.getAttributes().put(CURRENT_OBSERVATION_CONTEXT_ATTRIBUTE, observationContext);
return chain.filter(exchange).transformDeferred(call -> filter(exchange, observationContext, call));
return chain.filter(exchange).tap(() -> new ObservationSignalListener(observationContext));
}
private Publisher<Void> filter(ServerWebExchange exchange, ServerRequestObservationContext observationContext, Mono<Void> call) {
Observation observation = ServerHttpObservationDocumentation.HTTP_REACTIVE_SERVER_REQUESTS.observation(this.observationConvention,
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, this.observationRegistry);
observation.start();
return call.doOnEach(signal -> {
Throwable throwable = signal.getThrowable();
if (throwable != null) {
if (DISCONNECTED_CLIENT_EXCEPTIONS.contains(throwable.getClass().getSimpleName())) {
observationContext.setConnectionAborted(true);
}
observationContext.setError(throwable);
}
onTerminalSignal(observation, exchange);
})
.doOnCancel(() -> {
observationContext.setConnectionAborted(true);
observation.stop();
})
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, observation));
}
private final class ObservationSignalListener extends DefaultSignalListener<Void> {
private static final Set<String> DISCONNECTED_CLIENT_EXCEPTIONS = Set.of("AbortedException",
"ClientAbortException", "EOFException", "EofException");
private final ServerRequestObservationContext observationContext;
private final Observation observation;
private AtomicBoolean observationRecorded = new AtomicBoolean();
public ObservationSignalListener(ServerRequestObservationContext observationContext) {
this.observationContext = observationContext;
this.observation = ServerHttpObservationDocumentation.HTTP_REACTIVE_SERVER_REQUESTS.observation(observationConvention,
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry);
}
private void onTerminalSignal(Observation observation, ServerWebExchange exchange) {
ServerHttpResponse response = exchange.getResponse();
if (response.isCommitted()) {
observation.stop();
@Override
public void doOnSubscription() throws Throwable {
this.observation.start();
}
else {
response.beforeCommit(() -> {
observation.stop();
return Mono.empty();
});
@Override
public Context addToContext(Context originalContext) {
return originalContext.put(ObservationThreadLocalAccessor.KEY, this.observation);
}
@Override
public void doOnCancel() throws Throwable {
if (this.observationRecorded.compareAndSet(false, true)) {
this.observationContext.setConnectionAborted(true);
this.observation.stop();
}
}
@Override
public void doOnComplete() throws Throwable {
if (this.observationRecorded.compareAndSet(false, true)) {
ServerHttpResponse response = this.observationContext.getResponse();
if (response.isCommitted()) {
this.observation.stop();
}
else {
response.beforeCommit(() -> {
this.observation.stop();
return Mono.empty();
});
}
}
}
@Override
public void doOnError(Throwable error) throws Throwable {
if (this.observationRecorded.compareAndSet(false, true)) {
if (DISCONNECTED_CLIENT_EXCEPTIONS.contains(error.getClass().getSimpleName())) {
this.observationContext.setConnectionAborted(true);
}
this.observationContext.setError(error);
this.observation.stop();
}
}
}

Loading…
Cancel
Save