Browse Source

CachedSessionInvocationHandler properly caches createSharedConsumer results

Issue: SPR-11956
pull/581/head
Juergen Hoeller 11 years ago
parent
commit
7a7641bd23
  1. 39
      spring-jms/src/main/java/org/springframework/jms/connection/CachingConnectionFactory.java

39
spring-jms/src/main/java/org/springframework/jms/connection/CachingConnectionFactory.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2013 the original author or authors. * Copyright 2002-2014 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -55,10 +55,6 @@ import org.springframework.util.ReflectionUtils;
* {@link #setSessionCacheSize "sessionCacheSize" value} in case of a * {@link #setSessionCacheSize "sessionCacheSize" value} in case of a
* high-concurrency environment. * high-concurrency environment.
* *
* <p><b>NOTE: This ConnectionFactory decorator requires JMS 1.1 or higher.</b>
* You may use it through the JMS 1.0.2 API; however, the target JMS driver
* needs to be compliant with JMS 1.1.
*
* <p>When using the JMS 1.0.2 API, this ConnectionFactory will switch * <p>When using the JMS 1.0.2 API, this ConnectionFactory will switch
* into queue/topic mode according to the JMS API methods used at runtime: * into queue/topic mode according to the JMS API methods used at runtime:
* {@code createQueueConnection} and {@code createTopicConnection} will * {@code createQueueConnection} and {@code createTopicConnection} will
@ -82,6 +78,9 @@ import org.springframework.util.ReflectionUtils;
*/ */
public class CachingConnectionFactory extends SingleConnectionFactory { public class CachingConnectionFactory extends SingleConnectionFactory {
private static final Method createSharedConsumerMethod = ClassUtils.getMethodIfAvailable(
Session.class, "createSharedConsumer", Topic.class, String.class, String.class);
private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable( private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable(
Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class); Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class);
@ -339,7 +338,8 @@ public class CachingConnectionFactory extends SingleConnectionFactory {
return getCachedConsumer(dest, return getCachedConsumer(dest,
(args.length > 1 ? (String) args[1] : null), (args.length > 1 ? (String) args[1] : null),
(args.length > 2 && (Boolean) args[2]), (args.length > 2 && (Boolean) args[2]),
null); null,
false);
} }
} }
else if (methodName.equals("createDurableConsumer") || methodName.equals("createDurableSubscriber")) { else if (methodName.equals("createDurableConsumer") || methodName.equals("createDurableSubscriber")) {
@ -348,7 +348,18 @@ public class CachingConnectionFactory extends SingleConnectionFactory {
return getCachedConsumer(dest, return getCachedConsumer(dest,
(args.length > 2 ? (String) args[2] : null), (args.length > 2 ? (String) args[2] : null),
(args.length > 3 && (Boolean) args[3]), (args.length > 3 && (Boolean) args[3]),
(String) args[1]); (String) args[1],
true);
}
}
else if (methodName.equals("createSharedConsumer")) {
Destination dest = (Destination) args[0];
if (dest != null) {
return getCachedConsumer(dest,
(args.length > 2 ? (String) args[2] : null),
null,
(String) args[1],
false);
} }
} }
else if (methodName.equals("createSharedDurableConsumer")) { else if (methodName.equals("createSharedDurableConsumer")) {
@ -357,7 +368,8 @@ public class CachingConnectionFactory extends SingleConnectionFactory {
return getCachedConsumer(dest, return getCachedConsumer(dest,
(args.length > 2 ? (String) args[2] : null), (args.length > 2 ? (String) args[2] : null),
null, null,
(String) args[1]); (String) args[1],
true);
} }
} }
} }
@ -389,7 +401,7 @@ public class CachingConnectionFactory extends SingleConnectionFactory {
} }
private MessageConsumer getCachedConsumer( private MessageConsumer getCachedConsumer(
Destination dest, String selector, Boolean noLocal, String subscription) throws JMSException { Destination dest, String selector, Boolean noLocal, String subscription, boolean durable) throws JMSException {
ConsumerCacheKey cacheKey = new ConsumerCacheKey(dest, selector, noLocal, subscription); ConsumerCacheKey cacheKey = new ConsumerCacheKey(dest, selector, noLocal, subscription);
MessageConsumer consumer = this.cachedConsumers.get(cacheKey); MessageConsumer consumer = this.cachedConsumers.get(cacheKey);
@ -401,10 +413,11 @@ public class CachingConnectionFactory extends SingleConnectionFactory {
else { else {
if (dest instanceof Topic) { if (dest instanceof Topic) {
if (noLocal == null) { if (noLocal == null) {
// createSharedConsumer((Topic) dest, subscription, selector);
// createSharedDurableConsumer((Topic) dest, subscription, selector); // createSharedDurableConsumer((Topic) dest, subscription, selector);
Method method = (durable ? createSharedDurableConsumerMethod : createSharedConsumerMethod);
try { try {
consumer = (MessageConsumer) createSharedDurableConsumerMethod.invoke consumer = (MessageConsumer) method.invoke(this.target, dest, subscription, selector);
(this.target, dest, subscription, selector);
} }
catch (InvocationTargetException ex) { catch (InvocationTargetException ex) {
if (ex.getTargetException() instanceof JMSException) { if (ex.getTargetException() instanceof JMSException) {
@ -417,7 +430,7 @@ public class CachingConnectionFactory extends SingleConnectionFactory {
} }
} }
else { else {
consumer = (subscription != null ? consumer = (durable ?
this.target.createDurableSubscriber((Topic) dest, subscription, selector, noLocal) : this.target.createDurableSubscriber((Topic) dest, subscription, selector, noLocal) :
this.target.createConsumer(dest, selector, noLocal)); this.target.createConsumer(dest, selector, noLocal));
} }
@ -550,7 +563,7 @@ public class CachingConnectionFactory extends SingleConnectionFactory {
@Override @Override
public boolean equals(Object other) { public boolean equals(Object other) {
if (other == this) { if (this == other) {
return true; return true;
} }
ConsumerCacheKey otherKey = (ConsumerCacheKey) other; ConsumerCacheKey otherKey = (ConsumerCacheKey) other;

Loading…
Cancel
Save