23. JMS (Java消息服务 )

23.1 介绍

Spring整合JMS,就像对JDBC API的整合一样,简化了 JMS API的使用。JMS大概可以分为两个功能区,消息生产和消费。 JmsTemplate类用于消息生产和同步消息的接收。对于异步消息的接收与Java EE的消息驱动bean风格相似,Spring提 供大量的消息监听容器用于创建消息驱动POJOs(MDPs)。Spring也支持声明式方式创建消息监听。 org.springframework.jms.core包提供了使用JMS的核心功能。它包含了JMS模板类,通过处理资源的创建和释放,简化 了JMS的使用,更像是JdbcTemplate类为JDBC所做的。Spring模板类共同的设计准则是提供帮助器方法执行通用的操作和更 复杂的使用,将处理任务的本质委托给实现回调接口的用户。JMS模板遵循同样的设计准则,提供了多种方便的方法用于发送消息,异 步接收消息,还提供JMS会话和消息生产者给用户。org.springframework.jms.support包提供了JMSException的 转换功能,将受检查的JMSException层转换到不受检查异常的镜像层。如果有一个受检查的javax.jms.JMSException 类的子类,这个子类异常被封装到了不受检查的UncategorizedJmsException异常中。 org.springframework.jms.support.converter包提供一个 MessageConverter抽象层用于Java对象 和 JMS消 息间的相互转换。 org.springframework.jms.support.destination包提供多种策略管理JMS目的地,如为存储在JNDI的目的地提供定位器。 org.springframework.jms.annotation包提供必要的基础支持使用@JmsListener注解驱动的端点侦听器。 org.springframework.jms.config包提供解析器实现jms命名空间以及java配置支持监听器容器配置和创建监听器端。 最后, org.springframework.jms.connection包提供了ConnectionFactory接口的实现,适合用在独立的应用程序中。 它也 包含Spring的PlatformTransactionManager接口实现,JMS模块中当前被命名为JmsTransactionManager。这使得 JMS作为一种事务性资源无缝集成到Spring的事务管理机制中来。

23.2 使用 Spring JMS

23.2.1 JmsTemplate类

JmsTemplate类是JMS核心包中的中心类,它简化了JMS的使用,因为当发送消息或者同步接收消息的时候,它处理资源的创建和释放。

代码使用JmsTemplate仅仅需要实现回调接口,给他们一个十分清晰的高层次的约定。MessageCreator回调接口通过调用JmsTemplate 类中的代码提供Session,创建一则消息。为了允许JMS API更复杂的使用,回调接口SessionCallback提供给用户使用JMS session, ProducerCallback提供给用户成对的SessionMessageProducer

JMS API提供两种类型的发送方法,一种使用传递模式、优先级、存活时间任务服务质量参数,别一种没有使用服务质量参数而使用默认值。因为在 JmsTemplate有许多发送方法,暴露出的bean属性对应QOS(Quarity of Service)参数的设置避免了大量发送方法的重复。同样地,异步接收方法使用属性setReceiveTimeout 设置超时值。

一些JMS提供者通过配置ConnectionFactory,管理方式上允许默认的QOS值的设置。MessageProducer的发送方法 send(Destination destination, Message message) 在那些专有的JMS中将会使用不一样的QOS默认值。为了提供对QOS值一致的管理,JmsTemplate因此必须设置属性isExplicitQosEnabledtrue明确的指定使用默认的QOS值。

为了方便,JmsTemplate也提供一个基本的请求响应操作,允许在一个操作部分所创建的临时队列中发送消息并等待回复。

[Note]Note

一旦被配置JmsTemplate类实例是线程安全的。这是很重要的,因为这意味着你可以配置JmsTemplate的单例并且把它安全注入到多个协作中。 要明确的是, JmsTemplate是有状态的,它维持着对ConnectionFactory的引用,但这种状态不是会话状态。

从Spring框架4.1开始,JmsMessagingTemplate构建在 JmsTemplate基础之上,并且提供了和消息抽象层(如org.springframework.messaging.Message) 的整合,这允许让你用通用的方式创建消息发送消息。

23.2.2 连接

JmsTemplate需要一个对ConnectionFactory的引用 。ConnectionFactory是JMS规范的一部分,作为 使用JMS的入口点。它被客户端程序用作一个工厂创建与JMS提供者的连接并且压缩许多配置参数,大多数配置参数是消息提供 商指定的,如SSL配置选择。

当在EJB中使用JMS时,消息提供商提供了JMS接口的实现,所以他们可以参与到声明式事务管理和执行的连接池和会话中。 为了使用这个实现,J2EE容器通常需要你声明一个JMS连接工厂作为EJB或者servlet部署描述器内部的一个resource-ref, 客户端程序应该确保它引用ConnectionFactory被spring管理起来的实现。

缓存消息资源

标准API涉及创建许多中间对象。要创建一个消息,下面的API步骤被执行

ConnectionFactory->Connection->Session->MessageProducer->send

在ConnectionFactory和Send操作之间有三个中间对象被创建和销毁。为了优化资源的使用提升性能,ConnectionFactory 提供两种实现方式。

SingleConnectionFactory类

Spring提供ConnectionFactory接口的一个实现SingleConnectionFactory,它将在所有的createConnection()方法调用上返回 同样的 Connection并且不需要调用 close()。这是用于测试和独立的环境,以至于同一个连接能够被JmsTemplate多次使用,JmsTemplate 调用可能跨任意多个事务。SingleConnectionFactory参考一个标准的,通常来自于JNDI的ConnectionFactory

CachingConnectionFactory类

CachingConnectionFactory类扩展了 SingleConnectionFactory类的功能并且添加了会话缓存,消息生产者和消息消费者。 初始化缓存大小设置为1,使用属性SessionCacheSize增加被缓存会话的数量。注意实际被缓存会话的数量将远比基于应答模式缓存的会 话数量多,因此对于每一个AcknowledgementMode,当SessionCacheSize设置为1时,缓存的会话实例提升到4。当缓存的时候, 消息生产者和消息消费者被缓存在他们自己的会话中同时也考虑到生产者和消费者的唯一属性。消息生产者基于他们的目的地被缓存,消息消费者 基于目的地,选择器,非本地传送标识和持久订阅名称(假设创建持久消费者)的关键组成被缓存。

23.2.3 目的地管理

目的地,就像连接工厂一样,是JMS管理的对象,可以在JNDI中存储和获取。当配置一个Spring应用上下文时,你可以使用JNDI工厂类 JndiObjectFactoryBean / <jee:jndi-lookup>执行依赖注入,在你对JMS目的地对象的引用上面。然而,通常这种策略是 麻烦的,如果在应用程序中有大量的目的地,或者如果独特的JMS提供商有先进的目的地管理功能。例如,这个先进的目的地管理是动态的创建 目的地或者支持目的地的层次化命名空间的。JmsTemplate类委托一个目的地名的解析给实现DestinationResolver接口的一个 JMS目的地对象。DynamicDestinationResolverJmsTemplate使用,默认实现DestinationResolver并且提供动态 目的地的解析。JndiDestinationResolver也被提供,作为目的在服务定位器包含在JNDI中,和ptionally falls back to the behavior 包含在DynamicDestinationResolver中。

很多时候在JMS应用程序中被使用的destinations只有在运行的时候才知道,因此不能够在部署应用程序的时候通过管理方式创建。 这通常是因为在两个相互作用的系统组件之间,根据命名约定在运行时创建destinations。虽然动态的创建destinations并不是 JMS规范的一部分,但大多数的厂商提供了这个功能。动态destinations根据用户的定义进行创建区别于临时的destinations, 另外它并不总是在JNDI中注册。用于创建动态destinations的API在厂商之间是不一样的,因为与destination相关的属 性是厂商特有的。然而,有时候厂商做一个简单实现选择,而忽略了JMS规范的警告,使用TopicSession类的createTopic( String topicName)方法或者QueueSession类的createQueue(StringqueueName)方法,加上默认的destinat ion属性配置,创建一个新的destination。根据厂商的实现,DynamicDestinationResolver也有可能创建一个实实在在的 destination而不是对其进行解析。

JmsTemplate使用布尔类型属性pubSubDomain进行配置,了解JMS哪个域被使用。默认是false,表示采用点对点的Queues。 通过对DestinationResolver接口的实现,它决定了动态destination解析的行为。

你也可以使用JmsTemplate类的defaultDestination属性配置默认的destination。默认的destination将用来发送和 接收消息,不引用指定的destination。

23.2.4 消息监听容器

在EJB中JMS消息最常用的一种是驱动消息驱动beans(MDBs)。Spring提供了一种创建消息驱动POJOs(MDPs)方案,不用将一个用户 绑定到EJB容器中。(参看Section 23.4.2, “Asynchronous Reception - Message-Driven POJOs”了解更多Spring支持MDP的详细信息。)从Spring4.1 开始,端点方法可以简单使用注解@JmsListener参看Section 23.6, “Annotation-driven listener endpoints”了解更多信息。

一个消息监听容器用于接收JMS消息队列的消息并驱动MessageListener注入进来。它负责所有消息接收线程并分配给监听器处理。它 是MDP和消息供应者间的桥梁,并且负责注册接收消息,参与事务,获取和释放资源,异常转换等等。这允许你写与接收消息(可能需要回应) 相关的业务逻辑(可能复杂),并且delegates boilerplate JMS infrastructure concerns to the framework.

有两个标准的JMS消息监听容器包含在Spring中,每一个都有它特殊的功能集。

SimpleMessageListenerContainer类

这个消息监听容器是两种标准风格中比较简单的一个,它在启动时创建固定数量的JMS会话和消费者,使用标准的JMS方法MessageConsumer.setMessageListener() 注册监听,并且让JMS提供者做监听回调。它不适于动态运行要求或者参与额外管理事务。兼容上,它与标准的JMS规范很近,但它通常情况下不兼容Java EE的JMS限制条件。

DefaultMessageListenerContainer类

这个消息监听容器用于大部分的案例中。与SimpleMessageListenerContainer相反的是,这个容器适于动态运行要求并且能参与额外管理事务。 在配置JtaTransactionManager的时候,每一个被接收的消息使用XA事务注册,因此可能利用XA事务语法处理。这个监听容器在JMS供应者低要求, 先进功能(如事务参与)和与JavaEE环境兼容性之间取得了平衡。

容器缓存等级可以定制,注意当缓存不可用的时候,每一次消息接收,一个新的connection和新的session被创建。使用高负载的非持久化订阅可能导致 消息丢失,在这种情况下,确保使用合适的缓存等级。

This container also has recoverable capabilities when the broker goes down. By default, a simple BackOff implementation retries every 5 seconds. It is possible to specify a custom BackOff implementation for more fine-grained recovery options, see ExponentialBackOff for an example.

23.2.5 Transaction management

Spring provides a JmsTransactionManager that manages transactions for a single JMS ConnectionFactory. This allows JMS applications to leverage the managed transaction features of Spring as described in Chapter 11, 事务管理. The JmsTransactionManager performs local resource transactions, binding a JMS Connection/Session pair from the specified ConnectionFactory to the thread. JmsTemplate automatically detects such transactional resources and operates on them accordingly.

In a Java EE environment, the ConnectionFactory will pool Connections and Sessions, so those resources are efficiently reused across transactions. In a standalone environment, using Spring’s SingleConnectionFactory will result in a shared JMS Connection, with each transaction having its own independent Session. Alternatively, consider the use of a provider-specific pooling adapter such as ActiveMQ’s PooledConnectionFactory class.

JmsTemplate can also be used with the JtaTransactionManager and an XA-capable JMS ConnectionFactory for performing distributed transactions. Note that this requires the use of a JTA transaction manager as well as a properly XA-configured ConnectionFactory! (Check your Java EE server’s / JMS provider’s documentation.)

Reusing code across a managed and unmanaged transactional environment can be confusing when using the JMS API to create a Session from a Connection. This is because the JMS API has only one factory method to create a Session and it requires values for the transaction and acknowledgement modes. In a managed environment, setting these values is the responsibility of the environment’s transactional infrastructure, so these values are ignored by the vendor’s wrapper to the JMS Connection. When using the JmsTemplate in an unmanaged environment you can specify these values through the use of the properties sessionTransacted and sessionAcknowledgeMode. When using a PlatformTransactionManager with JmsTemplate, the template will always be given a transactional JMS Session.

23.3 Sending a Message

The JmsTemplate contains many convenience methods to send a message. There are send methods that specify the destination using a javax.jms.Destination object and those that specify the destination using a string for use in a JNDI lookup. The send method that takes no destination argument uses the default destination.

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;

import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.JmsTemplate;

public class JmsQueueSender {

    private JmsTemplate jmsTemplate;
    private Queue queue;

    public void setConnectionFactory(ConnectionFactory cf) {
        this.jmsTemplate = new JmsTemplate(cf);
    }

    public void setQueue(Queue queue) {
        this.queue = queue;
    }

    public void simpleSend() {
        this.jmsTemplate.send(this.queue, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("hello queue world");
            }
        });
    }
}

This example uses the MessageCreator callback to create a text message from the supplied Session object. The JmsTemplate is constructed by passing a reference to a ConnectionFactory. As an alternative, a zero argument constructor and connectionFactory is provided and can be used for constructing the instance in JavaBean style (using a BeanFactory or plain Java code). Alternatively, consider deriving from Spring’s JmsGatewaySupport convenience base class, which provides pre-built bean properties for JMS configuration.

The method send(String destinationName, MessageCreator creator) lets you send a message using the string name of the destination. If these names are registered in JNDI, you should set the destinationResolver property of the template to an instance of JndiDestinationResolver.

If you created the JmsTemplate and specified a default destination, the send(MessageCreator c) sends a message to that destination.

23.3.1 Using Message Converters

In order to facilitate the sending of domain model objects, the JmsTemplate has various send methods that take a Java object as an argument for a message’s data content. The overloaded methods convertAndSend() and receiveAndConvert() in JmsTemplate delegate the conversion process to an instance of the MessageConverter interface. This interface defines a simple contract to convert between Java objects and JMS messages. The default implementation SimpleMessageConverter supports conversion between String and TextMessage, byte[] and BytesMesssage, and java.util.Map and MapMessage. By using the converter, you and your application code can focus on the business object that is being sent or received via JMS and not be concerned with the details of how it is represented as a JMS message.

The sandbox currently includes a MapMessageConverter which uses reflection to convert between a JavaBean and a MapMessage. Other popular implementation choices you might implement yourself are Converters that use an existing XML marshalling package, such as JAXB, Castor, XMLBeans, or XStream, to create a TextMessage representing the object.

To accommodate the setting of a message’s properties, headers, and body that can not be generically encapsulated inside a converter class, the MessagePostProcessor interface gives you access to the message after it has been converted, but before it is sent. The example below demonstrates how to modify a message header and a property after a java.util.Map is converted to a message.

public void sendWithConversion() {
    Map map = new HashMap();
    map.put("Name", "Mark");
    map.put("Age", new Integer(47));
    jmsTemplate.convertAndSend("testQueue", map, new MessagePostProcessor() {
        public Message postProcessMessage(Message message) throws JMSException {
            message.setIntProperty("AccountID", 1234);
            message.setJMSCorrelationID("123-00001");
            return message;
        }
    });
}

This results in a message of the form:

MapMessage={
	Header={
		... standard headers ...
		CorrelationID={123-00001}
	}
	Properties={
		AccountID={Integer:1234}
	}
	Fields={
		Name={String:Mark}
		Age={Integer:47}
	}
}

23.3.2 SessionCallback and ProducerCallback

While the send operations cover many common usage scenarios, there are cases when you want to perform multiple operations on a JMS Session or MessageProducer. The SessionCallback and ProducerCallback expose the JMS Session and Session / MessageProducer pair respectively. The execute() methods on JmsTemplate execute these callback methods.

23.4 Receiving a message

23.4.1 Synchronous Reception

While JMS is typically associated with asynchronous processing, it is possible to consume messages synchronously. The overloaded receive(..) methods provide this functionality. During a synchronous receive, the calling thread blocks until a message becomes available. This can be a dangerous operation since the calling thread can potentially be blocked indefinitely. The property receiveTimeout specifies how long the receiver should wait before giving up waiting for a message.

23.4.2 Asynchronous Reception - Message-Driven POJOs

[Note]Note

Spring also supports annotated-listener endpoints through the use of the @JmsListener annotation and provides an open infrastructure to register endpoints programmatically. This is by far the most convenient way to setup an asynchronous receiver, see Section 23.6.1, “Enable listener endpoint annotations” for more details.

In a fashion similar to a Message-Driven Bean (MDB) in the EJB world, the Message-Driven POJO (MDP) acts as a receiver for JMS messages. The one restriction (but see also below for the discussion of the MessageListenerAdapter class) on an MDP is that it must implement the javax.jms.MessageListener interface. Please also be aware that in the case where your POJO will be receiving messages on multiple threads, it is important to ensure that your implementation is thread-safe.

Below is a simple implementation of an MDP:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ExampleListener implements MessageListener {

    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                System.out.println(((TextMessage) message).getText());
            }
            catch (JMSException ex) {
                throw new RuntimeException(ex);
            }
        }
        else {
            throw new IllegalArgumentException("Message must be of type TextMessage");
        }
    }

}

Once you’ve implemented your MessageListener, it’s time to create a message listener container.

Find below an example of how to define and configure one of the message listener containers that ships with Spring (in this case the DefaultMessageListenerContainer).

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener" />

<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener" />
</bean>

Please refer to the Spring javadocs of the various message listener containers for a full description of the features supported by each implementation.

23.4.3 the SessionAwareMessageListener interface

The SessionAwareMessageListener interface is a Spring-specific interface that provides a similar contract to the JMS MessageListener interface, but also provides the message handling method with access to the JMS Session from which the Message was received.

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

    void onMessage(Message message, Session session) throws JMSException;

}

You can choose to have your MDPs implement this interface (in preference to the standard JMS MessageListener interface) if you want your MDPs to be able to respond to any received messages (using the Session supplied in the onMessage(Message, Session) method). All of the message listener container implementations that ship with Spring have support for MDPs that implement either the MessageListener or SessionAwareMessageListener interface. Classes that implement the SessionAwareMessageListener come with the caveat that they are then tied to Spring through the interface. The choice of whether or not to use it is left entirely up to you as an application developer or architect.

Please note that the 'onMessage(..)' method of the SessionAwareMessageListener interface throws JMSException. In contrast to the standard JMS MessageListener interface, when using the SessionAwareMessageListener interface, it is the responsibility of the client code to handle any exceptions thrown.

23.4.4 the MessageListenerAdapter

The MessageListenerAdapter class is the final component in Spring’s asynchronous messaging support: in a nutshell, it allows you to expose almost any class as a MDP (there are of course some constraints).

Consider the following interface definition. Notice that although the interface extends neither the MessageListener nor SessionAwareMessageListener interfaces, it can still be used as a MDP via the use of the MessageListenerAdapter class. Notice also how the various message handling methods are strongly typed according to the contents of the various Message types that they can receive and handle.

public interface MessageDelegate {

    void handleMessage(String message);

    void handleMessage(Map message);

    void handleMessage(byte[] message);

    void handleMessage(Serializable message);

}
public class DefaultMessageDelegate implements MessageDelegate {
    // implementation elided for clarity...
}

In particular, note how the above implementation of the MessageDelegate interface (the above DefaultMessageDelegate class) has no JMS dependencies at all. It truly is a POJO that we will make into an MDP via the following configuration.

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="jmsexample.DefaultMessageDelegate"/>
    </constructor-arg>
</bean>

<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener" />
</bean>

Below is an example of another MDP that can only handle the receiving of JMS TextMessage messages. Notice how the message handling method is actually called 'receive' (the name of the message handling method in a MessageListenerAdapter defaults to 'handleMessage'), but it is configurable (as you will see below). Notice also how the 'receive(..)' method is strongly typed to receive and respond only to JMS TextMessage messages.

public interface TextMessageDelegate {

    void receive(TextMessage message);

}
public class DefaultTextMessageDelegate implements TextMessageDelegate {
    // implementation elided for clarity...
}

The configuration of the attendant MessageListenerAdapter would look like this:

<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="jmsexample.DefaultTextMessageDelegate"/>
    </constructor-arg>
    <property name="defaultListenerMethod" value="receive"/>
    <!-- we don't want automatic message context extraction -->
    <property name="messageConverter">
        <null/>
    </property>
</bean>

Please note that if the above 'messageListener' receives a JMS Message of a type other than TextMessage, an IllegalStateException will be thrown (and subsequently swallowed). Another of the capabilities of the MessageListenerAdapter class is the ability to automatically send back a response Message if a handler method returns a non-void value. Consider the interface and class:

public interface ResponsiveTextMessageDelegate {

    // notice the return type...
    String receive(TextMessage message);

}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
    // implementation elided for clarity...
}

If the above DefaultResponsiveTextMessageDelegate is used in conjunction with a MessageListenerAdapter then any non-null value that is returned from the execution of the 'receive(..)' method will (in the default configuration) be converted into a TextMessage. The resulting TextMessage will then be sent to the Destination (if one exists) defined in the JMS Reply-To property of the original Message, or the default Destination set on the MessageListenerAdapter (if one has been configured); if no Destination is found then an InvalidDestinationException will be thrown (and please note that this exception will not be swallowed and will propagate up the call stack).

23.4.5 Processing messages within transactions

Invoking a message listener within a transaction only requires reconfiguration of the listener container.

Local resource transactions can simply be activated through the sessionTransacted flag on the listener container definition. Each message listener invocation will then operate within an active JMS transaction, with message reception rolled back in case of listener execution failure. Sending a response message (via SessionAwareMessageListener) will be part of the same local transaction, but any other resource operations (such as database access) will operate independently. This usually requires duplicate message detection in the listener implementation, covering the case where database processing has committed but message processing failed to commit.

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
    <property name="sessionTransacted" value="true"/>
</bean>

For participating in an externally managed transaction, you will need to configure a transaction manager and use a listener container which supports externally managed transactions: typically DefaultMessageListenerContainer.

To configure a message listener container for XA transaction participation, you’ll want to configure a JtaTransactionManager (which, by default, delegates to the Java EE server’s transaction subsystem). Note that the underlying JMS ConnectionFactory needs to be XA-capable and properly registered with your JTA transaction coordinator! (Check your Java EE server’s configuration of JNDI resources.) This allows message reception as well as e.g. database access to be part of the same transaction (with unified commit semantics, at the expense of XA transaction log overhead).

<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

Then you just need to add it to our earlier container configuration. The container will take care of the rest.

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
    <property name="transactionManager" ref="transactionManager"/>
</bean>

23.5 Support for JCA Message Endpoints

Beginning with version 2.5, Spring also provides support for a JCA-based MessageListener container. The JmsMessageEndpointManager will attempt to automatically determine the ActivationSpec class name from the provider’s ResourceAdapter class name. Therefore, it is typically possible to just provide Spring’s generic JmsActivationSpecConfig as shown in the following example.

<bean class="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpecConfig">
        <bean class="org.springframework.jms.listener.endpoint.JmsActivationSpecConfig">
            <property name="destinationName" value="myQueue"/>
        </bean>
    </property>
    <property name="messageListener" ref="myMessageListener"/>
</bean>

Alternatively, you may set up a JmsMessageEndpointManager with a given ActivationSpec object. The ActivationSpec object may also come from a JNDI lookup (using <jee:jndi-lookup>).

<bean class="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpec">
        <bean class="org.apache.activemq.ra.ActiveMQActivationSpec">
            <property name="destination" value="myQueue"/>
            <property name="destinationType" value="javax.jms.Queue"/>
        </bean>
    </property>
    <property name="messageListener" ref="myMessageListener"/>
</bean>

Using Spring’s ResourceAdapterFactoryBean, the target ResourceAdapter may be configured locally as depicted in the following example.

<bean id="resourceAdapter" class="org.springframework.jca.support.ResourceAdapterFactoryBean">
    <property name="resourceAdapter">
        <bean class="org.apache.activemq.ra.ActiveMQResourceAdapter">
            <property name="serverUrl" value="tcp://localhost:61616"/>
        </bean>
    </property>
    <property name="workManager">
        <bean class="org.springframework.jca.work.SimpleTaskWorkManager"/>
    </property>
</bean>

The specified WorkManager may also point to an environment-specific thread pool - typically through SimpleTaskWorkManager's "asyncTaskExecutor" property. Consider defining a shared thread pool for all your ResourceAdapter instances if you happen to use multiple adapters.

In some environments (e.g. WebLogic 9 or above), the entire ResourceAdapter object may be obtained from JNDI instead (using <jee:jndi-lookup>). The Spring-based message listeners can then interact with the server-hosted ResourceAdapter, also using the server’s built-in WorkManager.

Please consult the JavaDoc for JmsMessageEndpointManager, JmsActivationSpecConfig, and ResourceAdapterFactoryBean for more details.

Spring also provides a generic JCA message endpoint manager which is not tied to JMS: org.springframework.jca.endpoint.GenericMessageEndpointManager. This component allows for using any message listener type (e.g. a CCI MessageListener) and any provider-specific ActivationSpec object. Check out your JCA provider’s documentation to find out about the actual capabilities of your connector, and consult GenericMessageEndpointManager's JavaDoc for the Spring-specific configuration details.

[Note]Note

JCA-based message endpoint management is very analogous to EJB 2.1 Message-Driven Beans; it uses the same underlying resource provider contract. Like with EJB 2.1 MDBs, any message listener interface supported by your JCA provider can be used in the Spring context as well. Spring nevertheless provides explicit convenience support for JMS, simply because JMS is the most common endpoint API used with the JCA endpoint management contract.

23.6 Annotation-driven listener endpoints

The easiest way to receive a message asynchronously is to use the annotated listener endpoint infrastructure. In a nutshell, it allows you to expose a method of a managed bean as a JMS listener endpoint.

@Component
public class MyService {

    @JmsListener(destination = "myDestination")
    public void processOrder(String data) { ... }
}

The idea of the example above is that whenever a message is available on the javax.jms.Destination "myDestination", the processOrder method is invoked accordingly (in this case, with the content of the JMS message similarly to what the MessageListenerAdapter provides).

The annotated endpoint infrastructure creates a message listener container behind the scenes for each annotated method, using a JmsListenerContainerFactory.

23.6.1 Enable listener endpoint annotations

To enable support for @JmsListener annotations add @EnableJms to one of your @Configuration classes.

@Configuration
@EnableJms
public class AppConfig {

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setDestinationResolver(destinationResolver());
        factory.setConcurrency("3-10");
        return factory;
    }
}

By default, the infrastructure looks for a bean named jmsListenerContainerFactory as the source for the factory to use to create message listener containers. In this case, and ignoring the JMS infrastructure setup, the processOrder method can be invoked with a core poll size of 3 threads and a maximum pool size of 10 threads.

It is possible to customize the listener container factory to use per annotation or an explicit default can be configured by implementing the JmsListenerConfigurer interface. The default is only required if at least one endpoint is registered without a specific container factory. See the javadoc for full details and examples.

If you prefer XML configuration use the <jms:annotation-driven> element.

<jms:annotation-driven/>

<bean id="jmsListenerContainerFactory"
        class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationResolver" ref="destinationResolver"/>
    <property name="concurrency" value="3-10"/>
</bean>

23.6.2 Programmatic endpoints registration

JmsListenerEndpoint provides a model of an JMS endpoint and is responsible for configuring the container for that model. The infrastructure allows you to configure endpoints programmatically in addition to the ones that are detected by the JmsListener annotation.

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
        endpoint.setId("myJmsEndpoint");
        endpoint.setDestination("anotherQueue");
        endpoint.setMessageListener(message -> {
            // processing
        });
        registrar.registerEndpoint(endpoint);
    }
}

In the example above, we used SimpleJmsListenerEndpoint which provides the actual MessageListener to invoke but you could just as well build your own endpoint variant describing a custom invocation mechanism.

It should be noted that you could just as well skip the use of @JmsListener altogether and only register your endpoints programmatically through JmsListenerConfigurer.

23.6.3 Annotated endpoint method signature

So far, we have been injecting a simple String in our endpoint but it can actually have a very flexible method signature. Let’s rewrite it to inject the Order with a custom header:

@Component
public class MyService {

    @JmsListener(destination = "myDestination")
    public void processOrder(Order order, @Header("order_type") String orderType) {
        ...
    }
}

These are the main elements you can inject in JMS listener endpoints:

  • The raw javax.jms.Message or any of its subclasses (provided of course that it matches the incoming message type).
  • The javax.jms.Session for optional access to the native JMS API e.g. for sending a custom reply.
  • The org.springframework.messaging.Message representing the incoming JMS message. Note that this message holds both the custom and the standard headers (as defined by JmsHeaders).
  • @Header-annotated method arguments to extract a specific header value, including standard JMS headers.
  • @Headers-annotated argument that must also be assignable to java.util.Map for getting access to all headers.
  • A non-annotated element that is not one of the supported types (i.e. Message and Session) is considered to be the payload. You can make that explicit by annotating the parameter with @Payload. You can also turn on validation by adding an extra @Valid.

The ability to inject Spring’s Message abstraction is particularly useful to benefit from all the information stored in the transport-specific message without relying on transport-specific API.

@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }

Handling of method arguments is provided by DefaultMessageHandlerMethodFactory which can be further customized to support additional method arguments. The conversion and validation support can be customized there as well.

For instance, if we want to make sure our Order is valid before processing it, we can annotate the payload with @Valid and configure the necessary validator as follows:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
    }

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setValidator(myValidator());
        return factory;
    }
}

23.6.4 Reply management

The existing support in MessageListenerAdapter already allows your method to have a non-void return type. When that’s the case, the result of the invocation is encapsulated in a javax.jms.Message sent either in the destination specified in the JMSReplyTo header of the original message or in the default destination configured on the listener. That default destination can now be set using the @SendTo annotation of the messaging abstraction.

Assuming our processOrder method should now return an OrderStatus, it is possible to write it as follow to automatically send a reply:

@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
    // order processing
    return status;
}

If you need to set additional headers in a transport-independent manner, you could return a Message instead, something like:

@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
    // order processing
    return MessageBuilder
            .withPayload(status)
            .setHeader("code", 1234)
            .build();
}

23.7 JMS Namespace Support

Spring provides an XML namespace for simplifying JMS configuration. To use the JMS namespace elements you will need to reference the JMS schema:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xsi:schemaLocation="
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">

    <!-- bean definitions here -->

</beans>

The namespace consists of three top-level elements: <annotation-driven/>, <listener-container/> and <jca-listener-container/>. <annotation-driven enables the use of annotation-driven listener endpoints. <listener-container/> and <jca-listener-container/> defines shared listener container configuration and may contain <listener/> child elements. Here is an example of a basic configuration for two listeners.

<jms:listener-container>

    <jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>

    <jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/>

</jms:listener-container>

The example above is equivalent to creating two distinct listener container bean definitions and two distinct MessageListenerAdapter bean definitions as demonstrated in Section 23.4.4, “the MessageListenerAdapter”. In addition to the attributes shown above, the listener element may contain several optional ones. The following table describes all available attributes:

Table 23.1. Attributes of the JMS <listener> element

AttributeDescription

id

A bean name for the hosting listener container. If not specified, a bean name will be automatically generated.

destination (required)

The destination name for this listener, resolved through the DestinationResolver strategy.

ref (required)

The bean name of the handler object.

method

The name of the handler method to invoke. If the ref points to a MessageListener or Spring SessionAwareMessageListener, this attribute may be omitted.

response-destination

The name of the default response destination to send response messages to. This will be applied in case of a request message that does not carry a "JMSReplyTo" field. The type of this destination will be determined by the listener-container’s "destination-type" attribute. Note: This only applies to a listener method with a return value, for which each result object will be converted into a response message.

subscription

The name of the durable subscription, if any.

selector

An optional message selector for this listener.

concurrency

The number of concurrent sessions/consumers to start for this listener. Can either be a simple number indicating the maximum number (e.g. "5") or a range indicating the lower as well as the upper limit (e.g. "3-5"). Note that a specified minimum is just a hint and might be ignored at runtime. Default is the value provided by the container


The <listener-container/> element also accepts several optional attributes. This allows for customization of the various strategies (for example, taskExecutor and destinationResolver) as well as basic JMS settings and resource references. Using these attributes, it is possible to define highly-customized listener containers while still benefiting from the convenience of the namespace.

Such settings can be automatically exposed as a JmsListenerContainerFactory by specifying the id of the bean to expose through the factory-id attribute.

<jms:listener-container connection-factory="myConnectionFactory"
        task-executor="myTaskExecutor"
        destination-resolver="myDestinationResolver"
        transaction-manager="myTransactionManager"
        concurrency="10">

    <jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>

    <jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/>

</jms:listener-container>

The following table describes all available attributes. Consult the class-level javadocs of the AbstractMessageListenerContainer and its concrete subclasses for more details on the individual properties. The javadocs also provide a discussion of transaction choices and message redelivery scenarios.

Table 23.2. Attributes of the JMS <listener-container> element

AttributeDescription

container-type

The type of this listener container. Available options are: default, simple, default102, or simple102 (the default value is 'default').

container-class

A custom listener container implementation class as fully qualified class name. Default is Spring’s standard DefaultMessageListenerContainer or SimpleMessageListenerContainer, according to the "container-type" attribute.

factory-id

Exposes the settings defined by this element as a JmsListenerContainerFactory with the specified id so that they can be reused with other endpoints.

connection-factory

A reference to the JMS ConnectionFactory bean (the default bean name is 'connectionFactory').

task-executor

A reference to the Spring TaskExecutor for the JMS listener invokers.

destination-resolver

A reference to the DestinationResolver strategy for resolving JMS Destinations.

message-converter

A reference to the MessageConverter strategy for converting JMS Messages to listener method arguments. Default is a SimpleMessageConverter.

error-handler

A reference to an ErrorHandler strategy for handling any uncaught Exceptions that may occur during the execution of the MessageListener.

destination-type

The JMS destination type for this listener: queue, topic, durableTopic, sharedTopic or sharedDurableTopic. This enables potentially the pubSubDomain, subscriptionDurable and subscriptionShared properties of the container. The default is queue (i.e. disabling those 3 properties).

client-id

The JMS client id for this listener container. Needs to be specified when using durable subscriptions.

cache

The cache level for JMS resources: none, connection, session, consumer or auto. By default ( auto), the cache level will effectively be "consumer", unless an external transaction manager has been specified - in which case the effective default will be none (assuming Java EE-style transaction management where the given ConnectionFactory is an XA-aware pool).

acknowledge

The native JMS acknowledge mode: auto, client, dups-ok or transacted. A value of transacted activates a locally transacted Session. As an alternative, specify the transaction-manager attribute described below. Default is auto.

transaction-manager

A reference to an external PlatformTransactionManager (typically an XA-based transaction coordinator, e.g. Spring’s JtaTransactionManager). If not specified, native acknowledging will be used (see "acknowledge" attribute).

concurrency

The number of concurrent sessions/consumers to start for each listener. Can either be a simple number indicating the maximum number (e.g. "5") or a range indicating the lower as well as the upper limit (e.g. "3-5"). Note that a specified minimum is just a hint and might be ignored at runtime. Default is 1; keep concurrency limited to 1 in case of a topic listener or if queue ordering is important; consider raising it for general queues.

prefetch

The maximum number of messages to load into a single session. Note that raising this number might lead to starvation of concurrent consumers!

receive-timeout

The timeout to use for receive calls (in milliseconds). The default is 1000 ms (1 sec); -1 indicates no timeout at all.

back-off

Specify the BackOff instance to use to compute the interval between recovery attempts. If the BackOffExecution implementation returns BackOffExecution#STOP, the listener container will not further attempt to recover. The recovery-interval value is ignored when this property is set. The default is a FixedBackOff with an interval of 5000 ms, that is 5 seconds.

recovery-interval

Specify the interval between recovery attempts, in milliseconds. Convenience way to create a FixedBackOff with the specified interval. For more recovery options, consider specifying a BackOff instance instead. The default is 5000 ms, that is 5 seconds.

phase

The lifecycle phase within which this container should start and stop. The lower the value the earlier this container will start and the later it will stop. The default is Integer.MAX_VALUE meaning the container will start as late as possible and stop as soon as possible.


Configuring a JCA-based listener container with the "jms" schema support is very similar.

<jms:jca-listener-container resource-adapter="myResourceAdapter"
        destination-resolver="myDestinationResolver"
        transaction-manager="myTransactionManager"
        concurrency="10">

    <jms:listener destination="queue.orders" ref="myMessageListener"/>

</jms:jca-listener-container>

The available configuration options for the JCA variant are described in the following table:

Table 23.3. Attributes of the JMS <jca-listener-container/> element

AttributeDescription

factory-id

Exposes the settings defined by this element as a JmsListenerContainerFactory with the specified id so that they can be reused with other endpoints.

resource-adapter

A reference to the JCA ResourceAdapter bean (the default bean name is 'resourceAdapter').

activation-spec-factory

A reference to the JmsActivationSpecFactory. The default is to autodetect the JMS provider and its ActivationSpec class (see DefaultJmsActivationSpecFactory)

destination-resolver

A reference to the DestinationResolver strategy for resolving JMS Destinations.

message-converter

A reference to the MessageConverter strategy for converting JMS Messages to listener method arguments. Default is a SimpleMessageConverter.

destination-type

The JMS destination type for this listener: queue, topic, durableTopic, sharedTopic or sharedDurableTopic. This enables potentially the pubSubDomain, subscriptionDurable and subscriptionShared properties of the container. The default is queue (i.e. disabling those 3 properties).

client-id

The JMS client id for this listener container. Needs to be specified when using durable subscriptions.

acknowledge

The native JMS acknowledge mode: auto, client, dups-ok or transacted. A value of transacted activates a locally transacted Session. As an alternative, specify the transaction-manager attribute described below. Default is auto.

transaction-manager

A reference to a Spring JtaTransactionManager or a javax.transaction.TransactionManager for kicking off an XA transaction for each incoming message. If not specified, native acknowledging will be used (see the "acknowledge" attribute).

concurrency

The number of concurrent sessions/consumers to start for each listener. Can either be a simple number indicating the maximum number (e.g. "5") or a range indicating the lower as well as the upper limit (e.g. "3-5"). Note that a specified minimum is just a hint and will typically be ignored at runtime when using a JCA listener container. Default is 1.

prefetch

The maximum number of messages to load into a single session. Note that raising this number might lead to starvation of concurrent consumers!