ActiveMQ 使用笔记(三) ActiveMQ 消息发送与接收
需要导入的jar包:
activemq所需的jar包
一段发送消息的代码:
Java
public static void send(){
try {
// 创建一个连接工厂
String url = "tcp://localhost:61616";
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中,也可以在activemq.xml中配置
connectionFactory.setUserName("system");
connectionFactory.setPassword("manager");
// 创建连接
Connection connection = connectionFactory.createConnection();
connection.start();
// 创建Session,参数解释:
// 第一个参数是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,没有回应则抛出异常,消息发送程序负责处理这个错误。
// 第二个参数消息的确认模式:
// AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次,但传输过程中可能因为错误而丢失消息。
// CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法(会通知消息提供者收到了消息)
// DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息(这种确认模式不在乎接收者收到重复的消息)。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目标,就创建主题也可以创建队列
Destination destination = session.createQueue("test");
// 创建消息生产者
MessageProducer producer = session.createProducer(destination);
// 设置持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 创建消息
String text = "Hello ActiveMQ!";
TextMessage message = session.createTextMessage(text);
// 发送消息到ActiveMQ
producer.send(message);
System.out.println("Message is sent!");
// 关闭资源
session.close();
connection.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
执行了上面的发送方法之后,在ActiveMQ的监视控制可以看到有一个test队列,并且有一条消息,如图:
点击队列名test,然后点击消息ID即可查看消息内容,如图:
如果DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失。
一段接收消息的代码:
Java
public static void get(){
try{
String url = "tcp://localhost:61616";
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中,也可以在activemq.xml中配置
connectionFactory.setUserName("system");
connectionFactory.setPassword("manager");
// 创建连接
Connection connection = connectionFactory.createConnection();
connection.start();
// 创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目标,就创建主题也可以创建队列
Destination destination = session.createQueue("test");
// 创建消息消费者
MessageConsumer consumer = session.createConsumer(destination);
// 接收消息,参数:接收消息的超时时间,为0的话则不超时,receive返回下一个消息,但是超时了或者消费者被关闭,返回null
Message message = consumer.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received: " + text);
} else {
System.out.println("Received: " + message);
}
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
执行了上面的接收方法之后,在ActiveMQ的监视控制可以看到test队列的消息已经被消费了,如图:
这里的代码只是测试用,在正式开发中一般与Spring结合使用jmsTemplate来发送消息,现实JMS的MessageListener来监听消息。
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。