Browse Source

STOMP client session supports sending ack/nack

Issue: SPR-14208
pull/1045/head
Rossen Stoyanchev 9 years ago
parent
commit
06b2d2b89e
  1. 19
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java
  2. 14
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompSession.java
  3. 38
      spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java

19
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -310,6 +310,23 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { @@ -310,6 +310,23 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
return subscription;
}
@Override
public Receiptable acknowledge(String messageId, boolean consumed) {
StompHeaders stompHeaders = new StompHeaders();
stompHeaders.setId(messageId);
String receiptId = checkOrAddReceipt(stompHeaders);
Receiptable receiptable = new ReceiptHandler(receiptId);
StompCommand command = (consumed ? StompCommand.ACK : StompCommand.NACK);
StompHeaderAccessor accessor = createHeaderAccessor(command);
accessor.addNativeHeaders(stompHeaders);
Message<byte[]> message = createMessage(accessor, null);
execute(message);
return receiptable;
}
private void unsubscribe(String id) {
StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.UNSUBSCRIBE);
accessor.setSubscriptionId(id);

14
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompSession.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -86,6 +86,18 @@ public interface StompSession { @@ -86,6 +86,18 @@ public interface StompSession {
*/
Subscription subscribe(StompHeaders headers, StompFrameHandler handler);
/**
* Send an acknowledgement whether a message was consumed or not resulting
* in an ACK or NACK frame respectively.
* <p><strong>Note:</strong> to use this when subscribing you must set the
* {@link StompHeaders#setAck(String) ack} header to "client" or
* "client-individual" in order ot use this.
* @param messageId the id of the message
* @param consumed whether the message was consumed or not
* @return a Receiptable for tracking events
*/
Receiptable acknowledge(String messageId, boolean consumed);
/**
* Disconnect the session by sending a DISCONNECT frame.
*/

38
spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -491,6 +491,42 @@ public class DefaultStompSessionTests { @@ -491,6 +491,42 @@ public class DefaultStompSessionTests {
assertEquals(subscription.getSubscriptionId(), stompHeaders.getId());
}
@Test
public void ack() throws Exception {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
String messageId = "123";
this.session.acknowledge(messageId, true);
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.ACK, accessor.getCommand());
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
assertEquals(stompHeaders.toString(), 1, stompHeaders.size());
assertEquals(messageId, stompHeaders.getId());
}
@Test
public void nack() throws Exception {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
String messageId = "123";
this.session.acknowledge(messageId, false);
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.NACK, accessor.getCommand());
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
assertEquals(stompHeaders.toString(), 1, stompHeaders.size());
assertEquals(messageId, stompHeaders.getId());
}
@Test
public void receiptReceived() throws Exception {

Loading…
Cancel
Save