ActiveMQ是Apache所提供的一个开源的消息系统,完全采用Java来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范。JMS是一组Java应用程序接口,它提供消息的创建、发送、读取等一系列服务。JMS提供了一组公共应用程序接口和响应的语法,类似于Java数据库的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信。
JMS支持两种消息发送和接收模型。一种称为P2P(Ponit to Point)模型,即采用点对点的方式发送消息。P2P模型是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输称为可能,P2P模型在点对点的情况下进行消息传递时采用。
另一种称为Pub/Sub(Publish/Subscribe,即发布-订阅)模型,发布-订阅模型定义了如何向一个内容节点发布和订阅消息,这个内容节点称为topic(主题)。主题可以认为是消息传递的中介,消息发布这将消息发布到某个主题,而消息订阅者则从主题订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布-订阅模型在消息的一对多广播时采用。
ActiveMQ的安装
下载最新的安装包apache-activemq-5.13.2-bin.tar.gz
下载之后解压: tar -zvxf apache-activemq-5.13.2-bin.tar.gz
ActiveMQ目录内容有:
bin目录包含ActiveMQ的启动脚本
conf目录包含ActiveMQ的所有配置文件
data目录包含日志文件和持久性消息数据
example: ActiveMQ的示例
lib: ActiveMQ运行所需要的lib
webapps: ActiveMQ的web控制台和一些相关的demo
ActiveMQ的默认服务端口为61616,这个可以在conf/activemq.xml配置文件中修改:
JMS的规范流程
获得JMS connection factory. 通过我们提供特定环境的连接信息来构造factory。
利用factory构造JMS connection
启动connection
通过connection创建JMS session.
指定JMS destination.
创建JMS producer或者创建JMS message并提供destination.
创建JMS consumer或注册JMS message listener.
发送和接收JMS message.
关闭所有JMS资源,包括connection, session, producer, consumer等。
案例(整合Spring)
pom.xml
junit junit 4.11 test org.apache.activemq activemq-all 5.11.1 org.springframework spring-jms 4.1.4.RELEASE org.springframework spring-test 4.1.4.RELEASE
Queue类型消息
spring配置文件
testSpringQueue
推送代码
package com.mq.spring.queue;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import javax.annotation.Resource;import javax.jms.*;@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations={"classpath:spring-mq-queue.xml"})public class QueueSender { @Resource private JmsTemplate jmsTemplate; @Test public void send(){ sendMqMessage(null,"spring activemq queue type message !"); } /** * 说明:发送的时候如果这里没有显示的指定destination.将用spring xml中配置的destination * @param destination * @param message */ public void sendMqMessage(Destination destination, final String message){ if(null == destination){ destination = jmsTemplate.getDefaultDestination(); } jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); System.out.println("spring send message..."); } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; }}
消费代码
package com.mq.spring.queue;import org.junit.Test;import javax.jms.*;import org.junit.runner.RunWith;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import javax.annotation.Resource;import javax.jms.Message;@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations={"classpath:spring-mq-queue.xml"})public class QueueReceiver { @Resource private JmsTemplate jmsTemplate; @Test public void receiveMqMessage(){ Destination destination = jmsTemplate.getDefaultDestination(); receive(destination); } /** * 接受消息 */ public void receive(Destination destination) { TextMessage tm = (TextMessage) jmsTemplate.receive(destination); try { System.out.println("从队列" + destination.toString() + "收到了消息:\t" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; }}
说明:上面的生产者和消费者使用同一套配置文件,使用独立的程序去接收消息,spring jms也提供了消息监听处理.接下来我们换成监听式消费
配置文件
testSpringQueue
监听器代码
public class ConsumerMessageListener implements MessageListener{ @Override public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("ConsumerMessageListener收到了文本消息:\t" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } }}
这样我们的消息消费就可以在监听器中处理消费了.生产的代码不变,修改发送者的消息体内容,执行生产程序
Topic类型消息
在使用 Spring JMS的时候,主题( Topic)和队列消息的主要差异体现在JmsTemplate中 "pubSubDomain"是否设置为 True。如果为 True,则是 Topic;如果是false或者默认,则是 queue。
topic类型消费配置文件说明
testSpringTopic
生产者代码
package com.mq.spring.topic;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import javax.annotation.Resource;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations={"classpath:spring-mq-topic.xml"})public class TopicSender { @Resource private JmsTemplate jmsTemplate; @Test public void send(){ sendMqMessage(null,"spring activemq topic type message[with listener] !"); } /** * 说明:发送的时候如果这里没有显示的指定destination.将用spring xml中配置的destination * @param destination * @param message */ public void sendMqMessage(Destination destination, final String message){ if(null == destination){ destination = jmsTemplate.getDefaultDestination(); } jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); System.out.println("spring send message..."); } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; }}
监听器代码
public class ConsumerMessageListener implements MessageListener{ @Override public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("ConsumerMessageListener收到了文本消息:\t" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } }}
参考资料
更多内容可以关注微信公众号,或者访问网站