RabbitMQ 安装,配置,java接入使用(详细教程)
一 RabbitMQ下载
RabbitMQ 官网最新版下载:
RabbitMQ: One broker to queue them all | RabbitMQ
RabbitMQ v3.13.6版本下载:
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.6/rabbitmq-server-3.13.6-1.el8.noarch.rpm
RabbitMQ依赖erlang-26.2.5.2-1.el7.x86_64.rpm下载:
https://github.com/rabbitmq/erlang-rpm/releases/download/v26.2.5.2/erlang-26.2.5.2-1.el7.x86_64.rpm
二 RabbitMQ安装
1 安装erlang环境
安装RabbitMQ前要先安装erlang环境,因为RabbitMQ是用erlang开发的
执行安装指令如下:
rpm -ivh erlang-26.2.5.2-1.el7.x86_64.rpm
执行后如下图:
验证 erlang 安装是否成功,执行erl可以查看版本,说明安装成功如下图:
2 安装RabbitMQ
执行安装RabbitMQ指令如下:
rpm -ivh rabbitmq-server-3.13.6-1.el8.noarch.rpm
执行安装中,如下图:
注意:如果erlang环境没有安装好,或者版本与当前rabbitMQ不匹配则会报错以下错误,提示需要指定范围的依赖版本,如下图:
如果出现上图的错误,请参考上一步重新安装erlang环境即可。
安装结束后,消息队列数据保存在哪?日记在哪?想了解更多的信息?
只需一条指令可查询当前状态信息:
rabbitmq-diagnostics status
执行后如下图:
从上图状态中可以看出目前没有使用任何配置文件,以可以看到以下有用的信息:
- 数据目录: /var/lib/rabbitmq/mnesia/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b
- 日记文件:/var/log/rabbitmq/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b.log
上图信息很详细,可以说开发者开发这个工具非常的细心,对软件有足够了解使用也安心!
3 配置RabbitMQ(可选项)
安装好后RabbitMQ没有使用任何的配置文件(也没有默认配置文件),但会生成一个空目录位置在:/etc/rabbitmq/ ,在这里你可以按照自己的需求参考官方网站配置自己的项目,格式支持有多种,下面我这里要变更默认端口为例创建一个配置文件:
vi /etc/rabbitmq/rabbitmq.config
配置文件内容:
[ {rabbit, [{tcp_listeners, [{"0.0.0.0", 51091}]}]}, {rabbitmq_management, [ {listener, [{port,59876}, {ssl, false}]} ]} ].
通过配置配置文件实现变更:
- 客户端 51091 用于消费或生产端连接,IP 0.0.0.0 代表绑定服务器内外网IP。
- 管理端口 59876 用于RabbitMQ的Web管理。
再次执行 rabbitmq-diagnostics status 查看新增的配置文件是否被使用,如下图:
上图可以看到刚刚创建的配置文件已被引用状态。
4 RabbitMQ 启动与关闭
RabbitMQ安装好后最终是服务状态,可以通过服务管理控制:
#启动 systemctl start rabbitmq-server #停止关闭 systemctl stop rabbitmq-server #重启 systemctl restart rabbitmq-server #开机启动 systemctl enable rabbitmq-server #查看状态 systemctl status rabbitmq-server
5 开启RabbitMQ的Web管理界面(可选项,强烈建议开启)
RabbitMQ的安装后自带Web管理界面,但是需要执行以下指令开启:
rabbitmq-plugins enable rabbitmq_management
我们平时只需要一名管员即可,后面要增加用户或设置权限直接在Web操作即可。
新增一位 RabbitMQ的Web管理员并增加设置管理权限 ,用于管理RabbitMQ.
#新增人员 rabbitmqctl add_user hua abc123uuPP #设置权限 rabbitmqctl set_permissions -p / hua ".*" ".*" ".*" #设置为管理员 rabbitmqctl set_user_tags hua administrator
* 表示授予该用户对该虚拟主机上所有队列和交换机的 configure、write 和 read 权限。
- 第一个 ".*" 表示用户可以配置任意队列和交换机。
- 第二个 ".*" 表示用户可以向任意队列和交换机发送消息。
- 第三个 ".*" 表示用户可以从任意队列中消费消息。
执行上面命令增加一个Web管理员:
- 用户名称:hua
- 密码:abc123uuPP
- 权限 :管理员
如果只在本地localhost登陆RabbitWeb管理平台,用默认的账号登陆即可:
- 默认用户:guest
- 默认密码:guest
三 RabbitMQ Web 管理
1 RabbitMQ Web 登陆
进入RabbitMQ Web 登陆页面如下:
首先我们使用默认账号密码尝试登陆,为了安全确实限制本地登陆,如下图:
使用上面新建的账号hua登陆,登陆成功如下图:
2 用户管理
用户管理,用户增加操作简单,如下图:
用户管理,用户权限设置操作简单,如下图:
用户操作界面非常人性化,可以很方便设置权限,修改用户资料。
3 虚拟主机(重要)
虚拟主机(vhost)是 RabbitMQ 中的一种逻辑隔离机制,它相当于一个独立的命名空间。每个虚拟主机内部可以拥有自己独立的队列、交换机、绑定等资源,彼此之间相互隔离,不能共享资源。
- 命名空间:每个虚拟主机都有自己的队列、交换机、绑定等资源。
- 资源隔离:不同虚拟主机之间的资源(如队列和交换机)完全隔离,防止不同应用间的资源冲突。
- 用户权限:不同的用户可以被授予不同虚拟主机的访问权限,确保用户只能访问指定的虚拟主机中的资源。
虚拟主机提供了一种隔离和权限管理的方式,适用于以下场景:
- 多租户架构:在 SaaS(软件即服务)或多租户应用中,你可以为不同的租户创建不同的虚拟主机,以确保数据隔离。
- 开发与生产环境隔离:你可以为开发环境和生产环境创建不同的虚拟主机,避免资源冲突和干扰。
- 权限管理:不同的用户或应用可以通过虚拟主机进行权限分离,确保只有特定用户才能访问某些资源。
默认虚拟主机
RabbitMQ 默认创建一个虚拟主机 /,这是一个特殊的虚拟主机,通常用于测试或默认情况下的资源管理。生产环境中,建议创建和使用新的虚拟主机,以更好地管理资源和权限。
虚拟主机操作也非常简单,如下图:
在用户管理界面选择用户绑定指定的虚拟主机,非常方便,如下图:
功能强大,非常好用。
四 java代码接入
方式一 java通用:
1 引入mvn依赖
com.rabbitmq amqp-client 5.20.0
JAVA 连接RabbitMQ生产消息与接收消费测试代码:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author hua * @date 2024-08-21 18:01 */ public class TestRabbitMQ { private final static String QUEUE_NAME = "hello"; public static void main1(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("xx.xx.xx.xx"); factory.setPort(51091); factory.setUsername("java_producer"); factory.setPassword("java_producer"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] argv) throws Exception { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("xx.xx.xx.xx"); factory.setPort(51091); factory.setUsername("java_consumer"); factory.setPassword("java_consumer"); // 连接到 RabbitMQ 服务器 try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明队列(确保队列存在) channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 定义回调函数,当有消息送达时执行 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; // 消费消息 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } }
测试运行发送消息,发送成功。如下图:
测试运行接收消息,消费成功。如下图:
上面测试通过后,改成服务类方便生产环境使用来发送消息代码:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.stereotype.Service; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author hua * @date 2024-08-22 */ @Service public class RabbitMqServiceImpl { private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class); private static final String QUEUE_NAME = "test"; private Connection connection; private Channel channel; public RabbitMqServiceImpl() { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("xx.xx.xx.xx"); factory.setPort(51091); factory.setUsername("java_producer"); factory.setPassword("java_producer"); //如果不指定虚拟机默认会使用/ factory.setVirtualHost("test"); try { this.connection = factory.newConnection(); this.channel = connection.createChannel(); this.channel.queueDeclare(QUEUE_NAME, false, false, false, null); logger.info("RabbitMqServiceImpl initialized successfully."); } catch (IOException | TimeoutException e) { e.printStackTrace(); logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage()); throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e); } } public void sendMessage(String message) { try { channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } catch (IOException e) { e.printStackTrace(); logger.error("Failed to send message: {}", e.getMessage()); } } public void close() { try { if (channel != null && channel.isOpen()) { channel.close(); } if (connection != null && connection.isOpen()) { connection.close(); } } catch (IOException | TimeoutException e) { e.printStackTrace(); } } }
上面的代码存在问题,未确认发送成功,有丢失风险,再改善如下:
import com.qyhua.common.table.db_hex_fail_log.entity.DbHexFailLog; import com.qyhua.common.table.db_hex_fail_log.service.impl.DbHexFailLogServiceImpl; import com.rabbitmq.client.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.io.IOException; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeoutException; /** * @author hua * @date 2024-08-22 */ @Service public class RabbitMqServiceImpl { private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class); private static final String QUEUE_NAME = "hex_kyc"; private Connection connection; private Channel channel; //存放所有消息,确认时删除,没确认的保存到数据库 private ConcurrentNavigableMap outstandingConfirms = new ConcurrentSkipListMap(); @Autowired DbHexFailLogServiceImpl dbHexFailLogService; @PostConstruct public void init() { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("xx.xx.xx.xx"); factory.setPort(xxx); factory.setUsername("java_producer"); factory.setPassword("java_producer"); factory.setVirtualHost("xxxx"); factory.setConnectionTimeout(3000); try { this.connection = factory.newConnection(); this.channel = connection.createChannel(); this.channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 启用发布者确认模式 this.channel.confirmSelect(); setupConfirmListener(); logger.info("RabbitMqServiceImpl initialized successfully."); } catch (IOException | TimeoutException e) { logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage(), e); throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e); } } public void sendMessage(String message) { try { long nextSeqNo = channel.getNextPublishSeqNo(); outstandingConfirms.put(nextSeqNo, message); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); logger.info(" [x] Sent '{}'", message); } catch (Exception e) { logger.error("Failed to send message: {}", e.getMessage(), e); saveFailedMessageToDatabase(message,"CF"); } } // 设置接收监听器,记录未确认的消息 private void setupConfirmListener() { ConfirmCallback ackCallback = (deliveryTag, multiple) -> { if (multiple) { outstandingConfirms.headMap(deliveryTag + 1).clear(); } else { outstandingConfirms.remove(deliveryTag); } System.out.println("Message confirmed ok deliveryTag="+deliveryTag); }; ConfirmCallback nackCallback = (deliveryTag, multiple) -> { if (multiple) { // 获取从起点到 `deliveryTag + 1` 之间的所有未确认的消息 ConcurrentNavigableMap unconfirmedMessages = outstandingConfirms.headMap(deliveryTag + 1); List FailList= new ArrayList(); for (Map.Entry entry : unconfirmedMessages.entrySet()) { String failedMessage = entry.getValue(); logger.error("Message not confirmed: deliveryTag={}, message={}", entry.getKey(), failedMessage); FailList.add(failedMessage); } saveFailedMessageToDatabaseBy(FailList); // 批量保存到数据库 unconfirmedMessages.clear(); // 清除这些未确认的消息 } else { String failedMessage = outstandingConfirms.get(deliveryTag); logger.error("Message not confirmed: deliveryTag={}, message={}", deliveryTag, failedMessage); saveFailedMessageToDatabase(failedMessage,"SF"); outstandingConfirms.remove(deliveryTag); // 移除单条未确认的消息 } }; channel.addConfirmListener(ackCallback, nackCallback); } private void saveFailedMessageToDatabaseBy(List failList) { List list=new ArrayList(failList.size()); LocalDateTime now = LocalDateTime.now(); for (String message : failList) { DbHexFailLog f=new DbHexFailLog(); f.setInHexStr(message); f.setCtime(now); f.setFlag("SF"); list.add(f); } dbHexFailLogService.saveBatch(list,list.size()); failList.clear(); } private void saveFailedMessageToDatabase(String message,String flag) { DbHexFailLog f=new DbHexFailLog(); f.setInHexStr(message); f.setCtime(LocalDateTime.now()); f.setFlag(flag); dbHexFailLogService.save(f); } @PreDestroy public void close() { try { if (channel != null && channel.isOpen()) { channel.close(); } if (connection != null && connection.isOpen()) { connection.close(); } logger.info("RabbitMqServiceImpl resources closed successfully."); } catch (IOException | TimeoutException e) { logger.error("Failed to close RabbitMqServiceImpl resources: {}", e.getMessage(), e); } } }
上面的代码优化后,主要增加了三项如下:
1 Publisher Confirms 机制:
- 启用 channel.confirmSelect() 来激活发布者确认模式。
- 使用 ConfirmCallback 和 NackCallback 来处理消息的确认与未确认逻辑。
- 未确认的消息会被保存到数据库中。
2 保存失败的消息到数据库。
3 在 @PreDestroy 方法中关闭 Channel 和 Connection,确保服务销毁时正确关闭资源。
方式二 SpringBoot框架使用
mvn依赖包:
org.springframework.boot spring-boot-starter-amqp
spring配置文件:
Spring: rabbitmq: host: xx.xx.xx.xx port: 51091 username: java_consumer password: java_consumer virtual-host: hellow connection-timeout: 6000
JAVA代码:
发送消息java代码:
import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @author hua * @date 2024-08-22 */ @Component public class MessageProducer { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private Queue queue; public void sendMessage(String message) { rabbitTemplate.convertAndSend(queue.getName(), message); System.out.println(" [x] Sent '" + message + "'"); } }
接收消息java代码:
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author hua * @date 2024-08-22 */ @Component public class RabbitListener { private static final Logger logger = LogManager.getLogger(RabbitListener.class); @RabbitListener(queues = "test") public void receiveMessage(String message) { try { System.out.println("rabbit rev