`

Spring JMS 几个类介绍

阅读更多

1 DestinationResolver 
    DestinationResolver接口的作用是将指定的目的地名解析为目的地实例。其定义如下:

Java代码  收藏代码
  1. public interface DestinationResolver {  
  2.     Destination resolveDestinationName(Session session, String destinationName,   
  3.         boolean pubSubDomain) throws JMSException;  
  4. }  

    参数pubSubDomain用于指定是使用“发布/订阅”模式(解析后的目的地是Topic),还是使用“点对点”模式(解析后的目的地是Queue)。

 

    CachingDestinationResolver接口继承了DestinationResolver,增加了缓存的功能,其接口定义如下:

Java代码  收藏代码
  1. public interface CachingDestinationResolver extends DestinationResolver {  
  2.     void removeFromCache(String destinationName);  
  3.     void clearCache();  
  4. }  

    在目的地失效的时候,removeFromCache方法会被调用;在JMS provider失效的时候,clearCache方法会被调用。

1.1 DynamicDestinationResolver 
    DynamicDestinationResolver实现了DestinationResolver接口。根据指定的目的地名,DynamicDestinationResolver会动态创建目的地实例。针对JMS1.1规范,它采用如下方法创建目的地:

Java代码  收藏代码
  1. session.createTopic(topicName)  
  2. session.createQueue(queueName);  


1.2 JndiDestinationResolver 
    JndiDestinationResolver继承自JndiLocatorSupport, 同时实现了CachingDestinationResolver接口。如果在JMS provider中配置了静态目的地,那么JndiDestinationResolver通过JNDI查找的方式获得目的地实例。


    JndiDestinationResolver的fallbackToDynamicDestination属性用于指定在JNDI查找失败后,是否使用动态目的地,默认值是false。JndiDestinationResolver的cache属性用于指定是否对目的地实例进行缓存,默认值是true。

 

1.3 BeanFactoryDestinationResolver 
    BeanFactoryDestinationResolver实现了DestinationResolver接口和BeanFactoryAware接口。它会根据指定的目的地名从BeanFactory中查找目的地实例。以下是相关的代码:

Java代码  收藏代码
  1. public Destination resolveDestinationName(Session session, String destinationName,   
  2. boolean pubSubDomain) throws JMSException {  
  3.     Assert.state(this.beanFactory != null"BeanFactory is required");  
  4.     try {  
  5.         return (Destination) this.beanFactory.getBean(destinationName, Destination.class);  
  6.     }  
  7.     catch (BeansException ex) {  
  8.         throw new DestinationResolutionException(  
  9.             "Failed to look up Destinaton bean with name '" + destinationName + "'", ex);  
  10.     }  
  11. }  

 

2 JmsAccessor 
    抽象类JmsAccessor是JmsTemplate、SimpleMessageListenerContainer和DefaultMessageListenerContainer等concrete class的基类。JmsAccessor定义了如下几个用于访问JMS服务的共通属性。

Java代码  收藏代码
  1. private ConnectionFactory connectionFactory;  
  2. private boolean sessionTransacted = false;  
  3. private int sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;  


    JmsAccessor提供了创建Connection和Session的方法,如下:

Java代码  收藏代码
  1. protected Connection createConnection() throws JMSException {  
  2.     return getConnectionFactory().createConnection();  
  3. }  
  4.   
  5. protected Session createSession(Connection con) throws JMSException {  
  6.     return con.createSession(isSessionTransacted(), getSessionAcknowledgeMode());  
  7. }  

 
2.1 JmsDestinationAccessor 
    抽象类JmsDestinationAccessor继承自JmsAccessor,增加了destinationResolver和pubSubDomain属性。destinationResolver的默认值是DynamicDestinationResolver的实例,也就是说默认采用动态目的地解析的方式;pubSubDomain用于指定是使用“发布/订阅”模式还是使用“点对点”模式,默认值是false。

    JmsDestinationAccessor提供了用于解析目的地的方法,如下:

Java代码  收藏代码
  1. protected Destination resolveDestinationName(Session session, String destinationName)   
  2. throws JMSException {  
  3.     return getDestinationResolver().resolveDestinationName(session, destinationName,   
  4.         isPubSubDomain());  
  5. }  


2.2 AbstractJmsListeningContainer 
    AbstractJmsListeningContainer继承自JmsDestinationAccessor,作为所有Message Listener Container的公共基类。它主要提供了JMS connection的生命周期管理的功能,但是没有对消息接收的方式(主动接收方式或者异步接收方式)等做任何假定。该类主要的属性如下:

Java代码  收藏代码
  1. private String clientId;  
  2. private Connection sharedConnection;  

    clientId通常用于持久订阅;sharedConnection保存了被共享的JMS connection。

 

    该类定义了如下的抽象方法,以便子类可以决定是否使用共享的JMS connection。

Java代码  收藏代码
  1. protected abstract boolean sharedConnectionEnabled();  

 

2.3 AbstractMessageListenerContainer 
    AbstractMessageListenerContainer继承自AbstractJmsListeningContainer,也是作为所有Message Listener Container的公共基类。该类主要的属性如下:

Java代码  收藏代码
  1. private volatile Object destination;  
  2. private volatile Object messageListener;  
  3. private boolean exposeListenerSession = true;  

    destination用于指定接收消息的目的地。
    messageListener用于指定处理消息的listener。对于messageListener,它既可以是符合JMS规范的javax.jms.MessageListener,也可以是Spring特有的org.springframework.jms.listener.SessionAwareMessageListener。SessionAwareMessageListener的定义如下:

Java代码  收藏代码
  1. public interface SessionAwareMessageListener {  
  2.     void onMessage(Message message, Session session) throws JMSException;  
  3. }  

    跟javax.jms.MessageListener相比,这个接口的onMessage方法增加了一个Session 类型的参数,可以通过这个session发送回复消息(reply message)。


    如果使用了SessionAwareMessageListener 类型的message listener,那么exposeListenerSession参数指定了传入onMessage方法的session参数是否是创建了MessageConsumer的session,默认值是true。如果是false,那么AbstractMessageListenerContainer会在connection上新建一个session,并传入onMessage方法。

2.4 AbstractPollingMessageListenerContainer 
    AbstractPollingMessageListenerContainer继承自AbstractMessageListenerContainer,它提供了对于主动接收消息(polling)的支持,以及支持外部的事务管理。

Java代码  收藏代码
  1. private boolean pubSubNoLocal = false;  
  2. private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;  
  3. private PlatformTransactionManager transactionManager;  

    如果使用“发布/订阅”模式,那么pubSubNoLocal 属性指定通过某个连接发送到某个Topic的消息,是否应该被投递回这个连接。


    receiveTimeout属性用于指定调用MessageConsumer的receive方法时的超时时间,默认值是1秒。需要注意的是,这个值应该比transactionManager 中指定的事务超时时间略小。


    通常情况下,应该为transactionManager设置一个org.springframework.transaction.jta.JtaTransactionManager的实例,此外也要设置一个支持XA的ConnectionFactory。需要注意的是,XA 事务对性能有较大的影响。
    如果只是希望使用local JMS transaction,那么只要设置sessionTransacted为true或者使用JmsTransactionManager即可。实际上,如果设置了非JTA的transactionManager,那么sessionTransacted属性会自动被设置成true。
    由于local JMS transaction无法同其它local transaction(例如local database transaction)进行协调,因此客户端程序可能需要对重发的消息进行检查。JMS规范要求:JMS provider应该将重发消息的JMSRedelivered属性设置为true。

2.5 SimpleMessageListenerContainer 
    SimpleMessageListenerContainer继承自AbstractMessageListenerContainer,使用异步方式接收消息(也就是通过MessageConsumer上注册MessageListener的方式接收消息)。该类主要的属性如下:

Java代码  收藏代码
  1. private boolean pubSubNoLocal = false;  
  2. private int concurrentConsumers = 1;  
  3. private Set sessions;  
  4. private Set consumers;  
  5. private TaskExecutor taskExecutor;  

    如果使用“发布/订阅”模式,那么pubSubNoLocal 属性指定通过某个连接发送到某个Topic的消息,是否应该被投递回这个连接。

 

    SimpleMessageListenerContainer允许创建多个Session和MessageConsumer来接收消息。具体的个数由concurrentConsumers属性指定。需要注意的是,应该只是在Destination为Queue的时候才使用多个MessageConsumer(Queue中的一个消息只能被一个Consumer接收),虽然使用多个MessageConsumer会提高消息处理的性能,但是消息处理的顺序却得不到保证:消息被接收的顺序仍然是消息发送时的顺序,但是由于消息可能会被并发处理,因此消息处理的顺序可能和消息发送的顺序不同。此外,不应该在Destination为Topic的时候使用多个MessageConsumer,这是因为多个MessageConsumer会接收到同样的消息。

    SimpleMessageListenerContainer创建的Session和MessageConsumer分别保存在sessions和consumers属性中。


    taskExecutor属性的默认值是null,也就是说,对MessageListener(或者SessionAwareMessageListener)的回调是在MessageConsumer的内部线程中执行。如果指定了taskExecutor,那么回调是在TaskExecutor内部的线程中执行。以下是相关的代码:

Java代码  收藏代码
  1. protected MessageConsumer createListenerConsumer(final Session session)   
  2. throws JMSException {  
  3.     Destination destination = getDestination();  
  4.     if (destination == null) {  
  5.         destination = resolveDestinationName(session, getDestinationName());  
  6.     }  
  7.     MessageConsumer consumer = createConsumer(session, destination);  
  8.   
  9.     if (this.taskExecutor != null) {  
  10.         consumer.setMessageListener(new MessageListener() {  
  11.             public void onMessage(final Message message) {  
  12.                 taskExecutor.execute(new Runnable() {  
  13.                     public void run() {  
  14.                         processMessage(message, session);  
  15.                     }  
  16.                 });  
  17.             }  
  18.         });  
  19.     }  
  20.     else {  
  21.         consumer.setMessageListener(new MessageListener() {  
  22.             public void onMessage(Message message) {  
  23.                 processMessage(message, session);  
  24.             }  
  25.         });  
  26.     }  
  27.   
  28.     return consumer;  
  29. }  

    需要注意的是,如果指定了taskExecutor,那么消息在被taskExecutor内部的线程处理前,可能已经被确认过了(外层的onMessage方法可能已经执行结束了)。因此如果使用事务Session或者Session.CLIENT_ACKNOWLEDGE类型的确认模式,那么可能会导致问题。

 

    该类的sharedConnectionEnabled方法(在AbstractJmsListeningContainer中定义)总是返回true,因此SimpleMessageListenerContainer会使用共享的JMS connection。

2.6 DefaultMessageListenerContainer 
    DefaultMessageListenerContainer继承自AbstractPollingMessageListenerContainer,主要使用同步的方式接收消息(也就是通过循环调用MessageConsumer.receive的方式接收消息)。该类主要的属性如下:

Java代码  收藏代码
  1. private int concurrentConsumers = 1;  
  2. private int maxConcurrentConsumers = 1;  
  3. private int maxMessagesPerTask = Integer.MIN_VALUE;  
  4. private int idleTaskExecutionLimit = 1;  
  5. private final Set scheduledInvokers = new HashSet();  
  6. private TaskExecutor taskExecutor;  
  7. private int cacheLevel = CACHE_AUTO;  

    跟SimpleMessageListenerContainer一样,DefaultMessageListenerContainer也支持创建多个Session和MessageConsumer来接收消息。跟SimpleMessageListenerContainer不同的是,DefaultMessageListenerContainer创建了concurrentConsumers所指定个数的AsyncMessageListenerInvoker(实现了SchedulingAwareRunnable接口),并交给taskExecutor运行。


    maxMessagesPerTask属性的默认值是Integer.MIN_VALUE,但是如果设置的taskExecutor(默认值是SimpleAsyncTaskExecutor)实现了SchedulingTaskExecutor接口并且其prefersShortLivedTasks方法返回true(也就是说该TaskExecutor倾向于短期任务),那么maxMessagesPerTask属性会自动被设置为10。
    如果maxMessagesPerTask属性的值小于0,那么AsyncMessageListenerInvoker.run方法会在循环中反复尝试接收消息,并在接收到消息后调用MessageListener(或者SessionAwareMessageListener);如果maxMessagesPerTask属性的值不小于0,那么AsyncMessageListenerInvoker.run方法里最多会尝试接收消息maxMessagesPerTask次,每次接收消息的超时时间由其父类AbstractPollingMessageListenerContainer的receiveTimeout属性指定。如果在这些尝试中都没有接收到消息,那么AsyncMessageListenerInvoker的idleTaskExecutionCount属性会被累加。在run方法执行完毕前会对idleTaskExecutionCount进行检查,如果该值超过了DefaultMessageListenerContainer.idleTaskExecutionLimit(默认值1),那么这个AsyncMessageListenerInvoker可能会被销毁。


    所有AsyncMessageListenerInvoker实例都保存在scheduledInvokers中,实例的个数可以在concurrentConsumers和maxConcurrentConsumers之间浮动。跟SimpleMessageListenerContainer一样,应该只是在Destination为Queue的时候才使用多个AsyncMessageListenerInvoker实例。

 

    cacheLevel属性用于指定是否对JMS资源进行缓存,可选的值是CACHE_NONE = 0、CACHE_CONNECTION = 1、CACHE_SESSION = 2、CACHE_CONSUMER = 3和CACHE_AUTO = 4。默认情况下,如果transactionManager属性不为null,那么cacheLevel被自动设置为CACHE_NONE(不进行缓存),否则cacheLevel被自动设置为CACHE_CONSUMER。


    如果cacheLevel属性值大于等于CACHE_CONNECTION,那么sharedConnectionEnabled方法(在AbstractJmsListeningContainer中定义)返回true,也就是说使用共享的JMS连接。

 

 

3 SingleConnectionFactory 
    SingleConnectionFactory实现了ConnectionFactory接口,其createConnection方法总是返回相同的Connection。可以在SingleConnectionFactory的构造函数中传入Connection对象或者ConnectionFactory对象,用来创建被代理的连接对象。SingleConnectionFactory.createConnection方法返回的连接是个代理,它忽略了对stop和close方法的调用(连接会在SingleConnectionFactory.destroy方法中关闭)。


    SingleConnectionFactory的reconnectOnException属性用来指定是否在连接抛出JMSException的时候,对连接进行重置,重置后如果再调用createConnection方法,那么会返回一个新的连接。


    需要注意的是,AbstractJmsListeningContainer类的抽象方法sharedConnectionEnabled指定了是否在message listener container内部使用共享的JMS连接。因此通常情况下不需要为单独的message listener container设置SingleConnectionFactory(及其子类);如果希望在不同的message listener container之间共享JMS连接,那么可以考虑使用SingleConnectionFactory。

3.1 CachingConnectionFactory 
    CachingConnectionFactory继承自SingleConnectionFactory,增加了对Session和MessageProducer缓存的功能。该类主要的属性如下:

Java代码  收藏代码
  1. private int sessionCacheSize = 1;  
  2. private boolean cacheProducers = true;  

    sessionCacheSize属性指定了被缓存的Session实例的个数(默认值是1),也就是说,如果同时请求的Session个数大于sessionCacheSize,那么这些Session不会被缓存,而是正常的被创建和销毁。


    cacheProducers属性指定了是否对MessageProducer进行缓存,默认值是true。

 出自:http://whitesock.iteye.com/blog/306776

分享到:
评论

相关推荐

    Spring 实现远程访问详解——jms和activemq

    前几章我们分别利用spring rmi、httpinvoker、httpclient、webservice技术实现不同服务器间的远程访问。本章我将通过spring jms和activemq实现单Web项目服务器间异步访问和多Web项目服务器间异步访问。 一. 简介 1. ...

    Spring_Framework_ API_5.0.5 (CHM格式)

    Spring5 是一个重要的版本,距离Spring...随着 Java、JavaEE 和其他一些框架基准版本的增加,SpringFramework5 取消了对几个框架的支持。例如: Portlet Velocity JasperReports XMLBeans JDO Guava

    Spring in Action(第2版)中文版

    15.2.6几个结束工作 15.3高级web流程技术 15.3.1使用decision状态 15.3.2提炼子流程并使用子状态 15.4集成springwebflow与其他框架 15.4.1jakartsstruts 15.4.2javaserverface 15.5小结 第16章集成其他web...

    Spring in Action(第二版 中文高清版).part2

    15.2.6 几个结束工作 15.3 高级Web流程技术 15.3.1 使用decision状态 15.3.2 提炼子流程并使用子状态 15.4 集成Spring Web Flow与其他框架 15.4.1 Jakarts Struts 15.4.2 JavaServer Face 15.5 小结 第16...

    Spring in Action(第二版 中文高清版).part1

    15.2.6 几个结束工作 15.3 高级Web流程技术 15.3.1 使用decision状态 15.3.2 提炼子流程并使用子状态 15.4 集成Spring Web Flow与其他框架 15.4.1 Jakarts Struts 15.4.2 JavaServer Face 15.5 小结 第16...

    jms activeMQ 经典代码

    写spring配置文件的时候, 要把MessageProducer, MessageConsumer,MessageListener,MessageListenerContainer几个地方弄清楚: 1.可以有一个或者多个消息生产者向同一个destination发送消息. 2.queue类型的只能有一个...

    gwt-spring-jpa-jta-jms-comet-hibernate

    目的为 GWT、Spring、JMS、Comet、JTA 提供一个集成的启动项目,由 Hibernate 和 Derby 提供支持。 该项目演示了各种开箱即用的有用功能,并可用作真正的 Web 应用程序的起点。技术MavenHibernateJPA 注释德比GWT ...

    SpringRest:带有AngularJS前端的Spring REST后端

    问题描述-多个用户登录系统。 它们具有通知窗口(消息控制台)。 基于事件,将根据业务逻辑从后端创建JMS消息并将其发布到Queue或Topic。 有些事件必须传递到用户的通知窗口,并允许用户采取措施。 [解决方案] JMS...

    java面试宝典

    244、客服端调用EJB对象的几个基本步骤 56 245、 如何给weblogic指定大小的内存? 56 246、如何设定的weblogic的热启动模式(开发模式)与产品发布模式? 57 247、如何启动时不需输入用户名与密码? 57 248、在...

    分布式事务实践 解决数据一致性

    介绍了分布式系统的定义、实现原则和几种形式,详细介绍了微服务架构的分布式系统,并使用Spring Cloud框架演示了一个完整的微服务系统的实现过程。 5-1 CAP原则和BASE理论简介 5-2 分布式系统综述 5-3 SpringCloud...

    java面试题

    Hibernate源码中几个包的作用简要介绍 65 72. struts 66 72.1. struts 简介 66 72.2. STRUTS的应用(如STRUTS架构) 66 72.3. 请写出Struts的工作原理、工作机制 67 72.4. struts的处理流程。 67 72.5. Struts 2框架...

    千方百计笔试题大全

    244、客服端调用EJB对象的几个基本步骤 56 245、 如何给weblogic指定大小的内存? 56 246、如何设定的weblogic的热启动模式(开发模式)与产品发布模式? 57 247、如何启动时不需输入用户名与密码? 57 248、在weblogic...

    黑马品优购项目

    4.2. 举几个简单模块的例子 4.2.1. 品牌管理 单表 分页、新增、删除、修改 4.2.2. 规格管理 2张表 分页、新增、删除、修改、显示优化(显示列表内容的一部分) 4.2.3. 模板管理 2张表 分页、新增、删除、修改、...

    Java面试宝典2010版

    16. hibernate进行多表查询每个表中各取几个字段,也就是说查询出来的结果集没有一个实体类与之对应如何解决; 17.介绍一下Hibernate的二级缓存 18、Spring 的依赖注入是什么意思? 给一个 Bean 的 message 属性, ...

    最新Java面试宝典pdf版

    16. hibernate进行多表查询每个表中各取几个字段,也就是说查询出来的结果集没有一个实体类与之对应如何解决; 123 17.介绍一下Hibernate的二级缓存 123 18、Spring 的依赖注入是什么意思? 给一个 Bean 的 message ...

    Java面试笔试资料大全

    16. hibernate进行多表查询每个表中各取几个字段,也就是说查询出来的结果集没有一个实体类与之对应如何解决; 123 17.介绍一下Hibernate的二级缓存 123 18、Spring 的依赖注入是什么意思? 给一个 Bean 的 message ...

    Java面试宝典2012版

    16. hibernate进行多表查询每个表中各取几个字段,也就是说查询出来的结果集没有一个实体类与之对应如何解决; 123 17.介绍一下Hibernate的二级缓存 123 18、Spring 的依赖注入是什么意思? 给一个 Bean 的 message...

    JAVA面试宝典2010

    16. hibernate进行多表查询每个表中各取几个字段,也就是说查询出来的结果集没有一个实体类与之对应如何解决; 123 17.介绍一下Hibernate的二级缓存 123 18、Spring 的依赖注入是什么意思? 给一个 Bean 的 message ...

    Java面试宝典-经典

    16. hibernate进行多表查询每个表中各取几个字段,也就是说查询出来的结果集没有一个实体类与之对应如何解决; 123 17.介绍一下Hibernate的二级缓存 123 18、Spring 的依赖注入是什么意思? 给一个 Bean 的 message ...

Global site tag (gtag.js) - Google Analytics