Spring 之 JMS 发送和接收消息

????????JMS(Java Message Service)?可以提供应用的伸缩性,可以有效地避免服务被压垮。在Java EE?平台中,为了发送和接收JMS消息,必要的步骤有如下七步:

  1. 在一个消息代理上创建一个JMS连接工厂
  2. 从JMS连接工厂中打开一个JMS连接
  3. 创建一个JMS的目的地,可以是一个消息也可以是一个Topic
  4. 从连接中获取一个JMS会话
  5. 用消息生产者或者消息消费者发送或接收一个消息
  6. 处理JMSException
  7. 关闭JMS会话和连接
public class ProducerTool {   
	  
    private String user = ActiveMQConnection.DEFAULT_USER;   
  
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;   
  
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;   
  
    private String subject = "TOOL.DEFAULT";   
  
    private Destination destination = null;   
  
    private Connection connection = null;   
  
    private Session session = null;   
  
    private MessageProducer producer = null;   
  
    // 初始化   
    private void initialize() throws JMSException, Exception {   
    	// 1创建 factory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(   
                user, password, url);   
        // 2、创建factory
        connection = connectionFactory.createConnection();   
        // 3、创建session
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);   
        // 4.创建目的地
        destination = session.createQueue(subject);   
        // 5.创建消息消生产者
        producer = session.createProducer(destination);   
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);   
    }   
  
    /**
     * 将传入的String 以TextMessage 格式发送出去
     * @param message
     * @throws JMSException
     * @throws Exception
     */
    public void produceMessage(String message) throws JMSException, Exception {   
        initialize();   
        // 创建文本消息
        TextMessage msg = session.createTextMessage(message);   
        // 打开连接
        connection.start();  
        // 发送消息
        producer.send(msg);   
    }   
  
    // 关闭连接  
    public void close() throws JMSException {   
        System.out.println("Producer:->Closing connection");   
        if (producer != null)   
            producer.close();   
        if (session != null)   
            session.close();   
        if (connection != null)   
            connection.close();   
    }   

?

public class ConsumerTool implements MessageListener {
/**
 * JSM 用户
 */
	private String user = ActiveMQConnection.DEFAULT_USER;
	/**
	 * JSM 用户密码
	 */

	private String password = ActiveMQConnection.DEFAULT_PASSWORD;

	private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
/**
 * JSM 主题
 */
	private String subject = "TOOL.DEFAULT";
/**
 * 目的地
 */
	private Destination destination = null;
/**
 * 连接
 */
	private Connection connection = null;
/**
 * 会话
 */
	private Session session = null;
/**
 * 消息消费者
 */
	private MessageConsumer consumer = null;

	/**
	 * 初始化方法
	 * 1、通过user ,password,url 创建 connenctionFactory
	 * 2、通过connectionFactory 创建session
	 * 3、通过session创建目的地
	 * 5、创建指定主题消息的消费者
	 * @throws JMSException
	 * @throws Exception
	 */
	private void initialize() throws JMSException, Exception {
		// 1、创建Factory
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				user, password, url);
		// 2、创建 connection
		connection = connectionFactory.createConnection();
		// 3、 创建session
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 4、创建 目的地
		destination = session.createQueue(subject);
		// 5、创建消息消费者
		consumer = session.createConsumer(destination);

	}

	/**
	 * 消费消息方法,向消息消费者添加消息到达监听器
	 * @throws JMSException
	 * @throws Exception
	 */
	public void consumeMessage() throws JMSException, Exception {
		initialize();
		connection.start();
		// 增加消息监听器
		consumer.setMessageListener(this);
	}

	/**
	 * close 关闭
	 * @throws JMSException
	 */
	public void close() throws JMSException {
		System.out.println("Consumer:->Closing connection");
		if (consumer != null)
			consumer.close();
		if (session != null)
			session.close();
		if (connection != null)
			connection.close();
	}

	/**
	 * 接收到消息:sendTo:13919306243;content:hello!
	 * 当接收到消息的处理方法。将接收到的信息按照指定格式截取组装成SMS消息发送出去。
	 */
	public void onMessage(Message message) {
		try {
			if (message instanceof TextMessage) {
				TextMessage txtMsg = (TextMessage) message;
				String msg = txtMsg.getText();
				String[] msgs = FileUtil.splitMessage(msg);
				try {
					if (null != msgs && !msgs[1].equals("")) {
						for (String s : FileUtil.buildContent(msgs[1])) {
							SMSSender.getInstance().send(
									new OutboundMessage(msgs[0], s));
						}
					}
				} catch (GatewayException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			} else {
				System.out.println("Consumer:->Received: " + message);
			}
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

?

可以发现实现一个简单的发送消息需要许多编码。Spring提供了一个基于模板的的解决方案,用于简化JMS消息实现的代码

?

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:jee="http://www.springframework.org/schema/jee"
	xmlns:task="http://www.springframework.org/schema/task" xmlns:util="http://www.springframework.org/schema/util"
	xmlns:jaxws="http://cxf.apache.org/jaxws"
	xsi:schemaLocation="
		http://www.springframework.org/schema/beans
		http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
		http://www.springframework.org/schema/context
		http://www.springframework.org/schema/context/spring-context-3.0.xsd
		http://www.springframework.org/schema/aop 
		http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
		http://www.springframework.org/schema/tx
		http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
		http://www.springframework.org/schema/jee 
		http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
		http://www.springframework.org/schema/task  
        http://www.springframework.org/schema/task/spring-task-3.1.xsd  
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util-3.0.xsd
        http://cxf.apache.org/jaxws
        http://cxf.apache.org/schemas/jaxws.xsd
		">
	<aop:aspectj-autoproxy />

	<context:annotation-config />
	<context:component-scan base-package="com.cathy.demo.jms.*" />
	<!-- connectionFactory -->
	<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://localhost:61616"/>
	</bean>
	<!-- jmsTemplate -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory"/>
		<property name="defaultDestination" ref="destination"/>
		<property name="receiveTimeout" value="60000"/>
	</bean>
	<!-- 队列目的地 -->
	<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg value="message.queue"/>
	</bean>
	<!-- 主题 -->
	<bean id="topic" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg index="0" value="notifyTopic"/>
	</bean>

</beans>

?

/**
 *
 * @author zhangwei_david
 * @version $Id: ProducerImpl.java, v 0.1 2015年1月31日 下午8:25:36 zhangwei_david Exp $
 */
@Component
public class ProducerImpl implements Producer {
    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private Destination destination;

    /**
     */
    public void send() {
        jmsTemplate.send(destination, new MessageCreator() {

            public Message createMessage(Session session) throws JMSException {
                MapMessage mapMessage = session.createMapMessage();
                mapMessage.setString("key", "test");
                return mapMessage;
            }
        });

    }
}

?

/**
 *
 * @author zhangwei_david
 * @version $Id: ReceiverImpl.java, v 0.1 2015年1月31日 下午8:53:49 zhangwei_david Exp $
 */
@Component
public class ReceiverImpl implements Receiver {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private Destination destination;

    /**
     * @see com.cathy.demo.jms.receiver.Receiver#receive()
     */
    public void receive() {
        MapMessage mapMessage = (MapMessage) jmsTemplate.receive(destination);
        if (mapMessage != null) {
            System.out.println(mapMessage);
        }
    }
}

?

/**
 *
 * @author zhangwei_david
 * @version $Id: Sender.java, v 0.1 2015年1月31日 下午8:47:18 zhangwei_david Exp $
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath*:META-INF/spring/jms-beans.xml")
public class Sender {
    @Autowired
    private Producer producer;

    @Autowired
    private Receiver receiver;

    @Test
    public void testSend() {
        producer.send();
    }

    @Test
    public void testReceive() {
        receiver.receive();
    }
}

?

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。