Browse Source

Updates for checkstyle.

pull/921/head
Spencer Gibb 6 years ago
parent
commit
fff66902df
No known key found for this signature in database
GPG Key ID: 7788A47380690861
  1. 13
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/autoconfigure/GatewayRSocketAutoConfiguration.java
  2. 35
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/autoconfigure/GatewayRSocketProperties.java
  3. 30
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/filter/AbstractFilterChain.java
  4. 2
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/filter/AbstractRSocketExchange.java
  5. 2
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/filter/FilterChain.java
  6. 3
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/filter/RSocketExchange.java
  7. 24
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/filter/RSocketFilter.java
  8. 98
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/metrics/MicrometerResponderRSocket.java
  9. 8
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/metrics/MicrometerResponderRSocketInterceptor.java
  10. 22
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/registry/LoadBalancedRSocket.java
  11. 20
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/registry/Registry.java
  12. 15
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/registry/RegistryRoutes.java
  13. 9
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/registry/RegistrySocketAcceptorFilter.java
  14. 109
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/route/Route.java
  15. 36
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/route/Routes.java
  16. 20
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/server/GatewayExchange.java
  17. 6
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/server/GatewayFilter.java
  18. 15
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/server/GatewayFilterChain.java
  19. 2
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/server/GatewayPredicate.java
  20. 125
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/server/GatewayRSocket.java
  21. 46
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/server/GatewayRSocketServer.java
  22. 82
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/server/PendingRequestRSocket.java
  23. 43
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/socketacceptor/GatewaySocketAcceptor.java
  24. 5
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/socketacceptor/SocketAcceptorExchange.java
  25. 6
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/socketacceptor/SocketAcceptorFilter.java
  26. 17
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/socketacceptor/SocketAcceptorFilterChain.java
  27. 2
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/socketacceptor/SocketAcceptorPredicate.java
  28. 23
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/socketacceptor/SocketAcceptorPredicateFilter.java
  29. 1
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/support/AsyncPredicate.java
  30. 35
      spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/support/Metadata.java
  31. 18
      spring-cloud-gateway-rsocket/src/test/java/org/springframework/cloud/gateway/rsocket/autoconfigure/GatewayRSocketAutoConfigurationTests.java
  32. 18
      spring-cloud-gateway-rsocket/src/test/java/org/springframework/cloud/gateway/rsocket/server/GatewayRSocketIntegrationTests.java
  33. 133
      spring-cloud-gateway-rsocket/src/test/java/org/springframework/cloud/gateway/rsocket/server/GatewayRSocketTests.java
  34. 89
      spring-cloud-gateway-rsocket/src/test/java/org/springframework/cloud/gateway/rsocket/socketacceptor/GatewaySocketAcceptorTests.java
  35. 37
      spring-cloud-gateway-rsocket/src/test/java/org/springframework/cloud/gateway/rsocket/socketacceptor/SocketAcceptorPredicateFilterTests.java
  36. 13
      spring-cloud-gateway-rsocket/src/test/java/org/springframework/cloud/gateway/rsocket/support/MetadataTests.java
  37. 142
      spring-cloud-gateway-rsocket/src/test/java/org/springframework/cloud/gateway/rsocket/test/PingPongApp.java
  38. 14
      spring-cloud-gateway-rsocket/src/test/java/org/springframework/cloud/gateway/rsocket/test/SocketAcceptorFilterOrderTests.java
  39. 1
      src/checkstyle/checkstyle-suppressions.xml

13
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/autoconfigure/GatewayRSocketAutoConfiguration.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.autoconfigure;
@ -53,7 +52,7 @@ public class GatewayRSocketAutoConfiguration { @@ -53,7 +52,7 @@ public class GatewayRSocketAutoConfiguration {
return new Registry();
}
//TODO: CompositeRoutes
// TODO: CompositeRoutes
@Bean
public RegistryRoutes registryRoutes(Registry registry) {
RegistryRoutes registryRoutes = new RegistryRoutes();
@ -76,13 +75,15 @@ public class GatewayRSocketAutoConfiguration { @@ -76,13 +75,15 @@ public class GatewayRSocketAutoConfiguration {
public GatewayRSocketProperties gatewayRSocketProperties(Environment env) {
GatewayRSocketProperties properties = new GatewayRSocketProperties();
if (env.containsProperty("spring.application.name")) {
properties.setId(env.getProperty("spring.application.name")); // set default from env
properties.setId(env.getProperty("spring.application.name")); // set default
// from env
}
return properties;
}
@Bean
public SocketAcceptorPredicateFilter socketAcceptorPredicateFilter(List<SocketAcceptorPredicate> predicates) {
public SocketAcceptorPredicateFilter socketAcceptorPredicateFilter(
List<SocketAcceptorPredicate> predicates) {
return new SocketAcceptorPredicateFilter(predicates);
}
@ -90,7 +91,8 @@ public class GatewayRSocketAutoConfiguration { @@ -90,7 +91,8 @@ public class GatewayRSocketAutoConfiguration {
public GatewaySocketAcceptor socketAcceptor(GatewayRSocket.Factory rsocketFactory,
List<SocketAcceptorFilter> filters, MeterRegistry meterRegistry,
GatewayRSocketProperties properties) {
return new GatewaySocketAcceptor(rsocketFactory, filters, meterRegistry, properties);
return new GatewaySocketAcceptor(rsocketFactory, filters, meterRegistry,
properties);
}
@Bean
@ -98,4 +100,5 @@ public class GatewayRSocketAutoConfiguration { @@ -98,4 +100,5 @@ public class GatewayRSocketAutoConfiguration {
GatewayRSocketProperties properties, MeterRegistry meterRegistry) {
return new GatewayRSocketServer(properties, socketAcceptor, meterRegistry);
}
}

35
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/autoconfigure/GatewayRSocketProperties.java

@ -12,17 +12,16 @@ @@ -12,17 +12,16 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.autoconfigure;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.core.style.ToStringCreator;
import java.util.ArrayList;
import java.util.List;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.core.style.ToStringCreator;
@ConfigurationProperties("spring.cloud.gateway.rsocket")
public class GatewayRSocketProperties {
@ -57,11 +56,8 @@ public class GatewayRSocketProperties { @@ -57,11 +56,8 @@ public class GatewayRSocketProperties {
@Override
public String toString() {
return new ToStringCreator(this)
.append("enabled", enabled)
.append("id", id)
.append("server", server)
.toString();
return new ToStringCreator(this).append("enabled", enabled).append("id", id)
.append("server", server).toString();
}
/**
@ -69,8 +65,15 @@ public class GatewayRSocketProperties { @@ -69,8 +65,15 @@ public class GatewayRSocketProperties {
*/
public static class Server {
//TODO: other transports
public enum TransportType { TCP }
// TODO: other transports
public enum TransportType {
/**
* TCP Transport type.
*/
TCP
}
/**
* Tag names and values to be supplied to Micrometer Interceptor.
@ -80,7 +83,7 @@ public class GatewayRSocketProperties { @@ -80,7 +83,7 @@ public class GatewayRSocketProperties {
/**
* Server port.
*/
private int port = 7002; //TODO: different default port?
private int port = 7002; // TODO: different default port?
public Server() {
micrometerTags.add("component");
@ -118,12 +121,10 @@ public class GatewayRSocketProperties { @@ -118,12 +121,10 @@ public class GatewayRSocketProperties {
@Override
public String toString() {
return new ToStringCreator(this)
.append("micrometerTags", micrometerTags)
.append("port", port)
.append("transport", transport)
.toString();
return new ToStringCreator(this).append("micrometerTags", micrometerTags)
.append("port", port).append("transport", transport).toString();
}
}
}

30
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/filter/AbstractFilterChain.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.filter;
@ -31,14 +30,17 @@ import org.springframework.lang.Nullable; @@ -31,14 +30,17 @@ import org.springframework.lang.Nullable;
/**
* Default implementation of {@link FilterChain}.
*
* <p>Each instance of this class represents one link in the chain. The public
* constructor {@link #AbstractFilterChain(List)}
* initializes the full chain and represents its first link.
* <p>
* Each instance of this class represents one link in the chain. The public constructor
* {@link #AbstractFilterChain(List)} initializes the full chain and represents its first
* link.
*
* <p>This class is immutable and thread-safe. It can be created once and
* re-used to handle request concurrently.
* <p>
* This class is immutable and thread-safe. It can be created once and re-used to handle
* request concurrently.
*
* Copied from org.springframework.web.server.handler.AbstractFilterChain
*
* @since 5.0
*/
public abstract class AbstractFilterChain<F extends RSocketFilter, E extends RSocketExchange, FC extends AbstractFilterChain>
@ -54,7 +56,6 @@ public abstract class AbstractFilterChain<F extends RSocketFilter, E extends RSo @@ -54,7 +56,6 @@ public abstract class AbstractFilterChain<F extends RSocketFilter, E extends RSo
@Nullable
protected final FC next;
/**
* Public constructor with the list of filters and the target handler to use.
* @param filters the filters ahead of the handler
@ -63,8 +64,8 @@ public abstract class AbstractFilterChain<F extends RSocketFilter, E extends RSo @@ -63,8 +64,8 @@ public abstract class AbstractFilterChain<F extends RSocketFilter, E extends RSo
protected AbstractFilterChain(List<F> filters) {
this.allFilters = Collections.unmodifiableList(filters);
FC chain = initChain(filters);
this.currentFilter = (F)chain.currentFilter;
this.next = (FC)chain.next;
this.currentFilter = (F) chain.currentFilter;
this.next = (FC) chain.next;
}
private FC initChain(List<F> filters) {
@ -80,7 +81,7 @@ public abstract class AbstractFilterChain<F extends RSocketFilter, E extends RSo @@ -80,7 +81,7 @@ public abstract class AbstractFilterChain<F extends RSocketFilter, E extends RSo
* Private constructor to represent one link in the chain.
*/
protected AbstractFilterChain(List<F> allFilters, @Nullable F currentFilter,
@Nullable FC next) {
@Nullable FC next) {
this.allFilters = allFilters;
this.currentFilter = currentFilter;
@ -91,7 +92,7 @@ public abstract class AbstractFilterChain<F extends RSocketFilter, E extends RSo @@ -91,7 +92,7 @@ public abstract class AbstractFilterChain<F extends RSocketFilter, E extends RSo
* Private constructor to represent one link in the chain.
*/
protected abstract FC create(List<F> allFilters, @Nullable F currentFilter,
@Nullable FC next);
@Nullable FC next);
public List<F> getFilters() {
return this.allFilters;
@ -100,10 +101,8 @@ public abstract class AbstractFilterChain<F extends RSocketFilter, E extends RSo @@ -100,10 +101,8 @@ public abstract class AbstractFilterChain<F extends RSocketFilter, E extends RSo
@Override
@SuppressWarnings("unchecked")
public Mono<Success> filter(E exchange) {
return Mono.defer(() ->
this.currentFilter != null && this.next != null ?
this.currentFilter.filter(exchange, this.next) :
getMonoSuccess());
return Mono.defer(() -> this.currentFilter != null && this.next != null
? this.currentFilter.filter(exchange, this.next) : getMonoSuccess());
}
private Mono<Success> getMonoSuccess() {
@ -114,4 +113,5 @@ public abstract class AbstractFilterChain<F extends RSocketFilter, E extends RSo @@ -114,4 +113,5 @@ public abstract class AbstractFilterChain<F extends RSocketFilter, E extends RSo
}
private static final Mono<Success> MONO_SUCCESS = Mono.just(Success.INSTANCE);
}

2
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/filter/AbstractRSocketExchange.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.filter;
@ -28,4 +27,5 @@ public abstract class AbstractRSocketExchange implements RSocketExchange { @@ -28,4 +27,5 @@ public abstract class AbstractRSocketExchange implements RSocketExchange {
public Map<String, Object> getAttributes() {
return this.attributes;
}
}

2
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/filter/FilterChain.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.filter;
@ -34,4 +33,5 @@ public interface FilterChain<E extends RSocketExchange> { @@ -34,4 +33,5 @@ public interface FilterChain<E extends RSocketExchange> {
* @return {@code Mono<Void>} to indicate when request handling is complete
*/
Mono<Success> filter(E exchange);
}

3
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/filter/RSocketExchange.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.filter;
@ -26,6 +25,7 @@ public interface RSocketExchange { @@ -26,6 +25,7 @@ public interface RSocketExchange {
/**
* Return a mutable map of request attributes for the current exchange.
* @return current attributes.
*/
Map<String, Object> getAttributes();
@ -66,4 +66,5 @@ public interface RSocketExchange { @@ -66,4 +66,5 @@ public interface RSocketExchange {
default <T> T getAttributeOrDefault(String name, T defaultValue) {
return (T) getAttributes().getOrDefault(name, defaultValue);
}
}

24
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/filter/RSocketFilter.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.filter;
@ -20,9 +19,9 @@ package org.springframework.cloud.gateway.rsocket.filter; @@ -20,9 +19,9 @@ package org.springframework.cloud.gateway.rsocket.filter;
import reactor.core.publisher.Mono;
/**
* Contract for interception-style, chained processing of Web requests that may
* be used to implement cross-cutting, application-agnostic requirements such
* as security, timeouts, and others.
* Contract for interception-style, chained processing of Web requests that may be used to
* implement cross-cutting, application-agnostic requirements such as security, timeouts,
* and others.
*
* Copied from WebFilter
*
@ -31,18 +30,23 @@ import reactor.core.publisher.Mono; @@ -31,18 +30,23 @@ import reactor.core.publisher.Mono;
public interface RSocketFilter<E extends RSocketExchange, FC extends FilterChain<E>> {
/**
* Enum to signal successful end of chain reached without the end being empty,
* i.e. Mono&lt;Void&gt; via Mono.empty(). This is because at the end of the chain
* an actual value needs to be returned. We can map success, but not empty.
* Enum to signal successful end of chain reached without the end being empty, i.e.
* Mono&lt;Void&gt; via Mono.empty(). This is because at the end of the chain an
* actual value needs to be returned. We can map success, but not empty.
*/
enum Success { INSTANCE } // should never have more than one value
enum Success {
INSTANCE
} // should never have more than one value
/**
* Process the Web request and (optionally) delegate to the next
* {@code RSocketFilter} through the given {@link FilterChain}.
* Process the Web request and (optionally) delegate to the next {@code RSocketFilter}
* through the given {@link FilterChain}.
* @param exchange the current RSocket exchange
* @param chain provides a way to delegate to the next filter
* @return {@code Mono<Success>} to indicate when request processing is complete.
*/
Mono<Success> filter(E exchange, FC chain);
}

98
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/metrics/MicrometerResponderRSocket.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.metrics;
@ -56,22 +55,27 @@ public class MicrometerResponderRSocket implements ResponderRSocket { @@ -56,22 +55,27 @@ public class MicrometerResponderRSocket implements ResponderRSocket {
/**
* Creates a new {@link RSocket}.
*
* @param delegate the {@link RSocket} to delegate to
* @param meterRegistry the {@link MeterRegistry} to use
* @param tags additional tags to attach to {@link Meter}s
* @throws IllegalArgumentException if {@code delegate} or {@code meterRegistry} is {@code null}
* @throws IllegalArgumentException if {@code delegate} or {@code meterRegistry} is
* {@code null}
*/
public MicrometerResponderRSocket(RSocket delegate, MeterRegistry meterRegistry, Tag... tags) {
public MicrometerResponderRSocket(RSocket delegate, MeterRegistry meterRegistry,
Tag... tags) {
Assert.notNull(delegate, "delegate must not be null");
Assert.notNull(meterRegistry, "meterRegistry must not be null");
this.delegate = delegate;
this.metadataPush = new InteractionCounters(meterRegistry, "metadata.push", tags);
this.requestChannel = new InteractionCounters(meterRegistry, "request.channel", tags);
this.requestFireAndForget = new InteractionCounters(meterRegistry, "request.fnf", tags);
this.requestResponse = new InteractionTimers(meterRegistry, "request.response", tags);
this.requestStream = new InteractionCounters(meterRegistry, "request.stream", tags);
this.requestChannel = new InteractionCounters(meterRegistry, "request.channel",
tags);
this.requestFireAndForget = new InteractionCounters(meterRegistry, "request.fnf",
tags);
this.requestResponse = new InteractionTimers(meterRegistry, "request.response",
tags);
this.requestStream = new InteractionCounters(meterRegistry, "request.stream",
tags);
}
@Override
@ -101,14 +105,12 @@ public class MicrometerResponderRSocket implements ResponderRSocket { @@ -101,14 +105,12 @@ public class MicrometerResponderRSocket implements ResponderRSocket {
@Override
public Mono<Payload> requestResponse(Payload payload) {
return Mono.defer(
() -> {
Timer.Sample sample = requestResponse.start();
return delegate
.requestResponse(payload)
.doFinally(signalType -> requestResponse.accept(sample, signalType));
});
return Mono.defer(() -> {
Timer.Sample sample = requestResponse.start();
return delegate.requestResponse(payload)
.doFinally(signalType -> requestResponse.accept(sample, signalType));
});
}
@Override
@ -133,7 +135,8 @@ public class MicrometerResponderRSocket implements ResponderRSocket { @@ -133,7 +135,8 @@ public class MicrometerResponderRSocket implements ResponderRSocket {
private final Counter onError;
private InteractionCounters(MeterRegistry meterRegistry, String interactionModel, Tag... tags) {
private InteractionCounters(MeterRegistry meterRegistry, String interactionModel,
Tag... tags) {
this.cancel = counter(meterRegistry, interactionModel, CANCEL, tags);
this.onComplete = counter(meterRegistry, interactionModel, ON_COMPLETE, tags);
this.onError = counter(meterRegistry, interactionModel, ON_ERROR, tags);
@ -142,27 +145,29 @@ public class MicrometerResponderRSocket implements ResponderRSocket { @@ -142,27 +145,29 @@ public class MicrometerResponderRSocket implements ResponderRSocket {
@Override
public void accept(SignalType signalType) {
switch (signalType) {
case CANCEL:
cancel.increment();
break;
case ON_COMPLETE:
onComplete.increment();
break;
case ON_ERROR:
onError.increment();
break;
case CANCEL:
cancel.increment();
break;
case ON_COMPLETE:
onComplete.increment();
break;
case ON_ERROR:
onError.increment();
break;
}
}
private static Counter counter(
MeterRegistry meterRegistry, String interactionModel, SignalType signalType, Tag... tags) {
private static Counter counter(MeterRegistry meterRegistry,
String interactionModel, SignalType signalType, Tag... tags) {
return meterRegistry.counter(
"rsocket." + interactionModel, Tags.of(tags).and("signal.type", signalType.name()));
return meterRegistry.counter("rsocket." + interactionModel,
Tags.of(tags).and("signal.type", signalType.name()));
}
}
private static final class InteractionTimers implements BiConsumer<Timer.Sample, SignalType> {
private static final class InteractionTimers
implements BiConsumer<Timer.Sample, SignalType> {
private final Timer cancel;
@ -172,7 +177,8 @@ public class MicrometerResponderRSocket implements ResponderRSocket { @@ -172,7 +177,8 @@ public class MicrometerResponderRSocket implements ResponderRSocket {
private final Timer onError;
private InteractionTimers(MeterRegistry meterRegistry, String interactionModel, Tag... tags) {
private InteractionTimers(MeterRegistry meterRegistry, String interactionModel,
Tag... tags) {
this.meterRegistry = meterRegistry;
this.cancel = timer(meterRegistry, interactionModel, CANCEL, tags);
@ -183,15 +189,15 @@ public class MicrometerResponderRSocket implements ResponderRSocket { @@ -183,15 +189,15 @@ public class MicrometerResponderRSocket implements ResponderRSocket {
@Override
public void accept(Timer.Sample sample, SignalType signalType) {
switch (signalType) {
case CANCEL:
sample.stop(cancel);
break;
case ON_COMPLETE:
sample.stop(onComplete);
break;
case ON_ERROR:
sample.stop(onError);
break;
case CANCEL:
sample.stop(cancel);
break;
case ON_COMPLETE:
sample.stop(onComplete);
break;
case ON_ERROR:
sample.stop(onError);
break;
}
}
@ -199,11 +205,13 @@ public class MicrometerResponderRSocket implements ResponderRSocket { @@ -199,11 +205,13 @@ public class MicrometerResponderRSocket implements ResponderRSocket {
return Timer.start(meterRegistry);
}
private static Timer timer(
MeterRegistry meterRegistry, String interactionModel, SignalType signalType, Tag... tags) {
private static Timer timer(MeterRegistry meterRegistry, String interactionModel,
SignalType signalType, Tag... tags) {
return meterRegistry.timer(
"rsocket." + interactionModel, Tags.of(tags).and("signal.type", signalType.name()));
return meterRegistry.timer("rsocket." + interactionModel,
Tags.of(tags).and("signal.type", signalType.name()));
}
}
}

8
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/metrics/MicrometerResponderRSocketInterceptor.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.metrics;
@ -33,13 +32,14 @@ public class MicrometerResponderRSocketInterceptor implements RSocketInterceptor @@ -33,13 +32,14 @@ public class MicrometerResponderRSocketInterceptor implements RSocketInterceptor
/**
* Creates a new {@link RSocketInterceptor}.
*
* @param meterRegistry the {@link MeterRegistry} to use to create {@link Meter}s.
* @param tags the additional tags to attach to each {@link Meter}
* @throws NullPointerException if {@code meterRegistry} is {@code null}
*/
public MicrometerResponderRSocketInterceptor(MeterRegistry meterRegistry, Tag... tags) {
this.meterRegistry = Objects.requireNonNull(meterRegistry, "meterRegistry must not be null");
public MicrometerResponderRSocketInterceptor(MeterRegistry meterRegistry,
Tag... tags) {
this.meterRegistry = Objects.requireNonNull(meterRegistry,
"meterRegistry must not be null");
this.tags = tags;
}

22
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/registry/LoadBalancedRSocket.java

@ -12,13 +12,11 @@ @@ -12,13 +12,11 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.registry;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
@ -39,6 +37,7 @@ public class LoadBalancedRSocket { @@ -39,6 +37,7 @@ public class LoadBalancedRSocket {
private final List<EnrichedRSocket> delegates = new CopyOnWriteArrayList<>();
private final String serviceName;
private final LoadBalancer loadBalancer;
public LoadBalancedRSocket(String serviceName) {
@ -59,15 +58,12 @@ public class LoadBalancedRSocket { @@ -59,15 +58,12 @@ public class LoadBalancedRSocket {
}
public void remove(Metadata metadata) {
//TODO: move delegates to a map for easy removal
// TODO: move delegates to a map for easy removal
this.delegates.stream()
.filter(enriched -> metadata.matches(enriched.getMetadata()))
.findFirst()
.filter(enriched -> metadata.matches(enriched.getMetadata())).findFirst()
.ifPresent(this.delegates::remove);
}
public List<EnrichedRSocket> getDelegates() {
return this.delegates;
}
@ -88,16 +84,20 @@ public class LoadBalancedRSocket { @@ -88,16 +84,20 @@ public class LoadBalancedRSocket {
public RSocket getSource() {
return this.source;
}
}
//TODO: Flux<RSocket> as input?
//TODO: reuse commons load balancer?
public interface LoadBalancer extends Function<List<EnrichedRSocket>, Mono<EnrichedRSocket>> {
// TODO: Flux<RSocket> as input?
// TODO: reuse commons load balancer?
public interface LoadBalancer
extends Function<List<EnrichedRSocket>, Mono<EnrichedRSocket>> {
}
public static class RoundRobinLoadBalancer implements LoadBalancer {
private final AtomicInteger position;
private final String serviceName;
public RoundRobinLoadBalancer(String serviceName) {
@ -123,5 +123,7 @@ public class LoadBalancedRSocket { @@ -123,5 +123,7 @@ public class LoadBalancedRSocket {
EnrichedRSocket rSocket = rSockets.get(pos % rSockets.size());
return Mono.just(rSocket);
}
}
}

20
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/registry/Registry.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.registry;
@ -37,27 +36,31 @@ import org.springframework.util.Assert; @@ -37,27 +36,31 @@ import org.springframework.util.Assert;
* When a new RSocket is registered, a RegisteredEvent is pushed onto a DirectProcessor
* that is acting as an event bus for registered Consumers.
*/
//TODO: name?
// TODO: name?
public class Registry {
private static final Log log = LogFactory.getLog(Registry.class);
private final Map<String, LoadBalancedRSocket> rsockets = new ConcurrentHashMap<>();
private final DirectProcessor<RegisteredEvent> registeredEvents = DirectProcessor.create();
private final FluxSink<RegisteredEvent> registeredEventsSink = registeredEvents.sink(FluxSink.OverflowStrategy.DROP);
private final DirectProcessor<RegisteredEvent> registeredEvents = DirectProcessor
.create();
private final FluxSink<RegisteredEvent> registeredEventsSink = registeredEvents
.sink(FluxSink.OverflowStrategy.DROP);
public Registry() {
}
//TODO: Mono<Void>?
// TODO: Mono<Void>?
public void register(Metadata metadata, RSocket rsocket) {
Assert.notNull(metadata, "metadata may not be null");
Assert.notNull(rsocket, "RSocket may not be null");
if (log.isDebugEnabled()) {
log.debug("Registering RSocket: " + metadata);
}
LoadBalancedRSocket composite = rsockets.computeIfAbsent(metadata.getName(), s ->
new LoadBalancedRSocket(metadata.getName()));
LoadBalancedRSocket composite = rsockets.computeIfAbsent(metadata.getName(),
s -> new LoadBalancedRSocket(metadata.getName()));
composite.addRSocket(rsocket, metadata);
registeredEventsSink.next(new RegisteredEvent(metadata, rsocket));
}
@ -82,7 +85,9 @@ public class Registry { @@ -82,7 +85,9 @@ public class Registry {
}
public static class RegisteredEvent {
private final Metadata routingMetadata;
private final RSocket rSocket;
public RegisteredEvent(Metadata routingMetadata, RSocket rSocket) {
@ -101,4 +106,5 @@ public class Registry { @@ -101,4 +106,5 @@ public class Registry {
}
}
}

15
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/registry/RegistryRoutes.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.registry;
@ -33,7 +32,7 @@ import org.springframework.cloud.gateway.rsocket.route.Routes; @@ -33,7 +32,7 @@ import org.springframework.cloud.gateway.rsocket.route.Routes;
import org.springframework.cloud.gateway.rsocket.support.Metadata;
/**
* Creates routes from RegisteredEvents
* Creates routes from RegisteredEvents.
*/
public class RegistryRoutes implements Routes, Consumer<Registry.RegisteredEvent> {
@ -43,8 +42,8 @@ public class RegistryRoutes implements Routes, Consumer<Registry.RegisteredEvent @@ -43,8 +42,8 @@ public class RegistryRoutes implements Routes, Consumer<Registry.RegisteredEvent
@Override
public Flux<Route> getRoutes() {
//TODO: sorting
//TODO: caching
// TODO: sorting
// TODO: caching
Collection<Route> routeCollection = routes.values();
if (log.isDebugEnabled()) {
log.debug("Found routes: " + routeCollection);
@ -69,14 +68,13 @@ public class RegistryRoutes implements Routes, Consumer<Registry.RegisteredEvent @@ -69,14 +68,13 @@ public class RegistryRoutes implements Routes, Consumer<Registry.RegisteredEvent
}
private Route createRoute(String id, Metadata routingMetadata) {
Route route = Route.builder()
.id(id)
.routingMetadata(routingMetadata)
Route route = Route.builder().id(id).routingMetadata(routingMetadata)
.predicate(exchange -> {
// TODO: standard predicates
// TODO: allow customized predicates
Metadata incomingRouting = exchange.getRoutingMetadata();
boolean matches = incomingRouting.getName().equalsIgnoreCase(routingMetadata.getName());
boolean matches = incomingRouting.getName()
.equalsIgnoreCase(routingMetadata.getName());
return Mono.just(matches);
})
// TODO: allow customized filters
@ -88,4 +86,5 @@ public class RegistryRoutes implements Routes, Consumer<Registry.RegisteredEvent @@ -88,4 +86,5 @@ public class RegistryRoutes implements Routes, Consumer<Registry.RegisteredEvent
return route;
}
}

9
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/registry/RegistrySocketAcceptorFilter.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.registry;
@ -29,6 +28,7 @@ import org.springframework.util.StringUtils; @@ -29,6 +28,7 @@ import org.springframework.util.StringUtils;
* Filter that registers the SendingSocket.
*/
public class RegistrySocketAcceptorFilter implements SocketAcceptorFilter, Ordered {
private final Registry registry;
public RegistrySocketAcceptorFilter(Registry registry) {
@ -36,8 +36,10 @@ public class RegistrySocketAcceptorFilter implements SocketAcceptorFilter, Order @@ -36,8 +36,10 @@ public class RegistrySocketAcceptorFilter implements SocketAcceptorFilter, Order
}
@Override
public Mono<Success> filter(SocketAcceptorExchange exchange, SocketAcceptorFilterChain chain) {
if (exchange.getMetadata() != null && StringUtils.hasLength(exchange.getMetadata().getName())) {
public Mono<Success> filter(SocketAcceptorExchange exchange,
SocketAcceptorFilterChain chain) {
if (exchange.getMetadata() != null
&& StringUtils.hasLength(exchange.getMetadata().getName())) {
this.registry.register(exchange.getMetadata(), exchange.getSendingSocket());
}
@ -48,4 +50,5 @@ public class RegistrySocketAcceptorFilter implements SocketAcceptorFilter, Order @@ -48,4 +50,5 @@ public class RegistrySocketAcceptorFilter implements SocketAcceptorFilter, Order
public int getOrder() {
return HIGHEST_PRECEDENCE + 1000;
}
}

109
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/route/Route.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.route;
@ -32,7 +31,6 @@ import org.springframework.core.Ordered; @@ -32,7 +31,6 @@ import org.springframework.core.Ordered;
import org.springframework.core.style.ToStringCreator;
import org.springframework.util.Assert;
/**
* @author Spencer Gibb
*/
@ -52,7 +50,9 @@ public class Route implements Ordered { @@ -52,7 +50,9 @@ public class Route implements Ordered {
return new Builder();
}
private Route(String id, Metadata targetMetadata, int order, AsyncPredicate<GatewayExchange> predicate, List<GatewayFilter> gatewayFilters) {
private Route(String id, Metadata targetMetadata, int order,
AsyncPredicate<GatewayExchange> predicate,
List<GatewayFilter> gatewayFilters) {
this.id = id;
this.targetMetadata = targetMetadata;
this.order = order;
@ -60,7 +60,57 @@ public class Route implements Ordered { @@ -60,7 +60,57 @@ public class Route implements Ordered {
this.gatewayFilters = gatewayFilters;
}
public String getId() {
return this.id;
}
public Metadata getTargetMetadata() {
return this.targetMetadata;
}
public int getOrder() {
return order;
}
public AsyncPredicate<GatewayExchange> getPredicate() {
return this.predicate;
}
public List<GatewayFilter> getFilters() {
return Collections.unmodifiableList(this.gatewayFilters);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Route route = (Route) o;
return Objects.equals(id, route.id)
&& Objects.equals(targetMetadata, route.targetMetadata)
&& Objects.equals(order, route.order)
&& Objects.equals(predicate, route.predicate)
&& Objects.equals(gatewayFilters, route.gatewayFilters);
}
@Override
public int hashCode() {
return Objects.hash(id, targetMetadata, predicate, gatewayFilters);
}
@Override
public String toString() {
return new ToStringCreator(this).append("id", id)
.append("targetMetadata", targetMetadata).append("order", order)
.append("predicate", predicate).append("gatewayFilters", gatewayFilters)
.toString();
}
public static class Builder {
protected String id;
protected Metadata routingMetadata;
@ -71,7 +121,8 @@ public class Route implements Ordered { @@ -71,7 +121,8 @@ public class Route implements Ordered {
protected List<GatewayFilter> gatewayFilters = new ArrayList<>();
protected Builder() {}
protected Builder() {
}
public Builder id(String id) {
this.id = id;
@ -115,7 +166,6 @@ public class Route implements Ordered { @@ -115,7 +166,6 @@ public class Route implements Ordered {
return filters(Arrays.asList(gatewayFilters));
}
public Builder predicate(AsyncPredicate<GatewayExchange> predicate) {
this.predicate = predicate;
return this;
@ -144,55 +194,10 @@ public class Route implements Ordered { @@ -144,55 +194,10 @@ public class Route implements Ordered {
Assert.notNull(this.routingMetadata, "targetMetadata can not be null");
Assert.notNull(this.predicate, "predicate can not be null");
return new Route(this.id, this.routingMetadata, this.order, predicate, this.gatewayFilters);
return new Route(this.id, this.routingMetadata, this.order, predicate,
this.gatewayFilters);
}
}
public String getId() {
return this.id;
}
public Metadata getTargetMetadata() {
return this.targetMetadata;
}
public int getOrder() {
return order;
}
public AsyncPredicate<GatewayExchange> getPredicate() {
return this.predicate;
}
public List<GatewayFilter> getFilters() {
return Collections.unmodifiableList(this.gatewayFilters);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Route route = (Route) o;
return Objects.equals(id, route.id) &&
Objects.equals(targetMetadata, route.targetMetadata) &&
Objects.equals(order, route.order) &&
Objects.equals(predicate, route.predicate) &&
Objects.equals(gatewayFilters, route.gatewayFilters);
}
@Override
public int hashCode() {
return Objects.hash(id, targetMetadata, predicate, gatewayFilters);
}
@Override
public String toString() {
return new ToStringCreator(this)
.append("id", id)
.append("targetMetadata", targetMetadata)
.append("order", order)
.append("predicate", predicate)
.append("gatewayFilters", gatewayFilters)
.toString();
}
}

36
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/route/Routes.java

@ -12,46 +12,50 @@ @@ -12,46 +12,50 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.route;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.gateway.rsocket.server.GatewayExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.cloud.gateway.rsocket.server.GatewayExchange;
/**
* @author Spencer Gibb
*/
public interface Routes {
/** log. */
Log log = LogFactory.getLog(Routes.class);
Flux<Route> getRoutes();
default Mono<Route> findRoute(GatewayExchange exchange) {
return getRoutes()
//individually filter routes so that filterWhen error delaying is not a problem
.concatMap(route -> Mono.just(route)
.filterWhen(r -> {
// add the current route we are testing
// TODO: exchange attributes
// exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
})
//instead of immediately stopping main flux due to error, log and swallow it
.doOnError(e -> log.error("Error applying predicate for route: "+route.getId(), e))
.onErrorResume(e -> Mono.empty())
)
.next()
.map(route -> {
// individually filter routes so that filterWhen error delaying is not a
// problem
.concatMap(route -> Mono.just(route).filterWhen(r -> {
// add the current route we are testing
// TODO: exchange attributes
// exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR,
// r.getId());
return r.getPredicate().apply(exchange);
})
// instead of immediately stopping main flux due to error, log and
// swallow it
.doOnError(e -> log.error(
"Error applying predicate for route: " + route.getId(),
e))
.onErrorResume(e -> Mono.empty()))
.next().map(route -> {
if (log.isDebugEnabled()) {
log.debug("Route matched: " + route.getId());
}
return route;
});
}
}

20
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/server/GatewayExchange.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.server;
@ -25,16 +24,23 @@ import org.apache.commons.logging.LogFactory; @@ -25,16 +24,23 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.gateway.rsocket.filter.AbstractRSocketExchange;
import org.springframework.cloud.gateway.rsocket.support.Metadata;
/**
* Exchange object used in GatewayFilterChain started by GatewayRSocket.
*/
public class GatewayExchange extends AbstractRSocketExchange {
private static final Log log = LogFactory.getLog(GatewayExchange.class);
/**
* Key for the route object in attributes.
*/
public static final String ROUTE_ATTR = "__route_attr_";
enum Type {
FIRE_AND_FORGET("request.fnf"),
REQUEST_CHANNEL("request.channel"),
REQUEST_RESPONSE("request.response"),
REQUEST_STREAM("request.stream");
FIRE_AND_FORGET("request.fnf"), REQUEST_CHANNEL(
"request.channel"), REQUEST_RESPONSE(
"request.response"), REQUEST_STREAM("request.stream");
private String key;
@ -45,10 +51,13 @@ public class GatewayExchange extends AbstractRSocketExchange { @@ -45,10 +51,13 @@ public class GatewayExchange extends AbstractRSocketExchange {
String getKey() {
return this.key;
}
}
private final Type type;
private final Metadata routingMetadata;
private Tags tags = Tags.empty();
public static GatewayExchange fromPayload(Type type, Payload payload) {
@ -90,4 +99,5 @@ public class GatewayExchange extends AbstractRSocketExchange { @@ -90,4 +99,5 @@ public class GatewayExchange extends AbstractRSocketExchange {
public void setTags(Tags tags) {
this.tags = tags;
}
}

6
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/server/GatewayFilter.java

@ -12,11 +12,13 @@ @@ -12,11 +12,13 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.server;
import org.springframework.cloud.gateway.rsocket.filter.RSocketFilter;
public interface GatewayFilter extends RSocketFilter<GatewayExchange, GatewayFilterChain> {}
public interface GatewayFilter
extends RSocketFilter<GatewayExchange, GatewayFilterChain> {
}

15
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/server/GatewayFilterChain.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.server;
@ -29,24 +28,26 @@ public class GatewayFilterChain @@ -29,24 +28,26 @@ public class GatewayFilterChain
/**
* Public constructor with the list of filters and the target handler to use.
*
* @param filters the filters ahead of the handler
*/
private GatewayFilterChain(List<GatewayFilter> filters) {
super(filters);
}
protected GatewayFilterChain(List<GatewayFilter> allFilters, GatewayFilter currentFilter, GatewayFilterChain next) {
protected GatewayFilterChain(List<GatewayFilter> allFilters,
GatewayFilter currentFilter, GatewayFilterChain next) {
super(allFilters, currentFilter, next);
}
@Override
protected GatewayFilterChain create(List<GatewayFilter> allFilters, GatewayFilter currentFilter, GatewayFilterChain next) {
protected GatewayFilterChain create(List<GatewayFilter> allFilters,
GatewayFilter currentFilter, GatewayFilterChain next) {
return new GatewayFilterChain(allFilters, currentFilter, next);
}
public static Mono<Success> executeFilterChain(List<GatewayFilter> filters, GatewayExchange exchange) {
return new GatewayFilterChain(filters)
.filter(exchange);
public static Mono<Success> executeFilterChain(List<GatewayFilter> filters,
GatewayExchange exchange) {
return new GatewayFilterChain(filters).filter(exchange);
}
}

2
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/server/GatewayPredicate.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.server;
@ -20,4 +19,5 @@ package org.springframework.cloud.gateway.rsocket.server; @@ -20,4 +19,5 @@ package org.springframework.cloud.gateway.rsocket.server;
import org.springframework.cloud.gateway.rsocket.support.AsyncPredicate;
public interface GatewayPredicate extends AsyncPredicate<GatewayExchange> {
}

125
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/server/GatewayRSocket.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.server;
@ -52,20 +51,24 @@ import static org.springframework.cloud.gateway.rsocket.server.GatewayExchange.T @@ -52,20 +51,24 @@ import static org.springframework.cloud.gateway.rsocket.server.GatewayExchange.T
import static org.springframework.cloud.gateway.rsocket.server.GatewayFilterChain.executeFilterChain;
/**
* Acts as a proxy to other registered sockets. Creates a GatewayExchange and attempts
* to locate a Route. If a Route is found, it is added to the exchange and the filter
* chains is executed againts the Route's filters. If the filter chain is successful,
* an attempt to locate a target RSocket via the Registry is executed. If not found
* a pending RSocket * is returned.
* Acts as a proxy to other registered sockets. Creates a GatewayExchange and attempts to
* locate a Route. If a Route is found, it is added to the exchange and the filter chains
* is executed againts the Route's filters. If the filter chain is successful, an attempt
* to locate a target RSocket via the Registry is executed. If not found a pending RSocket
* * is returned.
*/
public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket {
private static final Log log = LogFactory.getLog(GatewayRSocket.class);
private final Registry registry;
private final Routes routes;
private final MeterRegistry meterRegistry;
private final GatewayRSocketProperties properties;
private final Metadata metadata;
GatewayRSocket(Registry registry, Routes routes, MeterRegistry meterRegistry,
@ -75,21 +78,19 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket @@ -75,21 +78,19 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket
this.meterRegistry = meterRegistry;
this.properties = properties;
this.metadata = metadata;
this.onClose()
.doOnSuccess(v -> registry.deregister(metadata))
//.doOnNext(v -> log.error("OnClose doOnNext"))
this.onClose().doOnSuccess(v -> registry.deregister(metadata))
// .doOnNext(v -> log.error("OnClose doOnNext"))
.doOnError(t -> {
if (log.isErrorEnabled()) {
log.error("Error received, deregistering " + metadata, t);
}
registry.deregister(metadata);
})
//.doOnTerminate(() -> log.error("OnClose doOnTerminate"))
//.doFinally(st -> log.error("OnClose doFinally"))
// .doOnTerminate(() -> log.error("OnClose doOnTerminate"))
// .doFinally(st -> log.error("OnClose doFinally"))
.subscribe();
}
protected Registry getRegistry() {
return registry;
}
@ -110,44 +111,42 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket @@ -110,44 +111,42 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket
private GatewayExchange createExchange(GatewayExchange.Type type, Payload payload) {
GatewayExchange exchange = GatewayExchange.fromPayload(type, payload);
Tags tags = getTags(exchange);
exchange.setTags(tags);
exchange.setTags(tags);
return exchange;
}
private Tags getTags(GatewayExchange exchange) {
//TODO: add tags to exchange
// TODO: add tags to exchange
String responderName = this.metadata.getName();
String responderId = this.metadata.get("id");
String requestorName = exchange.getRoutingMetadata().getName();
Assert.hasText(responderName, "responderName must not be empty");
Assert.hasText(responderId, "responderId must not be empty");
Assert.hasText(requestorName, "requestorName must not be empty");
//requestor.id happens in a callback, later
return Tags.of("requester.name", requestorName,
"responder.name", responderName, "responder.id", responderId,
"gateway.id", this.properties.getId());
// requestor.id happens in a callback, later
return Tags.of("requester.name", requestorName, "responder.name", responderName,
"responder.id", responderId, "gateway.id", this.properties.getId());
}
@Override
public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
GatewayExchange exchange = createExchange(REQUEST_CHANNEL, payload);
Tags responderTags = Tags.of("source", "responder");
return findRSocketOrCreatePending(exchange)
.flatMapMany(rSocket -> {
Tags requesterTags = Tags.of("source", "requester");
Flux<Payload> flux = Flux.from(payloads)
.doOnNext(s -> count(exchange, "payload", requesterTags))
.doOnError(t -> count(exchange, "error", requesterTags))
.doFinally(s -> count(exchange, requesterTags));
if (rSocket instanceof ResponderRSocket) {
ResponderRSocket socket = (ResponderRSocket) rSocket;
return socket.requestChannel(payload, flux)
.log(GatewayRSocket.class.getName()+".request-channel", Level.FINEST);
}
return rSocket.requestChannel(flux);
})
.doOnNext(s -> count(exchange, "payload", responderTags))
return findRSocketOrCreatePending(exchange).flatMapMany(rSocket -> {
Tags requesterTags = Tags.of("source", "requester");
Flux<Payload> flux = Flux.from(payloads)
.doOnNext(s -> count(exchange, "payload", requesterTags))
.doOnError(t -> count(exchange, "error", requesterTags))
.doFinally(s -> count(exchange, requesterTags));
if (rSocket instanceof ResponderRSocket) {
ResponderRSocket socket = (ResponderRSocket) rSocket;
return socket.requestChannel(payload, flux).log(
GatewayRSocket.class.getName() + ".request-channel",
Level.FINEST);
}
return rSocket.requestChannel(flux);
}).doOnNext(s -> count(exchange, "payload", responderTags))
.doOnError(t -> count(exchange, "error", responderTags))
.doFinally(s -> count(exchange, responderTags));
}
@ -188,7 +187,8 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket @@ -188,7 +187,8 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket
.flatMap(rSocket -> rSocket.requestResponse(payload))
.doOnSubscribe(s -> timer.set(Timer.start(meterRegistry)))
.doOnError(t -> count(exchange, "error"))
.doFinally(s -> timer.get().stop(meterRegistry.timer(getMetricName(exchange), exchange.getTags())));
.doFinally(s -> timer.get().stop(meterRegistry
.timer(getMetricName(exchange), exchange.getTags())));
}
@Override
@ -203,9 +203,9 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket @@ -203,9 +203,9 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket
}
/**
* Attempt to locate target RSocket via filter chain.
* If not found, create a pending RSocket.
* @param exchange
* Attempt to locate target RSocket via filter chain. If not found, create a pending
* RSocket.
* @param exchange GatewayExchange
* @return
*/
private Mono<RSocket> findRSocketOrCreatePending(GatewayExchange exchange) {
@ -224,18 +224,21 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket @@ -224,18 +224,21 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket
return Mono.just(pending);
}
/* for testing */ PendingRequestRSocket constructPendingRSocket(GatewayExchange exchange) {
Function<Registry.RegisteredEvent, Mono<Route>> routeFinder = registeredEvent ->
getRouteMono(registeredEvent, exchange);
/* for testing */ PendingRequestRSocket constructPendingRSocket(
GatewayExchange exchange) {
Function<Registry.RegisteredEvent, Mono<Route>> routeFinder = registeredEvent -> getRouteMono(
registeredEvent, exchange);
return new PendingRequestRSocket(routeFinder, map -> {
Tags tags = exchange.getTags().and("requester.id", map.get("id"));
exchange.setTags(tags);
});
}
protected Mono<Route> getRouteMono(Registry.RegisteredEvent registeredEvent, GatewayExchange exchange) {
protected Mono<Route> getRouteMono(Registry.RegisteredEvent registeredEvent,
GatewayExchange exchange) {
return findRoute(exchange)
.log(PendingRequestRSocket.class.getName()+".find route pending", Level.FINEST)
.log(PendingRequestRSocket.class.getName() + ".find route pending",
Level.FINEST)
// can this be replaced with filter?
.flatMap(route -> {
return matchRoute(route, registeredEvent.getRoutingMetadata());
@ -244,11 +247,12 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket @@ -244,11 +247,12 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket
private Mono<Route> findRoute(GatewayExchange exchange) {
Mono<Route> routeMono;
/*if (this.route != null) { //TODO: cache Route?
routeMono = Mono.just(route);
} else {*/
routeMono = this.routes.findRoute(exchange);
//}
/*
* if (this.route != null) { //TODO: cache Route? routeMono = Mono.just(route); }
* else {
*/
routeMono = this.routes.findRoute(exchange);
// }
return routeMono;
}
@ -261,14 +265,14 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket @@ -261,14 +265,14 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket
}
/**
* First locate Route. If found, put route in exchange and execute filter chain.
* If successful, locate target RSocket.
* @param exchange
* First locate Route. If found, put route in exchange and execute filter chain. If
* successful, locate target RSocket.
* @param exchange GatewayExchange.
* @return Target RSocket or empty.
*/
private Mono<RSocket> findRSocket(GatewayExchange exchange) {
return this.routes.findRoute(exchange)
.log(GatewayRSocket.class.getName()+".find route", Level.FINEST)
.log(GatewayRSocket.class.getName() + ".find route", Level.FINEST)
.flatMap(route -> {
// put route in exchange for later use
exchange.getAttributes().put(ROUTE_ATTR, route);
@ -279,13 +283,12 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket @@ -279,13 +283,12 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket
return loadBalancedRSocket.choose();
}).map(enrichedRSocket -> {
Metadata metadata = enrichedRSocket
.getMetadata();
Tags tags = exchange.getTags().and("requester.id", metadata.get("id"));
Metadata metadata = enrichedRSocket.getMetadata();
Tags tags = exchange.getTags().and("requester.id",
metadata.get("id"));
exchange.setTags(tags);
return enrichedRSocket;
}).cast(RSocket.class)
.switchIfEmpty(doOnEmpty(exchange));
}).cast(RSocket.class).switchIfEmpty(doOnEmpty(exchange));
});
// TODO: deal with connecting to cluster?
@ -293,15 +296,20 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket @@ -293,15 +296,20 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket
private Mono<RSocket> doOnEmpty(GatewayExchange exchange) {
if (log.isDebugEnabled()) {
log.debug("Unable to find destination RSocket for " + exchange.getRoutingMetadata());
log.debug("Unable to find destination RSocket for "
+ exchange.getRoutingMetadata());
}
return Mono.empty();
}
public static class Factory {
private final Registry registry;
private final Routes routes;
private final MeterRegistry meterRegistry;
private final GatewayRSocketProperties properties;
public Factory(Registry registry, Routes routes, MeterRegistry meterRegistry,
@ -316,6 +324,7 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket @@ -316,6 +324,7 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket
return new GatewayRSocket(this.registry, this.routes, this.meterRegistry,
this.properties, metadata);
}
}
}

46
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/server/GatewayRSocketServer.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.server;
@ -43,20 +42,29 @@ import org.springframework.util.Assert; @@ -43,20 +42,29 @@ import org.springframework.util.Assert;
public class GatewayRSocketServer implements Ordered, SmartLifecycle {
private static final Log log = LogFactory.getLog(GatewayRSocketServer.class);
private static final RSocketInterceptor[] EMPTY_INTERCEPTORS = new RSocketInterceptor[0];
private final GatewayRSocketProperties properties;
private final SocketAcceptor socketAcceptor;
private final List<RSocketInterceptor> serverInterceptors;
private final AtomicBoolean running = new AtomicBoolean();
private CloseableChannel closeableChannel;
private final MeterRegistry meterRegistry;
public GatewayRSocketServer(GatewayRSocketProperties properties, SocketAcceptor socketAcceptor, MeterRegistry meterRegistry) {
public GatewayRSocketServer(GatewayRSocketProperties properties,
SocketAcceptor socketAcceptor, MeterRegistry meterRegistry) {
this(properties, socketAcceptor, meterRegistry, EMPTY_INTERCEPTORS);
}
public GatewayRSocketServer(GatewayRSocketProperties properties, SocketAcceptor socketAcceptor, MeterRegistry meterRegistry, RSocketInterceptor... interceptors) {
public GatewayRSocketServer(GatewayRSocketProperties properties,
SocketAcceptor socketAcceptor, MeterRegistry meterRegistry,
RSocketInterceptor... interceptors) {
Assert.notNull(properties, "properties may not be null");
Assert.notNull(socketAcceptor, "socketAcceptor may not be null");
Assert.notNull(meterRegistry, "meterRegistry may not be null");
@ -104,15 +112,17 @@ public class GatewayRSocketServer implements Ordered, SmartLifecycle { @@ -104,15 +112,17 @@ public class GatewayRSocketServer implements Ordered, SmartLifecycle {
TransportType transportType = server.getTransport();
TcpServerTransport transport;
switch (transportType) {
case TCP:
transport = TcpServerTransport.create(port);
break;
default:
throw new IllegalArgumentException("No support for server transport " + transportType);
case TCP:
transport = TcpServerTransport.create(port);
break;
default:
throw new IllegalArgumentException(
"No support for server transport " + transportType);
}
if (log.isInfoEnabled()) {
log.info("Starting Gateway RSocket Server on port: " + port + ", transport: " + transportType);
log.info("Starting Gateway RSocket Server on port: " + port + ", transport: "
+ transportType);
}
RSocketFactory.ServerRSocketFactory factory = RSocketFactory.receive();
@ -121,25 +131,23 @@ public class GatewayRSocketServer implements Ordered, SmartLifecycle { @@ -121,25 +131,23 @@ public class GatewayRSocketServer implements Ordered, SmartLifecycle {
List<String> micrometerTags = server.getMicrometerTags();
Tag[] tags = Tags.of(micrometerTags.toArray(new String[] {}))
.and("gateway.id", properties.getId())
.stream()
.collect(Collectors.toList())
.toArray(new Tag[]{});
.and("gateway.id", properties.getId()).stream()
.collect(Collectors.toList()).toArray(new Tag[] {});
factory
//TODO: add as bean like serverInterceptors above
.addConnectionPlugin(new MicrometerDuplexConnectionInterceptor(meterRegistry, tags))
// TODO: add as bean like serverInterceptors above
.addConnectionPlugin(
new MicrometerDuplexConnectionInterceptor(meterRegistry, tags))
.errorConsumer(throwable -> {
if (log.isDebugEnabled()) {
log.debug("Error with connection", throwable);
}
}) //TODO: add configurable errorConsumer
.acceptor(this.socketAcceptor)
.transport(transport)
.start()
}) // TODO: add configurable errorConsumer
.acceptor(this.socketAcceptor).transport(transport).start()
.map(closeableChannel -> {
this.closeableChannel = closeableChannel;
return closeableChannel;
}).subscribe();
}
}

82
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/server/PendingRequestRSocket.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.server;
@ -43,14 +42,19 @@ import static org.springframework.cloud.gateway.rsocket.server.GatewayExchange.R @@ -43,14 +42,19 @@ import static org.springframework.cloud.gateway.rsocket.server.GatewayExchange.R
import static org.springframework.cloud.gateway.rsocket.server.GatewayExchange.Type.REQUEST_STREAM;
import static org.springframework.cloud.gateway.rsocket.server.GatewayFilterChain.executeFilterChain;
public class PendingRequestRSocket extends AbstractRSocket implements ResponderRSocket, Consumer<RegisteredEvent> {
public class PendingRequestRSocket extends AbstractRSocket
implements ResponderRSocket, Consumer<RegisteredEvent> {
private static final Log log = LogFactory.getLog(PendingRequestRSocket.class);
private final Function<RegisteredEvent, Mono<Route>> routeFinder;
private final Consumer<Metadata> metadataCallback;
private final MonoProcessor<RSocket> rSocketProcessor;
private Disposable subscriptionDisposable;
private Route route;
public PendingRequestRSocket(Function<RegisteredEvent, Mono<Route>> routeFinder,
@ -58,7 +62,8 @@ public class PendingRequestRSocket extends AbstractRSocket implements ResponderR @@ -58,7 +62,8 @@ public class PendingRequestRSocket extends AbstractRSocket implements ResponderR
this(routeFinder, metadataCallback, MonoProcessor.create());
}
/* for testing */ PendingRequestRSocket(Function<RegisteredEvent, Mono<Route>> routeFinder,
/* for testing */ PendingRequestRSocket(
Function<RegisteredEvent, Mono<Route>> routeFinder,
Consumer<Metadata> metadataCallback,
MonoProcessor<RSocket> rSocketProcessor) {
this.routeFinder = routeFinder;
@ -67,24 +72,23 @@ public class PendingRequestRSocket extends AbstractRSocket implements ResponderR @@ -67,24 +72,23 @@ public class PendingRequestRSocket extends AbstractRSocket implements ResponderR
}
/**
* Find route (if needed) using pendingExchange.
* If found, see if the route target matches the registered service.
* If it matches, send registered RSocket to processor.
* Then execute normal filter chain. If filter chain is successful, execute request.
* @param registeredEvent
* Find route (if needed) using pendingExchange. If found, see if the route target
* matches the registered service. If it matches, send registered RSocket to
* processor. Then execute normal filter chain. If filter chain is successful, execute
* request.
* @param registeredEvent the RegisteredEvent
*/
@Override
public void accept(RegisteredEvent registeredEvent) {
this.routeFinder.apply(registeredEvent)
.subscribe(route -> {
this.route = route;
this.metadataCallback.accept(registeredEvent.getRoutingMetadata());
this.rSocketProcessor.onNext(registeredEvent.getRSocket());
this.rSocketProcessor.onComplete();
if (this.subscriptionDisposable != null) {
this.subscriptionDisposable.dispose();
}
});
this.routeFinder.apply(registeredEvent).subscribe(route -> {
this.route = route;
this.metadataCallback.accept(registeredEvent.getRoutingMetadata());
this.rSocketProcessor.onNext(registeredEvent.getRSocket());
this.rSocketProcessor.onComplete();
if (this.subscriptionDisposable != null) {
this.subscriptionDisposable.dispose();
}
});
}
@Override
@ -107,33 +111,36 @@ public class PendingRequestRSocket extends AbstractRSocket implements ResponderR @@ -107,33 +111,36 @@ public class PendingRequestRSocket extends AbstractRSocket implements ResponderR
@Override
public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
return processor("pending-request-rc", payload)
.flatMapMany(tuple -> {
RSocket rSocket = tuple.getT1();
if (rSocket instanceof ResponderRSocket) {
ResponderRSocket socket = (ResponderRSocket) rSocket;
return socket.requestChannel(payload, payloads);
}
return rSocket.requestChannel(payloads);
});
return processor("pending-request-rc", payload).flatMapMany(tuple -> {
RSocket rSocket = tuple.getT1();
if (rSocket instanceof ResponderRSocket) {
ResponderRSocket socket = (ResponderRSocket) rSocket;
return socket.requestChannel(payload, payloads);
}
return rSocket.requestChannel(payloads);
});
}
/**
* After processor receives onNext signal, get route from exchange attrs,
* create a new exchange from payload. Copy exchange attrs.
* Execute filter chain, if successful, execute request.
* @param logCategory
* @param payload
* After processor receives onNext signal, get route from exchange attrs, create a new
* exchange from payload. Copy exchange attrs. Execute filter chain, if successful,
* execute request.
* @param logCategory log category
* @param payload payload.
* @return
*/
protected Mono<Tuple2<RSocket, Success>> processor(String logCategory, Payload payload) {
protected Mono<Tuple2<RSocket, Success>> processor(String logCategory,
Payload payload) {
return rSocketProcessor
.log(PendingRequestRSocket.class.getName()+"."+logCategory, Level.FINEST)
.log(PendingRequestRSocket.class.getName() + "." + logCategory,
Level.FINEST)
.flatMap(rSocket -> {
GatewayExchange exchange = GatewayExchange.fromPayload(REQUEST_STREAM, payload);
GatewayExchange exchange = GatewayExchange.fromPayload(REQUEST_STREAM,
payload);
exchange.getAttributes().put(ROUTE_ATTR, route);
//exchange.getAttributes().putAll(pendingExchange.getAttributes());
return Mono.just(rSocket).zipWith(executeFilterChain(route.getFilters(), exchange));
// exchange.getAttributes().putAll(pendingExchange.getAttributes());
return Mono.just(rSocket)
.zipWith(executeFilterChain(route.getFilters(), exchange));
});
}
@ -141,4 +148,5 @@ public class PendingRequestRSocket extends AbstractRSocket implements ResponderR @@ -141,4 +148,5 @@ public class PendingRequestRSocket extends AbstractRSocket implements ResponderR
public void setSubscriptionDisposable(Disposable subscriptionDisposable) {
this.subscriptionDisposable = subscriptionDisposable;
}
}

43
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/socketacceptor/GatewaySocketAcceptor.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.socketacceptor;
@ -37,12 +36,16 @@ import org.springframework.cloud.gateway.rsocket.support.Metadata; @@ -37,12 +36,16 @@ import org.springframework.cloud.gateway.rsocket.support.Metadata;
public class GatewaySocketAcceptor implements SocketAcceptor {
private final SocketAcceptorFilterChain filterChain;
private final GatewayRSocket.Factory rSocketFactory;
private final MeterRegistry meterRegistry;
private final GatewayRSocketProperties properties;
public GatewaySocketAcceptor(GatewayRSocket.Factory rSocketFactory, List<SocketAcceptorFilter> filters,
MeterRegistry meterRegistry, GatewayRSocketProperties properties) {
public GatewaySocketAcceptor(GatewayRSocket.Factory rSocketFactory,
List<SocketAcceptorFilter> filters, MeterRegistry meterRegistry,
GatewayRSocketProperties properties) {
this.rSocketFactory = rSocketFactory;
this.filterChain = new SocketAcceptorFilterChain(filters);
this.meterRegistry = meterRegistry;
@ -53,31 +56,41 @@ public class GatewaySocketAcceptor implements SocketAcceptor { @@ -53,31 +56,41 @@ public class GatewaySocketAcceptor implements SocketAcceptor {
@SuppressWarnings("Duplicates")
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
//decorate sendingSocket with metrics
// decorate sendingSocket with metrics
// current gateway id, type requester, service name (from metadata), service id
Tags requesterTags = Tags.of("gateway.id", properties.getId(), "type", "requester");
Tags requesterTags = Tags.of("gateway.id", properties.getId(), "type",
"requester");
Tags metadataTags;
SocketAcceptorExchange exchange;
if (setup.hasMetadata()) { //TODO: and setup.metadataMimeType() is Announcement metadata or composite
if (setup.hasMetadata()) { // TODO: and setup.metadataMimeType() is Announcement
// metadata or composite
Metadata metadata = Metadata.decodeMetadata(setup.sliceMetadata());
metadataTags = Tags.of("service.name", metadata.getName())
.and("service.id", metadata.get("id"));
metadataTags = Tags.of("service.name", metadata.getName()).and("service.id",
metadata.get("id"));
// enrich exchange to have metadata
exchange = new SocketAcceptorExchange(setup, decorate(sendingSocket, requesterTags.and(metadataTags)), metadata);
} else {
exchange = new SocketAcceptorExchange(setup,
decorate(sendingSocket, requesterTags.and(metadataTags)), metadata);
}
else {
metadataTags = Tags.empty();
exchange = new SocketAcceptorExchange(setup, decorate(sendingSocket, requesterTags));
exchange = new SocketAcceptorExchange(setup,
decorate(sendingSocket, requesterTags));
}
Tags responderTags = Tags.of("gateway.id", properties.getId(), "type", "responder")
Tags responderTags = Tags
.of("gateway.id", properties.getId(), "type", "responder")
.and(metadataTags);
//decorate with metrics gateway id, type responder, service name, service id (instance id)
// decorate with metrics gateway id, type responder, service name, service id
// (instance id)
return this.filterChain.filter(exchange)
.log(GatewaySocketAcceptor.class.getName()+".socket acceptor filter chain", Level.FINEST)
.map(success -> decorate(this.rSocketFactory.create(exchange.getMetadata()), responderTags));
.log(GatewaySocketAcceptor.class.getName()
+ ".socket acceptor filter chain", Level.FINEST)
.map(success -> decorate(
this.rSocketFactory.create(exchange.getMetadata()),
responderTags));
}
private RSocket decorate(RSocket rSocket, Tags tags) {

5
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/socketacceptor/SocketAcceptorExchange.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.socketacceptor;
@ -26,8 +25,11 @@ import org.springframework.cloud.gateway.rsocket.filter.AbstractRSocketExchange; @@ -26,8 +25,11 @@ import org.springframework.cloud.gateway.rsocket.filter.AbstractRSocketExchange;
import org.springframework.cloud.gateway.rsocket.support.Metadata;
public class SocketAcceptorExchange extends AbstractRSocketExchange {
private final ConnectionSetupPayload setup;
private final RSocket sendingSocket;
private final Metadata metadata;
public SocketAcceptorExchange(ConnectionSetupPayload setup, RSocket sendingSocket) {
@ -52,4 +54,5 @@ public class SocketAcceptorExchange extends AbstractRSocketExchange { @@ -52,4 +54,5 @@ public class SocketAcceptorExchange extends AbstractRSocketExchange {
public Metadata getMetadata() {
return metadata;
}
}

6
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/socketacceptor/SocketAcceptorFilter.java

@ -12,11 +12,13 @@ @@ -12,11 +12,13 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.socketacceptor;
import org.springframework.cloud.gateway.rsocket.filter.RSocketFilter;
public interface SocketAcceptorFilter extends RSocketFilter<SocketAcceptorExchange, SocketAcceptorFilterChain> {}
public interface SocketAcceptorFilter
extends RSocketFilter<SocketAcceptorExchange, SocketAcceptorFilterChain> {
}

17
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/socketacceptor/SocketAcceptorFilterChain.java

@ -12,33 +12,34 @@ @@ -12,33 +12,34 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.socketacceptor;
import org.springframework.cloud.gateway.rsocket.filter.AbstractFilterChain;
import java.util.List;
public class SocketAcceptorFilterChain
extends AbstractFilterChain<SocketAcceptorFilter, SocketAcceptorExchange, SocketAcceptorFilterChain> {
import org.springframework.cloud.gateway.rsocket.filter.AbstractFilterChain;
public class SocketAcceptorFilterChain extends
AbstractFilterChain<SocketAcceptorFilter, SocketAcceptorExchange, SocketAcceptorFilterChain> {
/**
* Public constructor with the list of filters and the target handler to use.
*
* @param filters the filters ahead of the handler
*/
public SocketAcceptorFilterChain(List<SocketAcceptorFilter> filters) {
super(filters);
}
public SocketAcceptorFilterChain(List<SocketAcceptorFilter> allFilters, SocketAcceptorFilter currentFilter, SocketAcceptorFilterChain next) {
public SocketAcceptorFilterChain(List<SocketAcceptorFilter> allFilters,
SocketAcceptorFilter currentFilter, SocketAcceptorFilterChain next) {
super(allFilters, currentFilter, next);
}
@Override
protected SocketAcceptorFilterChain create(List<SocketAcceptorFilter> allFilters, SocketAcceptorFilter currentFilter, SocketAcceptorFilterChain next) {
protected SocketAcceptorFilterChain create(List<SocketAcceptorFilter> allFilters,
SocketAcceptorFilter currentFilter, SocketAcceptorFilterChain next) {
return new SocketAcceptorFilterChain(allFilters, currentFilter, next);
}
}

2
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/socketacceptor/SocketAcceptorPredicate.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.socketacceptor;
@ -20,4 +19,5 @@ package org.springframework.cloud.gateway.rsocket.socketacceptor; @@ -20,4 +19,5 @@ package org.springframework.cloud.gateway.rsocket.socketacceptor;
import org.springframework.cloud.gateway.rsocket.support.AsyncPredicate;
public interface SocketAcceptorPredicate extends AsyncPredicate<SocketAcceptorExchange> {
}

23
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/socketacceptor/SocketAcceptorPredicateFilter.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.socketacceptor;
@ -29,12 +28,13 @@ public class SocketAcceptorPredicateFilter implements SocketAcceptorFilter, Orde @@ -29,12 +28,13 @@ public class SocketAcceptorPredicateFilter implements SocketAcceptorFilter, Orde
private final AsyncPredicate<SocketAcceptorExchange> predicate;
//TODO: change from List to Flux?
// TODO: change from List to Flux?
public SocketAcceptorPredicateFilter(List<SocketAcceptorPredicate> predicates) {
Assert.notNull(predicates, "predicates may not be null");
if (predicates.isEmpty()) {
predicate = exchange -> Mono.just(true);
} else {
}
else {
AsyncPredicate<SocketAcceptorExchange> combined = predicates.get(0);
for (SocketAcceptorPredicate p : predicates.subList(1, predicates.size())) {
combined = combined.and(p);
@ -49,13 +49,14 @@ public class SocketAcceptorPredicateFilter implements SocketAcceptorFilter, Orde @@ -49,13 +49,14 @@ public class SocketAcceptorPredicateFilter implements SocketAcceptorFilter, Orde
}
@Override
public Mono<Success> filter(SocketAcceptorExchange exchange, SocketAcceptorFilterChain chain) {
return Mono.from(predicate.apply(exchange))
.flatMap(test -> {
if (test) {
return chain.filter(exchange);
}
return Mono.empty();
});
public Mono<Success> filter(SocketAcceptorExchange exchange,
SocketAcceptorFilterChain chain) {
return Mono.from(predicate.apply(exchange)).flatMap(test -> {
if (test) {
return chain.filter(exchange);
}
return Mono.empty();
});
}
}

1
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/support/AsyncPredicate.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.support;

35
spring-cloud-gateway-rsocket/src/main/java/org/springframework/cloud/gateway/rsocket/support/Metadata.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.support;
@ -32,9 +31,19 @@ import org.springframework.util.Assert; @@ -32,9 +31,19 @@ import org.springframework.util.Assert;
public class Metadata {
/**
* Mime type of routing extension.
*/
public static final String ROUTING_MIME_TYPE = "message/x.rsocket.routing.v0";
/**
* The logical name.
*/
private final String name;
/**
* Keys and values associated with name.
*/
private final Map<String, String> properties;
public Metadata(String name, Map<String, String> properties) {
@ -60,10 +69,8 @@ public class Metadata { @@ -60,10 +69,8 @@ public class Metadata {
@Override
public String toString() {
return new ToStringCreator(this)
.append("name", name)
.append("properties", properties)
.toString();
return new ToStringCreator(this).append("name", name)
.append("properties", properties).toString();
}
public static Builder from(String name) {
@ -110,7 +117,8 @@ public class Metadata { @@ -110,7 +117,8 @@ public class Metadata {
String name = decodeString(byteBuf, offset);
Map<String, String> properties = new LinkedHashMap<>();
while (offset.get() < byteBuf.readableBytes()) { //TODO: What is the best conditional here?
while (offset.get() < byteBuf.readableBytes()) { // TODO: What is the best
// conditional here?
String key = decodeString(byteBuf, offset);
String value = null;
if (offset.get() < byteBuf.readableBytes()) {
@ -148,10 +156,12 @@ public class Metadata { @@ -148,10 +156,12 @@ public class Metadata {
* equal values (ignoring case) of leftMetadata.
* @param leftMetadata first metadata to compare.
* @param rightMetadata second metadata to compare.
* @return true if all keys and values (case-insensitive) from leftMetadata are in rightMetadata.
* @return true if all keys and values (case-insensitive) from leftMetadata are in
* rightMetadata.
*/
//TODO: find a way to make this more performant
public static boolean matches(Map<String, String> leftMetadata, Map<String, String> rightMetadata) {
// TODO: find a way to make this more performant
public static boolean matches(Map<String, String> leftMetadata,
Map<String, String> rightMetadata) {
if (leftMetadata == null || rightMetadata == null) {
return false;
}
@ -159,17 +169,19 @@ public class Metadata { @@ -159,17 +169,19 @@ public class Metadata {
for (Map.Entry<String, String> entry : leftMetadata.entrySet()) {
String enrichedValue = rightMetadata.get(entry.getKey());
if (enrichedValue == null ||
//TODO: regex and possibly SpEL?
// TODO: regex and possibly SpEL?
!enrichedValue.equalsIgnoreCase(entry.getValue())) {
return false;
}
}
// all entries in metadata exist and match corresponding entries in enriched.metadata
// all entries in metadata exist and match corresponding entries in
// enriched.metadata
return true;
}
public static class Builder {
private final Metadata metadata;
public Builder(String name) {
@ -189,6 +201,7 @@ public class Metadata { @@ -189,6 +201,7 @@ public class Metadata {
public ByteBuf encode() {
return Metadata.encode(build());
}
}
}

18
spring-cloud-gateway-rsocket/src/test/java/org/springframework/cloud/gateway/rsocket/autoconfigure/GatewayRSocketAutoConfigurationTests.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.autoconfigure;
@ -23,11 +22,9 @@ import org.springframework.boot.actuate.autoconfigure.metrics.CompositeMeterRegi @@ -23,11 +22,9 @@ import org.springframework.boot.actuate.autoconfigure.metrics.CompositeMeterRegi
import org.springframework.boot.actuate.autoconfigure.metrics.MetricsAutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ReactiveWebApplicationContextRunner;
import org.springframework.cloud.gateway.rsocket.metrics.MicrometerResponderRSocketInterceptor;
import org.springframework.cloud.gateway.rsocket.registry.Registry;
import org.springframework.cloud.gateway.rsocket.registry.RegistryRoutes;
import org.springframework.cloud.gateway.rsocket.registry.RegistrySocketAcceptorFilter;
import org.springframework.cloud.gateway.rsocket.server.GatewayRSocket;
import org.springframework.cloud.gateway.rsocket.server.GatewayRSocketServer;
import org.springframework.cloud.gateway.rsocket.socketacceptor.GatewaySocketAcceptor;
import org.springframework.cloud.gateway.rsocket.socketacceptor.SocketAcceptorPredicate;
@ -40,21 +37,18 @@ public class GatewayRSocketAutoConfigurationTests { @@ -40,21 +37,18 @@ public class GatewayRSocketAutoConfigurationTests {
@Test
public void gatewayRSocketConfigured() {
new ReactiveWebApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(
GatewayRSocketAutoConfiguration.class,
CompositeMeterRegistryAutoConfiguration.class,
MetricsAutoConfiguration.class
))
.run(context -> assertThat(context)
.hasSingleBean(Registry.class)
.withConfiguration(
AutoConfigurations.of(GatewayRSocketAutoConfiguration.class,
CompositeMeterRegistryAutoConfiguration.class,
MetricsAutoConfiguration.class))
.run(context -> assertThat(context).hasSingleBean(Registry.class)
.hasSingleBean(RegistryRoutes.class)
.hasSingleBean(RegistrySocketAcceptorFilter.class)
.hasSingleBean(GatewayRSocketServer.class)
.hasSingleBean(GatewayRSocketProperties.class)
.hasSingleBean(GatewaySocketAcceptor.class)
.hasSingleBean(SocketAcceptorPredicateFilter.class)
.doesNotHaveBean(SocketAcceptorPredicate.class)
);
.doesNotHaveBean(SocketAcceptorPredicate.class));
}
}

18
spring-cloud-gateway-rsocket/src/test/java/org/springframework/cloud/gateway/rsocket/server/GatewayRSocketIntegrationTests.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.server;
@ -34,9 +33,8 @@ import org.springframework.util.SocketUtils; @@ -34,9 +33,8 @@ import org.springframework.util.SocketUtils;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = PingPongApp.class,
properties = { "ping.take=5" },
webEnvironment = WebEnvironment.RANDOM_PORT)
@SpringBootTest(classes = PingPongApp.class, properties = {
"ping.take=5" }, webEnvironment = WebEnvironment.RANDOM_PORT)
public class GatewayRSocketIntegrationTests {
private static int port;
@ -59,7 +57,8 @@ public class GatewayRSocketIntegrationTests { @@ -59,7 +57,8 @@ public class GatewayRSocketIntegrationTests {
@BeforeClass
public static void init() {
port = SocketUtils.findAvailableTcpPort();
System.setProperty("spring.cloud.gateway.rsocket.server.port", String.valueOf(port));
System.setProperty("spring.cloud.gateway.rsocket.server.port",
String.valueOf(port));
}
@AfterClass
@ -68,12 +67,9 @@ public class GatewayRSocketIntegrationTests { @@ -68,12 +67,9 @@ public class GatewayRSocketIntegrationTests {
}
@Test
public void contextLoads() {
StepVerifier.create(ping.getPongFlux())
.expectSubscription()
.then(() -> server.stop())
.thenConsumeWhile(s -> true)
.verifyComplete();
public void contextLoads() {
StepVerifier.create(ping.getPongFlux()).expectSubscription()
.then(() -> server.stop()).thenConsumeWhile(s -> true).verifyComplete();
assertThat(ping.getPongsReceived()).isGreaterThan(0);
assertThat(pong.getPingsReceived()).isGreaterThan(0);

133
spring-cloud-gateway-rsocket/src/test/java/org/springframework/cloud/gateway/rsocket/server/GatewayRSocketTests.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.server;
@ -46,9 +45,7 @@ import org.springframework.cloud.gateway.rsocket.route.Route; @@ -46,9 +45,7 @@ import org.springframework.cloud.gateway.rsocket.route.Route;
import org.springframework.cloud.gateway.rsocket.route.Routes;
import org.springframework.cloud.gateway.rsocket.support.Metadata;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -61,9 +58,10 @@ public class GatewayRSocketTests { @@ -61,9 +58,10 @@ public class GatewayRSocketTests {
private static Log logger = LogFactory.getLog(GatewayRSocketTests.class);
private Registry registry;
private Payload incomingPayload;
//TODO: add tests for metrics and other request types
// TODO: add tests for metrics and other request types
@Before
public void init() {
@ -83,95 +81,94 @@ public class GatewayRSocketTests { @@ -83,95 +81,94 @@ public class GatewayRSocketTests {
.thenReturn(Mono.just(DefaultPayload.create("response")));
}
@Test
public void multipleFilters() {
public void multipleFilters() {
TestFilter filter1 = new TestFilter();
TestFilter filter2 = new TestFilter();
TestFilter filter3 = new TestFilter();
Payload payload = new TestGatewayRSocket(registry, new TestRoutes(filter1, filter2, filter3))
.requestResponse(incomingPayload)
.block(Duration.ZERO);
Payload payload = new TestGatewayRSocket(registry,
new TestRoutes(filter1, filter2, filter3))
.requestResponse(incomingPayload).block(Duration.ZERO);
assertTrue(filter1.invoked());
assertTrue(filter2.invoked());
assertTrue(filter3.invoked());
assertNotNull(payload);
assertThat(filter1.invoked()).isTrue();
assertThat(filter2.invoked()).isTrue();
assertThat(filter3.invoked()).isTrue();
assertThat(payload).isNotNull();
}
@Test
public void zeroFilters() {
public void zeroFilters() {
Payload payload = new TestGatewayRSocket(registry, new TestRoutes())
.requestResponse(incomingPayload)
.block(Duration.ZERO);
.requestResponse(incomingPayload).block(Duration.ZERO);
assertNotNull(payload);
assertThat(payload).isNotNull();
}
@Test
public void shortcircuitFilter() {
public void shortcircuitFilter() {
TestFilter filter1 = new TestFilter();
ShortcircuitingFilter filter2 = new ShortcircuitingFilter();
TestFilter filter3 = new TestFilter();
TestGatewayRSocket gatewayRSocket = new TestGatewayRSocket(registry, new TestRoutes(filter1, filter2, filter3));
TestGatewayRSocket gatewayRSocket = new TestGatewayRSocket(registry,
new TestRoutes(filter1, filter2, filter3));
Mono<Payload> response = gatewayRSocket.requestResponse(incomingPayload);
// a false filter will create a pending rsocket that blocks forever
// this tweaks the rsocket to compelte.
gatewayRSocket.processor.onNext(null);
StepVerifier.withVirtualTime(() -> response)
.expectSubscription()
StepVerifier.withVirtualTime(() -> response).expectSubscription()
.verifyComplete();
assertTrue(filter1.invoked());
assertTrue(filter2.invoked());
assertFalse(filter3.invoked());
assertThat(filter1.invoked()).isTrue();
assertThat(filter2.invoked()).isTrue();
assertThat(filter3.invoked()).isFalse();
}
@Test
public void asyncFilter() {
public void asyncFilter() {
AsyncFilter filter = new AsyncFilter();
Payload payload = new TestGatewayRSocket(registry, new TestRoutes(filter))
.requestResponse(incomingPayload)
.block(Duration.ofSeconds(5));
.requestResponse(incomingPayload).block(Duration.ofSeconds(5));
assertTrue(filter.invoked());
assertNotNull(payload);
assertThat(filter.invoked()).isTrue();
assertThat(payload).isNotNull();
}
//TODO: add exception handlers?
// TODO: add exception handlers?
@Test(expected = IllegalStateException.class)
public void handleErrorFromFilter() {
public void handleErrorFromFilter() {
ExceptionFilter filter = new ExceptionFilter();
new TestGatewayRSocket(registry, new TestRoutes(filter))
.requestResponse(incomingPayload)
.block(Duration.ofSeconds(5));
.requestResponse(incomingPayload).block(Duration.ofSeconds(5));
// assertNull(socket);
}
private static Metadata getMetadata() {
return Metadata.from("service").with("id", "service1").build();
}
private static class TestGatewayRSocket extends GatewayRSocket {
private final MonoProcessor<RSocket> processor = MonoProcessor.create();
public TestGatewayRSocket(Registry registry, Routes routes) {
super(registry, routes, new SimpleMeterRegistry(), new GatewayRSocketProperties(),
getMetadata());
TestGatewayRSocket(Registry registry, Routes routes) {
super(registry, routes, new SimpleMeterRegistry(),
new GatewayRSocketProperties(), getMetadata());
}
@Override
PendingRequestRSocket constructPendingRSocket(GatewayExchange exchange) {
Function<Registry.RegisteredEvent, Mono<Route>> routeFinder = registeredEvent ->
getRouteMono(registeredEvent, exchange);
Function<Registry.RegisteredEvent, Mono<Route>> routeFinder = registeredEvent -> getRouteMono(
registeredEvent, exchange);
return new PendingRequestRSocket(routeFinder, map -> {
Tags tags = exchange.getTags().and("requester.id", map.get("id"));
exchange.setTags(tags);
@ -181,42 +178,36 @@ public class GatewayRSocketTests { @@ -181,42 +178,36 @@ public class GatewayRSocketTests {
public MonoProcessor<RSocket> getProcessor() {
return processor;
}
}
private static Metadata getMetadata() {
return Metadata.from("service")
.with("id", "service1")
.build();
}
private static class TestRoutes implements Routes {
private final Route route;
private List<GatewayFilter> filters;
public TestRoutes() {
TestRoutes() {
this(Collections.emptyList());
}
public TestRoutes(GatewayFilter... filters) {
TestRoutes(GatewayFilter... filters) {
this(Arrays.asList(filters));
}
public TestRoutes(List<GatewayFilter> filters) {
TestRoutes(List<GatewayFilter> filters) {
this.filters = filters;
route = Route.builder()
.id("route1")
route = Route.builder().id("route1")
.routingMetadata(Metadata.from("mock").build())
.predicate(exchange -> Mono.just(true))
.filters(filters)
.build();
.predicate(exchange -> Mono.just(true)).filters(filters).build();
}
@Override
public Flux<Route> getRoutes() {
return Flux.just(route);
}
}
}
private static class TestFilter implements GatewayFilter {
@ -232,24 +223,28 @@ public class GatewayRSocketTests { @@ -232,24 +223,28 @@ public class GatewayRSocketTests {
return doFilter(exchange, chain);
}
public Mono<Success> doFilter(GatewayExchange exchange, GatewayFilterChain chain) {
public Mono<Success> doFilter(GatewayExchange exchange,
GatewayFilterChain chain) {
return chain.filter(exchange);
}
}
}
private static class ShortcircuitingFilter extends TestFilter {
@Override
public Mono<Success> doFilter(GatewayExchange exchange, GatewayFilterChain chain) {
public Mono<Success> doFilter(GatewayExchange exchange,
GatewayFilterChain chain) {
return Mono.empty();
}
}
private static class AsyncFilter extends TestFilter {
@Override
public Mono<Success> doFilter(GatewayExchange exchange, GatewayFilterChain chain) {
public Mono<Success> doFilter(GatewayExchange exchange,
GatewayFilterChain chain) {
return doAsyncWork().flatMap(asyncResult -> {
logger.debug("Async result: " + asyncResult);
return chain.filter(exchange);
@ -259,8 +254,8 @@ public class GatewayRSocketTests { @@ -259,8 +254,8 @@ public class GatewayRSocketTests {
private Mono<String> doAsyncWork() {
return Mono.delay(Duration.ofMillis(100L)).map(l -> "123");
}
}
}
private static class ExceptionFilter implements GatewayFilter {
@ -268,18 +263,16 @@ public class GatewayRSocketTests { @@ -268,18 +263,16 @@ public class GatewayRSocketTests {
public Mono<Success> filter(GatewayExchange exchange, GatewayFilterChain chain) {
return Mono.error(new IllegalStateException("boo"));
}
}
/*private static class TestExceptionHandler implements WebExceptionHandler {
private Throwable ex;
}
@Override
public Mono<Void> handle(GatewayExchange exchange, Throwable ex) {
this.ex = ex;
return Mono.error(ex);
}
}*/
/*
* private static class TestExceptionHandler implements WebExceptionHandler {
*
* private Throwable ex;
*
* @Override public Mono<Void> handle(GatewayExchange exchange, Throwable ex) {
* this.ex = ex; return Mono.error(ex); } }
*/
}

89
spring-cloud-gateway-rsocket/src/test/java/org/springframework/cloud/gateway/rsocket/socketacceptor/GatewaySocketAcceptorTests.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.socketacceptor;
@ -49,10 +48,14 @@ public class GatewaySocketAcceptorTests { @@ -49,10 +48,14 @@ public class GatewaySocketAcceptorTests {
private static Log logger = LogFactory.getLog(GatewaySocketAcceptorTests.class);
private GatewayRSocket.Factory factory;
private ConnectionSetupPayload setupPayload;
private RSocket sendingSocket;
private MeterRegistry meterRegistry;
private GatewayRSocketProperties properties = new GatewayRSocketProperties();
private GatewayRSocketProperties properties = new GatewayRSocketProperties();
@Before
public void init() {
@ -61,10 +64,11 @@ public class GatewaySocketAcceptorTests { @@ -61,10 +64,11 @@ public class GatewaySocketAcceptorTests {
this.sendingSocket = mock(RSocket.class);
this.meterRegistry = new SimpleMeterRegistry();
when(this.factory.create(any(Metadata.class))).thenReturn(mock(GatewayRSocket.class));
when(this.factory.create(any(Metadata.class)))
.thenReturn(mock(GatewayRSocket.class));
}
//TODO: test metrics
// TODO: test metrics
@Test
public void multipleFilters() {
@ -72,10 +76,10 @@ public class GatewaySocketAcceptorTests { @@ -72,10 +76,10 @@ public class GatewaySocketAcceptorTests {
TestFilter filter2 = new TestFilter();
TestFilter filter3 = new TestFilter();
RSocket socket = new GatewaySocketAcceptor(this.factory, Arrays.asList(filter1, filter2, filter3),
this.meterRegistry, this.properties)
.accept(this.setupPayload, this.sendingSocket)
.block(Duration.ZERO);
RSocket socket = new GatewaySocketAcceptor(this.factory,
Arrays.asList(filter1, filter2, filter3), this.meterRegistry,
this.properties).accept(this.setupPayload, this.sendingSocket)
.block(Duration.ZERO);
assertThat(filter1.invoked()).isTrue();
assertThat(filter2.invoked()).isTrue();
@ -85,10 +89,10 @@ public class GatewaySocketAcceptorTests { @@ -85,10 +89,10 @@ public class GatewaySocketAcceptorTests {
@Test
public void zeroFilters() {
RSocket socket = new GatewaySocketAcceptor(this.factory, Collections.emptyList(),
RSocket socket = new GatewaySocketAcceptor(this.factory, Collections.emptyList(),
this.meterRegistry, this.properties)
.accept(this.setupPayload, this.sendingSocket)
.block(Duration.ZERO);
.accept(this.setupPayload, this.sendingSocket)
.block(Duration.ZERO);
assertThat(socket).isNotNull();
}
@ -100,11 +104,10 @@ public class GatewaySocketAcceptorTests { @@ -100,11 +104,10 @@ public class GatewaySocketAcceptorTests {
ShortcircuitingFilter filter2 = new ShortcircuitingFilter();
TestFilter filter3 = new TestFilter();
RSocket socket = new GatewaySocketAcceptor(this.factory, Arrays.asList(filter1, filter2, filter3),
this.meterRegistry, this.properties)
.accept(this.setupPayload, this.sendingSocket)
.block(Duration.ZERO);
RSocket socket = new GatewaySocketAcceptor(this.factory,
Arrays.asList(filter1, filter2, filter3), this.meterRegistry,
this.properties).accept(this.setupPayload, this.sendingSocket)
.block(Duration.ZERO);
assertThat(filter1.invoked()).isTrue();
assertThat(filter2.invoked()).isTrue();
@ -117,25 +120,24 @@ public class GatewaySocketAcceptorTests { @@ -117,25 +120,24 @@ public class GatewaySocketAcceptorTests {
AsyncFilter filter = new AsyncFilter();
RSocket socket = new GatewaySocketAcceptor(this.factory, singletonList(filter),
RSocket socket = new GatewaySocketAcceptor(this.factory, singletonList(filter),
this.meterRegistry, this.properties)
.accept(this.setupPayload, this.sendingSocket)
.block(Duration.ofSeconds(5));
.accept(this.setupPayload, this.sendingSocket)
.block(Duration.ofSeconds(5));
assertThat(filter.invoked()).isTrue();
assertThat(socket).isNotNull();
}
//TODO: add exception handlers?
// TODO: add exception handlers?
@Test(expected = IllegalStateException.class)
public void handleErrorFromFilter() {
ExceptionFilter filter = new ExceptionFilter();
new GatewaySocketAcceptor(this.factory, singletonList(filter),
this.meterRegistry, this.properties)
.accept(this.setupPayload, this.sendingSocket)
.block(Duration.ofSeconds(5));
new GatewaySocketAcceptor(this.factory, singletonList(filter), this.meterRegistry,
this.properties).accept(this.setupPayload, this.sendingSocket)
.block(Duration.ofSeconds(5));
}
@ -148,29 +150,34 @@ public class GatewaySocketAcceptorTests { @@ -148,29 +150,34 @@ public class GatewaySocketAcceptorTests {
}
@Override
public Mono<Success> filter(SocketAcceptorExchange exchange, SocketAcceptorFilterChain chain) {
public Mono<Success> filter(SocketAcceptorExchange exchange,
SocketAcceptorFilterChain chain) {
this.invoked = true;
return doFilter(exchange, chain);
}
public Mono<Success> doFilter(SocketAcceptorExchange exchange, SocketAcceptorFilterChain chain) {
public Mono<Success> doFilter(SocketAcceptorExchange exchange,
SocketAcceptorFilterChain chain) {
return chain.filter(exchange);
}
}
}
private static class ShortcircuitingFilter extends TestFilter {
@Override
public Mono<Success> doFilter(SocketAcceptorExchange exchange, SocketAcceptorFilterChain chain) {
public Mono<Success> doFilter(SocketAcceptorExchange exchange,
SocketAcceptorFilterChain chain) {
return Mono.empty();
}
}
private static class AsyncFilter extends TestFilter {
@Override
public Mono<Success> doFilter(SocketAcceptorExchange exchange, SocketAcceptorFilterChain chain) {
public Mono<Success> doFilter(SocketAcceptorExchange exchange,
SocketAcceptorFilterChain chain) {
return doAsyncWork().flatMap(asyncResult -> {
logger.debug("Async result: " + asyncResult);
return chain.filter(exchange);
@ -180,26 +187,26 @@ public class GatewaySocketAcceptorTests { @@ -180,26 +187,26 @@ public class GatewaySocketAcceptorTests {
private Mono<String> doAsyncWork() {
return Mono.delay(Duration.ofMillis(100L)).map(l -> "123");
}
}
private static class ExceptionFilter implements SocketAcceptorFilter {
@Override
public Mono<Success> filter(SocketAcceptorExchange exchange, SocketAcceptorFilterChain chain) {
public Mono<Success> filter(SocketAcceptorExchange exchange,
SocketAcceptorFilterChain chain) {
return Mono.error(new IllegalStateException("boo"));
}
}
/*private static class TestExceptionHandler implements WebExceptionHandler {
private Throwable ex;
}
@Override
public Mono<Void> handle(SocketAcceptorExchange exchange, Throwable ex) {
this.ex = ex;
return Mono.error(ex);
}
}*/
/*
* private static class TestExceptionHandler implements WebExceptionHandler {
*
* private Throwable ex;
*
* @Override public Mono<Void> handle(SocketAcceptorExchange exchange, Throwable ex) {
* this.ex = ex; return Mono.error(ex); } }
*/
}

37
spring-cloud-gateway-rsocket/src/test/java/org/springframework/cloud/gateway/rsocket/socketacceptor/SocketAcceptorPredicateFilterTests.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.socketacceptor;
@ -38,18 +37,14 @@ public class SocketAcceptorPredicateFilterTests { @@ -38,18 +37,14 @@ public class SocketAcceptorPredicateFilterTests {
@Test
public void noPredicateWorks() {
Mono<Success> result = runFilter(Collections.emptyList());
StepVerifier.create(result)
.expectNext(Success.INSTANCE)
.verifyComplete();
StepVerifier.create(result).expectNext(Success.INSTANCE).verifyComplete();
}
@Test
public void singleTruePredicateWorks() {
TestPredicate predicate = new TestPredicate(true);
Mono<Success> result = runFilter(predicate);
StepVerifier.create(result)
.expectNext(Success.INSTANCE)
.verifyComplete();
StepVerifier.create(result).expectNext(Success.INSTANCE).verifyComplete();
assertThat(predicate.invoked()).isTrue();
}
@ -57,8 +52,7 @@ public class SocketAcceptorPredicateFilterTests { @@ -57,8 +52,7 @@ public class SocketAcceptorPredicateFilterTests {
public void singleFalsePredicateWorks() {
TestPredicate predicate = new TestPredicate(false);
Mono<Success> result = runFilter(predicate);
StepVerifier.create(result)
.verifyComplete();
StepVerifier.create(result).verifyComplete();
assertThat(predicate.invoked()).isTrue();
}
@ -68,11 +62,10 @@ public class SocketAcceptorPredicateFilterTests { @@ -68,11 +62,10 @@ public class SocketAcceptorPredicateFilterTests {
TestPredicate predicate = new TestPredicate(false);
TestPredicate predicate2 = new TestPredicate(false);
Mono<Success> result = runFilter(predicate, predicate2);
StepVerifier.create(result)
.verifyComplete();
StepVerifier.create(result).verifyComplete();
assertThat(predicate.invoked()).isTrue();
assertThat(predicate2.invoked()).isTrue(); //Async predicates don't short circuit
assertThat(predicate2.invoked()).isTrue(); // Async predicates don't short circuit
}
@Test
@ -80,8 +73,7 @@ public class SocketAcceptorPredicateFilterTests { @@ -80,8 +73,7 @@ public class SocketAcceptorPredicateFilterTests {
TestPredicate truePredicate = new TestPredicate(true);
TestPredicate falsePredicate = new TestPredicate(false);
Mono<Success> result = runFilter(truePredicate, falsePredicate);
StepVerifier.create(result)
.verifyComplete();
StepVerifier.create(result).verifyComplete();
assertThat(truePredicate.invoked()).isTrue();
assertThat(falsePredicate.invoked()).isTrue();
}
@ -91,9 +83,7 @@ public class SocketAcceptorPredicateFilterTests { @@ -91,9 +83,7 @@ public class SocketAcceptorPredicateFilterTests {
TestPredicate truePredicate = new TestPredicate(true);
TestPredicate truePredicate2 = new TestPredicate(true);
Mono<Success> result = runFilter(truePredicate, truePredicate2);
StepVerifier.create(result)
.expectNext(Success.INSTANCE)
.verifyComplete();
StepVerifier.create(result).expectNext(Success.INSTANCE).verifyComplete();
assertThat(truePredicate.invoked()).isTrue();
assertThat(truePredicate2.invoked()).isTrue();
}
@ -107,9 +97,12 @@ public class SocketAcceptorPredicateFilterTests { @@ -107,9 +97,12 @@ public class SocketAcceptorPredicateFilterTests {
}
private Mono<Success> runFilter(List<SocketAcceptorPredicate> predicates) {
SocketAcceptorPredicateFilter filter = new SocketAcceptorPredicateFilter(predicates);
SocketAcceptorExchange exchange = new SocketAcceptorExchange(mock(ConnectionSetupPayload.class), mock(RSocket.class));
SocketAcceptorFilterChain filterChain = new SocketAcceptorFilterChain(Collections.singletonList(filter));
SocketAcceptorPredicateFilter filter = new SocketAcceptorPredicateFilter(
predicates);
SocketAcceptorExchange exchange = new SocketAcceptorExchange(
mock(ConnectionSetupPayload.class), mock(RSocket.class));
SocketAcceptorFilterChain filterChain = new SocketAcceptorFilterChain(
Collections.singletonList(filter));
return filter.filter(exchange, filterChain);
}
@ -119,7 +112,7 @@ public class SocketAcceptorPredicateFilterTests { @@ -119,7 +112,7 @@ public class SocketAcceptorPredicateFilterTests {
private final Mono<Boolean> test;
public TestPredicate(boolean value) {
TestPredicate(boolean value) {
test = Mono.just(value);
}
@ -132,5 +125,7 @@ public class SocketAcceptorPredicateFilterTests { @@ -132,5 +125,7 @@ public class SocketAcceptorPredicateFilterTests {
public boolean invoked() {
return invoked;
}
}
}

13
spring-cloud-gateway-rsocket/src/test/java/org/springframework/cloud/gateway/rsocket/support/MetadataTests.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.support;
@ -39,14 +38,11 @@ public class MetadataTests { @@ -39,14 +38,11 @@ public class MetadataTests {
@Test
public void encodeAndDecodeWorks() {
ByteBuf byteBuf = Metadata.from("test1")
.with("key1111", "val111111")
.with("key22", "val222")
.encode();
ByteBuf byteBuf = Metadata.from("test1").with("key1111", "val111111")
.with("key22", "val222").encode();
Metadata metadata = assertMetadata(byteBuf, "test1");
Map<String, String> properties = metadata.getProperties();
assertThat(properties).hasSize(2)
.containsOnlyKeys("key1111", "key22")
assertThat(properties).hasSize(2).containsOnlyKeys("key1111", "key22")
.containsValues("val111111", "val222");
}
@ -82,7 +78,8 @@ public class MetadataTests { @@ -82,7 +78,8 @@ public class MetadataTests {
private Map<String, String> metadata(int size) {
Assert.isTrue(size > 0, "size must be > 0");
HashMap<String, String> metadata = new HashMap<>();
IntStream.rangeClosed(1, size).forEach(i -> metadata.put("key"+i, "val"+i));
IntStream.rangeClosed(1, size).forEach(i -> metadata.put("key" + i, "val" + i));
return metadata;
}
}

142
spring-cloud-gateway-rsocket/src/test/java/org/springframework/cloud/gateway/rsocket/test/PingPongApp.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.test;
@ -90,17 +89,18 @@ public class PingPongApp { @@ -90,17 +89,18 @@ public class PingPongApp {
in = in.substring(0, 4);
}
switch (in.toLowerCase()) {
case "ping":
return "pong";
case "pong":
return "ping";
default:
throw new IllegalArgumentException("Value must be ping or pong, not " + in);
case "ping":
return "pong";
case "pong":
return "ping";
default:
throw new IllegalArgumentException("Value must be ping or pong, not " + in);
}
}
@Slf4j
public static class Ping implements Ordered, ApplicationListener<ApplicationReadyEvent> {
public static class Ping
implements Ordered, ApplicationListener<ApplicationReadyEvent> {
@Autowired
private MeterRegistry meterRegistry;
@ -108,6 +108,7 @@ public class PingPongApp { @@ -108,6 +108,7 @@ public class PingPongApp {
private final String id;
private final AtomicInteger pongsReceived = new AtomicInteger();
private Flux<String> pongFlux;
public Ping(String id) {
@ -121,44 +122,53 @@ public class PingPongApp { @@ -121,44 +122,53 @@ public class PingPongApp {
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
log.info("Starting Ping"+id);
log.info("Starting Ping" + id);
ConfigurableEnvironment env = event.getApplicationContext().getEnvironment();
Integer take = env.getProperty("ping.take", Integer.class, null);
Integer gatewayPort = env.getProperty("spring.cloud.gateway.rsocket.server.port",
Integer.class, 7002);
Integer gatewayPort = env.getProperty(
"spring.cloud.gateway.rsocket.server.port", Integer.class, 7002);
log.debug("ping.take: " + take);
MicrometerRSocketInterceptor interceptor = new MicrometerRSocketInterceptor(meterRegistry, Tag.of("component", "ping"));
ByteBuf announcementMetadata = Metadata.from("ping").with("id", "ping"+id).encode();
MicrometerRSocketInterceptor interceptor = new MicrometerRSocketInterceptor(
meterRegistry, Tag.of("component", "ping"));
ByteBuf announcementMetadata = Metadata.from("ping").with("id", "ping" + id)
.encode();
pongFlux = RSocketFactory.connect()
.metadataMimeType(Metadata.ROUTING_MIME_TYPE)
.setupPayload(DefaultPayload.create(EMPTY_BUFFER, announcementMetadata))
.setupPayload(
DefaultPayload.create(EMPTY_BUFFER, announcementMetadata))
.addClientPlugin(interceptor)
.transport(TcpClientTransport.create(gatewayPort)) // proxy
.start()
.flatMapMany(socket ->
{
Flux<String> pong = socket.requestChannel(
Flux.interval(Duration.ofSeconds(1))
.map(i -> {
ByteBuf data = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "ping" + id);
ByteBuf routingMetadata = Metadata.from("pong").encode();
return DefaultPayload.create(data, routingMetadata);
})
.onBackpressureDrop(payload -> log.debug("Dropped payload " + payload.getDataUtf8())) // this is needed in case pong is not available yet
).map(Payload::getDataUtf8)
.doOnNext(str -> {
int received = pongsReceived.incrementAndGet();
log.info("received " + str + "(" + received + ") in Ping" + id);
})
.doFinally(signal -> socket.dispose());
if (take != null) {
return pong.take(take);
}
return pong;
}
);
.start().flatMapMany(socket -> {
Flux<String> pong = socket.requestChannel(
Flux.interval(Duration.ofSeconds(1)).map(i -> {
ByteBuf data = ByteBufUtil.writeUtf8(
ByteBufAllocator.DEFAULT, "ping" + id);
ByteBuf routingMetadata = Metadata.from("pong")
.encode();
return DefaultPayload.create(data, routingMetadata);
}).onBackpressureDrop(payload -> log.debug(
"Dropped payload " + payload.getDataUtf8())) // this
// is
// needed
// in
// case
// pong
// is
// not
// available
// yet
).map(Payload::getDataUtf8).doOnNext(str -> {
int received = pongsReceived.incrementAndGet();
log.info("received " + str + "(" + received + ") in Ping"
+ id);
}).doFinally(signal -> socket.dispose());
if (take != null) {
return pong.take(take);
}
return pong;
});
pongFlux.subscribe();
}
@ -170,10 +180,12 @@ public class PingPongApp { @@ -170,10 +180,12 @@ public class PingPongApp {
public int getPongsReceived() {
return pongsReceived.get();
}
}
@Slf4j
public static class Pong implements Ordered, ApplicationListener<ApplicationReadyEvent> {
public static class Pong
implements Ordered, ApplicationListener<ApplicationReadyEvent> {
@Autowired
private MeterRegistry meterRegistry;
@ -191,22 +203,23 @@ public class PingPongApp { @@ -191,22 +203,23 @@ public class PingPongApp {
Integer pongDelay = env.getProperty("pong.delay", Integer.class, 5000);
try {
Thread.sleep(pongDelay);
} catch (InterruptedException e) {
}
catch (InterruptedException e) {
e.printStackTrace();
}
log.info("Starting Pong");
Integer gatewayPort = env.getProperty("spring.cloud.gateway.rsocket.server.port",
Integer.class, 7002);
MicrometerRSocketInterceptor interceptor = new MicrometerRSocketInterceptor(meterRegistry, Tag.of("component", "pong"));
ByteBuf announcementMetadata = Metadata.from("pong").with("id", "pong1").encode();
RSocketFactory.connect()
.metadataMimeType(Metadata.ROUTING_MIME_TYPE)
.setupPayload(DefaultPayload.create(EMPTY_BUFFER, announcementMetadata))
.addClientPlugin(interceptor)
.acceptor(this::accept)
Integer gatewayPort = env.getProperty(
"spring.cloud.gateway.rsocket.server.port", Integer.class, 7002);
MicrometerRSocketInterceptor interceptor = new MicrometerRSocketInterceptor(
meterRegistry, Tag.of("component", "pong"));
ByteBuf announcementMetadata = Metadata.from("pong").with("id", "pong1")
.encode();
RSocketFactory.connect().metadataMimeType(Metadata.ROUTING_MIME_TYPE)
.setupPayload(
DefaultPayload.create(EMPTY_BUFFER, announcementMetadata))
.addClientPlugin(interceptor).acceptor(this::accept)
.transport(TcpClientTransport.create(gatewayPort)) // proxy
.start()
.block();
.start().block();
}
@SuppressWarnings("Duplicates")
@ -215,18 +228,15 @@ public class PingPongApp { @@ -215,18 +228,15 @@ public class PingPongApp {
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads)
.map(Payload::getDataUtf8)
.doOnNext(str -> {
int received = pingsReceived.incrementAndGet();
log.info("received " + str + "("+received+") in Pong");
})
.map(PingPongApp::reply)
.map(reply -> {
ByteBuf data = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, reply);
ByteBuf routingMetadata = Metadata.from("ping").encode();
return DefaultPayload.create(data, routingMetadata);
});
return Flux.from(payloads).map(Payload::getDataUtf8).doOnNext(str -> {
int received = pingsReceived.incrementAndGet();
log.info("received " + str + "(" + received + ") in Pong");
}).map(PingPongApp::reply).map(reply -> {
ByteBuf data = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT,
reply);
ByteBuf routingMetadata = Metadata.from("ping").encode();
return DefaultPayload.create(data, routingMetadata);
});
}
};
return pong;
@ -235,10 +245,12 @@ public class PingPongApp { @@ -235,10 +245,12 @@ public class PingPongApp {
public int getPingsReceived() {
return pingsReceived.get();
}
}
@Slf4j
public static class MyGatewayFilter implements GatewayFilter {
private AtomicBoolean invoked = new AtomicBoolean(false);
@Override
@ -251,6 +263,7 @@ public class PingPongApp { @@ -251,6 +263,7 @@ public class PingPongApp {
public boolean invoked() {
return invoked.get();
}
}
@Slf4j
@ -259,7 +272,8 @@ public class PingPongApp { @@ -259,7 +272,8 @@ public class PingPongApp {
private AtomicBoolean invoked = new AtomicBoolean(false);
@Override
public Mono<Success> filter(SocketAcceptorExchange exchange, SocketAcceptorFilterChain chain) {
public Mono<Success> filter(SocketAcceptorExchange exchange,
SocketAcceptorFilterChain chain) {
log.info("in custom socket acceptor filter");
invoked.compareAndSet(false, true);
return chain.filter(exchange);
@ -273,7 +287,7 @@ public class PingPongApp { @@ -273,7 +287,7 @@ public class PingPongApp {
public boolean invoked() {
return invoked.get();
}
}
}
}

14
spring-cloud-gateway-rsocket/src/test/java/org/springframework/cloud/gateway/rsocket/test/SocketAcceptorFilterOrderTests.java

@ -12,7 +12,6 @@ @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.springframework.cloud.gateway.rsocket.test;
@ -36,12 +35,15 @@ public class SocketAcceptorFilterOrderTests { @@ -36,12 +35,15 @@ public class SocketAcceptorFilterOrderTests {
@Test
public void predicateFilterAfterRegistryFilter() {
SocketAcceptorFilter predicateFilter = new SocketAcceptorPredicateFilter(Collections.emptyList());
SocketAcceptorFilter registryFilter = new RegistrySocketAcceptorFilter(mock(Registry.class));
List<SocketAcceptorFilter> filters = Arrays.asList(predicateFilter, registryFilter);
SocketAcceptorFilter predicateFilter = new SocketAcceptorPredicateFilter(
Collections.emptyList());
SocketAcceptorFilter registryFilter = new RegistrySocketAcceptorFilter(
mock(Registry.class));
List<SocketAcceptorFilter> filters = Arrays.asList(predicateFilter,
registryFilter);
OrderComparator.sort(filters);
assertThat(filters)
.containsExactly(registryFilter, predicateFilter);
assertThat(filters).containsExactly(registryFilter, predicateFilter);
}
}

1
src/checkstyle/checkstyle-suppressions.xml

@ -3,6 +3,7 @@ @@ -3,6 +3,7 @@
"-//Puppy Crawl//DTD Suppressions 1.1//EN"
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
<suppressions>
<suppress files=".*AbstractFilterChain\.java" checks="LineLengthCheck"/>
<suppress files=".*AbstractNameValueGatewayFilterFactory\.java" checks="LineLengthCheck"/>
<suppress files=".*AdhocTestSuite\.java" checks="LineLengthCheck"/>
<suppress files=".*CloudFoundryRouteServiceRoutePredicateFactory\.java" checks="JavadocTagContinuationIndentationCheck"/>

Loading…
Cancel
Save