From da95542d8fdbf9db1add28edbeccb3f9a2e0ccbf Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Fri, 13 Oct 2023 11:35:42 +0200 Subject: [PATCH] Prevent duplicate HTTP server observations Prior to this commit, HTTP server observations for Spring WebFlux could be recorded twice for a single request in some cases. The "COMPLETE" and "CANCEL" signals would race in the reactive pipeline and would trigger both the `doOnComplete()` and ` `doOnCancel()` operators, each calling `observation.stop()` on the current observation. This would in fact publish two different observations for the same request. This commit ensures that the instrumentation uses the `Mono#tap` operator to guard against this case and only call `Observation#stop` once for each request. Fixes gh-31417 --- .../reactive/ServerHttpObservationFilter.java | 95 ++++++++++++------- 1 file changed, 63 insertions(+), 32 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java b/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java index ca75df7e7a..c290926307 100644 --- a/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java +++ b/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,12 +18,14 @@ package org.springframework.web.filter.reactive; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor; -import org.reactivestreams.Publisher; +import reactor.core.observability.DefaultSignalListener; import reactor.core.publisher.Mono; +import reactor.util.context.Context; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.http.server.reactive.observation.DefaultServerRequestObservationConvention; @@ -99,40 +101,69 @@ public class ServerHttpObservationFilter implements WebFilter { ServerRequestObservationContext observationContext = new ServerRequestObservationContext(exchange.getRequest(), exchange.getResponse(), exchange.getAttributes()); exchange.getAttributes().put(CURRENT_OBSERVATION_CONTEXT_ATTRIBUTE, observationContext); - return chain.filter(exchange).transformDeferred(call -> filter(exchange, observationContext, call)); + return chain.filter(exchange).tap(() -> new ObservationSignalListener(observationContext)); } - private Publisher filter(ServerWebExchange exchange, ServerRequestObservationContext observationContext, Mono call) { - Observation observation = ServerHttpObservationDocumentation.HTTP_REACTIVE_SERVER_REQUESTS.observation(this.observationConvention, - DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, this.observationRegistry); - observation.start(); - return call.doOnEach(signal -> { - Throwable throwable = signal.getThrowable(); - if (throwable != null) { - if (DISCONNECTED_CLIENT_EXCEPTIONS.contains(throwable.getClass().getSimpleName())) { - observationContext.setConnectionAborted(true); - } - observationContext.setError(throwable); - } - onTerminalSignal(observation, exchange); - }) - .doOnCancel(() -> { - observationContext.setConnectionAborted(true); - observation.stop(); - }) - .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, observation)); - } + private final class ObservationSignalListener extends DefaultSignalListener { + + private static final Set DISCONNECTED_CLIENT_EXCEPTIONS = Set.of("AbortedException", + "ClientAbortException", "EOFException", "EofException"); + + private final ServerRequestObservationContext observationContext; + + private final Observation observation; + + private AtomicBoolean observationRecorded = new AtomicBoolean(); + + public ObservationSignalListener(ServerRequestObservationContext observationContext) { + this.observationContext = observationContext; + this.observation = ServerHttpObservationDocumentation.HTTP_REACTIVE_SERVER_REQUESTS.observation(observationConvention, + DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry); + } - private void onTerminalSignal(Observation observation, ServerWebExchange exchange) { - ServerHttpResponse response = exchange.getResponse(); - if (response.isCommitted()) { - observation.stop(); + @Override + public void doOnSubscription() throws Throwable { + this.observation.start(); } - else { - response.beforeCommit(() -> { - observation.stop(); - return Mono.empty(); - }); + + @Override + public Context addToContext(Context originalContext) { + return originalContext.put(ObservationThreadLocalAccessor.KEY, this.observation); + } + + @Override + public void doOnCancel() throws Throwable { + if (this.observationRecorded.compareAndSet(false, true)) { + this.observationContext.setConnectionAborted(true); + this.observation.stop(); + } + } + + @Override + public void doOnComplete() throws Throwable { + if (this.observationRecorded.compareAndSet(false, true)) { + ServerHttpResponse response = this.observationContext.getResponse(); + if (response.isCommitted()) { + this.observation.stop(); + } + else { + response.beforeCommit(() -> { + this.observation.stop(); + return Mono.empty(); + }); + } + } + } + + @Override + public void doOnError(Throwable error) throws Throwable { + if (this.observationRecorded.compareAndSet(false, true)) { + if (DISCONNECTED_CLIENT_EXCEPTIONS.contains(error.getClass().getSimpleName())) { + this.observationContext.setConnectionAborted(true); + } + this.observationContext.setError(error); + this.observation.stop(); + } } }