Spring Boot整合RabbitMQ
1. 环境准备
- Spring Boot 2.1.3.RELEASE
- RabbitMQ 3.x
- JDK 8 或以上
- Maven 3.5+
2. 安装Erlang、RabbitMQ
2.1 安装前准备
RabbitMQ 依赖 Erlang 环境,需确保两者的版本匹配,官方兼容性参考:RabbitMQ & Erlang 版本对照表。
2.2 下载安装包
- Erlang RPM 包:从 GitHub Releases 或 RabbitMQ 官方仓库 下载对应版本。
- RabbitMQ RPM 包:从 RabbitMQ 官网 下载与 Erlang 兼容的版本。
我这里直接下载的otp_src_26.1.tar.gz压缩包文件
2.1 安装Erlang
- 解压 otp_src_26.1.tar.gz
tar -xvzf otp_src_${ERLANG_VERSION}.tar.gz cd otp_src_${ERLANG_VERSION}
- 安装 Erlang 需要一些依赖库
- Debian/Ubuntu
sudo apt update sudo apt install -y build-essential autoconf libncurses5-dev libssl-dev libwxgtk3.0-gtk3-dev libwxbase3.0-dev libwxgtk3.0-dev libgl1-mesa-dev libglu1-mesa-dev libpng-dev libssh-dev unixodbc-dev xsltproc fop libxml2-utils
- CentOS/Rocky Linux
sudo yum groupinstall -y "Development Tools" sudo yum install -y epel-release sudo yum install -y ncurses-devel openssl-devel wxBase wxGTK3 wxGTK3-devel libpng-devel unixODBC-devel libxslt fop libxml2
- 运行 ./configure 以检查系统环境,并生成 Makefile:
./configure --prefix=/usr/local/erlang --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac
• --prefix=/usr/local/erlang 指定安装目录 • --enable-threads 允许多线程支持 • --enable-smp-support 允许多核 CPU 并行支持 • --enable-kernel-poll 启用内核轮询(提高 I/O 性能) • --enable-hipe 启用 HiPE(高性能 Erlang 虚拟机) • --without-javac 不编译 Java 相关功能(可选)
- 编译:
make -j$(nproc) # 使用所有 CPU 核心进行编译
- 安装
sudo make install
- 配置环境变量
echo 'export PATH=/usr/local/erlang/bin:$PATH' >> ~/.bashrc source ~/.bashrc
2.2 安装RabbitMQ
- 添加 RabbitMQ 官方仓库
# 添加 RabbitMQ 仓库密钥 curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key | sudo gpg --dearmor -o /usr/share/keyrings/rabbitmq.gpg # 添加 RabbitMQ 软件源 echo "deb [signed-by=/usr/share/keyrings/rabbitmq.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list # 更新软件包列表 sudo apt update
- 安装 RabbitMQ 3.8.x 版本
# 列出可用的 RabbitMQ 版本 apt-cache madison rabbitmq-server # 从输出中找到 3.8.x 系列的版本号: sudo apt install -y rabbitmq-server=3.8.2-0ubuntu1
-
启动 RabbitMQ 服务:sudo systemctl start rabbitmq-server
-
设置开机自启:sudo systemctl enable rabbitmq-server
-
使用 RabbitMQ 的 Web 管理界面,可以启用管理插件(可选):sudo rabbitmq-plugins enable rabbitmq_management
-
验证安装:sudo systemctl status rabbitmq-server
通过浏览器访问 http://服务器IP地址:15672,使用默认用户名 guest 和密码 guest 登录管理界面。不过需要注意的是,guest 用户默认只能通过本地访问,如果需要远程访问,需要创建新用户并授予相应权限。
2.2 开启远程访问
2.2.1 创建新用户
使用 rabbitmqctl 命令来创建新用户,命令格式如下:
sudo rabbitmqctl add_user
例如,创建一个名为 admin,密码为 admin123 的用户:
sudo rabbitmqctl add_user admin admin123
2.2.2 设置用户角色
RabbitMQ 有几种不同的用户角色,每个角色具有不同的权限:
- administrator:可以管理所有方面,包括用户、虚拟主机、权限等。
- monitoring:可以查看 RabbitMQ 节点的监控信息。
- policymaker:可以管理策略和参数。
- management:可以访问管理界面。
- none:没有特殊权限。
使用以下命令为用户设置角色:
sudo rabbitmqctl set_user_tags
如果要将 admin 用户设置为 administrator 角色,可以执行:
sudo rabbitmqctl set_user_tags admin administrator
2.2.3 授予用户权限
用户需要对特定的虚拟主机(Virtual Host)具有相应的权限才能进行操作。默认情况下,RabbitMQ 有一个名为 / 的虚拟主机。可以使用以下命令为用户授予权限:
sudo rabbitmqctl set_permissions -p ".*" ".*" ".*"
其中,三个 ".*" 分别表示对虚拟主机的配置(configure)、写入(write)和读取(read)权限。如果要为 admin 用户授予对 / 虚拟主机的所有权限,可以执行:
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
2.2.4 验证用户权限
你可以使用新创建的用户登录 RabbitMQ 的管理界面(http://服务器IP地址:15672),或者使用客户端工具(如 pika 等)连接到 RabbitMQ 服务器,验证是否可以正常访问。
2.2.5 删除默认 guest 用户(可选)
为了提高安全性,如果你已经创建了新的管理员用户,可以考虑删除默认的 guest 用户:
sudo rabbitmqctl delete_user guest
通过以上步骤,就可以创建新用户并授予相应权限,实现 RabbitMQ 的远程访问。
3. 添加Spring Boot RabbitMQ依赖
在pom.xml中添加依赖:
org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-amqp
执行mvn clean install。
4. 配置RabbitMQ
在application.yml中添加连接配置:
spring: rabbitmq: host: 10.211.55.5 port: 5672 username: admin password: admin123 virtualHost: /
5. 定义消息队列
5.1 创建RabbitMqConfig.java,定义交换机、队列和绑定:
import org.springframework.amqp.core.*; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMqConfig { public static final String EXCHANGE_NAME = "test-exchange"; public static final String ROUTING_KEY = "test-routing-key"; public static final String QUEUE_NAME = "test-queue"; @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } // 创建队列 @Bean public Queue queue() { /** * @param name 队列名字 * @param durable 声明一个持久队列(该队列将在服务器重启后继续存在),则为true */ return new Queue(QUEUE_NAME, true); } // 创建交换机(Direct类型) @Bean public DirectExchange exchange() { return new DirectExchange(EXCHANGE_NAME); } // 绑定队列到交换机 @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY); } }
5.2 生产者(Producer)
创建RabbitMqProducer.java发送消息:
import com.example.rabbitmq.config.RabbitMqConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class RabbitMqProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String message) { rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.ROUTING_KEY, message); } }
5.3 创建API测试发送消息
创建MessageController.java测试:
import com.example.rabbitmq.producer.RabbitMqProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @RestController @RequestMapping("/message") public class MessageController { @Autowired private RabbitMqProducer producer; @GetMapping("/send") public String sendMessage(@RequestParam String msg) { producer.sendMessage(msg); return "Message sent: " + msg; } }
访问http://localhost:8080/message/send?msg=Hello RabbitMQ测试。
5.4 消费者(Consumer)
创建RabbitMqConsumer.java接收消息:
import com.example.rabbitmq.config.RabbitMqConfig; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitMqConsumer { @RabbitListener(queues = RabbitMqConfig.QUEUE_NAME) public void receiveMessage(String message) { System.out.println("Received message: " + message); } }
重新启动项目,控制台应输出:
Received message: Hello RabbitMQ
6. 发送对象消息
6.1 定义消息对象
import java.io.Serializable; public class CustomMessage implements Serializable { private static final long serialVersionUID = 1L; private String content; private int id; public CustomMessage() {} public CustomMessage(String content, int id) { this.content = content; this.id = id; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public int getId() { return id; } public void setId(int id) { this.id = id; } @Override public String toString() { return "CustomMessage{id=" + id + ", content='" + content + "'}"; } }
6.2 发送对象
import com.weigang.config.RabbitMqConfig; import com.weigang.model.CustomMessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class RabbitMqProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String message) { rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.ROUTING_KEY, message); } public void sendObjectMessage(CustomMessage message) { rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.ROUTING_KEY, message); } }
6.3 通过API发送对象
@GetMapping("/sendObj") public String sendMessage(@RequestParam Integer id,@RequestParam String content) { CustomMessage customMessage = new CustomMessage(content, id); producer.sendObjectMessage(customMessage); return "Message sent: " + customMessage; }
然后访问:http://localhost:8080/message/sendObj?id=1001&content=Hello RabbitMQ
6.4 消费对象
import com.weigang.config.RabbitMqConfig; import com.weigang.model.CustomMessage; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitMqConsumer { @RabbitListener(queues = RabbitMqConfig.QUEUE_NAME) public void receiveMessage(String message) { System.out.println("Received message: " + message); } @RabbitListener(queues = RabbitMqConfig.QUEUE_NAME) public void receiveMessageObj(CustomMessage message) { System.out.println("Received object message: " + message.toString()); } }
重新启动项目,控制台应输出:
Received object message: CustomMessage{content='Hello RabbitMQ', id=1001}
- CentOS/Rocky Linux