|
|
@ -32,7 +32,6 @@ import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTe |
|
|
|
import org.springframework.http.server.reactive.HttpHandler; |
|
|
|
import org.springframework.http.server.reactive.HttpHandler; |
|
|
|
import org.springframework.http.server.reactive.ServerHttpRequest; |
|
|
|
import org.springframework.http.server.reactive.ServerHttpRequest; |
|
|
|
import org.springframework.http.server.reactive.ServerHttpResponse; |
|
|
|
import org.springframework.http.server.reactive.ServerHttpResponse; |
|
|
|
import org.springframework.web.reactive.function.BodyExtractors; |
|
|
|
|
|
|
|
import org.springframework.web.reactive.function.client.WebClient; |
|
|
|
import org.springframework.web.reactive.function.client.WebClient; |
|
|
|
|
|
|
|
|
|
|
|
import static org.junit.Assert.assertTrue; |
|
|
|
import static org.junit.Assert.assertTrue; |
|
|
@ -57,8 +56,8 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest |
|
|
|
public void writeAndFlushWith() throws Exception { |
|
|
|
public void writeAndFlushWith() throws Exception { |
|
|
|
Mono<String> result = this.webClient.get() |
|
|
|
Mono<String> result = this.webClient.get() |
|
|
|
.uri("/write-and-flush") |
|
|
|
.uri("/write-and-flush") |
|
|
|
.exchange() |
|
|
|
.retrieve() |
|
|
|
.flatMapMany(response -> response.body(BodyExtractors.toFlux(String.class))) |
|
|
|
.bodyToFlux(String.class) |
|
|
|
.takeUntil(s -> s.endsWith("data1")) |
|
|
|
.takeUntil(s -> s.endsWith("data1")) |
|
|
|
.reduce((s1, s2) -> s1 + s2); |
|
|
|
.reduce((s1, s2) -> s1 + s2); |
|
|
|
|
|
|
|
|
|
|
@ -72,13 +71,13 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest |
|
|
|
public void writeAndAutoFlushOnComplete() { |
|
|
|
public void writeAndAutoFlushOnComplete() { |
|
|
|
Mono<String> result = this.webClient.get() |
|
|
|
Mono<String> result = this.webClient.get() |
|
|
|
.uri("/write-and-complete") |
|
|
|
.uri("/write-and-complete") |
|
|
|
.exchange() |
|
|
|
.retrieve() |
|
|
|
.flatMapMany(response -> response.bodyToFlux(String.class)) |
|
|
|
.bodyToFlux(String.class) |
|
|
|
.reduce((s1, s2) -> s1 + s2); |
|
|
|
.reduce((s1, s2) -> s1 + s2); |
|
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
try { |
|
|
|
StepVerifier.create(result) |
|
|
|
StepVerifier.create(result) |
|
|
|
.consumeNextWith(value -> assertTrue(value.length() == 200000)) |
|
|
|
.consumeNextWith(value -> assertTrue(value.length() == 20000 * "0123456789".length())) |
|
|
|
.expectComplete() |
|
|
|
.expectComplete() |
|
|
|
.verify(Duration.ofSeconds(5L)); |
|
|
|
.verify(Duration.ofSeconds(5L)); |
|
|
|
} |
|
|
|
} |
|
|
@ -95,14 +94,15 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest |
|
|
|
|
|
|
|
|
|
|
|
@Test // SPR-14992
|
|
|
|
@Test // SPR-14992
|
|
|
|
public void writeAndAutoFlushBeforeComplete() { |
|
|
|
public void writeAndAutoFlushBeforeComplete() { |
|
|
|
Flux<String> result = this.webClient.get() |
|
|
|
Mono<String> result = this.webClient.get() |
|
|
|
.uri("/write-and-never-complete") |
|
|
|
.uri("/write-and-never-complete") |
|
|
|
.exchange() |
|
|
|
.retrieve() |
|
|
|
.flatMapMany(response -> response.bodyToFlux(String.class)); |
|
|
|
.bodyToFlux(String.class) |
|
|
|
|
|
|
|
.next(); |
|
|
|
|
|
|
|
|
|
|
|
StepVerifier.create(result) |
|
|
|
StepVerifier.create(result) |
|
|
|
.expectNextMatches(s -> s.startsWith("0123456789")) |
|
|
|
.expectNextMatches(s -> s.startsWith("0123456789")) |
|
|
|
.thenCancel() |
|
|
|
.expectComplete() |
|
|
|
.verify(Duration.ofSeconds(5L)); |
|
|
|
.verify(Duration.ofSeconds(5L)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|