Browse Source

Moves from Map<String, String> to Metadata type.

Aligns closer to future changes to announcement and routing metadata
extensions.
pull/921/head
Spencer Gibb 6 years ago
parent
commit
f0a9ce02c8
No known key found for this signature in database
GPG Key ID: 7788A47380690861
  1. 4
      src/main/java/org/springframework/cloud/gateway/rsocket/autoconfigure/GatewayRSocketAutoConfiguration.java
  2. 12
      src/main/java/org/springframework/cloud/gateway/rsocket/registry/LoadBalancedRSocket.java
  3. 55
      src/main/java/org/springframework/cloud/gateway/rsocket/registry/Registry.java
  4. 14
      src/main/java/org/springframework/cloud/gateway/rsocket/registry/RegistryRoutes.java
  5. 4
      src/main/java/org/springframework/cloud/gateway/rsocket/registry/RegistrySocketAcceptorFilter.java
  6. 14
      src/main/java/org/springframework/cloud/gateway/rsocket/route/Route.java
  7. 21
      src/main/java/org/springframework/cloud/gateway/rsocket/server/GatewayExchange.java
  8. 19
      src/main/java/org/springframework/cloud/gateway/rsocket/server/GatewayRSocket.java
  9. 18
      src/main/java/org/springframework/cloud/gateway/rsocket/server/GatewayRSocketServer.java
  10. 8
      src/main/java/org/springframework/cloud/gateway/rsocket/server/PendingRequestRSocket.java
  11. 9
      src/main/java/org/springframework/cloud/gateway/rsocket/socketacceptor/GatewaySocketAcceptor.java
  12. 10
      src/main/java/org/springframework/cloud/gateway/rsocket/socketacceptor/SocketAcceptorExchange.java
  13. 180
      src/main/java/org/springframework/cloud/gateway/rsocket/support/Metadata.java
  14. 18
      src/test/java/org/springframework/cloud/gateway/rsocket/server/GatewayRSocketTests.java
  15. 5
      src/test/java/org/springframework/cloud/gateway/rsocket/socketacceptor/GatewaySocketAcceptorTests.java
  16. 27
      src/test/java/org/springframework/cloud/gateway/rsocket/support/MetadataTests.java
  17. 8
      src/test/java/org/springframework/cloud/gateway/rsocket/test/PingPongApp.java

4
src/main/java/org/springframework/cloud/gateway/rsocket/autoconfigure/GatewayRSocketAutoConfiguration.java

@ -95,7 +95,7 @@ public class GatewayRSocketAutoConfiguration { @@ -95,7 +95,7 @@ public class GatewayRSocketAutoConfiguration {
@Bean
public GatewayRSocketServer gatewayApp(GatewaySocketAcceptor socketAcceptor,
GatewayRSocketProperties properties) {
return new GatewayRSocketServer(properties, socketAcceptor);
GatewayRSocketProperties properties, MeterRegistry meterRegistry) {
return new GatewayRSocketServer(properties, socketAcceptor, meterRegistry);
}
}

12
src/main/java/org/springframework/cloud/gateway/rsocket/registry/LoadBalancedRSocket.java

@ -54,14 +54,14 @@ public class LoadBalancedRSocket { @@ -54,14 +54,14 @@ public class LoadBalancedRSocket {
return this.loadBalancer.apply(this.delegates);
}
public void addRSocket(RSocket rsocket, Map<String, String> metadata) {
public void addRSocket(RSocket rsocket, Metadata metadata) {
this.delegates.add(new EnrichedRSocket(rsocket, metadata));
}
public void remove(Map<String, String> metadata) {
public void remove(Metadata metadata) {
//TODO: move delegates to a map for easy removal
this.delegates.stream()
.filter(enriched -> Metadata.matches(metadata, enriched.getMetadata()))
.filter(enriched -> metadata.matches(enriched.getMetadata()))
.findFirst()
.ifPresent(this.delegates::remove);
}
@ -74,14 +74,14 @@ public class LoadBalancedRSocket { @@ -74,14 +74,14 @@ public class LoadBalancedRSocket {
public static class EnrichedRSocket extends RSocketProxy {
private final Map<String, String> metadata;
private final Metadata metadata;
public EnrichedRSocket(RSocket source, Map<String, String> metadata) {
public EnrichedRSocket(RSocket source, Metadata metadata) {
super(source);
this.metadata = metadata;
}
public Map<String, String> getMetadata() {
public Metadata getMetadata() {
return this.metadata;
}

55
src/main/java/org/springframework/cloud/gateway/rsocket/registry/Registry.java

@ -20,7 +20,6 @@ package org.springframework.cloud.gateway.rsocket.registry; @@ -20,7 +20,6 @@ package org.springframework.cloud.gateway.rsocket.registry;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import io.rsocket.RSocket;
import org.apache.commons.logging.Log;
@ -29,8 +28,8 @@ import reactor.core.Disposable; @@ -29,8 +28,8 @@ import reactor.core.Disposable;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.FluxSink;
import org.springframework.cloud.gateway.rsocket.support.Metadata;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
/**
* The Registry handles all RSocket connections that have been made that have associated
@ -46,56 +45,36 @@ public class Registry { @@ -46,56 +45,36 @@ public class Registry {
private final DirectProcessor<RegisteredEvent> registeredEvents = DirectProcessor.create();
private final FluxSink<RegisteredEvent> registeredEventsSink = registeredEvents.sink(FluxSink.OverflowStrategy.DROP);
private final Function<Map<String, String>, String> keyFunction;
public Registry() {
this(tags -> {
if (CollectionUtils.isEmpty(tags)) {
return null; // throw error?
}
//TODO: key generation
return tags.get("name");
});
}
public Registry(Function<Map<String, String>, String> keyFunction) {
this.keyFunction = keyFunction;
}
public String computeKey(Map<String, String> properties) {
return keyFunction.apply(properties);
}
//TODO: Mono<Void>?
public void register(Map<String, String> properties, RSocket rsocket) {
Assert.notEmpty(properties, "properties may not be empty");
public void register(Metadata metadata, RSocket rsocket) {
Assert.notNull(metadata, "metadata may not be null");
Assert.notNull(rsocket, "RSocket may not be null");
if (log.isDebugEnabled()) {
log.debug("Registering RSocket: " + properties);
log.debug("Registering RSocket: " + metadata);
}
LoadBalancedRSocket composite = rsockets.computeIfAbsent(computeKey(properties), s ->
new LoadBalancedRSocket(properties.get("name")));
composite.addRSocket(rsocket, properties);
registeredEventsSink.next(new RegisteredEvent(properties, rsocket));
LoadBalancedRSocket composite = rsockets.computeIfAbsent(metadata.getName(), s ->
new LoadBalancedRSocket(metadata.getName()));
composite.addRSocket(rsocket, metadata);
registeredEventsSink.next(new RegisteredEvent(metadata, rsocket));
}
public void deregister(Map<String, String> metadata) {
Assert.notEmpty(metadata, "metadata may not be empty");
public void deregister(Metadata metadata) {
Assert.notNull(metadata, "metadata may not be null");
if (log.isDebugEnabled()) {
log.debug("Deregistering RSocket: " + metadata);
}
String key = computeKey(metadata);
LoadBalancedRSocket loadBalanced = this.rsockets.get(key);
LoadBalancedRSocket loadBalanced = this.rsockets.get(metadata.getName());
if (loadBalanced != null) {
loadBalanced.remove(metadata);
}
}
public LoadBalancedRSocket getRegistered(Map<String, String> properties) {
if (CollectionUtils.isEmpty(properties)) {
return null;
}
return rsockets.get(computeKey(properties));
public LoadBalancedRSocket getRegistered(Metadata metadata) {
return rsockets.get(metadata.getName());
}
public Disposable addListener(Consumer<RegisteredEvent> consumer) {
@ -103,17 +82,17 @@ public class Registry { @@ -103,17 +82,17 @@ public class Registry {
}
public static class RegisteredEvent {
private final Map<String, String> routingMetadata;
private final Metadata routingMetadata;
private final RSocket rSocket;
public RegisteredEvent(Map<String, String> routingMetadata, RSocket rSocket) {
Assert.notEmpty(routingMetadata, "routingMetadata may not be empty");
public RegisteredEvent(Metadata routingMetadata, RSocket rSocket) {
Assert.notNull(routingMetadata, "routingMetadata may not be null");
Assert.notNull(rSocket, "RSocket may not be null");
this.routingMetadata = routingMetadata;
this.rSocket = rSocket;
}
public Map<String, String> getRoutingMetadata() {
public Metadata getRoutingMetadata() {
return routingMetadata;
}

14
src/main/java/org/springframework/cloud/gateway/rsocket/registry/RegistryRoutes.java

@ -30,6 +30,7 @@ import reactor.core.publisher.Mono; @@ -30,6 +30,7 @@ import reactor.core.publisher.Mono;
import org.springframework.cloud.gateway.rsocket.route.Route;
import org.springframework.cloud.gateway.rsocket.route.Routes;
import org.springframework.cloud.gateway.rsocket.support.Metadata;
/**
* Creates routes from RegisteredEvents
@ -53,30 +54,29 @@ public class RegistryRoutes implements Routes, Consumer<Registry.RegisteredEvent @@ -53,30 +54,29 @@ public class RegistryRoutes implements Routes, Consumer<Registry.RegisteredEvent
@Override
public void accept(Registry.RegisteredEvent registeredEvent) {
Map<String, String> routingMetadata = registeredEvent.getRoutingMetadata();
Metadata routingMetadata = registeredEvent.getRoutingMetadata();
String id = getId(routingMetadata);
routes.computeIfAbsent(id, key -> createRoute(id, routingMetadata));
}
private String getId(Map<String, String> routingMetadata) {
String id = routingMetadata.get("name");
private String getId(Metadata routingMetadata) {
String id = routingMetadata.getName();
if (id == null) {
id = UUID.randomUUID().toString();
}
return id;
}
private Route createRoute(String id, Map<String, String> routingMetadata) {
private Route createRoute(String id, Metadata routingMetadata) {
Route route = Route.builder()
.id(id)
.routingMetadata(routingMetadata)
.predicate(exchange -> {
// TODO: standard predicates
// TODO: allow customized predicates
Map<String, String> incomingRouting = exchange.getRoutingMetadata();
boolean matches = incomingRouting.containsKey("name")
&& incomingRouting.get("name").equals(routingMetadata.get("name"));
Metadata incomingRouting = exchange.getRoutingMetadata();
boolean matches = incomingRouting.getName().equalsIgnoreCase(routingMetadata.getName());
return Mono.just(matches);
})
// TODO: allow customized filters

4
src/main/java/org/springframework/cloud/gateway/rsocket/registry/RegistrySocketAcceptorFilter.java

@ -23,7 +23,7 @@ import org.springframework.cloud.gateway.rsocket.socketacceptor.SocketAcceptorEx @@ -23,7 +23,7 @@ import org.springframework.cloud.gateway.rsocket.socketacceptor.SocketAcceptorEx
import org.springframework.cloud.gateway.rsocket.socketacceptor.SocketAcceptorFilter;
import org.springframework.cloud.gateway.rsocket.socketacceptor.SocketAcceptorFilterChain;
import org.springframework.core.Ordered;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
/**
* Filter that registers the SendingSocket.
@ -37,7 +37,7 @@ public class RegistrySocketAcceptorFilter implements SocketAcceptorFilter, Order @@ -37,7 +37,7 @@ public class RegistrySocketAcceptorFilter implements SocketAcceptorFilter, Order
@Override
public Mono<Success> filter(SocketAcceptorExchange exchange, SocketAcceptorFilterChain chain) {
if (!CollectionUtils.isEmpty(exchange.getMetadata())) {
if (exchange.getMetadata() != null && StringUtils.hasLength(exchange.getMetadata().getName())) {
this.registry.register(exchange.getMetadata(), exchange.getSendingSocket());
}

14
src/main/java/org/springframework/cloud/gateway/rsocket/route/Route.java

@ -22,12 +22,12 @@ import java.util.Arrays; @@ -22,12 +22,12 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.springframework.cloud.gateway.rsocket.server.GatewayExchange;
import org.springframework.cloud.gateway.rsocket.server.GatewayFilter;
import org.springframework.cloud.gateway.rsocket.support.AsyncPredicate;
import org.springframework.cloud.gateway.rsocket.support.Metadata;
import org.springframework.core.Ordered;
import org.springframework.core.style.ToStringCreator;
import org.springframework.util.Assert;
@ -40,7 +40,7 @@ public class Route implements Ordered { @@ -40,7 +40,7 @@ public class Route implements Ordered {
private final String id;
private final Map<String, String> targetMetadata;
private final Metadata targetMetadata;
private final int order;
@ -52,7 +52,7 @@ public class Route implements Ordered { @@ -52,7 +52,7 @@ public class Route implements Ordered {
return new Builder();
}
private Route(String id, Map<String, String> targetMetadata, int order, AsyncPredicate<GatewayExchange> predicate, List<GatewayFilter> gatewayFilters) {
private Route(String id, Metadata targetMetadata, int order, AsyncPredicate<GatewayExchange> predicate, List<GatewayFilter> gatewayFilters) {
this.id = id;
this.targetMetadata = targetMetadata;
this.order = order;
@ -63,7 +63,7 @@ public class Route implements Ordered { @@ -63,7 +63,7 @@ public class Route implements Ordered {
public static class Builder {
protected String id;
protected Map<String, String> routingMetadata;
protected Metadata routingMetadata;
protected int order = 0;
@ -91,7 +91,7 @@ public class Route implements Ordered { @@ -91,7 +91,7 @@ public class Route implements Ordered {
return this.predicate;
}
public Builder routingMetadata(Map<String, String> routingMetadata) {
public Builder routingMetadata(Metadata routingMetadata) {
this.routingMetadata = routingMetadata;
return this;
}
@ -141,7 +141,7 @@ public class Route implements Ordered { @@ -141,7 +141,7 @@ public class Route implements Ordered {
public Route build() {
Assert.notNull(this.id, "id can not be null");
Assert.notEmpty(this.routingMetadata, "targetMetadata can not be null or empty");
Assert.notNull(this.routingMetadata, "targetMetadata can not be null");
Assert.notNull(this.predicate, "predicate can not be null");
return new Route(this.id, this.routingMetadata, this.order, predicate, this.gatewayFilters);
@ -152,7 +152,7 @@ public class Route implements Ordered { @@ -152,7 +152,7 @@ public class Route implements Ordered {
return this.id;
}
public Map<String, String> getTargetMetadata() {
public Metadata getTargetMetadata() {
return this.targetMetadata;
}

21
src/main/java/org/springframework/cloud/gateway/rsocket/server/GatewayExchange.java

@ -17,8 +17,6 @@ @@ -17,8 +17,6 @@
package org.springframework.cloud.gateway.rsocket.server;
import java.util.Map;
import io.micrometer.core.instrument.Tags;
import io.rsocket.Payload;
import org.apache.commons.logging.Log;
@ -26,7 +24,6 @@ import org.apache.commons.logging.LogFactory; @@ -26,7 +24,6 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.gateway.rsocket.filter.AbstractRSocketExchange;
import org.springframework.cloud.gateway.rsocket.support.Metadata;
import org.springframework.util.CollectionUtils;
public class GatewayExchange extends AbstractRSocketExchange {
@ -51,33 +48,29 @@ public class GatewayExchange extends AbstractRSocketExchange { @@ -51,33 +48,29 @@ public class GatewayExchange extends AbstractRSocketExchange {
}
private final Type type;
private final Map<String, String> routingMetadata;
private final Metadata routingMetadata;
private Tags tags = Tags.empty();
public static GatewayExchange fromPayload(Type type, Payload payload) {
return new GatewayExchange(type, getRoutingMetadata(payload));
}
private static Map<String, String> getRoutingMetadata(Payload payload) {
private static Metadata getRoutingMetadata(Payload payload) {
if (payload == null || !payload.hasMetadata()) { // and metadata is routing
return null;
}
// TODO: deal with composite metadata
Map<String, String> properties = Metadata.decodeProperties(payload.sliceMetadata());
if (CollectionUtils.isEmpty(properties)) {
return null;
}
Metadata metadata = Metadata.decodeMetadata(payload.sliceMetadata());
if (log.isDebugEnabled()) {
log.debug("found routing metadata " + properties);
log.debug("found routing metadata " + metadata);
}
return properties;
return metadata;
}
public GatewayExchange(Type type, Map<String, String> routingMetadata) {
public GatewayExchange(Type type, Metadata routingMetadata) {
this.type = type;
this.routingMetadata = routingMetadata;
}
@ -86,7 +79,7 @@ public class GatewayExchange extends AbstractRSocketExchange { @@ -86,7 +79,7 @@ public class GatewayExchange extends AbstractRSocketExchange {
return type;
}
public Map<String, String> getRoutingMetadata() {
public Metadata getRoutingMetadata() {
return routingMetadata;
}

19
src/main/java/org/springframework/cloud/gateway/rsocket/server/GatewayRSocket.java

@ -17,7 +17,6 @@ @@ -17,7 +17,6 @@
package org.springframework.cloud.gateway.rsocket.server;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.logging.Level;
@ -67,10 +66,10 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket @@ -67,10 +66,10 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket
private final Routes routes;
private final MeterRegistry meterRegistry;
private final GatewayRSocketProperties properties;
private final Map<String, String> metadata;
private final Metadata metadata;
GatewayRSocket(Registry registry, Routes routes, MeterRegistry meterRegistry,
GatewayRSocketProperties properties, Map<String, String> metadata) {
GatewayRSocketProperties properties, Metadata metadata) {
this.registry = registry;
this.routes = routes;
this.meterRegistry = meterRegistry;
@ -117,9 +116,9 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket @@ -117,9 +116,9 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket
private Tags getTags(GatewayExchange exchange) {
//TODO: add tags to exchange
String responderName = this.metadata.get("name");
String responderName = this.metadata.getName();
String responderId = this.metadata.get("id");
String requestorName = exchange.getRoutingMetadata().get("name");
String requestorName = exchange.getRoutingMetadata().getName();
Assert.hasText(responderName, "responderName must not be empty");
Assert.hasText(responderId, "responderId must not be empty");
Assert.hasText(requestorName, "requestorName must not be empty");
@ -253,9 +252,9 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket @@ -253,9 +252,9 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket
return routeMono;
}
private Mono<Route> matchRoute(Route route, Map<String, String> annoucementMetadata) {
Map<String, String> targetMetadata = route.getTargetMetadata();
if (Metadata.matches(targetMetadata, annoucementMetadata)) {
private Mono<Route> matchRoute(Route route, Metadata annoucementMetadata) {
Metadata targetMetadata = route.getTargetMetadata();
if (targetMetadata.matches(annoucementMetadata)) {
return Mono.just(route);
}
return Mono.empty();
@ -280,7 +279,7 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket @@ -280,7 +279,7 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket
return loadBalancedRSocket.choose();
}).map(enrichedRSocket -> {
Map<String, String> metadata = enrichedRSocket
Metadata metadata = enrichedRSocket
.getMetadata();
Tags tags = exchange.getTags().and("requester.id", metadata.get("id"));
exchange.setTags(tags);
@ -313,7 +312,7 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket @@ -313,7 +312,7 @@ public class GatewayRSocket extends AbstractRSocket implements ResponderRSocket
this.properties = properties;
}
public GatewayRSocket create(Map<String, String> metadata) {
public GatewayRSocket create(Metadata metadata) {
return new GatewayRSocket(this.registry, this.routes, this.meterRegistry,
this.properties, metadata);
}

18
src/main/java/org/springframework/cloud/gateway/rsocket/server/GatewayRSocketServer.java

@ -34,7 +34,6 @@ import io.rsocket.transport.netty.server.TcpServerTransport; @@ -34,7 +34,6 @@ import io.rsocket.transport.netty.server.TcpServerTransport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.rsocket.autoconfigure.GatewayRSocketProperties;
import org.springframework.cloud.gateway.rsocket.autoconfigure.GatewayRSocketProperties.Server.TransportType;
import org.springframework.context.SmartLifecycle;
@ -51,19 +50,20 @@ public class GatewayRSocketServer implements Ordered, SmartLifecycle { @@ -51,19 +50,20 @@ public class GatewayRSocketServer implements Ordered, SmartLifecycle {
private final List<RSocketInterceptor> serverInterceptors;
private final AtomicBoolean running = new AtomicBoolean();
private CloseableChannel closeableChannel;
@Autowired
MeterRegistry meterRegistry;
private final MeterRegistry meterRegistry;
public GatewayRSocketServer(GatewayRSocketProperties properties, SocketAcceptor socketAcceptor) {
this(properties, socketAcceptor, EMPTY_INTERCEPTORS);
public GatewayRSocketServer(GatewayRSocketProperties properties, SocketAcceptor socketAcceptor, MeterRegistry meterRegistry) {
this(properties, socketAcceptor, meterRegistry, EMPTY_INTERCEPTORS);
}
public GatewayRSocketServer(GatewayRSocketProperties properties, SocketAcceptor socketAcceptor, RSocketInterceptor... interceptors) {
public GatewayRSocketServer(GatewayRSocketProperties properties, SocketAcceptor socketAcceptor, MeterRegistry meterRegistry, RSocketInterceptor... interceptors) {
Assert.notNull(properties, "properties may not be null");
Assert.notNull(socketAcceptor, "socketAcceptor may not be null");
Assert.notNull(meterRegistry, "meterRegistry may not be null");
Assert.notNull(interceptors, "interceptors may not be null");
this.properties = properties;
this.socketAcceptor = socketAcceptor;
this.meterRegistry = meterRegistry;
this.serverInterceptors = Arrays.asList(interceptors);
}
@ -129,7 +129,11 @@ public class GatewayRSocketServer implements Ordered, SmartLifecycle { @@ -129,7 +129,11 @@ public class GatewayRSocketServer implements Ordered, SmartLifecycle {
factory
//TODO: add as bean like serverInterceptors above
.addConnectionPlugin(new MicrometerDuplexConnectionInterceptor(meterRegistry, tags))
//.errorConsumer() TODO: add errorConsumer
.errorConsumer(throwable -> {
if (log.isDebugEnabled()) {
log.debug("Error with connection", throwable);
}
}) //TODO: add configurable errorConsumer
.acceptor(this.socketAcceptor)
.transport(transport)
.start()

8
src/main/java/org/springframework/cloud/gateway/rsocket/server/PendingRequestRSocket.java

@ -17,7 +17,6 @@ @@ -17,7 +17,6 @@
package org.springframework.cloud.gateway.rsocket.server;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
@ -38,6 +37,7 @@ import reactor.util.function.Tuple2; @@ -38,6 +37,7 @@ import reactor.util.function.Tuple2;
import org.springframework.cloud.gateway.rsocket.filter.RSocketFilter.Success;
import org.springframework.cloud.gateway.rsocket.registry.Registry.RegisteredEvent;
import org.springframework.cloud.gateway.rsocket.route.Route;
import org.springframework.cloud.gateway.rsocket.support.Metadata;
import static org.springframework.cloud.gateway.rsocket.server.GatewayExchange.ROUTE_ATTR;
import static org.springframework.cloud.gateway.rsocket.server.GatewayExchange.Type.REQUEST_STREAM;
@ -48,18 +48,18 @@ public class PendingRequestRSocket extends AbstractRSocket implements ResponderR @@ -48,18 +48,18 @@ public class PendingRequestRSocket extends AbstractRSocket implements ResponderR
private static final Log log = LogFactory.getLog(PendingRequestRSocket.class);
private final Function<RegisteredEvent, Mono<Route>> routeFinder;
private final Consumer<Map<String, String>> metadataCallback;
private final Consumer<Metadata> metadataCallback;
private final MonoProcessor<RSocket> rSocketProcessor;
private Disposable subscriptionDisposable;
private Route route;
public PendingRequestRSocket(Function<RegisteredEvent, Mono<Route>> routeFinder,
Consumer<Map<String, String>> metadataCallback) {
Consumer<Metadata> metadataCallback) {
this(routeFinder, metadataCallback, MonoProcessor.create());
}
/* for testing */ PendingRequestRSocket(Function<RegisteredEvent, Mono<Route>> routeFinder,
Consumer<Map<String, String>> metadataCallback,
Consumer<Metadata> metadataCallback,
MonoProcessor<RSocket> rSocketProcessor) {
this.routeFinder = routeFinder;
this.metadataCallback = metadataCallback;

9
src/main/java/org/springframework/cloud/gateway/rsocket/socketacceptor/GatewaySocketAcceptor.java

@ -18,7 +18,6 @@ @@ -18,7 +18,6 @@
package org.springframework.cloud.gateway.rsocket.socketacceptor;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.stream.Collectors;
@ -62,11 +61,11 @@ public class GatewaySocketAcceptor implements SocketAcceptor { @@ -62,11 +61,11 @@ public class GatewaySocketAcceptor implements SocketAcceptor {
Tags metadataTags;
SocketAcceptorExchange exchange;
if (setup.hasMetadata()) { //TODO: and setup.metadataMimeType() is Announcement metadata or composite
Map<String, String> properties = Metadata.decodeProperties(setup.sliceMetadata());
metadataTags = Tags.of("service.name", properties.get("name"))
.and("service.id", properties.get("id"));
Metadata metadata = Metadata.decodeMetadata(setup.sliceMetadata());
metadataTags = Tags.of("service.name", metadata.getName())
.and("service.id", metadata.get("id"));
// enrich exchange to have metadata
exchange = new SocketAcceptorExchange(setup, decorate(sendingSocket, requesterTags.and(metadataTags)), properties);
exchange = new SocketAcceptorExchange(setup, decorate(sendingSocket, requesterTags.and(metadataTags)), metadata);
} else {
metadataTags = Tags.empty();
exchange = new SocketAcceptorExchange(setup, decorate(sendingSocket, requesterTags));

10
src/main/java/org/springframework/cloud/gateway/rsocket/socketacceptor/SocketAcceptorExchange.java

@ -18,24 +18,24 @@ @@ -18,24 +18,24 @@
package org.springframework.cloud.gateway.rsocket.socketacceptor;
import java.util.Collections;
import java.util.Map;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import org.springframework.cloud.gateway.rsocket.filter.AbstractRSocketExchange;
import org.springframework.cloud.gateway.rsocket.support.Metadata;
public class SocketAcceptorExchange extends AbstractRSocketExchange {
private final ConnectionSetupPayload setup;
private final RSocket sendingSocket;
private final Map<String, String> metadata;
private final Metadata metadata;
public SocketAcceptorExchange(ConnectionSetupPayload setup, RSocket sendingSocket) {
this(setup, sendingSocket, Collections.emptyMap());
this(setup, sendingSocket, new Metadata(null, Collections.emptyMap()));
}
public SocketAcceptorExchange(ConnectionSetupPayload setup, RSocket sendingSocket,
Map<String, String> metadata) {
Metadata metadata) {
this.setup = setup;
this.sendingSocket = sendingSocket;
this.metadata = metadata;
@ -49,7 +49,7 @@ public class SocketAcceptorExchange extends AbstractRSocketExchange { @@ -49,7 +49,7 @@ public class SocketAcceptorExchange extends AbstractRSocketExchange {
return sendingSocket;
}
public Map<String, String> getMetadata() {
public Metadata getMetadata() {
return metadata;
}
}

180
src/main/java/org/springframework/cloud/gateway/rsocket/support/Metadata.java

@ -18,78 +18,129 @@ @@ -18,78 +18,129 @@
package org.springframework.cloud.gateway.rsocket.support;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.rsocket.util.NumberUtils;
import org.springframework.core.style.ToStringCreator;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
public abstract class Metadata {
public class Metadata {
public static final String ROUTING_MIME_TYPE = "message/x.rsocket.routing.v0";
private Metadata() {}
private final String name;
private final Map<String, String> properties;
public static ByteBuf encodeProperties(Map<String, String> properties) {
return encodeProperties(ByteBufAllocator.DEFAULT, properties);
public Metadata(String name, Map<String, String> properties) {
this.name = name;
this.properties = properties;
}
public static ByteBuf encodeProperties(ByteBufAllocator allocator, Map<String, String> properties) {
Assert.notEmpty(properties, "tags may not be null or empty"); //TODO: is this true?
List<String> pairs = properties.entrySet().stream()
.map(entry -> entry.getValue() + ":" + entry.getValue())
.collect(Collectors.toList());
return encodeTags(allocator, pairs.toArray(new String[0]));
public String getName() {
return this.name;
}
public static ByteBuf encodeTags(String... tags) {
return encodeTags(ByteBufAllocator.DEFAULT, tags);
public Map<String, String> getProperties() {
return this.properties;
}
public static ByteBuf encodeTags(ByteBufAllocator allocator, String... tags) {
Assert.notEmpty(tags, "tags may not be null or empty"); //TODO: is this true?
public String get(String key) {
return this.properties.get(key);
}
public String put(String key, String value) {
return this.properties.put(key, value);
}
@Override
public String toString() {
return new ToStringCreator(this)
.append("name", name)
.append("properties", properties)
.toString();
}
public static Builder from(String name) {
return new Builder(name);
}
public static ByteBuf encode(Metadata metadata) {
return encode(ByteBufAllocator.DEFAULT, metadata);
}
public static ByteBuf encode(ByteBufAllocator allocator, Metadata metadata) {
return encode(allocator, metadata.getName(), metadata.getProperties());
}
public static ByteBuf encode(String name, Map<String, String> properties) {
return encode(ByteBufAllocator.DEFAULT, name, properties);
}
public static ByteBuf encode(ByteBufAllocator allocator, String name,
Map<String, String> properties) {
Assert.hasText(name, "name may not be empty");
Assert.notNull(properties, "properties may not be null");
Assert.notNull(allocator, "allocator may not be null");
ByteBuf byteBuf = allocator.buffer();
for (String tag : tags) {
int tagLength = NumberUtils.requireUnsignedByte(ByteBufUtil.utf8Bytes(tag));
byteBuf.writeByte(tagLength);
ByteBufUtil.reserveAndWriteUtf8(byteBuf, tag, tagLength);
}
encodeString(byteBuf, name);
properties.entrySet().stream().forEach(entry -> {
encodeString(byteBuf, entry.getKey());
encodeString(byteBuf, entry.getValue());
});
return byteBuf;
}
public static Map<String, String> decodeProperties(ByteBuf byteBuf) {
return decodeTags(byteBuf).stream()
.map(Pair::parse)
.filter(Objects::nonNull)
.collect(LinkedHashMap::new,
(map, pair) -> map.put(pair.name, pair.value),
HashMap::putAll);
private static void encodeString(ByteBuf byteBuf, String s) {
int length = NumberUtils.requireUnsignedByte(ByteBufUtil.utf8Bytes(s));
byteBuf.writeByte(length);
ByteBufUtil.reserveAndWriteUtf8(byteBuf, s, length);
}
public static List<String> decodeTags(ByteBuf byteBuf) {
ArrayList<String> tags = new ArrayList<>();
public static Metadata decodeMetadata(ByteBuf byteBuf) {
AtomicInteger offset = new AtomicInteger(0);
int offset = 0;
while (offset < byteBuf.readableBytes()) { //TODO: What is the best conditional here?
int tagLength = byteBuf.getByte(offset);
offset += Byte.BYTES;
String tag = byteBuf.toString(offset, tagLength, StandardCharsets.UTF_8);
tags.add(tag);
offset += tagLength;
String name = decodeString(byteBuf, offset);
Map<String, String> properties = new LinkedHashMap<>();
while (offset.get() < byteBuf.readableBytes()) { //TODO: What is the best conditional here?
String key = decodeString(byteBuf, offset);
String value = null;
if (offset.get() < byteBuf.readableBytes()) {
value = decodeString(byteBuf, offset);
}
properties.put(key, value);
}
return tags;
return new Metadata(name, properties);
}
private static String decodeString(ByteBuf byteBuf, AtomicInteger offset) {
int length = byteBuf.getByte(offset.get());
int index = offset.addAndGet(Byte.BYTES);
String s = byteBuf.toString(index, length, StandardCharsets.UTF_8);
offset.addAndGet(length);
return s;
}
public boolean matches(Metadata other) {
if (other == null) {
return false;
}
if (other.getName() == null) {
return false;
}
if (!getName().equalsIgnoreCase(other.getName())) {
return false;
}
return matches(getProperties(), other.getProperties());
}
/**
@ -118,47 +169,26 @@ public abstract class Metadata { @@ -118,47 +169,26 @@ public abstract class Metadata {
return true;
}
/**
* A single name value pair.
*/
public static class Pair {
private String name;
private String value;
public static class Builder {
private final Metadata metadata;
public Pair(String name, String value) {
Assert.hasLength(name, "Name must not be empty");
this.name = name;
this.value = value;
public Builder(String name) {
Assert.hasText(name, "Name must not be empty.");
this.metadata = new Metadata(name, new LinkedHashMap<>());
}
public static Pair parse(String pair) {
int index = getSeparatorIndex(pair);
String name = (index > 0) ? pair.substring(0, index) : pair;
String value = (index > 0) ? pair.substring(index + 1) : "";
return of(name.trim(), value.trim());
public Builder with(String key, String value) {
this.metadata.put(key, value);
return this;
}
private static int getSeparatorIndex(String pair) {
int colonIndex = pair.indexOf(':');
int equalIndex = pair.indexOf('=');
if (colonIndex == -1) {
return equalIndex;
}
if (equalIndex == -1) {
return colonIndex;
}
return Math.min(colonIndex, equalIndex);
public Metadata build() {
return this.metadata;
}
private static Pair of(String name, String value) {
if (StringUtils.isEmpty(name) && StringUtils.isEmpty(value)) {
return null;
}
return new Pair(name, value);
public ByteBuf encode() {
return Metadata.encode(build());
}
}
}

18
src/test/java/org/springframework/cloud/gateway/rsocket/server/GatewayRSocketTests.java

@ -20,9 +20,7 @@ package org.springframework.cloud.gateway.rsocket.server; @@ -20,9 +20,7 @@ package org.springframework.cloud.gateway.rsocket.server;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import io.micrometer.core.instrument.Tags;
@ -52,7 +50,6 @@ import static org.junit.Assert.assertFalse; @@ -52,7 +50,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -72,11 +69,11 @@ public class GatewayRSocketTests { @@ -72,11 +69,11 @@ public class GatewayRSocketTests {
public void init() {
registry = mock(Registry.class);
incomingPayload = DefaultPayload.create(Unpooled.EMPTY_BUFFER,
Metadata.encodeTags("name:mock", "id:mock1"));
Metadata.from("mock").with("id", "mock1").encode());
RSocket rSocket = mock(RSocket.class);
LoadBalancedRSocket loadBalancedRSocket = mock(LoadBalancedRSocket.class);
when(registry.getRegistered(anyMap())).thenReturn(loadBalancedRSocket);
when(registry.getRegistered(any(Metadata.class))).thenReturn(loadBalancedRSocket);
Mono<EnrichedRSocket> mono = Mono
.just(new EnrichedRSocket(rSocket, getMetadata()));
@ -186,11 +183,10 @@ public class GatewayRSocketTests { @@ -186,11 +183,10 @@ public class GatewayRSocketTests {
}
}
private static Map<String, String> getMetadata() {
HashMap<String, String> map = new HashMap<>();
map.put("name", "service");
map.put("id", "service1");
return map;
private static Metadata getMetadata() {
return Metadata.from("service")
.with("id", "service1")
.build();
}
private static class TestRoutes implements Routes {
@ -209,7 +205,7 @@ public class GatewayRSocketTests { @@ -209,7 +205,7 @@ public class GatewayRSocketTests {
this.filters = filters;
route = Route.builder()
.id("route1")
.routingMetadata(Collections.singletonMap("name", "mock"))
.routingMetadata(Metadata.from("mock").build())
.predicate(exchange -> Mono.just(true))
.filters(filters)
.build();

5
src/test/java/org/springframework/cloud/gateway/rsocket/socketacceptor/GatewaySocketAcceptorTests.java

@ -33,10 +33,11 @@ import reactor.core.publisher.Mono; @@ -33,10 +33,11 @@ import reactor.core.publisher.Mono;
import org.springframework.cloud.gateway.rsocket.autoconfigure.GatewayRSocketProperties;
import org.springframework.cloud.gateway.rsocket.server.GatewayRSocket;
import org.springframework.cloud.gateway.rsocket.support.Metadata;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -60,7 +61,7 @@ public class GatewaySocketAcceptorTests { @@ -60,7 +61,7 @@ public class GatewaySocketAcceptorTests {
this.sendingSocket = mock(RSocket.class);
this.meterRegistry = new SimpleMeterRegistry();
when(this.factory.create(anyMap())).thenReturn(mock(GatewayRSocket.class));
when(this.factory.create(any(Metadata.class))).thenReturn(mock(GatewayRSocket.class));
}
//TODO: test metrics

27
src/test/java/org/springframework/cloud/gateway/rsocket/support/MetadataTests.java

@ -21,6 +21,7 @@ import java.util.HashMap; @@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;
import io.netty.buffer.ByteBuf;
import org.junit.Test;
import org.springframework.util.Assert;
@ -30,6 +31,32 @@ import static org.springframework.cloud.gateway.rsocket.support.Metadata.matches @@ -30,6 +31,32 @@ import static org.springframework.cloud.gateway.rsocket.support.Metadata.matches
public class MetadataTests {
@Test
public void encodeAndDecodeJustName() {
ByteBuf byteBuf = Metadata.from("test").encode();
assertMetadata(byteBuf, "test");
}
@Test
public void encodeAndDecodeWorks() {
ByteBuf byteBuf = Metadata.from("test1")
.with("key1111", "val111111")
.with("key22", "val222")
.encode();
Metadata metadata = assertMetadata(byteBuf, "test1");
Map<String, String> properties = metadata.getProperties();
assertThat(properties).hasSize(2)
.containsOnlyKeys("key1111", "key22")
.containsValues("val111111", "val222");
}
private Metadata assertMetadata(ByteBuf byteBuf, String name) {
Metadata metadata = Metadata.decodeMetadata(byteBuf);
assertThat(metadata).isNotNull();
assertThat(metadata.getName()).isEqualTo(name);
return metadata;
}
@Test
public void nullMetadataDoesNotMatch() {
assertThat(matches(null, new HashMap<>())).isFalse();

8
src/test/java/org/springframework/cloud/gateway/rsocket/test/PingPongApp.java

@ -130,7 +130,7 @@ public class PingPongApp { @@ -130,7 +130,7 @@ public class PingPongApp {
log.debug("ping.take: " + take);
MicrometerRSocketInterceptor interceptor = new MicrometerRSocketInterceptor(meterRegistry, Tag.of("component", "ping"));
ByteBuf announcementMetadata = Metadata.encodeTags("name:ping", "id:ping"+id);
ByteBuf announcementMetadata = Metadata.from("ping").with("id", "ping"+id).encode();
pongFlux = RSocketFactory.connect()
.metadataMimeType(Metadata.ROUTING_MIME_TYPE)
.setupPayload(DefaultPayload.create(EMPTY_BUFFER, announcementMetadata))
@ -143,7 +143,7 @@ public class PingPongApp { @@ -143,7 +143,7 @@ public class PingPongApp {
Flux.interval(Duration.ofSeconds(1))
.map(i -> {
ByteBuf data = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "ping" + id);
ByteBuf routingMetadata = Metadata.encodeTags("name:pong");
ByteBuf routingMetadata = Metadata.from("pong").encode();
return DefaultPayload.create(data, routingMetadata);
})
.onBackpressureDrop(payload -> log.debug("Dropped payload " + payload.getDataUtf8())) // this is needed in case pong is not available yet
@ -198,7 +198,7 @@ public class PingPongApp { @@ -198,7 +198,7 @@ public class PingPongApp {
Integer gatewayPort = env.getProperty("spring.cloud.gateway.rsocket.server.port",
Integer.class, 7002);
MicrometerRSocketInterceptor interceptor = new MicrometerRSocketInterceptor(meterRegistry, Tag.of("component", "pong"));
ByteBuf announcementMetadata = Metadata.encodeTags("name:pong", "id:pong1");
ByteBuf announcementMetadata = Metadata.from("pong").with("id", "pong1").encode();
RSocketFactory.connect()
.metadataMimeType(Metadata.ROUTING_MIME_TYPE)
.setupPayload(DefaultPayload.create(EMPTY_BUFFER, announcementMetadata))
@ -224,7 +224,7 @@ public class PingPongApp { @@ -224,7 +224,7 @@ public class PingPongApp {
.map(PingPongApp::reply)
.map(reply -> {
ByteBuf data = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, reply);
ByteBuf routingMetadata = Metadata.encodeTags("name:ping", "id:");
ByteBuf routingMetadata = Metadata.from("ping").encode();
return DefaultPayload.create(data, routingMetadata);
});
}

Loading…
Cancel
Save