diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/support/RxJava1ClientWebRequestBuilder.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/support/RxJava1ClientWebRequestBuilder.java
new file mode 100644
index 0000000000..d7f6ed4828
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/support/RxJava1ClientWebRequestBuilder.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.web.client.reactive.support;
+
+import java.net.URI;
+
+import reactor.core.converter.RxJava1ObservableConverter;
+import reactor.core.converter.RxJava1SingleConverter;
+import reactor.core.publisher.Mono;
+import rx.Observable;
+import rx.Single;
+
+import org.springframework.core.ResolvableType;
+import org.springframework.http.HttpCookie;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.MediaType;
+import org.springframework.http.client.reactive.ClientHttpRequest;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.client.reactive.ClientWebRequest;
+import org.springframework.web.client.reactive.ClientWebRequestBuilder;
+import org.springframework.web.client.reactive.ClientWebRequestPostProcessor;
+import org.springframework.web.client.reactive.DefaultClientWebRequestBuilder;
+
+/**
+ * Builds a {@link ClientHttpRequest} using a {@code Observable}
+ * or {@code Single} as request body.
+ *
+ *
See static factory methods in {@link RxJava1ClientWebRequestBuilders}
+ *
+ * @author Brian Clozel
+ * @see RxJava1ClientWebRequestBuilders
+ */
+public class RxJava1ClientWebRequestBuilder implements ClientWebRequestBuilder {
+
+ private final DefaultClientWebRequestBuilder delegate;
+
+ public RxJava1ClientWebRequestBuilder(HttpMethod httpMethod, String urlTemplate,
+ Object... urlVariables) throws RestClientException {
+ this.delegate = new DefaultClientWebRequestBuilder(httpMethod, urlTemplate, urlVariables);
+ }
+
+ public RxJava1ClientWebRequestBuilder(HttpMethod httpMethod, URI url) {
+ this.delegate = new DefaultClientWebRequestBuilder(httpMethod, url);
+ }
+
+ /**
+ * Add an HTTP request header
+ */
+ public RxJava1ClientWebRequestBuilder header(String name, String... values) {
+ this.delegate.header(name, values);
+ return this;
+ }
+
+ /**
+ * Add all provided HTTP request headers
+ */
+ public RxJava1ClientWebRequestBuilder headers(HttpHeaders httpHeaders) {
+ this.delegate.headers(httpHeaders);
+ return this;
+ }
+
+ /**
+ * Set the Content-Type request header to the given {@link MediaType}
+ */
+ public RxJava1ClientWebRequestBuilder contentType(MediaType contentType) {
+ this.delegate.contentType(contentType);
+ return this;
+ }
+
+ /**
+ * Set the Content-Type request header to the given media type
+ */
+ public RxJava1ClientWebRequestBuilder contentType(String contentType) {
+ this.delegate.contentType(contentType);
+ return this;
+ }
+
+ /**
+ * Set the Accept request header to the given {@link MediaType}s
+ */
+ public RxJava1ClientWebRequestBuilder accept(MediaType... mediaTypes) {
+ this.delegate.accept(mediaTypes);
+ return this;
+ }
+
+ /**
+ * Set the Accept request header to the given media types
+ */
+ public RxJava1ClientWebRequestBuilder accept(String... mediaTypes) {
+ this.delegate.accept(mediaTypes);
+ return this;
+ }
+
+ /**
+ * Add a Cookie to the HTTP request
+ */
+ public RxJava1ClientWebRequestBuilder cookie(String name, String value) {
+ this.delegate.cookie(name, value);
+ return this;
+ }
+
+ /**
+ * Add a Cookie to the HTTP request
+ */
+ public RxJava1ClientWebRequestBuilder cookie(HttpCookie cookie) {
+ this.delegate.cookie(cookie);
+ return this;
+ }
+
+ /**
+ * Allows performing more complex operations with a strategy. For example, a
+ * {@link ClientWebRequestPostProcessor} implementation might accept the arguments of username
+ * and password and set an HTTP Basic authentication header.
+ *
+ * @param postProcessor the {@link ClientWebRequestPostProcessor} to use. Cannot be null.
+ *
+ * @return this instance for further modifications.
+ */
+ public RxJava1ClientWebRequestBuilder apply(ClientWebRequestPostProcessor postProcessor) {
+ this.delegate.apply(postProcessor);
+ return this;
+ }
+
+ /**
+ * Use the given object as the request body
+ */
+ public RxJava1ClientWebRequestBuilder body(Object content) {
+ this.delegate.body(Mono.just(content), ResolvableType.forInstance(content));
+ return this;
+ }
+
+ /**
+ * Use the given {@link Single} as the request body and use its {@link ResolvableType}
+ * as type information for the element published by this reactive stream
+ */
+ public RxJava1ClientWebRequestBuilder body(Single> content, ResolvableType elementType) {
+ this.delegate.body(RxJava1SingleConverter.toPublisher(content), elementType);
+ return this;
+ }
+
+ /**
+ * Use the given {@link Observable} as the request body and use its {@link ResolvableType}
+ * as type information for the elements published by this reactive stream
+ */
+ public RxJava1ClientWebRequestBuilder body(Observable> content, ResolvableType elementType) {
+ this.delegate.body(RxJava1ObservableConverter.toPublisher(content), elementType);
+ return this;
+ }
+
+ @Override
+ public ClientWebRequest build() {
+ return this.delegate.build();
+ }
+
+}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/support/RxJava1ClientWebRequestBuilders.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/support/RxJava1ClientWebRequestBuilders.java
new file mode 100644
index 0000000000..4a6a8281b8
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/support/RxJava1ClientWebRequestBuilders.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.web.client.reactive.support;
+
+import org.springframework.http.HttpMethod;
+
+/**
+ * Static factory methods for {@link RxJava1ClientWebRequestBuilder ClientWebRequestBuilders}
+ * using the {@link rx.Observable} and {@link rx.Single} API.
+ *
+ * @author Brian Clozel
+ */
+public abstract class RxJava1ClientWebRequestBuilders {
+
+ /**
+ * Create a {@link RxJava1ClientWebRequestBuilder} for a GET request.
+ *
+ * @param urlTemplate a URL template; the resulting URL will be encoded
+ * @param urlVariables zero or more URL variables
+ */
+ public static RxJava1ClientWebRequestBuilder get(String urlTemplate, Object... urlVariables) {
+ return new RxJava1ClientWebRequestBuilder(HttpMethod.GET, urlTemplate, urlVariables);
+ }
+
+ /**
+ * Create a {@link RxJava1ClientWebRequestBuilder} for a POST request.
+ *
+ * @param urlTemplate a URL template; the resulting URL will be encoded
+ * @param urlVariables zero or more URL variables
+ */
+ public static RxJava1ClientWebRequestBuilder post(String urlTemplate, Object... urlVariables) {
+ return new RxJava1ClientWebRequestBuilder(HttpMethod.POST, urlTemplate, urlVariables);
+ }
+
+
+ /**
+ * Create a {@link RxJava1ClientWebRequestBuilder} for a PUT request.
+ *
+ * @param urlTemplate a URL template; the resulting URL will be encoded
+ * @param urlVariables zero or more URL variables
+ */
+ public static RxJava1ClientWebRequestBuilder put(String urlTemplate, Object... urlVariables) {
+ return new RxJava1ClientWebRequestBuilder(HttpMethod.PUT, urlTemplate, urlVariables);
+ }
+
+ /**
+ * Create a {@link RxJava1ClientWebRequestBuilder} for a PATCH request.
+ *
+ * @param urlTemplate a URL template; the resulting URL will be encoded
+ * @param urlVariables zero or more URL variables
+ */
+ public static RxJava1ClientWebRequestBuilder patch(String urlTemplate, Object... urlVariables) {
+ return new RxJava1ClientWebRequestBuilder(HttpMethod.PATCH, urlTemplate, urlVariables);
+ }
+
+ /**
+ * Create a {@link RxJava1ClientWebRequestBuilder} for a DELETE request.
+ *
+ * @param urlTemplate a URL template; the resulting URL will be encoded
+ * @param urlVariables zero or more URL variables
+ */
+ public static RxJava1ClientWebRequestBuilder delete(String urlTemplate, Object... urlVariables) {
+ return new RxJava1ClientWebRequestBuilder(HttpMethod.DELETE, urlTemplate, urlVariables);
+ }
+
+ /**
+ * Create a {@link RxJava1ClientWebRequestBuilder} for an OPTIONS request.
+ *
+ * @param urlTemplate a URL template; the resulting URL will be encoded
+ * @param urlVariables zero or more URL variables
+ */
+ public static RxJava1ClientWebRequestBuilder options(String urlTemplate, Object... urlVariables) {
+ return new RxJava1ClientWebRequestBuilder(HttpMethod.OPTIONS, urlTemplate, urlVariables);
+ }
+
+ /**
+ * Create a {@link RxJava1ClientWebRequestBuilder} for a HEAD request.
+ *
+ * @param urlTemplate a URL template; the resulting URL will be encoded
+ * @param urlVariables zero or more URL variables
+ */
+ public static RxJava1ClientWebRequestBuilder head(String urlTemplate, Object... urlVariables) {
+ return new RxJava1ClientWebRequestBuilder(HttpMethod.HEAD, urlTemplate, urlVariables);
+ }
+
+ /**
+ * Create a {@link RxJava1ClientWebRequestBuilder} for a request with the given HTTP method.
+ *
+ * @param httpMethod the HTTP method
+ * @param urlTemplate a URL template; the resulting URL will be encoded
+ * @param urlVariables zero or more URL variables
+ */
+ public static RxJava1ClientWebRequestBuilder request(HttpMethod httpMethod, String urlTemplate, Object... urlVariables) {
+ return new RxJava1ClientWebRequestBuilder(httpMethod, urlTemplate, urlVariables);
+ }
+}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/support/RxJava1ResponseExtractors.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/support/RxJava1ResponseExtractors.java
new file mode 100644
index 0000000000..96de49f9a0
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/support/RxJava1ResponseExtractors.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.web.client.reactive.support;
+
+import java.util.List;
+import java.util.Optional;
+
+import org.springframework.core.ResolvableType;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.http.client.reactive.ClientHttpResponse;
+import org.springframework.http.converter.reactive.HttpMessageConverter;
+import org.springframework.web.client.reactive.ResponseExtractor;
+
+import reactor.core.converter.RxJava1ObservableConverter;
+import reactor.core.converter.RxJava1SingleConverter;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import rx.Observable;
+import rx.Single;
+
+/**
+ * Static factory methods for {@link ResponseExtractor}
+ * based on the {@link Observable} and {@link Single} API.
+ *
+ * @author Brian Clozel
+ */
+public class RxJava1ResponseExtractors {
+
+ /**
+ * Extract the response body and decode it, returning it as a {@code Single}
+ */
+ public static ResponseExtractor> body(Class sourceClass) {
+
+ ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
+ //noinspection unchecked
+ return (clientResponse, messageConverters) -> (Single) RxJava1SingleConverter
+ .fromPublisher(clientResponse
+ .flatMap(resp -> decodeResponseBody(resp, resolvableType, messageConverters)).next());
+ }
+
+ /**
+ * Extract the response body and decode it, returning it as an {@code Observable}
+ */
+ public static ResponseExtractor> bodyStream(Class sourceClass) {
+
+ ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
+ return (clientResponse, messageConverters) -> RxJava1ObservableConverter
+ .fromPublisher(clientResponse
+ .flatMap(resp -> decodeResponseBody(resp, resolvableType, messageConverters)));
+ }
+
+ /**
+ * Extract the full response body as a {@code ResponseEntity}
+ * with its body decoded as a single type {@code T}
+ */
+ public static ResponseExtractor>> response(Class sourceClass) {
+
+ ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
+ return (clientResponse, messageConverters) -> (Single>)
+ RxJava1SingleConverter.fromPublisher(clientResponse
+ .then(response ->
+ Mono.when(
+ decodeResponseBody(response, resolvableType, messageConverters).next(),
+ Mono.just(response.getHeaders()),
+ Mono.just(response.getStatusCode())))
+ .map(tuple -> {
+ //noinspection unchecked
+ return new ResponseEntity<>((T) tuple.getT1(), tuple.getT2(), tuple.getT3());
+ }));
+ }
+
+ /**
+ * Extract the full response body as a {@code ResponseEntity}
+ * with its body decoded as an {@code Observable}
+ */
+ public static ResponseExtractor>>> responseStream(Class sourceClass) {
+ ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
+ return (clientResponse, messageConverters) -> RxJava1SingleConverter.fromPublisher(clientResponse
+ .map(response -> new ResponseEntity<>(
+ RxJava1ObservableConverter
+ .fromPublisher(decodeResponseBody(response, resolvableType, messageConverters)),
+ response.getHeaders(),
+ response.getStatusCode())));
+ }
+
+ /**
+ * Extract the response headers as an {@code HttpHeaders} instance
+ */
+ public static ResponseExtractor> headers() {
+ return (clientResponse, messageConverters) -> RxJava1SingleConverter
+ .fromPublisher(clientResponse.map(resp -> resp.getHeaders()));
+ }
+
+ protected static Flux decodeResponseBody(ClientHttpResponse response, ResolvableType responseType,
+ List> messageConverters) {
+
+ MediaType contentType = response.getHeaders().getContentType();
+ Optional> converter = resolveConverter(messageConverters, responseType, contentType);
+ if (!converter.isPresent()) {
+ return Flux.error(new IllegalStateException("Could not decode response body of type '" + contentType +
+ "' with target type '" + responseType.toString() + "'"));
+ }
+ //noinspection unchecked
+ return (Flux) converter.get().read(responseType, response);
+ }
+
+
+ protected static Optional> resolveConverter(List> messageConverters,
+ ResolvableType type, MediaType mediaType) {
+ return messageConverters.stream().filter(e -> e.canRead(type, mediaType)).findFirst();
+ }
+}
\ No newline at end of file
diff --git a/spring-web-reactive/src/test/java/org/springframework/web/client/reactive/RxJava1WebClientIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/client/reactive/RxJava1WebClientIntegrationTests.java
new file mode 100644
index 0000000000..97ebe3eed0
--- /dev/null
+++ b/spring-web-reactive/src/test/java/org/springframework/web/client/reactive/RxJava1WebClientIntegrationTests.java
@@ -0,0 +1,340 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.web.client.reactive;
+
+import static org.junit.Assert.*;
+import static org.springframework.web.client.reactive.support.RxJava1ClientWebRequestBuilders.*;
+import static org.springframework.web.client.reactive.support.RxJava1ResponseExtractors.*;
+
+import java.util.concurrent.TimeUnit;
+
+import okhttp3.HttpUrl;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import rx.Observable;
+import rx.Single;
+import rx.observers.TestSubscriber;
+
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
+import org.springframework.http.codec.Pojo;
+
+/**
+ * {@link WebClient} integration tests with the {@code Obserable} and {@code Single} API.
+ *
+ * @author Brian Clozel
+ */
+public class RxJava1WebClientIntegrationTests {
+
+ private MockWebServer server;
+
+ private WebClient webClient;
+
+ @Before
+ public void setup() {
+ this.server = new MockWebServer();
+ this.webClient = new WebClient(new ReactorClientHttpConnector());
+ }
+
+ @Test
+ public void shouldGetHeaders() throws Exception {
+
+ HttpUrl baseUrl = server.url("/greeting?name=Spring");
+ this.server.enqueue(new MockResponse().setHeader("Content-Type", "text/plain").setBody("Hello Spring!"));
+
+ Single result = this.webClient
+ .perform(get(baseUrl.toString()))
+ .extract(headers());
+
+ TestSubscriber ts = new TestSubscriber();
+ result.subscribe(ts);
+ ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
+
+ HttpHeaders httpHeaders = ts.getOnNextEvents().get(0);
+ assertEquals(MediaType.TEXT_PLAIN, httpHeaders.getContentType());
+ assertEquals(13L, httpHeaders.getContentLength());
+ ts.assertValueCount(1);
+ ts.assertCompleted();
+
+ RecordedRequest request = server.takeRequest();
+ assertEquals(1, server.getRequestCount());
+ assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT));
+ assertEquals("/greeting?name=Spring", request.getPath());
+ }
+
+ @Test
+ public void shouldGetPlainTextResponseAsObject() throws Exception {
+
+ HttpUrl baseUrl = server.url("/greeting?name=Spring");
+ this.server.enqueue(new MockResponse().setBody("Hello Spring!"));
+
+ Single result = this.webClient
+ .perform(get(baseUrl.toString())
+ .header("X-Test-Header", "testvalue"))
+ .extract(body(String.class));
+
+ TestSubscriber ts = new TestSubscriber();
+ result.subscribe(ts);
+ ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
+
+ String response = ts.getOnNextEvents().get(0);
+ assertEquals("Hello Spring!", response);
+ ts.assertValueCount(1);
+ ts.assertCompleted();
+
+ RecordedRequest request = server.takeRequest();
+ assertEquals(1, server.getRequestCount());
+ assertEquals("testvalue", request.getHeader("X-Test-Header"));
+ assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT));
+ assertEquals("/greeting?name=Spring", request.getPath());
+ }
+
+ @Test
+ public void shouldGetPlainTextResponse() throws Exception {
+
+ HttpUrl baseUrl = server.url("/greeting?name=Spring");
+ this.server.enqueue(new MockResponse().setHeader("Content-Type", "text/plain").setBody("Hello Spring!"));
+
+ Single> result = this.webClient
+ .perform(get(baseUrl.toString())
+ .accept(MediaType.TEXT_PLAIN))
+ .extract(response(String.class));
+
+ TestSubscriber> ts = new TestSubscriber>();
+ result.subscribe(ts);
+ ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
+
+ ResponseEntity response = ts.getOnNextEvents().get(0);
+ assertEquals(200, response.getStatusCode().value());
+ assertEquals(MediaType.TEXT_PLAIN, response.getHeaders().getContentType());
+ assertEquals("Hello Spring!", response.getBody());
+ ts.assertValueCount(1);
+ ts.assertCompleted();
+
+ RecordedRequest request = server.takeRequest();
+ assertEquals(1, server.getRequestCount());
+ assertEquals("/greeting?name=Spring", request.getPath());
+ assertEquals("text/plain", request.getHeader(HttpHeaders.ACCEPT));
+ }
+
+ @Test
+ public void shouldGetJsonAsMonoOfString() throws Exception {
+
+ HttpUrl baseUrl = server.url("/json");
+ String content = "{\"bar\":\"barbar\",\"foo\":\"foofoo\"}";
+ this.server.enqueue(new MockResponse().setHeader("Content-Type", "application/json")
+ .setBody(content));
+
+ Single result = this.webClient
+ .perform(get(baseUrl.toString())
+ .accept(MediaType.APPLICATION_JSON))
+ .extract(body(String.class));
+
+ TestSubscriber ts = new TestSubscriber();
+ result.subscribe(ts);
+ ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
+
+ String response = ts.getOnNextEvents().get(0);
+ assertEquals(content, response);
+ ts.assertValueCount(1);
+ ts.assertCompleted();
+
+ RecordedRequest request = server.takeRequest();
+ assertEquals(1, server.getRequestCount());
+ assertEquals("/json", request.getPath());
+ assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT));
+ }
+
+ @Test
+ public void shouldGetJsonAsMonoOfPojo() throws Exception {
+
+ HttpUrl baseUrl = server.url("/pojo");
+ this.server.enqueue(new MockResponse().setHeader("Content-Type", "application/json")
+ .setBody("{\"bar\":\"barbar\",\"foo\":\"foofoo\"}"));
+
+ Single result = this.webClient
+ .perform(get(baseUrl.toString())
+ .accept(MediaType.APPLICATION_JSON))
+ .extract(body(Pojo.class));
+
+ TestSubscriber ts = new TestSubscriber();
+ result.subscribe(ts);
+ ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
+
+ Pojo response = ts.getOnNextEvents().get(0);
+ assertEquals("barbar", response.getBar());
+ ts.assertValueCount(1);
+ ts.assertCompleted();
+
+ RecordedRequest request = server.takeRequest();
+ assertEquals(1, server.getRequestCount());
+ assertEquals("/pojo", request.getPath());
+ assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT));
+ }
+
+ @Test
+ public void shouldGetJsonAsFluxOfPojos() throws Exception {
+
+ HttpUrl baseUrl = server.url("/pojos");
+ this.server.enqueue(new MockResponse().setHeader("Content-Type", "application/json")
+ .setBody("[{\"bar\":\"bar1\",\"foo\":\"foo1\"},{\"bar\":\"bar2\",\"foo\":\"foo2\"}]"));
+
+ Observable result = this.webClient
+ .perform(get(baseUrl.toString())
+ .accept(MediaType.APPLICATION_JSON))
+ .extract(bodyStream(Pojo.class));
+
+ TestSubscriber ts = new TestSubscriber();
+ result.subscribe(ts);
+ ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
+
+ assertThat(ts.getOnNextEvents().get(0).getBar(), Matchers.is("bar1"));
+ assertThat(ts.getOnNextEvents().get(1).getBar(), Matchers.is("bar2"));
+ ts.assertValueCount(2);
+ ts.assertCompleted();
+
+ RecordedRequest request = server.takeRequest();
+ assertEquals(1, server.getRequestCount());
+ assertEquals("/pojos", request.getPath());
+ assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT));
+ }
+
+ @Test
+ public void shouldGetJsonAsResponseOfPojosStream() throws Exception {
+
+ HttpUrl baseUrl = server.url("/pojos");
+ this.server.enqueue(new MockResponse().setHeader("Content-Type", "application/json")
+ .setBody("[{\"bar\":\"bar1\",\"foo\":\"foo1\"},{\"bar\":\"bar2\",\"foo\":\"foo2\"}]"));
+
+ Single>> result = this.webClient
+ .perform(get(baseUrl.toString())
+ .accept(MediaType.APPLICATION_JSON))
+ .extract(responseStream(Pojo.class));
+
+ TestSubscriber>> ts = new TestSubscriber>>();
+ result.subscribe(ts);
+ ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
+
+ ResponseEntity> response = ts.getOnNextEvents().get(0);
+ assertEquals(200, response.getStatusCode().value());
+ assertEquals(MediaType.APPLICATION_JSON, response.getHeaders().getContentType());
+ ts.assertValueCount(1);
+ ts.assertCompleted();
+
+ RecordedRequest request = server.takeRequest();
+ assertEquals(1, server.getRequestCount());
+ assertEquals("/pojos", request.getPath());
+ assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT));
+ }
+
+ @Test
+ public void shouldPostPojoAsJson() throws Exception {
+
+ HttpUrl baseUrl = server.url("/pojo/capitalize");
+ this.server.enqueue(new MockResponse()
+ .setHeader("Content-Type", "application/json")
+ .setBody("{\"bar\":\"BARBAR\",\"foo\":\"FOOFOO\"}"));
+
+ Pojo spring = new Pojo("foofoo", "barbar");
+ Single result = this.webClient
+ .perform(post(baseUrl.toString())
+ .body(spring)
+ .contentType(MediaType.APPLICATION_JSON)
+ .accept(MediaType.APPLICATION_JSON))
+ .extract(body(Pojo.class));
+
+ TestSubscriber ts = new TestSubscriber();
+ result.subscribe(ts);
+ ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
+
+ assertThat(ts.getOnNextEvents().get(0).getBar(), Matchers.is("BARBAR"));
+ ts.assertValueCount(1);
+ ts.assertCompleted();
+
+ RecordedRequest request = server.takeRequest();
+ assertEquals(1, server.getRequestCount());
+ assertEquals("/pojo/capitalize", request.getPath());
+ assertEquals("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}", request.getBody().readUtf8());
+ assertEquals("chunked", request.getHeader(HttpHeaders.TRANSFER_ENCODING));
+ assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT));
+ assertEquals("application/json", request.getHeader(HttpHeaders.CONTENT_TYPE));
+ }
+
+ @Test
+ public void shouldSendCookieHeader() throws Exception {
+ HttpUrl baseUrl = server.url("/test");
+ this.server.enqueue(new MockResponse()
+ .setHeader("Content-Type", "text/plain").setBody("test"));
+
+ Single result = this.webClient
+ .perform(get(baseUrl.toString())
+ .cookie("testkey", "testvalue"))
+ .extract(body(String.class));
+
+ TestSubscriber ts = new TestSubscriber();
+ result.subscribe(ts);
+ ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
+
+ String response = ts.getOnNextEvents().get(0);
+ assertEquals("test", response);
+ ts.assertValueCount(1);
+ ts.assertCompleted();
+
+ RecordedRequest request = server.takeRequest();
+ assertEquals(1, server.getRequestCount());
+ assertEquals("/test", request.getPath());
+ assertEquals("testkey=testvalue", request.getHeader(HttpHeaders.COOKIE));
+ }
+
+ @Test
+ @Ignore
+ public void shouldGetErrorWhen404() throws Exception {
+
+ HttpUrl baseUrl = server.url("/greeting?name=Spring");
+ this.server.enqueue(new MockResponse().setResponseCode(404));
+
+ Single result = this.webClient
+ .perform(get(baseUrl.toString()))
+ .extract(body(String.class));
+
+ // TODO: error message should be converted to a ClientException
+ TestSubscriber ts = new TestSubscriber();
+ result.subscribe(ts);
+ ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
+
+ ts.assertError(WebClientException.class);
+
+ RecordedRequest request = server.takeRequest();
+ assertEquals(1, server.getRequestCount());
+ assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT));
+ assertEquals("/greeting?name=Spring", request.getPath());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ this.server.shutdown();
+ }
+
+}