9.3 Spring Boot 集成 ActiveMQ

ActiveMQ 是一种基于 JMS 1.1 规范的开源的消息中间件,ActiveMQ 的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。

ActiveMQ 非常成熟,功能强大,在早些年业内大量的公司以及项目中都有应用,偶尔会有较低概率丢失消息。现在社区活跃度在降低,国内应用越来越少。官方现在对 ActiveMQ 5.x 维护越来越少,几个月才发布一个版本。

9.2.1安装配置

到官网下载最新的 apache-activemq-5.15.10-bin.zip 编译好的 zip 压缩包。将其解压到磁盘,例如C:\Java\apache-activemq-5.15.10这个目录。打开命令终端,进入 ActiveMQ 安装目录下的 bin 目录,执行activemq.bat start启动 ActiveMQ 服务器。

访问 http://localhost:8161/index.html 控制台,点击 Manage ActiveMQ broker 用户名和密码都是admin,进入管理界面,检查 ActiveMQ 是否正确安装。

因为 ActiveMQ 应用得越来越少,我们就不做集群安装配置演示了。

9.2.2 Spring Boot 集成 ActiveMQ

Spring Boot 为 ActiveMQ 提供了启动器(starter),集成 ActiveMQ 很方便。

Spring 为 JMS 提供了org.springframework.jms.core.JmsTemplate模板类,封装了常用的消息操作,使用起来非常方便。

新建一个 Spring Boot 项目,选择 Spring Web 和 Spring for Apache ActiveMQ 5 这两个 starter 依赖。

项目 pom 文件,最主要的就是:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

9.2.2.1 点对点(Queue)模式

ActiveMQ 的默认配置为点对点模式(spring.jms.pub-sub-domain=false)。在配置文件 application.yml 中配置到 ActiveMQ 的连接信息:

spring:
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin

创建一个 Controller,接收用户输入,向 MQ 发送消息:

package com.example.activemq.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/activemq/")
public class ActiveMQController {
	@Autowired
	private JmsTemplate jmsTemplate;
	@RequestMapping("/send")
	public String sendMsg(String msg) {
		jmsTemplate.convertAndSend("Q1", msg);
		return msg + " Sended to Q1.";
	}
}

在 Controller 中注入private JmsTemplate jmsTemplate;,然后使用 JmsTemplate 的 send 方法向 Q1 队列发送消息。

创建一个服务类,监听消息队列 Q1,并简单地将接收到地消息输出到控制台。

package com.example.activemq.service;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
@Service
public class MessageListener {
	@JmsListener(destination = "Q1")
	public void msgReceive(String msg) {
		System.out.println("Message: " + msg + " Received.");
	}
}

注解@JmsListener(destination = "Q1")表示 msgReceive 方法监听 Q1 这个队列。

运行 Spring Boot 应用程序,访问 http://localhost:8080/activemq/send?msg=Hello%20Kevin. Kevin.) ,向消息队列中发送“Hello Kevin.”这样一条消息。

然后观察 Spring Boot 应用的控制台,可以看到由 MessageListener.msgReceive 方法从 Q1 接收到消息后在控制台输出的信息。

ActiveMQ 的管理控制台,也可以看到由程序创建的 Q1 队列,及消息消费者和消息相关的信息。

9.2.2.2 发布订阅(Topic)模式

在 Spring Boot 应用中通过spring.jms.pub-sub-domain=true配置,打开 Spring 对 ActiveMQ 发布订阅模式的支持。

新创建一个 Spring Boot 项目,选择 Spring Web 和 Spring for Apache ActiveMQ 5 这两个 starter 依赖。在配置文件 application.yml 中配置到 ActiveMQ 的连接信息:

spring:
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin
    packages:
      trust-all: true
  jms:
    pub-sub-domain: true

com.example.activemq.topic.producer包下面创建消息生产者类 TopicProducer,将消息发送到“Topic-Weather”这个 Topic 上。

package com.example.activemq.topic.producer;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class TopicProducer {
	@Autowired
	private JmsTemplate jmsTemplate;
	public void sendMessage(String msg) {
		ActiveMQTopic destination = new ActiveMQTopic("Topic-Weather");
		jmsTemplate.convertAndSend(destination, msg);
	}
}

com.example.activemq.topic.consumer包下创建消息消费者 TopicConsumer,通过两个消息监听方法模拟两个订阅“Topic-Weather”主题的消息消费者(subscriber1 和 subscriber2)。

package com.example.activemq.topic.consumer;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class TopicConsumer {
	/**
	 * 消息消费者1,监听"Topic-Weather"上的消息
	 * @param msg 消息
	 */
	@JmsListener(destination = "Topic-Weather")
	public void subscriber1(String msg) {
		System.out.println("Consumer1 consume message: " + msg);
	}
	/**
	 * 消息消费者2,监听"Topic-Weather"上的消息
	 * @param msg 消息
	 */
	@JmsListener(destination = "Topic-Weather")
	public void subscriber2(String msg) {
		System.out.println("Consumer2 consume message: " + msg);
	}
}

com.example.activemq.topic.controller包下创建和用户交互发送消息的控制器,在其中调用消息生产者 TopicProducer 类来发送消息。

package com.example.activemq.topic.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.example.activemq.topic.producer.TopicProducer;
@RestController
@RequestMapping("/activemq/")
public class TopicController {
	@Autowired
	private TopicProducer topicProducer;
	@RequestMapping("/send")
	public String sendMsg(String msg) {
		topicProducer.sendMessage(msg);
		return msg + " Sended to Topic-Weather.";
	}
}

运行 Spring Boot 应用程序,访问 http://localhost:8080/activemq/send?msg=明天气温6-8度 ,向消息队列中发送“Hello Kevin.”这样一条消息。

然后观察 Spring Boot 应用的控制台,可以看到由 TopicConsumer.subscriber1 方法和 TopicConsumer.subscriber2 方法订阅Topic-Weather这个主题后接收到消息,在控制台输出的信息。

ActiveMQ 的管理控制台,也可以看到 Topic-Weather 及其消费者数量等信息。

下一节:RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息中间件,基于 erlang 开发,并发能力很强,性能极好,延时很低,是当前大量部署使用的消息中间件。