Browse Source

Common DataWithMediaType class and common synchronization for ResponseBodyEmitter/SseEmitter

Issue: SPR-13223
Issue: SPR-13224
pull/835/merge
Juergen Hoeller 9 years ago
parent
commit
bdb63483df
  1. 157
      spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java
  2. 59
      spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java
  3. 15
      spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterTests.java

157
spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java

@ -17,8 +17,8 @@ @@ -17,8 +17,8 @@
package org.springframework.web.servlet.mvc.method.annotation;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.LinkedHashSet;
import java.util.Set;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpResponse;
@ -54,18 +54,18 @@ import org.springframework.util.Assert; @@ -54,18 +54,18 @@ import org.springframework.util.Assert;
* </pre>
*
* @author Rossen Stoyanchev
* @author Juergen Hoeller
* @since 4.2
*/
public class ResponseBodyEmitter {
private final Long timeout;
private volatile Handler handler;
private final Set<DataWithMediaType> earlySendAttempts = new LinkedHashSet<DataWithMediaType>(8);
/* Cache for objects sent before handler is set. */
private final Map<Object, MediaType> initHandlerCache = new LinkedHashMap<Object, MediaType>(10);
private Handler handler;
private volatile boolean complete;
private boolean complete;
private Throwable failure;
@ -110,32 +110,29 @@ public class ResponseBodyEmitter { @@ -110,32 +110,29 @@ public class ResponseBodyEmitter {
protected void extendResponse(ServerHttpResponse outputMessage) {
}
void initialize(Handler handler) throws IOException {
synchronized (this) {
this.handler = handler;
for (Map.Entry<Object, MediaType> entry : this.initHandlerCache.entrySet()) {
try {
sendInternal(entry.getKey(), entry.getValue());
}
catch (Throwable ex) {
return;
}
}
if (this.complete) {
if (this.failure != null) {
this.handler.completeWithError(this.failure);
}
else {
this.handler.complete();
}
}
if (this.timeoutCallback != null) {
this.handler.onTimeout(this.timeoutCallback);
synchronized void initialize(Handler handler) throws IOException {
this.handler = handler;
for (DataWithMediaType sendAttempt : this.earlySendAttempts) {
sendInternal(sendAttempt.getData(), sendAttempt.getMediaType());
}
this.earlySendAttempts.clear();
if (this.complete) {
if (this.failure != null) {
this.handler.completeWithError(this.failure);
}
if (this.completionCallback != null) {
this.handler.onCompletion(this.completionCallback);
else {
this.handler.complete();
}
}
if (this.timeoutCallback != null) {
this.handler.onTimeout(this.timeoutCallback);
}
if (this.completionCallback != null) {
this.handler.onCompletion(this.completionCallback);
}
}
/**
@ -159,33 +156,29 @@ public class ResponseBodyEmitter { @@ -159,33 +156,29 @@ public class ResponseBodyEmitter {
* @throws IOException raised when an I/O error occurs
* @throws java.lang.IllegalStateException wraps any other errors
*/
public void send(Object object, MediaType mediaType) throws IOException {
public synchronized void send(Object object, MediaType mediaType) throws IOException {
Assert.state(!this.complete, "ResponseBodyEmitter is already set complete");
sendInternal(object, mediaType);
}
private void sendInternal(Object object, MediaType mediaType) throws IOException {
if (object == null) {
return;
}
if (this.handler == null) {
synchronized (this) {
if (this.handler == null) {
this.initHandlerCache.put(object, mediaType);
return;
if (object != null) {
if (this.handler != null) {
try {
this.handler.send(object, mediaType);
}
catch (IOException ex) {
this.handler.completeWithError(ex);
throw ex;
}
catch (Throwable ex) {
this.handler.completeWithError(ex);
throw new IllegalStateException("Failed to send " + object, ex);
}
}
}
try {
this.handler.send(object, mediaType);
}
catch (IOException ex){
this.handler.completeWithError(ex);
throw ex;
}
catch (Throwable ex){
this.handler.completeWithError(ex);
throw new IllegalStateException("Failed to send " + object, ex);
else {
this.earlySendAttempts.add(new DataWithMediaType(object, mediaType));
}
}
}
@ -194,12 +187,10 @@ public class ResponseBodyEmitter { @@ -194,12 +187,10 @@ public class ResponseBodyEmitter {
* <p>A dispatch is made into the app server where Spring MVC completes
* asynchronous request processing.
*/
public void complete() {
synchronized (this) {
this.complete = true;
if (this.handler != null) {
this.handler.complete();
}
public synchronized void complete() {
this.complete = true;
if (this.handler != null) {
this.handler.complete();
}
}
@ -208,13 +199,11 @@ public class ResponseBodyEmitter { @@ -208,13 +199,11 @@ public class ResponseBodyEmitter {
* <p>A dispatch is made into the app server where Spring MVC will pass the
* exception through its exception handling mechanism.
*/
public void completeWithError(Throwable ex) {
synchronized (this) {
this.complete = true;
this.failure = ex;
if (this.handler != null) {
this.handler.completeWithError(ex);
}
public synchronized void completeWithError(Throwable ex) {
this.complete = true;
this.failure = ex;
if (this.handler != null) {
this.handler.completeWithError(ex);
}
}
@ -222,12 +211,10 @@ public class ResponseBodyEmitter { @@ -222,12 +211,10 @@ public class ResponseBodyEmitter {
* Register code to invoke when the async request times out. This method is
* called from a container thread when an async request times out.
*/
public void onTimeout(Runnable callback) {
synchronized (this) {
this.timeoutCallback = callback;
if (this.handler != null) {
this.handler.onTimeout(callback);
}
public synchronized void onTimeout(Runnable callback) {
this.timeoutCallback = callback;
if (this.handler != null) {
this.handler.onTimeout(callback);
}
}
@ -237,12 +224,10 @@ public class ResponseBodyEmitter { @@ -237,12 +224,10 @@ public class ResponseBodyEmitter {
* reason including timeout and network error. This method is useful for
* detecting that a {@code ResponseBodyEmitter} instance is no longer usable.
*/
public void onCompletion(Runnable callback) {
synchronized (this) {
this.completionCallback = callback;
if (this.handler != null) {
this.handler.onCompletion(callback);
}
public synchronized void onCompletion(Runnable callback) {
this.completionCallback = callback;
if (this.handler != null) {
this.handler.onCompletion(callback);
}
}
@ -263,4 +248,28 @@ public class ResponseBodyEmitter { @@ -263,4 +248,28 @@ public class ResponseBodyEmitter {
void onCompletion(Runnable callback);
}
/**
* Simple struct for a data entry.
*/
static class DataWithMediaType {
private final Object data;
private final MediaType mediaType;
public DataWithMediaType(Object data, MediaType mediaType) {
this.data = data;
this.mediaType = mediaType;
}
public Object getData() {
return this.data;
}
public MediaType getMediaType() {
return this.mediaType;
}
}
}

59
spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java

@ -19,8 +19,8 @@ package org.springframework.web.servlet.mvc.method.annotation; @@ -19,8 +19,8 @@ package org.springframework.web.servlet.mvc.method.annotation;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.LinkedHashSet;
import java.util.Set;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
@ -31,6 +31,7 @@ import org.springframework.http.server.ServerHttpResponse; @@ -31,6 +31,7 @@ import org.springframework.http.server.ServerHttpResponse;
* <a href="http://www.w3.org/TR/eventsource/">Server-Sent Events</a>.
*
* @author Rossen Stoyanchev
* @author Juergen Hoeller
* @since 4.2
*/
public class SseEmitter extends ResponseBodyEmitter {
@ -41,6 +42,7 @@ public class SseEmitter extends ResponseBodyEmitter { @@ -41,6 +42,7 @@ public class SseEmitter extends ResponseBodyEmitter {
@Override
protected void extendResponse(ServerHttpResponse outputMessage) {
super.extendResponse(outputMessage);
HttpHeaders headers = outputMessage.getHeaders();
if (headers.getContentType() == null) {
headers.setContentType(new MediaType("text", "event-stream"));
@ -75,14 +77,12 @@ public class SseEmitter extends ResponseBodyEmitter { @@ -75,14 +77,12 @@ public class SseEmitter extends ResponseBodyEmitter {
* @param object the object to write
* @param mediaType a MediaType hint for selecting an HttpMessageConverter
* @throws IOException raised when an I/O error occurs
* @throws java.lang.IllegalStateException wraps any other errors
*/
@Override
public void send(Object object, MediaType mediaType) throws IOException {
if (object == null) {
return;
if (object != null) {
send(event().data(object, mediaType));
}
send(event().data(object, mediaType));
}
/**
@ -95,18 +95,19 @@ public class SseEmitter extends ResponseBodyEmitter { @@ -95,18 +95,19 @@ public class SseEmitter extends ResponseBodyEmitter {
* </pre>
* @param builder a builder for an SSE formatted event.
* @throws IOException raised when an I/O error occurs
* @throws java.lang.IllegalStateException wraps any other errors
*/
public void send(SseEventBuilder builder) throws IOException {
Map<Object, MediaType> map = builder.build();
for (Map.Entry<Object, MediaType> entry : map.entrySet()) {
super.send(entry.getKey(), entry.getValue());
Set<DataWithMediaType> dataToSend = ((SseEventBuilderImpl) builder).build();
synchronized (this) {
for (DataWithMediaType entry : dataToSend) {
super.send(entry.getData(), entry.getMediaType());
}
}
}
public static SseEventBuilder event() {
return new DefaultSseEventBuilder();
return new SseEventBuilderImpl();
}
@ -144,22 +145,15 @@ public class SseEmitter extends ResponseBodyEmitter { @@ -144,22 +145,15 @@ public class SseEmitter extends ResponseBodyEmitter {
* Add an SSE "data" line.
*/
SseEventBuilder data(Object object, MediaType mediaType);
/**
* Return a map with objects that represent the data to be written to
* the response as well as the required SSE text formatting that
* surrounds it.
*/
Map<Object, MediaType> build();
}
/**
* Default implementation of SseEventBuilder.
*/
private static class DefaultSseEventBuilder implements SseEventBuilder {
private static class SseEventBuilderImpl implements SseEventBuilder {
private final Map<Object, MediaType> map = new LinkedHashMap<Object, MediaType>(4);
private final Set<DataWithMediaType> dataToSend = new LinkedHashSet<DataWithMediaType>(4);
private StringBuilder sb;
@ -196,12 +190,12 @@ public class SseEmitter extends ResponseBodyEmitter { @@ -196,12 +190,12 @@ public class SseEmitter extends ResponseBodyEmitter {
public SseEventBuilder data(Object object, MediaType mediaType) {
append("data:");
saveAppendedText();
this.map.put(object, mediaType);
this.dataToSend.add(new DataWithMediaType(object, mediaType));
append("\n");
return this;
}
DefaultSseEventBuilder append(String text) {
SseEventBuilderImpl append(String text) {
if (this.sb == null) {
this.sb = new StringBuilder();
}
@ -209,21 +203,20 @@ public class SseEmitter extends ResponseBodyEmitter { @@ -209,21 +203,20 @@ public class SseEmitter extends ResponseBodyEmitter {
return this;
}
private void saveAppendedText() {
if (this.sb != null) {
this.map.put(this.sb.toString(), TEXT_PLAIN);
this.sb = null;
Set<DataWithMediaType> build() {
if ((this.sb == null || this.sb.length() == 0) && this.dataToSend.isEmpty()) {
return Collections.<DataWithMediaType>emptySet();
}
append("\n");
saveAppendedText();
return this.dataToSend;
}
@Override
public Map<Object, MediaType> build() {
if (this.sb == null || this.sb.length() == 0 && this.map.isEmpty()) {
return Collections.<Object, MediaType>emptyMap();
private void saveAppendedText() {
if (this.sb != null) {
this.dataToSend.add(new DataWithMediaType(this.sb.toString(), TEXT_PLAIN));
this.sb = null;
}
append("\n");
saveAppendedText();
return this.map;
}
}

15
spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterTests.java

@ -31,7 +31,9 @@ import static org.junit.Assert.fail; @@ -31,7 +31,9 @@ import static org.junit.Assert.fail;
/**
* Unit tests for {@link ResponseBodyEmitter}.
*
* @author Rossen Stoyanchev
* @author Tomasz Nurkiewicz
*/
public class ResponseBodyEmitterTests {
@ -62,6 +64,19 @@ public class ResponseBodyEmitterTests { @@ -62,6 +64,19 @@ public class ResponseBodyEmitterTests {
verifyNoMoreInteractions(this.handler);
}
@Test
public void sendDuplicateBeforeHandlerInitialized() throws Exception {
this.emitter.send("foo", MediaType.TEXT_PLAIN);
this.emitter.send("foo", MediaType.TEXT_PLAIN);
this.emitter.complete();
verifyNoMoreInteractions(this.handler);
this.emitter.initialize(this.handler);
verify(this.handler, times(2)).send("foo", MediaType.TEXT_PLAIN);
verify(this.handler).complete();
verifyNoMoreInteractions(this.handler);
}
@Test
public void sendBeforeHandlerInitializedWithError() throws Exception {
IllegalStateException ex = new IllegalStateException();

Loading…
Cancel
Save