博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ+Spring工程创建详解(附工程文件)
阅读量:5810 次
发布时间:2019-06-18

本文共 7025 字,大约阅读时间需要 23 分钟。

ActiveMQ是Apache所提供的一个开源的消息系统,完全采用Java来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范。JMS是一组Java应用程序接口,它提供消息的创建、发送、读取等一系列服务。JMS提供了一组公共应用程序接口和响应的语法,类似于Java数据库的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信。

JMS支持两种消息发送和接收模型。一种称为P2P(Ponit to Point)模型,即采用点对点的方式发送消息。P2P模型是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输称为可能,P2P模型在点对点的情况下进行消息传递时采用。

这里写图片描述

另一种称为Pub/Sub(Publish/Subscribe,即发布-订阅)模型,发布-订阅模型定义了如何向一个内容节点发布和订阅消息,这个内容节点称为topic(主题)。主题可以认为是消息传递的中介,消息发布这将消息发布到某个主题,而消息订阅者则从主题订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布-订阅模型在消息的一对多广播时采用。

这里写图片描述

ActiveMQ的安装

下载最新的安装包apache-activemq-5.13.2-bin.tar.gz

下载之后解压: tar -zvxf apache-activemq-5.13.2-bin.tar.gz

ActiveMQ目录内容有:

  • bin目录包含ActiveMQ的启动脚本

  • conf目录包含ActiveMQ的所有配置文件

  • data目录包含日志文件和持久性消息数据

  • example: ActiveMQ的示例

  • lib: ActiveMQ运行所需要的lib

  • webapps: ActiveMQ的web控制台和一些相关的demo

ActiveMQ的默认服务端口为61616,这个可以在conf/activemq.xml配置文件中修改:

JMS的规范流程

  1. 获得JMS connection factory. 通过我们提供特定环境的连接信息来构造factory。

  2. 利用factory构造JMS connection

  3. 启动connection

  4. 通过connection创建JMS session.

  5. 指定JMS destination.

  6. 创建JMS producer或者创建JMS message并提供destination.

  7. 创建JMS consumer或注册JMS message listener.

  8. 发送和接收JMS message.

  9. 关闭所有JMS资源,包括connection, session, producer, consumer等。

案例(整合Spring)

pom.xml

junit
junit
4.11
test
org.apache.activemq
activemq-all
5.11.1
org.springframework
spring-jms
4.1.4.RELEASE
org.springframework
spring-test
4.1.4.RELEASE

Queue类型消息

spring配置文件

testSpringQueue

推送代码

package com.mq.spring.queue;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import javax.annotation.Resource;import javax.jms.*;@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations={"classpath:spring-mq-queue.xml"})public class QueueSender {  @Resource  private JmsTemplate jmsTemplate;  @Test  public void send(){    sendMqMessage(null,"spring activemq queue type message !");  }  /**   * 说明:发送的时候如果这里没有显示的指定destination.将用spring xml中配置的destination   * @param destination   * @param message   */  public void sendMqMessage(Destination destination, final String message){    if(null == destination){      destination = jmsTemplate.getDefaultDestination();    }    jmsTemplate.send(destination, new MessageCreator() {      public Message createMessage(Session session) throws JMSException {        return session.createTextMessage(message);      }    });    System.out.println("spring send message...");  }  public void setJmsTemplate(JmsTemplate jmsTemplate) {    this.jmsTemplate = jmsTemplate;  }}

消费代码

package com.mq.spring.queue;import org.junit.Test;import javax.jms.*;import org.junit.runner.RunWith;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import javax.annotation.Resource;import javax.jms.Message;@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations={"classpath:spring-mq-queue.xml"})public class QueueReceiver {  @Resource  private JmsTemplate jmsTemplate;  @Test  public void receiveMqMessage(){    Destination destination = jmsTemplate.getDefaultDestination();    receive(destination);  }  /**   * 接受消息   */  public void receive(Destination destination) {    TextMessage tm = (TextMessage) jmsTemplate.receive(destination);    try {      System.out.println("从队列" + destination.toString() + "收到了消息:\t" + tm.getText());    } catch (JMSException e) {      e.printStackTrace();    }  }  public void setJmsTemplate(JmsTemplate jmsTemplate) {    this.jmsTemplate = jmsTemplate;  }}

说明:上面的生产者和消费者使用同一套配置文件,使用独立的程序去接收消息,spring jms也提供了消息监听处理.接下来我们换成监听式消费

配置文件

testSpringQueue

监听器代码

public class ConsumerMessageListener implements MessageListener{  @Override  public void onMessage(Message message) {    TextMessage tm = (TextMessage) message;    try {      System.out.println("ConsumerMessageListener收到了文本消息:\t" + tm.getText());    } catch (JMSException e) {      e.printStackTrace();    }  }}

这样我们的消息消费就可以在监听器中处理消费了.生产的代码不变,修改发送者的消息体内容,执行生产程序

Topic类型消息

在使用 Spring JMS的时候,主题( Topic)和队列消息的主要差异体现在JmsTemplate中 "pubSubDomain"是否设置为 True。如果为 True,则是 Topic;如果是false或者默认,则是 queue。

topic类型消费配置文件说明

testSpringTopic

生产者代码

package com.mq.spring.topic;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import javax.annotation.Resource;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations={"classpath:spring-mq-topic.xml"})public class TopicSender {  @Resource  private JmsTemplate jmsTemplate;  @Test  public void send(){    sendMqMessage(null,"spring activemq topic type message[with listener] !");  }  /**   * 说明:发送的时候如果这里没有显示的指定destination.将用spring xml中配置的destination   * @param destination   * @param message   */  public void sendMqMessage(Destination destination, final String message){    if(null == destination){      destination = jmsTemplate.getDefaultDestination();    }    jmsTemplate.send(destination, new MessageCreator() {      public Message createMessage(Session session) throws JMSException {        return session.createTextMessage(message);      }    });    System.out.println("spring send message...");  }  public void setJmsTemplate(JmsTemplate jmsTemplate) {    this.jmsTemplate = jmsTemplate;  }}

监听器代码

public class ConsumerMessageListener implements MessageListener{  @Override  public void onMessage(Message message) {    TextMessage tm = (TextMessage) message;    try {      System.out.println("ConsumerMessageListener收到了文本消息:\t" + tm.getText());    } catch (JMSException e) {      e.printStackTrace();    }  }}

参考资料

更多内容可以关注微信公众号,或者访问网站

http://7xp64w.com1.z0.glb.clouddn.com/qrcode_for_gh_3e33976a25c9_258.jpg

你可能感兴趣的文章
linux下的haproxy的搭建
查看>>
SQL注入(dvwa环境)
查看>>
HyperLeger Fabric开发(五)——HyperLeger Fabric账本存储
查看>>
原生ajax
查看>>
pymongo(看后转载,在原基础上添加了类连接和简单调用)
查看>>
Maven学习总结(3)——使用Maven构建项目
查看>>
[8-30]Bash环境变量知识梳理
查看>>
Mysql学习总结(10)——MySql触发器使用讲解
查看>>
syscall参数表
查看>>
日志记录函数
查看>>
Windows Server 2012R2 DHCP故障转移(2)
查看>>
一天一个Linux基础命令之列出目录信息命令ls
查看>>
struts2详细配置指导
查看>>
Hibernate原生SQL查询
查看>>
c#3.0的新增语言功能
查看>>
RancherOS install
查看>>
1、Linux 磁盘、文件、目录结构
查看>>
zabbix安装部署(二)
查看>>
部署NFS与测试NFS
查看>>
流媒体传输协议系列之----RTP/RTCP协议解析
查看>>