Browse Source

Rename Reactor support classes

This change modifies the names of the Reactor support classes in order
to align with the same changes in the 4.0.x line which now supports
both Reactor 1.1 and 1.0.

Issue: SPR-11636
pull/529/head
Rossen Stoyanchev 11 years ago
parent
commit
08f0395033
  1. 6
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor11StompCodec.java
  2. 10
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java
  3. 8
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpClient.java
  4. 4
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpConnection.java
  5. 16
      spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java

6
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompCodec.java → spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor11StompCodec.java

@ -33,7 +33,7 @@ import java.nio.ByteBuffer; @@ -33,7 +33,7 @@ import java.nio.ByteBuffer;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class StompCodec implements Codec<Buffer, Message<byte[]>, Message<byte[]>> {
public class Reactor11StompCodec implements Codec<Buffer, Message<byte[]>, Message<byte[]>> {
private final StompDecoder stompDecoder;
@ -42,11 +42,11 @@ public class StompCodec implements Codec<Buffer, Message<byte[]>, Message<byte[] @@ -42,11 +42,11 @@ public class StompCodec implements Codec<Buffer, Message<byte[]>, Message<byte[]
private final Function<Message<byte[]>, Buffer> encodingFunction;
public StompCodec() {
public Reactor11StompCodec() {
this(new StompEncoder(), new StompDecoder());
}
public StompCodec(StompEncoder encoder, StompDecoder decoder) {
public Reactor11StompCodec(StompEncoder encoder, StompDecoder decoder) {
Assert.notNull(encoder, "'encoder' is required");
Assert.notNull(decoder, "'decoder' is required");
this.stompEncoder = encoder;

10
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

@ -35,7 +35,7 @@ import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy; @@ -35,7 +35,7 @@ import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.ReactorTcpClient;
import org.springframework.messaging.tcp.reactor.Reactor11TcpClient;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@ -311,7 +311,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -311,7 +311,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/**
* Configure a TCP client for managing TCP connections to the STOMP broker.
* By default {@link org.springframework.messaging.tcp.reactor.ReactorTcpClient} is used.
* By default {@link org.springframework.messaging.tcp.reactor.Reactor11TcpClient} is used.
*/
public void setTcpClient(TcpOperations<byte[]> tcpClient) {
this.tcpClient = tcpClient;
@ -354,7 +354,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -354,7 +354,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
if (this.tcpClient == null) {
StompDecoder decoder = new StompDecoder();
decoder.setHeaderInitializer(getHeaderInitializer());
StompCodec codec = new StompCodec(new StompEncoder(), decoder);
Reactor11StompCodec codec = new Reactor11StompCodec(new StompEncoder(), decoder);
this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort, codec);
}
@ -838,8 +838,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -838,8 +838,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private static class StompTcpClientFactory {
public TcpOperations<byte[]> create(String relayHost, int relayPort, StompCodec codec) {
return new ReactorTcpClient<byte[]>(relayHost, relayPort, codec);
public TcpOperations<byte[]> create(String relayHost, int relayPort, Reactor11StompCodec codec) {
return new Reactor11TcpClient<byte[]>(relayHost, relayPort, codec);
}
}

8
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java → spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpClient.java

@ -55,7 +55,7 @@ import reactor.tuple.Tuple2; @@ -55,7 +55,7 @@ import reactor.tuple.Tuple2;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class ReactorTcpClient<P> implements TcpOperations<P> {
public class Reactor11TcpClient<P> implements TcpOperations<P> {
public static final Class<NettyTcpClient> REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class;
@ -77,7 +77,7 @@ public class ReactorTcpClient<P> implements TcpOperations<P> { @@ -77,7 +77,7 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
* @param port the port to connect to
* @param codec the codec to use for encoding and decoding the TCP stream
*/
public ReactorTcpClient(String host, int port, Codec<Buffer, Message<P>, Message<P>> codec) {
public Reactor11TcpClient(String host, int port, Codec<Buffer, Message<P>, Message<P>> codec) {
// Revisit in 1.1: is Environment still required w/ sync dispatcher?
this.environment = new Environment(new SynchronousDispatcherConfigReader());
@ -98,7 +98,7 @@ public class ReactorTcpClient<P> implements TcpOperations<P> { @@ -98,7 +98,7 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
*
* @param tcpClient the TcpClient to use
*/
public ReactorTcpClient(TcpClient<Message<P>, Message<P>> tcpClient) {
public Reactor11TcpClient(TcpClient<Message<P>, Message<P>> tcpClient) {
Assert.notNull(tcpClient, "'tcpClient' must not be null");
this.tcpClient = tcpClient;
this.environment = null;
@ -178,7 +178,7 @@ public class ReactorTcpClient<P> implements TcpOperations<P> { @@ -178,7 +178,7 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
connectionHandler.afterConnectionClosed();
}
});
connectionHandler.afterConnected(new ReactorTcpConnection<P>(connection));
connectionHandler.afterConnected(new Reactor11TcpConnection<P>(connection));
}
});
}

4
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpConnection.java → spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpConnection.java

@ -33,12 +33,12 @@ import reactor.net.NetChannel; @@ -33,12 +33,12 @@ import reactor.net.NetChannel;
*
* @author Rossen Stoyanchev
*/
public class ReactorTcpConnection<P> implements TcpConnection<P> {
public class Reactor11TcpConnection<P> implements TcpConnection<P> {
private final NetChannel<Message<P>, Message<P>> channel;
public ReactorTcpConnection(NetChannel<Message<P>, Message<P>> connection) {
public Reactor11TcpConnection(NetChannel<Message<P>, Message<P>> connection) {
this.channel = connection;
}

16
spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java

@ -33,7 +33,7 @@ import reactor.io.Buffer; @@ -33,7 +33,7 @@ import reactor.io.Buffer;
import static org.junit.Assert.*;
/**
* Test fixture for {@link StompCodec}.
* Test fixture for {@link Reactor11StompCodec}.
*
* @author Andy Wilkinson
*/
@ -41,7 +41,7 @@ public class StompCodecTests { @@ -41,7 +41,7 @@ public class StompCodecTests {
private final ArgumentCapturingConsumer<Message<byte[]>> consumer = new ArgumentCapturingConsumer<Message<byte[]>>();
private final Function<Buffer, Message<byte[]>> decoder = new StompCodec().decoder(consumer);
private final Function<Buffer, Message<byte[]>> decoder = new Reactor11StompCodec().decoder(consumer);
@Test
public void decodeFrameWithCrLfEols() {
@ -176,7 +176,7 @@ public class StompCodecTests { @@ -176,7 +176,7 @@ public class StompCodecTests {
Buffer buffer = Buffer.wrap(frame1 + frame2);
final List<Message<byte[]>> messages = new ArrayList<Message<byte[]>>();
new StompCodec().decoder(new Consumer<Message<byte[]>>() {
new Reactor11StompCodec().decoder(new Consumer<Message<byte[]>>() {
@Override
public void accept(Message<byte[]> message) {
messages.add(message);
@ -234,7 +234,7 @@ public class StompCodecTests { @@ -234,7 +234,7 @@ public class StompCodecTests {
Buffer buffer = Buffer.wrap(frame);
final List<Message<byte[]>> messages = new ArrayList<Message<byte[]>>();
new StompCodec().decoder(new Consumer<Message<byte[]>>() {
new Reactor11StompCodec().decoder(new Consumer<Message<byte[]>>() {
@Override
public void accept(Message<byte[]> message) {
messages.add(message);
@ -251,7 +251,7 @@ public class StompCodecTests { @@ -251,7 +251,7 @@ public class StompCodecTests {
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
assertEquals("DISCONNECT\n\n\0", new StompCodec().encoder().apply(frame).asString());
assertEquals("DISCONNECT\n\n\0", new Reactor11StompCodec().encoder().apply(frame).asString());
}
@Test
@ -262,7 +262,7 @@ public class StompCodecTests { @@ -262,7 +262,7 @@ public class StompCodecTests {
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
String frameString = new StompCodec().encoder().apply(frame).asString();
String frameString = new Reactor11StompCodec().encoder().apply(frame).asString();
assertTrue(frameString.equals("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0") ||
frameString.equals("CONNECT\nhost:github.org\naccept-version:1.2\n\n\0"));
@ -276,7 +276,7 @@ public class StompCodecTests { @@ -276,7 +276,7 @@ public class StompCodecTests {
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
assertEquals("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0",
new StompCodec().encoder().apply(frame).asString());
new Reactor11StompCodec().encoder().apply(frame).asString());
}
@Test
@ -287,7 +287,7 @@ public class StompCodecTests { @@ -287,7 +287,7 @@ public class StompCodecTests {
Message<byte[]> frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders());
assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0",
new StompCodec().encoder().apply(frame).asString());
new Reactor11StompCodec().encoder().apply(frame).asString());
}
private void assertIncompleteDecode(String partialFrame) {

Loading…
Cancel
Save