diff --git a/spring-jms/src/main/java/org/springframework/jms/connection/CachingConnectionFactory.java b/spring-jms/src/main/java/org/springframework/jms/connection/CachingConnectionFactory.java index ec586de9cb..a5cfbcba54 100644 --- a/spring-jms/src/main/java/org/springframework/jms/connection/CachingConnectionFactory.java +++ b/spring-jms/src/main/java/org/springframework/jms/connection/CachingConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,10 +55,6 @@ import org.springframework.util.ReflectionUtils; * {@link #setSessionCacheSize "sessionCacheSize" value} in case of a * high-concurrency environment. * - *
NOTE: This ConnectionFactory decorator requires JMS 1.1 or higher. - * You may use it through the JMS 1.0.2 API; however, the target JMS driver - * needs to be compliant with JMS 1.1. - * *
When using the JMS 1.0.2 API, this ConnectionFactory will switch * into queue/topic mode according to the JMS API methods used at runtime: * {@code createQueueConnection} and {@code createTopicConnection} will @@ -82,6 +78,9 @@ import org.springframework.util.ReflectionUtils; */ public class CachingConnectionFactory extends SingleConnectionFactory { + private static final Method createSharedConsumerMethod = ClassUtils.getMethodIfAvailable( + Session.class, "createSharedConsumer", Topic.class, String.class, String.class); + private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable( Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class); @@ -339,7 +338,8 @@ public class CachingConnectionFactory extends SingleConnectionFactory { return getCachedConsumer(dest, (args.length > 1 ? (String) args[1] : null), (args.length > 2 && (Boolean) args[2]), - null); + null, + false); } } else if (methodName.equals("createDurableConsumer") || methodName.equals("createDurableSubscriber")) { @@ -348,7 +348,18 @@ public class CachingConnectionFactory extends SingleConnectionFactory { return getCachedConsumer(dest, (args.length > 2 ? (String) args[2] : null), (args.length > 3 && (Boolean) args[3]), - (String) args[1]); + (String) args[1], + true); + } + } + else if (methodName.equals("createSharedConsumer")) { + Destination dest = (Destination) args[0]; + if (dest != null) { + return getCachedConsumer(dest, + (args.length > 2 ? (String) args[2] : null), + null, + (String) args[1], + false); } } else if (methodName.equals("createSharedDurableConsumer")) { @@ -357,7 +368,8 @@ public class CachingConnectionFactory extends SingleConnectionFactory { return getCachedConsumer(dest, (args.length > 2 ? (String) args[2] : null), null, - (String) args[1]); + (String) args[1], + true); } } } @@ -389,7 +401,7 @@ public class CachingConnectionFactory extends SingleConnectionFactory { } private MessageConsumer getCachedConsumer( - Destination dest, String selector, Boolean noLocal, String subscription) throws JMSException { + Destination dest, String selector, Boolean noLocal, String subscription, boolean durable) throws JMSException { ConsumerCacheKey cacheKey = new ConsumerCacheKey(dest, selector, noLocal, subscription); MessageConsumer consumer = this.cachedConsumers.get(cacheKey); @@ -401,10 +413,11 @@ public class CachingConnectionFactory extends SingleConnectionFactory { else { if (dest instanceof Topic) { if (noLocal == null) { + // createSharedConsumer((Topic) dest, subscription, selector); // createSharedDurableConsumer((Topic) dest, subscription, selector); + Method method = (durable ? createSharedDurableConsumerMethod : createSharedConsumerMethod); try { - consumer = (MessageConsumer) createSharedDurableConsumerMethod.invoke - (this.target, dest, subscription, selector); + consumer = (MessageConsumer) method.invoke(this.target, dest, subscription, selector); } catch (InvocationTargetException ex) { if (ex.getTargetException() instanceof JMSException) { @@ -417,7 +430,7 @@ public class CachingConnectionFactory extends SingleConnectionFactory { } } else { - consumer = (subscription != null ? + consumer = (durable ? this.target.createDurableSubscriber((Topic) dest, subscription, selector, noLocal) : this.target.createConsumer(dest, selector, noLocal)); } @@ -550,7 +563,7 @@ public class CachingConnectionFactory extends SingleConnectionFactory { @Override public boolean equals(Object other) { - if (other == this) { + if (this == other) { return true; } ConsumerCacheKey otherKey = (ConsumerCacheKey) other;