From 449aea577c0c746a87950b0d806e6f861a1cd063 Mon Sep 17 00:00:00 2001 From: adriancole Date: Wed, 11 Sep 2013 19:38:50 +0200 Subject: [PATCH] Remove support for Observable methods. --- CHANGES.md | 3 + core/src/main/java/feign/Contract.java | 12 -- core/src/main/java/feign/Feign.java | 51 +---- core/src/main/java/feign/MethodHandler.java | 197 +++--------------- core/src/main/java/feign/MethodMetadata.java | 15 -- core/src/main/java/feign/Observable.java | 39 ---- core/src/main/java/feign/Observer.java | 68 ------ core/src/main/java/feign/ReflectiveFeign.java | 55 +---- core/src/main/java/feign/Subscription.java | 32 --- .../java/feign/codec/IncrementalDecoder.java | 117 ----------- .../feign/codec/StringIncrementalDecoder.java | 33 --- .../test/java/feign/DefaultContractTest.java | 37 ---- core/src/test/java/feign/FeignTest.java | 169 --------------- .../java/feign/examples/GitHubExample.java | 62 +----- gson/src/main/java/feign/gson/GsonModule.java | 18 +- .../test/java/feign/gson/GsonModuleTest.java | 46 ---- .../java/feign/jaxrs/JAXRSContractTest.java | 37 ---- .../feign/jaxrs/examples/GitHubExample.java | 46 ---- 18 files changed, 52 insertions(+), 985 deletions(-) delete mode 100644 core/src/main/java/feign/Observable.java delete mode 100644 core/src/main/java/feign/Observer.java delete mode 100644 core/src/main/java/feign/Subscription.java delete mode 100644 core/src/main/java/feign/codec/IncrementalDecoder.java delete mode 100644 core/src/main/java/feign/codec/StringIncrementalDecoder.java diff --git a/CHANGES.md b/CHANGES.md index c8cb7106..3cf46c2c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,6 @@ +### Version 5.0 +* Remove support for Observable methods. + ### Version 4.4.1 * Fix NullPointerException on calling equals and hashCode. diff --git a/core/src/main/java/feign/Contract.java b/core/src/main/java/feign/Contract.java index eed9b7bd..81324740 100644 --- a/core/src/main/java/feign/Contract.java +++ b/core/src/main/java/feign/Contract.java @@ -18,7 +18,6 @@ package feign; import javax.inject.Named; import java.lang.annotation.Annotation; import java.lang.reflect.Method; -import java.lang.reflect.Type; import java.net.URI; import java.util.ArrayList; import java.util.Collection; @@ -26,7 +25,6 @@ import java.util.List; import static feign.Util.checkState; import static feign.Util.emptyToNull; -import static feign.Util.resolveLastTypeParameter; /** * Defines what annotations and values are valid on interfaces. @@ -58,14 +56,6 @@ public interface Contract { data.returnType(method.getGenericReturnType()); data.configKey(Feign.configKey(method)); - if (Observable.class.isAssignableFrom(method.getReturnType())) { - Type context = method.getGenericReturnType(); - Type observableType = resolveLastTypeParameter(method.getGenericReturnType(), Observable.class); - checkState(observableType != null, "Expected param %s to be Observable or Observable or a subtype", - context, observableType); - data.incrementalType(observableType); - } - for (Annotation methodAnnotation : method.getAnnotations()) { processAnnotationOnMethod(data, methodAnnotation, method); } @@ -83,8 +73,6 @@ public interface Contract { if (parameterTypes[i] == URI.class) { data.urlIndex(i); } else if (!isHttpAnnotation) { - checkState(!Observer.class.isAssignableFrom(parameterTypes[i]), - "Please return Observer as opposed to passing an Observable arg: %s", method); checkState(data.formParams().isEmpty(), "Body parameters cannot be used with form parameters."); checkState(data.bodyIndex() == null, "Method has too many Body parameters: %s", method); data.bodyIndex(i); diff --git a/core/src/main/java/feign/Feign.java b/core/src/main/java/feign/Feign.java index f92841e9..f4e8c1f4 100644 --- a/core/src/main/java/feign/Feign.java +++ b/core/src/main/java/feign/Feign.java @@ -16,34 +16,19 @@ package feign; -import dagger.Lazy; import dagger.ObjectGraph; import dagger.Provides; import feign.Logger.NoOpLogger; import feign.Request.Options; import feign.Target.HardCodedTarget; -import feign.codec.Decoder; -import feign.codec.Encoder; import feign.codec.ErrorDecoder; -import feign.codec.IncrementalDecoder; -import javax.inject.Named; -import javax.inject.Singleton; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLSocketFactory; -import java.io.Closeable; import java.lang.reflect.Method; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; - -import static java.lang.Thread.MIN_PRIORITY; /** * Feign's purpose is to ease development against http apis that feign @@ -52,7 +37,7 @@ import static java.lang.Thread.MIN_PRIORITY; * In implementation, Feign is a {@link Feign#newInstance factory} for * generating {@link Target targeted} http apis. */ -public abstract class Feign implements Closeable { +public abstract class Feign { /** * Returns a new instance of an HTTP API, defined by annotations in the @@ -106,9 +91,8 @@ public abstract class Feign implements Closeable { return SSLSocketFactory.class.cast(SSLSocketFactory.getDefault()); } - @Provides - HostnameVerifier hostnameVerifier() { - return HttpsURLConnection.getDefaultHostnameVerifier(); + @Provides HostnameVerifier hostnameVerifier() { + return HttpsURLConnection.getDefaultHostnameVerifier(); } @Provides Client httpClient(Client.Default client) { @@ -130,22 +114,6 @@ public abstract class Feign implements Closeable { @Provides Options options() { return new Options(); } - - /** - * Used for both http invocation and decoding when observers are used. - */ - @Provides @Singleton @Named("http") Executor httpExecutor() { - return Executors.newCachedThreadPool(new ThreadFactory() { - @Override public Thread newThread(final Runnable r) { - return new Thread(new Runnable() { - @Override public void run() { - Thread.currentThread().setPriority(MIN_PRIORITY); - r.run(); - } - }, MethodHandler.IDLE_THREAD_NAME); - } - }); - } } /** @@ -188,17 +156,4 @@ public abstract class Feign implements Closeable { modulesForGraph.add(module); return modulesForGraph; } - - private final Lazy httpExecutor; - - Feign(Lazy httpExecutor) { - this.httpExecutor = httpExecutor; - } - - @Override public void close() { - Executor e = httpExecutor.get(); - if (e instanceof ExecutorService) { - ExecutorService.class.cast(e).shutdownNow(); - } - } } diff --git a/core/src/main/java/feign/MethodHandler.java b/core/src/main/java/feign/MethodHandler.java index 173285da..767f1e4b 100644 --- a/core/src/main/java/feign/MethodHandler.java +++ b/core/src/main/java/feign/MethodHandler.java @@ -15,22 +15,16 @@ */ package feign; -import dagger.Lazy; import feign.Request.Options; import feign.codec.DecodeException; import feign.codec.Decoder; import feign.codec.ErrorDecoder; -import feign.codec.IncrementalDecoder; import javax.inject.Inject; -import javax.inject.Named; import javax.inject.Provider; import java.io.IOException; -import java.io.Reader; import java.util.Set; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import static feign.FeignException.errorExecuting; import static feign.FeignException.errorReading; @@ -43,16 +37,14 @@ interface MethodHandler { static class Factory { private final Client client; - private final Lazy httpExecutor; private final Provider retryer; private final Set requestInterceptors; private final Logger logger; private final Provider logLevel; - @Inject Factory(Client client, @Named("http") Lazy httpExecutor, Provider retryer, - Set requestInterceptors, Logger logger, Provider logLevel) { + @Inject Factory(Client client, Provider retryer, Set requestInterceptors, + Logger logger, Provider logLevel) { this.client = checkNotNull(client, "client"); - this.httpExecutor = checkNotNull(httpExecutor, "httpExecutor"); this.retryer = checkNotNull(retryer, "retryer"); this.requestInterceptors = checkNotNull(requestInterceptors, "requestInterceptors"); this.logger = checkNotNull(logger, "logger"); @@ -64,14 +56,6 @@ interface MethodHandler { return new SynchronousMethodHandler(target, client, retryer, requestInterceptors, logger, logLevel, md, buildTemplateFromArgs, options, decoder, errorDecoder); } - - public MethodHandler create(Target target, MethodMetadata md, BuildTemplateFromArgs buildTemplateFromArgs, - Options options, IncrementalDecoder.TextStream incrementalDecoder, - ErrorDecoder errorDecoder) { - ObserverHandler observerHandler = new ObserverHandler(target, client, retryer, requestInterceptors, logger, - logLevel, md, buildTemplateFromArgs, options, incrementalDecoder, errorDecoder, httpExecutor); - return new ObservableMethodHandler(observerHandler); - } } /** @@ -81,158 +65,31 @@ interface MethodHandler { public RequestTemplate apply(Object[] argv); } - static class ObservableMethodHandler implements MethodHandler { - private final ObserverHandler observerHandler; - - private ObservableMethodHandler(ObserverHandler observerHandler) { - this.observerHandler = observerHandler; - } - - @Override public Object invoke(Object[] argv) { - final Object[] argvCopy = new Object[argv != null ? argv.length : 0]; - if (argv != null) - System.arraycopy(argv, 0, argvCopy, 0, argv.length); - - return new Observable() { - - @Override public Subscription subscribe(Observer observer) { - final Object[] oneMoreArg = new Object[argvCopy.length + 1]; - System.arraycopy(argvCopy, 0, oneMoreArg, 0, argvCopy.length); - oneMoreArg[argvCopy.length] = observer; - return observerHandler.invoke(oneMoreArg); - } - }; - } - } - - static class ObserverHandler extends BaseMethodHandler { - private final Lazy httpExecutor; - private final IncrementalDecoder.TextStream incrementalDecoder; - - private ObserverHandler(Target target, Client client, Provider retryer, - Set requestInterceptors, Logger logger, - Provider logLevel, MethodMetadata metadata, - BuildTemplateFromArgs buildTemplateFromArgs, Options options, - IncrementalDecoder.TextStream incrementalDecoder, ErrorDecoder errorDecoder, - Lazy httpExecutor) { - super(target, client, retryer, requestInterceptors, logger, logLevel, metadata, buildTemplateFromArgs, options, - errorDecoder); - this.httpExecutor = checkNotNull(httpExecutor, "httpExecutor for %s", target); - this.incrementalDecoder = checkNotNull(incrementalDecoder, "incrementalDecoder for %s", target); - } - - @Override public Subscription invoke(Object[] argv) { - final AtomicBoolean subscribed = new AtomicBoolean(true); - final Object[] oneMoreArg = new Object[argv.length + 1]; - System.arraycopy(argv, 0, oneMoreArg, 0, argv.length); - oneMoreArg[argv.length] = subscribed; - httpExecutor.get().execute(new Runnable() { - @Override public void run() { - Error error = null; - Object arg = oneMoreArg[oneMoreArg.length - 2]; - Observer observer = Observer.class.cast(arg); - try { - ObserverHandler.super.invoke(oneMoreArg); - observer.onSuccess(); - } catch (Error cause) { - // assign to a variable in case .onFailure throws a RTE - error = cause; - observer.onFailure(cause); - } catch (Throwable cause) { - observer.onFailure(cause); - } finally { - Thread.currentThread().setName(IDLE_THREAD_NAME); - if (error != null) - throw error; - } - } - }); - return new Subscription() { - @Override public void unsubscribe() { - subscribed.set(false); - } - }; - } - - @Override protected Void decode(Object[] oneMoreArg, Response response) throws IOException { - Object arg = oneMoreArg[oneMoreArg.length - 2]; - Observer observer = Observer.class.cast(arg); - AtomicBoolean subscribed = AtomicBoolean.class.cast(oneMoreArg[oneMoreArg.length - 1]); - if (metadata.incrementalType().equals(Response.class)) { - observer.onNext(response); - } else if (metadata.incrementalType() != Void.class) { - Response.Body body = response.body(); - if (body == null) - return null; - Reader reader = body.asReader(); - try { - incrementalDecoder.decode(reader, metadata.incrementalType(), observer, subscribed); - } finally { - ensureClosed(body); - } - } - return null; - } - - @Override protected Request targetRequest(RequestTemplate template) { - Request request = super.targetRequest(template); - Thread.currentThread().setName(THREAD_PREFIX + metadata.configKey()); - return request; - } - } - /** * same approach as retrofit: temporarily rename threads */ static String THREAD_PREFIX = "Feign-"; static String IDLE_THREAD_NAME = THREAD_PREFIX + "Idle"; - static class SynchronousMethodHandler extends BaseMethodHandler { + static final class SynchronousMethodHandler implements MethodHandler { + + private final MethodMetadata metadata; + private final Target target; + private final Client client; + private final Provider retryer; + private final Set requestInterceptors; + private final Logger logger; + private final Provider logLevel; + private final BuildTemplateFromArgs buildTemplateFromArgs; + private final Options options; private final Decoder.TextStream decoder; + private final ErrorDecoder errorDecoder; private SynchronousMethodHandler(Target target, Client client, Provider retryer, Set requestInterceptors, Logger logger, Provider logLevel, MethodMetadata metadata, BuildTemplateFromArgs buildTemplateFromArgs, Options options, Decoder.TextStream decoder, ErrorDecoder errorDecoder) { - super(target, client, retryer, requestInterceptors, logger, logLevel, metadata, buildTemplateFromArgs, options, - errorDecoder); - this.decoder = checkNotNull(decoder, "decoder for %s", target); - } - - @Override protected Object decode(Object[] argv, Response response) throws Throwable { - if (metadata.returnType().equals(Response.class)) { - return response; - } else if (metadata.returnType() == void.class || response.body() == null) { - return null; - } - try { - return decoder.decode(response.body().asReader(), metadata.returnType()); - } catch (FeignException e) { - throw e; - } catch (RuntimeException e) { - throw new DecodeException(e.getMessage(), e); - } - } - } - - static abstract class BaseMethodHandler implements MethodHandler { - - protected final MethodMetadata metadata; - protected final Target target; - protected final Client client; - protected final Provider retryer; - protected final Set requestInterceptors; - protected final Logger logger; - protected final Provider logLevel; - protected final BuildTemplateFromArgs buildTemplateFromArgs; - protected final Options options; - protected final ErrorDecoder errorDecoder; - - private BaseMethodHandler(Target target, Client client, Provider retryer, - Set requestInterceptors, Logger logger, - Provider logLevel, MethodMetadata metadata, - BuildTemplateFromArgs buildTemplateFromArgs, Options options, ErrorDecoder errorDecoder) { this.target = checkNotNull(target, "target"); this.client = checkNotNull(client, "client for %s", target); this.retryer = checkNotNull(retryer, "retryer for %s", target); @@ -243,6 +100,7 @@ interface MethodHandler { this.buildTemplateFromArgs = checkNotNull(buildTemplateFromArgs, "metadata for %s", target); this.options = checkNotNull(options, "options for %s", target); this.errorDecoder = checkNotNull(errorDecoder, "errorDecoder for %s", target); + this.decoder = checkNotNull(decoder, "decoder for %s", target); } @Override public Object invoke(Object[] argv) throws Throwable { @@ -250,7 +108,7 @@ interface MethodHandler { Retryer retryer = this.retryer.get(); while (true) { try { - return executeAndDecode(argv, template); + return executeAndDecode(template); } catch (RetryableException e) { retryer.continueOrPropagate(e); if (logLevel.get() != Logger.Level.NONE) { @@ -261,7 +119,7 @@ interface MethodHandler { } } - public Object executeAndDecode(Object[] argv, RequestTemplate template) throws Throwable { + Object executeAndDecode(RequestTemplate template) throws Throwable { Request request = targetRequest(template); if (logLevel.get() != Logger.Level.NONE) { @@ -285,7 +143,7 @@ interface MethodHandler { response = logger.logAndRebufferResponse(metadata.configKey(), logLevel.get(), response, elapsedTime); } if (response.status() >= 200 && response.status() < 300) { - return decode(argv, response); + return decode(response); } else { throw errorDecoder.decode(metadata.configKey(), response); } @@ -299,17 +157,30 @@ interface MethodHandler { } } - protected long elapsedTime(long start) { + long elapsedTime(long start) { return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); } - protected Request targetRequest(RequestTemplate template) { + Request targetRequest(RequestTemplate template) { for (RequestInterceptor interceptor : requestInterceptors) { interceptor.apply(template); } return target.apply(new RequestTemplate(template)); } - protected abstract Object decode(Object[] argv, Response response) throws Throwable; + Object decode(Response response) throws Throwable { + if (metadata.returnType().equals(Response.class)) { + return response; + } else if (metadata.returnType() == void.class || response.body() == null) { + return null; + } + try { + return decoder.decode(response.body().asReader(), metadata.returnType()); + } catch (FeignException e) { + throw e; + } catch (RuntimeException e) { + throw new DecodeException(e.getMessage(), e); + } + } } } diff --git a/core/src/main/java/feign/MethodMetadata.java b/core/src/main/java/feign/MethodMetadata.java index 14ca1f1a..d2c8f3a5 100644 --- a/core/src/main/java/feign/MethodMetadata.java +++ b/core/src/main/java/feign/MethodMetadata.java @@ -30,9 +30,7 @@ public final class MethodMetadata implements Serializable { private String configKey; private transient Type returnType; - private transient Type incrementalType; private Integer urlIndex; - private Integer observerIndex; private Integer bodyIndex; private transient Type bodyType; private RequestTemplate template = new RequestTemplate(); @@ -63,19 +61,6 @@ public final class MethodMetadata implements Serializable { return this; } - /** - * Type that {@link feign.codec.IncrementalDecoder} must process. If null, - * {@link feign.codec.Decoder} will be used against the {@link #returnType()}; - */ - public Type incrementalType() { - return incrementalType; - } - - MethodMetadata incrementalType(Type incrementalType) { - this.incrementalType = incrementalType; - return this; - } - public Integer urlIndex() { return urlIndex; } diff --git a/core/src/main/java/feign/Observable.java b/core/src/main/java/feign/Observable.java deleted file mode 100644 index 0ea6112e..00000000 --- a/core/src/main/java/feign/Observable.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feign; - -/** - * An {@code Observer} is asynchronous equivalent to an {@code Iterable}. - *
- * Each call to {@link #subscribe(Observer)} implies a new - * {@link Request HTTP request}. - * - * @param expected value to decode incrementally from the http response. - */ -public interface Observable { - - /** - * Calling subscribe will initiate a new HTTP request which will be - * {@link feign.codec.IncrementalDecoder incrementally decoded} into the - * {@code observer} until it is finished or - * {@link feign.Subscription#unsubscribe()} is called. - * - * @param observer - * @return a {@link Subscription} with which you can stop the streaming of - * events to the {@code observer}. - */ - public Subscription subscribe(Observer observer); -} diff --git a/core/src/main/java/feign/Observer.java b/core/src/main/java/feign/Observer.java deleted file mode 100644 index d0aa6c78..00000000 --- a/core/src/main/java/feign/Observer.java +++ /dev/null @@ -1,68 +0,0 @@ -package feign; - -/** - * An {@code Observer} is asynchronous equivalent to an {@code Iterator}. - *

- * Observers receive results as they are - * {@link feign.codec.IncrementalDecoder decoded} from an - * {@link Response.Body http response body}. {@link #onNext(Object) onNext} - * will be called for each incremental value of type {@code T} until - * {@link feign.Subscription#unsubscribe()} is called or the response is finished. - *
- * {@link #onSuccess() onSuccess} or {@link #onFailure(Throwable)} onFailure} - * will be called when the response is finished, but not both. - *
- * {@code Observer} can be used as an asynchronous alternative to a - * {@code Collection}, or any other use where iterative response parsing is - * worth the additional effort to implement this interface. - *
- *
- * Here's an example of implementing {@code Observer}: - *
- *

- * Observer counter = new Observer() {
- *
- *   public int count;
- *
- *   @Override public void onNext(Contributor element) {
- *     count++;
- *   }
- *
- *   @Override public void onSuccess() {
- *     System.out.println("found " + count + " contributors");
- *   }
- *
- *   @Override public void onFailure(Throwable cause) {
- *     System.err.println("sad face after contributor " + count);
- *   }
- * };
- * subscription = github.contributors("netflix", "feign", counter);
- * 
- * - * @param expected value to decode incrementally from the http response. - */ -public interface Observer { - /** - * Invoked as soon as new data is available. Could be invoked many times or - * not at all. - * - * @param element next decoded element. - */ - void onNext(T element); - - /** - * Called when response processing completed successfully. - */ - void onSuccess(); - - /** - * Called when response processing failed for any reason. - *
- * Common failure cases include {@link FeignException}, - * {@link java.io.IOException}, and {@link feign.codec.DecodeException}. - * However, the cause could be a {@code Throwable} of any kind. - * - * @param cause the reason for the failure - */ - void onFailure(Throwable cause); -} diff --git a/core/src/main/java/feign/ReflectiveFeign.java b/core/src/main/java/feign/ReflectiveFeign.java index 0cb2490c..81029285 100644 --- a/core/src/main/java/feign/ReflectiveFeign.java +++ b/core/src/main/java/feign/ReflectiveFeign.java @@ -15,19 +15,15 @@ */ package feign; -import dagger.Lazy; import dagger.Provides; import feign.Request.Options; import feign.codec.Decoder; import feign.codec.EncodeException; import feign.codec.Encoder; import feign.codec.ErrorDecoder; -import feign.codec.IncrementalDecoder; import feign.codec.StringDecoder; -import feign.codec.StringIncrementalDecoder; import javax.inject.Inject; -import javax.inject.Named; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; @@ -40,7 +36,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.Executor; import static feign.Util.checkArgument; import static feign.Util.checkNotNull; @@ -53,8 +48,7 @@ public class ReflectiveFeign extends Feign { private final ParseHandlersByName targetToHandlersByName; - @Inject ReflectiveFeign(@Named("http") Lazy httpExecutor, ParseHandlersByName targetToHandlersByName) { - super(httpExecutor); + @Inject ReflectiveFeign(ParseHandlersByName targetToHandlersByName) { this.targetToHandlersByName = targetToHandlersByName; } @@ -136,10 +130,6 @@ public class ReflectiveFeign extends Feign { return Collections.emptySet(); } - @Provides(type = Provides.Type.SET_VALUES) Set noIncrementalDecoders() { - return Collections.emptySet(); - } - @Provides Feign provideFeign(ReflectiveFeign in) { return in; } @@ -151,15 +141,12 @@ public class ReflectiveFeign extends Feign { private final Map> encoders = new HashMap>(); private final Encoder.Text> formEncoder; private final Map> decoders = new HashMap>(); - private final Map> incrementalDecoders = - new HashMap>(); private final ErrorDecoder errorDecoder; private final MethodHandler.Factory factory; @SuppressWarnings("unchecked") @Inject ParseHandlersByName(Contract contract, Options options, Set encoders, Set decoders, - Set incrementalDecoders, ErrorDecoder errorDecoder, - MethodHandler.Factory factory) { + ErrorDecoder errorDecoder, MethodHandler.Factory factory) { this.contract = contract; this.options = options; this.factory = factory; @@ -191,16 +178,6 @@ public class ReflectiveFeign extends Feign { Type type = resolveLastTypeParameter(decoder.getClass(), Decoder.class); this.decoders.put(type, Decoder.TextStream.class.cast(decoder)); } - StringIncrementalDecoder stringIncrementalDecoder = new StringIncrementalDecoder(); - this.incrementalDecoders.put(Void.class, stringIncrementalDecoder); - this.incrementalDecoders.put(Response.class, stringIncrementalDecoder); - this.incrementalDecoders.put(String.class, stringIncrementalDecoder); - for (IncrementalDecoder incrementalDecoder : incrementalDecoders) { - checkState(incrementalDecoder instanceof IncrementalDecoder.TextStream, - "Currently, only IncrementalDecoder.TextStream is supported. Found: ", incrementalDecoder); - Type type = resolveLastTypeParameter(incrementalDecoder.getClass(), IncrementalDecoder.class); - this.incrementalDecoders.put(type, IncrementalDecoder.TextStream.class.cast(incrementalDecoder)); - } } public Map apply(Target key) { @@ -227,27 +204,15 @@ public class ReflectiveFeign extends Feign { } else { buildTemplate = new BuildTemplateByResolvingArgs(md); } - if (md.incrementalType() != null) { - IncrementalDecoder.TextStream incrementalDecoder = incrementalDecoders.get(md.incrementalType()); - if (incrementalDecoder == null) { - incrementalDecoder = incrementalDecoders.get(Object.class); - } - if (incrementalDecoder == null) { - throw new IllegalStateException(format("%s needs @Provides(type = Set) IncrementalDecoder incrementalDecoder()" + - "{ // IncrementalDecoder.TextStream<%s> or IncrementalDecoder.TextStream}", md.configKey(), md.incrementalType())); - } - result.put(md.configKey(), factory.create(key, md, buildTemplate, options, incrementalDecoder, errorDecoder)); - } else { - Decoder.TextStream decoder = decoders.get(md.returnType()); - if (decoder == null) { - decoder = decoders.get(Object.class); - } - if (decoder == null) { - throw new IllegalStateException(format("%s needs @Provides(type = Set) Decoder decoder()" + - "{ // Decoder.TextStream<%s> or Decoder.TextStream}", md.configKey(), md.returnType())); - } - result.put(md.configKey(), factory.create(key, md, buildTemplate, options, decoder, errorDecoder)); + Decoder.TextStream decoder = decoders.get(md.returnType()); + if (decoder == null) { + decoder = decoders.get(Object.class); + } + if (decoder == null) { + throw new IllegalStateException(format("%s needs @Provides(type = Set) Decoder decoder()" + + "{ // Decoder.TextStream<%s> or Decoder.TextStream}", md.configKey(), md.returnType())); } + result.put(md.configKey(), factory.create(key, md, buildTemplate, options, decoder, errorDecoder)); } return result; } diff --git a/core/src/main/java/feign/Subscription.java b/core/src/main/java/feign/Subscription.java deleted file mode 100644 index 1b327f74..00000000 --- a/core/src/main/java/feign/Subscription.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feign; - -/** - * Subscription returns from {@link Observable#subscribe(Observer)} to allow - * unsubscribing. - */ -public interface Subscription { - - /** - * Stop receiving notifications on the {@link Observer} that was registered - * when this Subscription was received. - *
- * This allows unregistering an {@link Observer} before it has finished - * receiving all events (ie. before onCompleted is called). - */ - void unsubscribe(); -} diff --git a/core/src/main/java/feign/codec/IncrementalDecoder.java b/core/src/main/java/feign/codec/IncrementalDecoder.java deleted file mode 100644 index 00e11b4b..00000000 --- a/core/src/main/java/feign/codec/IncrementalDecoder.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feign.codec; - -import feign.FeignException; -import feign.Observer; - -import java.io.IOException; -import java.io.Reader; -import java.lang.reflect.Type; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Decodes an HTTP response incrementally into an {@link feign.Observer} - * via a series of {@link feign.Observer#onNext(Object) onNext} calls. - *

- * Invoked when {@link feign.Response#status()} is in the 2xx range. - * - * @param input that can be derived from {@link feign.Response.Body}. - * @param widest type an instance of this can decode. - */ -public interface IncrementalDecoder { - /** - * Implement this to decode a resource to an object into a single object. - * If you need to wrap exceptions, please do so via {@link feign.codec.DecodeException}. - *
- * Do not call {@link feign.Observer#onSuccess() onSuccess} or - * {@link feign.Observer#onFailure onFailure}. - * - * @param input if {@code Closeable}, no need to close this, as the caller - * manages resources. - * @param type type parameter of {@link feign.Observer#onNext}. - * @param observer call {@link feign.Observer#onNext onNext} - * each time an object of {@code type} is decoded - * from the response. - * @param subscribed false indicates the observer should no longer receive - * {@link Observer#onNext(Object)} calls. - * @throws java.io.IOException will be propagated safely to the caller. - * @throws feign.codec.DecodeException when decoding failed due to a checked exception - * besides IOException. - * @throws feign.FeignException when decoding succeeds, but conveys the operation - * failed. - */ - void decode(I input, Type type, Observer observer, AtomicBoolean subscribed) - throws IOException, DecodeException, FeignException; - - /** - * Used for text-based apis, follows - * {@link feign.codec.IncrementalDecoder#decode(Object, java.lang.reflect.Type, feign.Observer, AtomicBoolean)} - * semantics, applied to inputs of type {@link java.io.Reader}.
- * Ex.
- *

- *

-   * public class GsonDecoder implements Decoder.TextStream<Object> {
-   *   private final Gson gson;
-   *
-   *   public GsonDecoder(Gson gson) {
-   *     this.gson = gson;
-   *   }
-   *
-   *   @Override
-   *   public Object decode(Reader reader, Type type) throws IOException {
-   *     try {
-   *       return gson.fromJson(reader, type);
-   *     } catch (JsonIOException e) {
-   *       if (e.getCause() != null &&
-   *           e.getCause() instanceof IOException) {
-   *         throw IOException.class.cast(e.getCause());
-   *       }
-   *       throw e;
-   *     }
-   *   }
-   * }
-   * 
- *
-   * public class GsonIncrementalDecoder implements IncrementalDecoder {
-   *   private final Gson gson;
-   *
-   *   public GsonIncrementalDecoder(Gson gson) {
-   *     this.gson = gson;
-   *   }
-   *
-   *   @Override public void decode(Reader reader, Type type, Observer observer) throws Exception {
-   *     JsonReader jsonReader = new JsonReader(reader);
-   *     jsonReader.beginArray();
-   *     while (jsonReader.hasNext()) {
-   *       try {
-   *          observer.onNext(gson.fromJson(jsonReader, type));
-   *       } catch (JsonIOException e) {
-   *         if (e.getCause() != null &&
-   *             e.getCause() instanceof IOException) {
-   *           throw IOException.class.cast(e.getCause());
-   *         }
-   *         throw e;
-   *       }
-   *     }
-   *     jsonReader.endArray();
-   *   }
-   * }
-   * 
-   */
-  public interface TextStream extends IncrementalDecoder {
-  }
-}
diff --git a/core/src/main/java/feign/codec/StringIncrementalDecoder.java b/core/src/main/java/feign/codec/StringIncrementalDecoder.java
deleted file mode 100644
index a3fa77ba..00000000
--- a/core/src/main/java/feign/codec/StringIncrementalDecoder.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2013 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package feign.codec;
-
-import feign.Observer;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.lang.reflect.Type;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class StringIncrementalDecoder implements IncrementalDecoder.TextStream {
-  private static final StringDecoder STRING_DECODER = new StringDecoder();
-
-  @Override
-  public void decode(Reader reader, Type type, Observer observer, AtomicBoolean subscribed)
-      throws IOException {
-    observer.onNext(STRING_DECODER.decode(reader, type));
-  }
-}
diff --git a/core/src/test/java/feign/DefaultContractTest.java b/core/src/test/java/feign/DefaultContractTest.java
index aaaaf7eb..7dae4758 100644
--- a/core/src/test/java/feign/DefaultContractTest.java
+++ b/core/src/test/java/feign/DefaultContractTest.java
@@ -21,7 +21,6 @@ import com.google.gson.reflect.TypeToken;
 import org.testng.annotations.Test;
 
 import javax.inject.Named;
-import java.lang.reflect.Type;
 import java.net.URI;
 import java.util.List;
 
@@ -238,40 +237,4 @@ public class DefaultContractTest {
     assertEquals(md.template().headers().get("Auth-Token"), ImmutableSet.of("{Auth-Token}"));
     assertEquals(md.indexToName().get(0), ImmutableSet.of("Auth-Token"));
   }
-
-  interface WithObservable {
-    @RequestLine("GET /") Observable> valid();
-
-    @RequestLine("GET /") Observable> wildcardExtends();
-
-    @RequestLine("GET /") ParameterizedObservable> subtype();
-
-    @RequestLine("GET /") Response returnType(Observable> one);
-
-    @RequestLine("GET /") Observable> alsoObserver(Observer> observer);
-  }
-
-  interface ParameterizedObservable> extends Observable {
-  }
-
-  static final List listString = null;
-
-  @Test public void methodCanHaveObservableReturn() throws Exception {
-    contract.parseAndValidatateMetadata(WithObservable.class.getDeclaredMethod("valid"));
-  }
-
-  @Test public void methodMetadataReturnTypeOnObservableMethodIsItsTypeParameter() throws Exception {
-    Type listStringType = getClass().getDeclaredField("listString").getGenericType();
-    MethodMetadata md = contract.parseAndValidatateMetadata(WithObservable.class.getDeclaredMethod("valid"));
-    assertEquals(md.incrementalType(), listStringType);
-    md = contract.parseAndValidatateMetadata(WithObservable.class.getDeclaredMethod("wildcardExtends"));
-    assertEquals(md.incrementalType(), listStringType);
-    md = contract.parseAndValidatateMetadata(WithObservable.class.getDeclaredMethod("subtype"));
-    assertEquals(md.incrementalType(), listStringType);
-  }
-
-  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = "Please return Observer as opposed to passing an Observable arg.*")
-  public void noObserverArgs() throws Exception {
-    contract.parseAndValidatateMetadata(WithObservable.class.getDeclaredMethod("alsoObserver", Observer.class));
-  }
 }
diff --git a/core/src/test/java/feign/FeignTest.java b/core/src/test/java/feign/FeignTest.java
index e604d5dc..2c2b7b90 100644
--- a/core/src/test/java/feign/FeignTest.java
+++ b/core/src/test/java/feign/FeignTest.java
@@ -22,7 +22,6 @@ import com.google.mockwebserver.MockResponse;
 import com.google.mockwebserver.MockWebServer;
 import com.google.mockwebserver.RecordedRequest;
 import com.google.mockwebserver.SocketPolicy;
-import dagger.Lazy;
 import dagger.Module;
 import dagger.Provides;
 import feign.codec.Decoder;
@@ -42,11 +41,7 @@ import java.net.URI;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import static dagger.Provides.Type.SET;
 import static feign.Util.UTF_8;
@@ -54,27 +49,12 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
 
 @Test
 // unbound wildcards are not currently injectable in dagger.
 @SuppressWarnings("rawtypes")
 public class FeignTest {
 
-  @Test public void closeShutsdownExecutorService() throws IOException, InterruptedException {
-    final ExecutorService service = Executors.newCachedThreadPool();
-    new Feign(new Lazy() {
-      @Override public Executor get() {
-        return service;
-      }
-    }) {
-      @Override public  T newInstance(Target target) {
-        return null;
-      }
-    }.close();
-    assertTrue(service.isShutdown());
-  }
-
   interface TestInterface {
     @RequestLine("POST /") String post();
 
@@ -94,12 +74,6 @@ public class FeignTest {
 
     @RequestLine("GET /?1={1}&2={2}") Response queryParams(@Named("1") String one, @Named("2") Iterable twos);
 
-    @RequestLine("POST /") Observable observableVoid();
-
-    @RequestLine("POST /") Observable observableString();
-
-    @RequestLine("POST /") Observable observableResponse();
-
     @dagger.Module(library = true)
     static class Module {
       @Provides(type = SET) Encoder defaultEncoder() {
@@ -140,76 +114,6 @@ public class FeignTest {
     @RequestLine("POST /") String post();
   }
 
-  @Test
-  public void observableVoid() throws IOException, InterruptedException {
-    final MockWebServer server = new MockWebServer();
-    server.enqueue(new MockResponse().setBody("foo"));
-    server.play();
-
-    try {
-      TestInterface api = Feign.create(TestInterface.class, "http://localhost:" + server.getPort(),
-          new TestInterface.Module(), new RunSynchronous());
-
-      final AtomicBoolean success = new AtomicBoolean();
-
-      Observer observer = new Observer() {
-
-        @Override public void onNext(Void element) {
-          fail("on next isn't valid for void");
-        }
-
-        @Override public void onSuccess() {
-          success.set(true);
-        }
-
-        @Override public void onFailure(Throwable cause) {
-          fail(cause.getMessage());
-        }
-      };
-      api.observableVoid().subscribe(observer);
-
-      assertTrue(success.get());
-      assertEquals(server.getRequestCount(), 1);
-    } finally {
-      server.shutdown();
-    }
-  }
-
-  @Test
-  public void observableResponse() throws IOException, InterruptedException {
-    final MockWebServer server = new MockWebServer();
-    server.enqueue(new MockResponse().setBody("foo"));
-    server.play();
-
-    try {
-      TestInterface api = Feign.create(TestInterface.class, "http://localhost:" + server.getPort(),
-          new TestInterface.Module(), new RunSynchronous());
-
-      final AtomicBoolean success = new AtomicBoolean();
-
-      Observer observer = new Observer() {
-
-        @Override public void onNext(Response element) {
-          assertEquals(element.status(), 200);
-        }
-
-        @Override public void onSuccess() {
-          success.set(true);
-        }
-
-        @Override public void onFailure(Throwable cause) {
-          fail(cause.getMessage());
-        }
-      };
-      api.observableResponse().subscribe(observer);
-
-      assertTrue(success.get());
-      assertEquals(server.getRequestCount(), 1);
-    } finally {
-      server.shutdown();
-    }
-  }
-
   @Module(library = true, overrides = true)
   static class RunSynchronous {
     @Provides @Singleton @Named("http") Executor httpExecutor() {
@@ -221,79 +125,6 @@ public class FeignTest {
     }
   }
 
-  @Test
-  public void incrementString() throws IOException, InterruptedException {
-    final MockWebServer server = new MockWebServer();
-    server.enqueue(new MockResponse().setBody("foo"));
-    server.play();
-
-    try {
-      TestInterface api = Feign.create(TestInterface.class, "http://localhost:" + server.getPort(),
-          new TestInterface.Module(), new RunSynchronous());
-
-      final AtomicBoolean success = new AtomicBoolean();
-
-      Observer observer = new Observer() {
-
-        @Override public void onNext(String element) {
-          assertEquals(element, "foo");
-        }
-
-        @Override public void onSuccess() {
-          success.set(true);
-        }
-
-        @Override public void onFailure(Throwable cause) {
-          fail(cause.getMessage());
-        }
-      };
-      api.observableString().subscribe(observer);
-
-      assertTrue(success.get());
-      assertEquals(server.getRequestCount(), 1);
-    } finally {
-      server.shutdown();
-    }
-  }
-
-  @Test
-  public void multipleObservers() throws IOException, InterruptedException {
-    final MockWebServer server = new MockWebServer();
-    server.enqueue(new MockResponse().setBody("foo"));
-    server.enqueue(new MockResponse().setBody("foo"));
-    server.play();
-
-    try {
-      TestInterface api = Feign.create(TestInterface.class, "http://localhost:" + server.getPort(), new TestInterface.Module());
-
-      final CountDownLatch latch = new CountDownLatch(2);
-
-      Observer observer = new Observer() {
-
-        @Override public void onNext(String element) {
-          assertEquals(element, "foo");
-        }
-
-        @Override public void onSuccess() {
-          latch.countDown();
-        }
-
-        @Override public void onFailure(Throwable cause) {
-          fail(cause.getMessage());
-        }
-      };
-
-      Observable observable = api.observableString();
-      observable.subscribe(observer);
-      observable.subscribe(observer);
-      latch.await();
-
-      assertEquals(server.getRequestCount(), 2);
-    } finally {
-      server.shutdown();
-    }
-  }
-
   @Test
   public void postTemplateParamsResolve() throws IOException, InterruptedException {
     final MockWebServer server = new MockWebServer();
diff --git a/core/src/test/java/feign/examples/GitHubExample.java b/core/src/test/java/feign/examples/GitHubExample.java
index 428f9685..aaac3752 100644
--- a/core/src/test/java/feign/examples/GitHubExample.java
+++ b/core/src/test/java/feign/examples/GitHubExample.java
@@ -22,11 +22,8 @@ import dagger.Module;
 import dagger.Provides;
 import feign.Feign;
 import feign.Logger;
-import feign.Observable;
-import feign.Observer;
 import feign.RequestLine;
 import feign.codec.Decoder;
-import feign.codec.IncrementalDecoder;
 
 import javax.inject.Inject;
 import javax.inject.Named;
@@ -35,8 +32,6 @@ import java.io.IOException;
 import java.io.Reader;
 import java.lang.reflect.Type;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import static dagger.Provides.Type.SET;
 
@@ -48,9 +43,6 @@ public class GitHubExample {
   interface GitHub {
     @RequestLine("GET /repos/{owner}/{repo}/contributors")
     List contributors(@Named("owner") String owner, @Named("repo") String repo);
-
-    @RequestLine("GET /repos/{owner}/{repo}/contributors")
-    Observable observable(@Named("owner") String owner, @Named("repo") String repo);
   }
 
   static class Contributor {
@@ -66,20 +58,6 @@ public class GitHubExample {
     for (Contributor contributor : contributors) {
       System.out.println(contributor.login + " (" + contributor.contributions + ")");
     }
-
-    System.out.println("Let's treat our contributors as an observable.");
-    Observable observable = github.observable("netflix", "feign");
-
-    CountDownLatch latch = new CountDownLatch(2);
-
-    System.out.println("Let's add 2 subscribers.");
-    observable.subscribe(new ContributorObserver(latch));
-    observable.subscribe(new ContributorObserver(latch));
-
-    // wait for the task to complete.
-    latch.await();
-
-    System.exit(0);
   }
 
   @Module(overrides = true, library = true, includes = GsonModule.class)
@@ -107,13 +85,9 @@ public class GitHubExample {
     @Provides(type = SET) Decoder decoder(GsonDecoder gsonDecoder) {
       return gsonDecoder;
     }
-
-    @Provides(type = SET) IncrementalDecoder incrementalDecoder(GsonDecoder gsonDecoder) {
-      return gsonDecoder;
-    }
   }
 
-  static class GsonDecoder implements Decoder.TextStream, IncrementalDecoder.TextStream {
+  static class GsonDecoder implements Decoder.TextStream {
     private final Gson gson;
 
     @Inject GsonDecoder(Gson gson) {
@@ -124,15 +98,6 @@ public class GitHubExample {
       return fromJson(new JsonReader(reader), type);
     }
 
-    @Override
-    public void decode(Reader reader, Type type, Observer observer, AtomicBoolean subscribed) throws IOException {
-      JsonReader jsonReader = new JsonReader(reader);
-      jsonReader.beginArray();
-      while (jsonReader.hasNext() && subscribed.get()) {
-        observer.onNext(fromJson(jsonReader, type));
-      }
-    }
-
     private Object fromJson(JsonReader jsonReader, Type type) throws IOException {
       try {
         return gson.fromJson(jsonReader, type);
@@ -144,29 +109,4 @@ public class GitHubExample {
       }
     }
   }
-
-  static class ContributorObserver implements Observer {
-
-    private final CountDownLatch latch;
-    public int count;
-
-    public ContributorObserver(CountDownLatch latch) {
-      this.latch = latch;
-    }
-
-    // parsed directly from the text stream without an intermediate collection.
-    @Override public void onNext(Contributor contributor) {
-      count++;
-    }
-
-    @Override public void onSuccess() {
-      System.out.println("found " + count + " contributors");
-      latch.countDown();
-    }
-
-    @Override public void onFailure(Throwable cause) {
-      cause.printStackTrace();
-      latch.countDown();
-    }
-  }
 }
diff --git a/gson/src/main/java/feign/gson/GsonModule.java b/gson/src/main/java/feign/gson/GsonModule.java
index aab32687..52fd8077 100644
--- a/gson/src/main/java/feign/gson/GsonModule.java
+++ b/gson/src/main/java/feign/gson/GsonModule.java
@@ -26,11 +26,9 @@ import com.google.gson.reflect.TypeToken;
 import com.google.gson.stream.JsonReader;
 import com.google.gson.stream.JsonWriter;
 import dagger.Provides;
-import feign.Observer;
 import feign.codec.Decoder;
 import feign.codec.EncodeException;
 import feign.codec.Encoder;
-import feign.codec.IncrementalDecoder;
 
 import javax.inject.Inject;
 import javax.inject.Singleton;
@@ -39,7 +37,6 @@ import java.io.Reader;
 import java.lang.reflect.Type;
 import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import static dagger.Provides.Type.SET;
 
@@ -54,11 +51,7 @@ public final class GsonModule {
     return codec;
   }
 
-  @Provides(type = SET) IncrementalDecoder incrementalDecoder(GsonCodec codec) {
-    return codec;
-  }
-
-  static class GsonCodec implements Encoder.Text, Decoder.TextStream, IncrementalDecoder.TextStream {
+  static class GsonCodec implements Encoder.Text, Decoder.TextStream {
     private final Gson gson;
 
     @Inject GsonCodec(Gson gson) {
@@ -73,15 +66,6 @@ public final class GsonModule {
       return fromJson(new JsonReader(reader), type);
     }
 
-    @Override
-    public void decode(Reader reader, Type type, Observer observer, AtomicBoolean subscribed) throws IOException {
-      JsonReader jsonReader = new JsonReader(reader);
-      jsonReader.beginArray();
-      while (subscribed.get() && jsonReader.hasNext()) {
-        observer.onNext(fromJson(jsonReader, type));
-      }
-    }
-
     private Object fromJson(JsonReader jsonReader, Type type) throws IOException {
       try {
         return gson.fromJson(jsonReader, type);
diff --git a/gson/src/test/java/feign/gson/GsonModuleTest.java b/gson/src/test/java/feign/gson/GsonModuleTest.java
index 983c58ff..bde0f8d7 100644
--- a/gson/src/test/java/feign/gson/GsonModuleTest.java
+++ b/gson/src/test/java/feign/gson/GsonModuleTest.java
@@ -18,10 +18,8 @@ package feign.gson;
 import com.google.gson.reflect.TypeToken;
 import dagger.Module;
 import dagger.ObjectGraph;
-import feign.Observer;
 import feign.codec.Decoder;
 import feign.codec.Encoder;
-import feign.codec.IncrementalDecoder;
 import org.testng.annotations.Test;
 
 import javax.inject.Inject;
@@ -32,11 +30,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
 
 @Test
 public class GsonModuleTest {
@@ -44,7 +39,6 @@ public class GsonModuleTest {
   static class EncodersAndDecoders {
     @Inject Set encoders;
     @Inject Set decoders;
-    @Inject Set incrementalDecoders;
   }
 
   @Test public void providesEncoderDecoderAndIncrementalDecoder() throws Exception {
@@ -55,8 +49,6 @@ public class GsonModuleTest {
     assertEquals(bindings.encoders.iterator().next().getClass(), GsonModule.GsonCodec.class);
     assertEquals(bindings.decoders.size(), 1);
     assertEquals(bindings.decoders.iterator().next().getClass(), GsonModule.GsonCodec.class);
-    assertEquals(bindings.incrementalDecoders.size(), 1);
-    assertEquals(bindings.incrementalDecoders.iterator().next().getClass(), GsonModule.GsonCodec.class);
   }
 
   @Module(includes = GsonModule.class, library = true, injects = Encoders.class)
@@ -132,44 +124,6 @@ public class GsonModuleTest {
         }.getType()), zones);
   }
 
-  @Module(includes = GsonModule.class, library = true, injects = IncrementalDecoders.class)
-  static class IncrementalDecoders {
-    @Inject Set decoders;
-  }
-
-  @Test public void decodesIncrementally() throws Exception {
-    IncrementalDecoders bindings = new IncrementalDecoders();
-    ObjectGraph.create(bindings).inject(bindings);
-
-    final List zones = new LinkedList();
-    zones.add(new Zone("denominator.io."));
-    zones.add(new Zone("denominator.io.", "ABCD"));
-
-    final AtomicInteger index = new AtomicInteger(0);
-
-    Observer zoneCallback = new Observer() {
-
-      @Override public void onNext(Zone element) {
-        assertEquals(element, zones.get(index.getAndIncrement()));
-      }
-
-      @Override public void onSuccess() {
-        // decoder shouldn't call onSuccess
-        fail();
-      }
-
-      @Override public void onFailure(Throwable cause) {
-        // decoder shouldn't call onFailure
-        fail();
-      }
-    };
-
-    IncrementalDecoder.TextStream.class.cast(bindings.decoders.iterator().next())
-        .decode(new StringReader(zonesJson), Zone.class, zoneCallback, new AtomicBoolean(true));
-
-    assertEquals(index.get(), 2);
-  }
-
   private String zonesJson = ""//
       + "[\n"//
       + "  {\n"//
diff --git a/jaxrs/src/test/java/feign/jaxrs/JAXRSContractTest.java b/jaxrs/src/test/java/feign/jaxrs/JAXRSContractTest.java
index 1669e369..7a573e00 100644
--- a/jaxrs/src/test/java/feign/jaxrs/JAXRSContractTest.java
+++ b/jaxrs/src/test/java/feign/jaxrs/JAXRSContractTest.java
@@ -19,8 +19,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.gson.reflect.TypeToken;
 import feign.MethodMetadata;
-import feign.Observable;
-import feign.Observer;
 import feign.Response;
 import org.testng.annotations.Test;
 
@@ -40,7 +38,6 @@ import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
-import java.lang.reflect.Type;
 import java.net.URI;
 import java.util.List;
 
@@ -345,38 +342,4 @@ public class JAXRSContractTest {
   public void emptyHeaderParam() throws Exception {
     contract.parseAndValidatateMetadata(HeaderParams.class.getDeclaredMethod("emptyHeaderParam", String.class));
   }
-
-  interface WithObservable {
-    @GET @Path("/") Observable> valid();
-
-    @GET @Path("/") Observable> wildcardExtends();
-
-    @GET @Path("/") ParameterizedObservable> subtype();
-
-    @GET @Path("/") Observable> alsoObserver(Observer> observer);
-  }
-
-  interface ParameterizedObservable> extends Observable {
-  }
-
-  static final List listString = null;
-
-  @Test public void methodCanHaveObservableReturn() throws Exception {
-    contract.parseAndValidatateMetadata(WithObservable.class.getDeclaredMethod("valid"));
-  }
-
-  @Test public void methodMetadataReturnTypeOnObservableMethodIsItsTypeParameter() throws Exception {
-    Type listStringType = getClass().getDeclaredField("listString").getGenericType();
-    MethodMetadata md = contract.parseAndValidatateMetadata(WithObservable.class.getDeclaredMethod("valid"));
-    assertEquals(md.incrementalType(), listStringType);
-    md = contract.parseAndValidatateMetadata(WithObservable.class.getDeclaredMethod("wildcardExtends"));
-    assertEquals(md.incrementalType(), listStringType);
-    md = contract.parseAndValidatateMetadata(WithObservable.class.getDeclaredMethod("subtype"));
-    assertEquals(md.incrementalType(), listStringType);
-  }
-
-  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = "Please return Observer as opposed to passing an Observable arg.*")
-  public void noObserverArgs() throws Exception {
-    contract.parseAndValidatateMetadata(WithObservable.class.getDeclaredMethod("alsoObserver", Observer.class));
-  }
 }
diff --git a/jaxrs/src/test/java/feign/jaxrs/examples/GitHubExample.java b/jaxrs/src/test/java/feign/jaxrs/examples/GitHubExample.java
index 80289f11..5e994244 100644
--- a/jaxrs/src/test/java/feign/jaxrs/examples/GitHubExample.java
+++ b/jaxrs/src/test/java/feign/jaxrs/examples/GitHubExample.java
@@ -19,17 +19,13 @@ import dagger.Module;
 import dagger.Provides;
 import feign.Feign;
 import feign.Logger;
-import feign.Observable;
-import feign.Observer;
 import feign.gson.GsonModule;
 import feign.jaxrs.JAXRSModule;
 
-import javax.inject.Named;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
 
 /**
  * adapted from {@code com.example.retrofit.GitHubClient}
@@ -39,9 +35,6 @@ public class GitHubExample {
   interface GitHub {
     @GET @Path("/repos/{owner}/{repo}/contributors")
     List contributors(@PathParam("owner") String owner, @PathParam("repo") String repo);
-
-    @GET @Path("/repos/{owner}/{repo}/contributors")
-    Observable observable(@PathParam("owner") String owner, @PathParam("repo") String repo);
   }
 
   static class Contributor {
@@ -57,20 +50,6 @@ public class GitHubExample {
     for (Contributor contributor : contributors) {
       System.out.println(contributor.login + " (" + contributor.contributions + ")");
     }
-
-    System.out.println("Let's treat our contributors as an observable.");
-    Observable observable = github.observable("netflix", "feign");
-
-    CountDownLatch latch = new CountDownLatch(2);
-
-    System.out.println("Let's add 2 subscribers.");
-    observable.subscribe(new ContributorObserver(latch));
-    observable.subscribe(new ContributorObserver(latch));
-
-    // wait for the task to complete.
-    latch.await();
-
-    System.exit(0);
   }
 
   /**
@@ -87,29 +66,4 @@ public class GitHubExample {
       return new Logger.ErrorLogger();
     }
   }
-
-  static class ContributorObserver implements Observer {
-
-    private final CountDownLatch latch;
-    public int count;
-
-    public ContributorObserver(CountDownLatch latch) {
-      this.latch = latch;
-    }
-
-    // parsed directly from the text stream without an intermediate collection.
-    @Override public void onNext(Contributor contributor) {
-      count++;
-    }
-
-    @Override public void onSuccess() {
-      System.out.println("found " + count + " contributors");
-      latch.countDown();
-    }
-
-    @Override public void onFailure(Throwable cause) {
-      cause.printStackTrace();
-      latch.countDown();
-    }
-  }
 }