Browse Source

Commit actions are (properly) deferred

Issue: SPR-16597
pull/1744/head
Rossen Stoyanchev 7 years ago
parent
commit
72bbb2619d
  1. 10
      spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java
  2. 3
      spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java
  3. 16
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java
  4. 40
      spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java
  5. 7
      spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java
  6. 24
      spring-web/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java
  7. 2
      spring-web/src/test/java/org/springframework/web/server/handler/ResponseStatusExceptionHandlerTests.java

10
spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java

@ -90,11 +90,11 @@ public class HttpHandlerConnector implements ClientHttpConnector { @@ -90,11 +90,11 @@ public class HttpHandlerConnector implements ClientHttpConnector {
return Mono.empty();
});
mockServerResponse.setWriteHandler(responseBody -> {
log("Creating client response for ", httpMethod, uri);
result.onNext(adaptResponse(mockServerResponse, responseBody));
return Mono.empty();
});
mockServerResponse.setWriteHandler(responseBody ->
Mono.fromRunnable(() -> {
log("Creating client response for ", httpMethod, uri);
result.onNext(adaptResponse(mockServerResponse, responseBody));
}));
log("Writing client request for ", httpMethod, uri);
requestCallback.apply(mockClientRequest).subscribe(aVoid -> {}, result::onError);

3
spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java

@ -45,6 +45,9 @@ public interface ReactiveHttpOutputMessage extends HttpMessage { @@ -45,6 +45,9 @@ public interface ReactiveHttpOutputMessage extends HttpMessage {
/**
* Register an action to apply just before the HttpOutputMessage is committed.
* <p><strong>Note:</strong> the supplied action must be properly deferred,
* e.g. via {@link Mono#defer} or {@link Mono#fromRunnable}, to ensure it's
* executed in the right order, relative to other actions.
* @param action the action to apply
*/
void beforeCommit(Supplier<? extends Mono<Void>> action);

16
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java

@ -209,13 +209,13 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { @@ -209,13 +209,13 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
return Mono.empty();
}
this.commitActions.add(() -> {
applyStatusCode();
applyHeaders();
applyCookies();
this.state.set(State.COMMITTED);
return Mono.empty();
});
this.commitActions.add(() ->
Mono.fromRunnable(() -> {
applyStatusCode();
applyHeaders();
applyCookies();
this.state.set(State.COMMITTED);
}));
if (writeAction != null) {
this.commitActions.add(writeAction);
@ -224,7 +224,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { @@ -224,7 +224,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
List<? extends Mono<Void>> actions = this.commitActions.stream()
.map(Supplier::get).collect(Collectors.toList());
return Flux.concat(actions).next();
return Flux.concat(actions).then();
}

40
spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -111,28 +111,30 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl @@ -111,28 +111,30 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
@Override
public Mono<Void> writeWith(File file, long position, long count) {
return doCommit(() -> {
FileChannel source = null;
try {
source = FileChannel.open(file.toPath(), StandardOpenOption.READ);
StreamSinkChannel destination = this.exchange.getResponseChannel();
Channels.transferBlocking(destination, source, position, count);
return Mono.empty();
}
catch (IOException ex) {
return Mono.error(ex);
}
finally {
if (source != null) {
return doCommit(() ->
Mono.defer(() -> {
FileChannel source = null;
try {
source.close();
source = FileChannel.open(file.toPath(), StandardOpenOption.READ);
StreamSinkChannel destination = this.exchange.getResponseChannel();
Channels.transferBlocking(destination, source, position, count);
return Mono.empty();
}
catch (IOException ex) {
// ignore
return Mono.error(ex);
}
}
}
});
finally {
if (source != null) {
try {
source.close();
}
catch (IOException ex) {
// ignore
}
}
}
}));
}

7
spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -99,10 +99,7 @@ public class ServerHttpResponseTests { @@ -99,10 +99,7 @@ public class ServerHttpResponseTests {
public void beforeCommitWithComplete() throws Exception {
ResponseCookie cookie = ResponseCookie.from("ID", "123").build();
TestServerHttpResponse response = new TestServerHttpResponse();
response.beforeCommit(() -> {
response.getCookies().add(cookie.getName(), cookie);
return Mono.empty();
});
response.beforeCommit(() -> Mono.fromRunnable(() -> response.getCookies().add(cookie.getName(), cookie)));
response.writeWith(Flux.just(wrap("a"), wrap("b"), wrap("c"))).block();
assertTrue(response.statusCodeWritten);

24
spring-web/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -54,15 +54,11 @@ public class ZeroCopyIntegrationTests extends AbstractHttpHandlerIntegrationTest @@ -54,15 +54,11 @@ public class ZeroCopyIntegrationTests extends AbstractHttpHandlerIntegrationTest
// Zero-copy only does not support servlet
assumeTrue(server instanceof ReactorHttpServer || server instanceof UndertowHttpServer);
RestTemplate restTemplate = new RestTemplate();
URI url = new URI("http://localhost:" + port);
RequestEntity<?> request = RequestEntity.get(url).build();
ResponseEntity<byte[]> response = new RestTemplate().exchange(request, byte[].class);
RequestEntity<?> request =
RequestEntity.get(new URI("http://localhost:" + port)).build();
ResponseEntity<byte[]> response = restTemplate.exchange(request, byte[].class);
Resource logo =
new ClassPathResource("spring.png", ZeroCopyIntegrationTests.class);
Resource logo = new ClassPathResource("spring.png", ZeroCopyIntegrationTests.class);
assertTrue(response.hasBody());
assertEquals(logo.contentLength(), response.getHeaders().getContentLength());
@ -76,22 +72,16 @@ public class ZeroCopyIntegrationTests extends AbstractHttpHandlerIntegrationTest @@ -76,22 +72,16 @@ public class ZeroCopyIntegrationTests extends AbstractHttpHandlerIntegrationTest
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
try {
ZeroCopyHttpOutputMessage zeroCopyResponse =
(ZeroCopyHttpOutputMessage) response;
Resource logo = new ClassPathResource("spring.png",
ZeroCopyIntegrationTests.class);
ZeroCopyHttpOutputMessage zeroCopyResponse = (ZeroCopyHttpOutputMessage) response;
Resource logo = new ClassPathResource("spring.png", ZeroCopyIntegrationTests.class);
File logoFile = logo.getFile();
zeroCopyResponse.getHeaders().setContentType(MediaType.IMAGE_PNG);
zeroCopyResponse.getHeaders().setContentLength(logoFile.length());
return zeroCopyResponse.writeWith(logoFile, 0, logoFile.length());
}
catch (Throwable ex) {
return Mono.error(ex);
}
}
}

2
spring-web/src/test/java/org/springframework/web/server/handler/ResponseStatusExceptionHandlerTests.java

@ -82,7 +82,7 @@ public class ResponseStatusExceptionHandlerTests { @@ -82,7 +82,7 @@ public class ResponseStatusExceptionHandlerTests {
Throwable ex = new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Oops");
this.exchange.getResponse().setStatusCode(HttpStatus.CREATED);
Mono<Void> mono = this.exchange.getResponse().setComplete()
.then(this.handler.handle(this.exchange, ex));
.then(Mono.defer(() -> this.handler.handle(this.exchange, ex)));
StepVerifier.create(mono).consumeErrorWith(actual -> assertSame(ex, actual)).verify();
}

Loading…
Cancel
Save