From a0727191e14d8458a4d932371b4b8baf68202cab Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 16 Oct 2020 18:10:05 +0100 Subject: [PATCH 1/2] Polishing --- .../annotation/ResponseBodyEmitter.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java index 23c5e57ed8..9d4da7fe08 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java @@ -72,18 +72,21 @@ public class ResponseBodyEmitter { /** Store send data before handler is initialized. */ private final Set earlySendAttempts = new LinkedHashSet<>(8); - /** Store complete invocation before handler is initialized. */ + /** Store successful completion before the handler is initialized. */ private boolean complete; - /** Store completeWithError invocation before handler is initialized. */ + /** Store an error before the handler is initialized. */ @Nullable private Throwable failure; /** - * After an IOException on send, the servlet container will provide an onError - * callback that we'll handle as completeWithError (on container thread). - * We use this flag to ignore competing attempts to completeWithError by - * the application via try-catch. */ + * After an I/O error, we don't call {@link #completeWithError} directly but + * wait for the Servlet container to call us via {@code AsyncListener#onError} + * on a container thread at which point we call completeWithError. + * This flag is used to ignore further calls to complete or completeWithError + * that may come for example from an application try-catch block on the + * thread of the I/O error. + */ private boolean sendFailed; private final DefaultCallback timeoutCallback = new DefaultCallback(); @@ -280,7 +283,10 @@ public class ResponseBodyEmitter { /** - * Handle sent objects and complete request processing. + * Contract to handle the sending of event data, the completion of event + * sending, and the registration of callbacks to be invoked in case of + * timeout, error, and completion for any reason (including from the + * container side). */ interface Handler { From db3d537e72ed07828efc456d112523ae0377d84b Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 16 Oct 2020 19:07:36 +0100 Subject: [PATCH 2/2] Protect SseEmitter against early I/O errors Closes gh-25442 --- .../annotation/ResponseBodyEmitter.java | 23 +++++++++++++++---- ...ResponseBodyEmitterReturnValueHandler.java | 13 ++++++++--- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java index 9d4da7fe08..732a677933 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -127,10 +127,14 @@ public class ResponseBodyEmitter { synchronized void initialize(Handler handler) throws IOException { this.handler = handler; - for (DataWithMediaType sendAttempt : this.earlySendAttempts) { - sendInternal(sendAttempt.getData(), sendAttempt.getMediaType()); + try { + for (DataWithMediaType sendAttempt : this.earlySendAttempts) { + sendInternal(sendAttempt.getData(), sendAttempt.getMediaType()); + } + } + finally { + this.earlySendAttempts.clear(); } - this.earlySendAttempts.clear(); if (this.complete) { if (this.failure != null) { @@ -147,6 +151,13 @@ public class ResponseBodyEmitter { } } + synchronized void initializeWithError(Throwable ex) { + this.complete = true; + this.failure = ex; + this.earlySendAttempts.clear(); + this.errorCallback.accept(ex); + } + /** * Invoked after the response is updated with the status code and headers, * if the ResponseBodyEmitter is wrapped in a ResponseEntity, but before the @@ -182,7 +193,9 @@ public class ResponseBodyEmitter { * @throws java.lang.IllegalStateException wraps any other errors */ public synchronized void send(Object object, @Nullable MediaType mediaType) throws IOException { - Assert.state(!this.complete, "ResponseBodyEmitter is already set complete"); + Assert.state(!this.complete, + "ResponseBodyEmitter has already completed" + + (this.failure != null ? " with error: " + this.failure : "")); sendInternal(object, mediaType); } diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandler.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandler.java index ee02ad1700..835116d425 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandler.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandler.java @@ -176,10 +176,17 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur // Headers will be flushed at the first write outputMessage = new StreamingServletServerHttpResponse(outputMessage); - DeferredResult deferredResult = new DeferredResult<>(emitter.getTimeout()); - WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer); + HttpMessageConvertingHandler handler; + try { + DeferredResult deferredResult = new DeferredResult<>(emitter.getTimeout()); + WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer); + handler = new HttpMessageConvertingHandler(outputMessage, deferredResult); + } + catch (Throwable ex) { + emitter.initializeWithError(ex); + throw ex; + } - HttpMessageConvertingHandler handler = new HttpMessageConvertingHandler(outputMessage, deferredResult); emitter.initialize(handler); }