RabbitMQ 安装,配置,java接入使用(详细教程)

06-01 996阅读

一 RabbitMQ下载

RabbitMQ 官网最新版下载:

RabbitMQ: One broker to queue them all | RabbitMQ

RabbitMQ v3.13.6版本下载:

https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.6/rabbitmq-server-3.13.6-1.el8.noarch.rpm

RabbitMQ依赖erlang-26.2.5.2-1.el7.x86_64.rpm下载:

https://github.com/rabbitmq/erlang-rpm/releases/download/v26.2.5.2/erlang-26.2.5.2-1.el7.x86_64.rpm

二 RabbitMQ安装

   1 安装erlang环境

  安装RabbitMQ前要先安装erlang环境,因为RabbitMQ是用erlang开发的

  执行安装指令如下:

rpm -ivh erlang-26.2.5.2-1.el7.x86_64.rpm

   执行后如下图:

RabbitMQ 安装,配置,java接入使用(详细教程)

  验证 erlang 安装是否成功,执行erl可以查看版本,说明安装成功如下图:

 RabbitMQ 安装,配置,java接入使用(详细教程)

2 安装RabbitMQ

  执行安装RabbitMQ指令如下:

rpm -ivh rabbitmq-server-3.13.6-1.el8.noarch.rpm 

  执行安装中,如下图: 

RabbitMQ 安装,配置,java接入使用(详细教程)

 注意:如果erlang环境没有安装好,或者版本与当前rabbitMQ不匹配则会报错以下错误,提示需要指定范围的依赖版本,如下图: 

RabbitMQ 安装,配置,java接入使用(详细教程)

如果出现上图的错误,请参考上一步重新安装erlang环境即可。

 安装结束后,消息队列数据保存在哪?日记在哪?想了解更多的信息?

只需一条指令可查询当前状态信息:

rabbitmq-diagnostics status

执行后如下图:

RabbitMQ 安装,配置,java接入使用(详细教程)

RabbitMQ 安装,配置,java接入使用(详细教程)

从上图状态中可以看出目前没有使用任何配置文件,以可以看到以下有用的信息:

  • 数据目录: /var/lib/rabbitmq/mnesia/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b
  • 日记文件:/var/log/rabbitmq/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b.log

     上图信息很详细,可以说开发者开发这个工具非常的细心,对软件有足够了解使用也安心!

    3 配置RabbitMQ(可选项)

      安装好后RabbitMQ没有使用任何的配置文件(也没有默认配置文件),但会生成一个空目录位置在:/etc/rabbitmq/ ,在这里你可以按照自己的需求参考官方网站配置自己的项目,格式支持有多种,下面我这里要变更默认端口为例创建一个配置文件:

    vi /etc/rabbitmq/rabbitmq.config

    配置文件内容:

    [
      {rabbit, [{tcp_listeners, [{"0.0.0.0", 51091}]}]},
      {rabbitmq_management, [
      {listener, [{port,59876}, {ssl, false}]}
      ]}
    ].

      通过配置配置文件实现变更:

    1.   客户端 51091 用于消费或生产端连接,IP 0.0.0.0 代表绑定服务器内外网IP。
    2.   管理端口 59876 用于RabbitMQ的Web管理。

    再次执行 rabbitmq-diagnostics status 查看新增的配置文件是否被使用,如下图:

    RabbitMQ 安装,配置,java接入使用(详细教程)

     上图可以看到刚刚创建的配置文件已被引用状态。

    4 RabbitMQ 启动与关闭

       RabbitMQ安装好后最终是服务状态,可以通过服务管理控制:

    #启动
    systemctl start rabbitmq-server
    #停止关闭
    systemctl stop rabbitmq-server
    #重启
    systemctl restart rabbitmq-server
    #开机启动
    systemctl enable rabbitmq-server
    #查看状态
    systemctl status rabbitmq-server

      操作如下图:RabbitMQ 安装,配置,java接入使用(详细教程)

    5 开启RabbitMQ的Web管理界面(可选项,强烈建议开启)

    RabbitMQ的安装后自带Web管理界面,但是需要执行以下指令开启:

    rabbitmq-plugins enable rabbitmq_management

     我们平时只需要一名管员即可,后面要增加用户或设置权限直接在Web操作即可。

       新增一位 RabbitMQ的Web管理员并增加设置管理权限 ,用于管理RabbitMQ.

    #新增人员
    rabbitmqctl add_user hua abc123uuPP
    #设置权限
    rabbitmqctl set_permissions -p / hua ".*" ".*" ".*"
    #设置为管理员
    rabbitmqctl set_user_tags hua administrator
    

    * 表示授予该用户对该虚拟主机上所有队列和交换机的 configure、write 和 read 权限。

    • 第一个 ".*" 表示用户可以配置任意队列和交换机。
    • 第二个 ".*" 表示用户可以向任意队列和交换机发送消息。
    • 第三个 ".*" 表示用户可以从任意队列中消费消息。

       执行过程如图:RabbitMQ 安装,配置,java接入使用(详细教程)

      执行上面命令增加一个Web管理员:

      • 用户名称:hua
      • 密码:abc123uuPP
      • 权限 :管理员

          如果只在本地localhost登陆RabbitWeb管理平台,用默认的账号登陆即可:

        • 默认用户:guest
        • 默认密码:guest

          三 RabbitMQ Web 管理

          1 RabbitMQ Web 登陆

           进入RabbitMQ Web 登陆页面如下:

          RabbitMQ 安装,配置,java接入使用(详细教程)

          首先我们使用默认账号密码尝试登陆,为了安全确实限制本地登陆,如下图:

          RabbitMQ 安装,配置,java接入使用(详细教程)

          使用上面新建的账号hua登陆,登陆成功如下图:

          RabbitMQ 安装,配置,java接入使用(详细教程)

          2 用户管理

           用户管理,用户增加操作简单,如下图:

          RabbitMQ 安装,配置,java接入使用(详细教程)

            用户管理,用户权限设置操作简单,如下图:

          RabbitMQ 安装,配置,java接入使用(详细教程)

          用户操作界面非常人性化,可以很方便设置权限,修改用户资料。 

          3 虚拟主机(重要)

          虚拟主机(vhost)是 RabbitMQ 中的一种逻辑隔离机制,它相当于一个独立的命名空间。每个虚拟主机内部可以拥有自己独立的队列、交换机、绑定等资源,彼此之间相互隔离,不能共享资源。

          • 命名空间:每个虚拟主机都有自己的队列、交换机、绑定等资源。
          • 资源隔离:不同虚拟主机之间的资源(如队列和交换机)完全隔离,防止不同应用间的资源冲突。
          • 用户权限:不同的用户可以被授予不同虚拟主机的访问权限,确保用户只能访问指定的虚拟主机中的资源。

            虚拟主机提供了一种隔离和权限管理的方式,适用于以下场景:

            • 多租户架构:在 SaaS(软件即服务)或多租户应用中,你可以为不同的租户创建不同的虚拟主机,以确保数据隔离。
            • 开发与生产环境隔离:你可以为开发环境和生产环境创建不同的虚拟主机,避免资源冲突和干扰。
            • 权限管理:不同的用户或应用可以通过虚拟主机进行权限分离,确保只有特定用户才能访问某些资源。

              默认虚拟主机

              RabbitMQ 默认创建一个虚拟主机 /,这是一个特殊的虚拟主机,通常用于测试或默认情况下的资源管理。生产环境中,建议创建和使用新的虚拟主机,以更好地管理资源和权限。 

              虚拟主机操作也非常简单,如下图:

              RabbitMQ 安装,配置,java接入使用(详细教程) 在用户管理界面选择用户绑定指定的虚拟主机,非常方便,如下图:

               RabbitMQ 安装,配置,java接入使用(详细教程)

              功能强大,非常好用。 

              四 java代码接入

                方式一 java通用:

                1 引入mvn依赖

                      
                          com.rabbitmq
                          amqp-client
                          5.20.0
                      

                JAVA 连接RabbitMQ生产消息与接收消费测试代码:

              import com.rabbitmq.client.Channel;
              import com.rabbitmq.client.Connection;
              import com.rabbitmq.client.ConnectionFactory;
              import com.rabbitmq.client.DeliverCallback;
              import java.io.IOException;
              import java.util.concurrent.TimeoutException;
              /**
               * @author hua
               * @date 2024-08-21 18:01
               */
              public class TestRabbitMQ {
                  private final static String QUEUE_NAME = "hello";
                  public static void main1(String[] args) {
                      ConnectionFactory factory = new ConnectionFactory();
                      factory.setHost("xx.xx.xx.xx");
                      factory.setPort(51091);
                      factory.setUsername("java_producer");
                      factory.setPassword("java_producer");
                      try (Connection connection = factory.newConnection();
                           Channel channel = connection.createChannel()) {
                          channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                          String message = "Hello World!";
                          channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                          System.out.println(" [x] Sent '" + message + "'");
                      } catch (TimeoutException e) {
                          e.printStackTrace();
                      } catch (IOException e) {
                          e.printStackTrace();
                      }
                  }
                  public static void main(String[] argv) throws Exception {
                      // 创建连接工厂
                      ConnectionFactory factory = new ConnectionFactory();
                      factory.setHost("xx.xx.xx.xx");
                      factory.setPort(51091);
                      factory.setUsername("java_consumer");
                      factory.setPassword("java_consumer");
                      // 连接到 RabbitMQ 服务器
                      try (Connection connection = factory.newConnection();
                           Channel channel = connection.createChannel()) {
                          // 声明队列(确保队列存在)
                          channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                          System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
                          // 定义回调函数,当有消息送达时执行
                          DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                              String message = new String(delivery.getBody(), "UTF-8");
                              System.out.println(" [x] Received '" + message + "'");
                          };
                          // 消费消息
                          channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
                      }
                  }
              }
              

               测试运行发送消息,发送成功。如下图:

               RabbitMQ 安装,配置,java接入使用(详细教程)

               测试运行接收消息,消费成功。如下图:

              RabbitMQ 安装,配置,java接入使用(详细教程)

               上面测试通过后,改成服务类方便生产环境使用来发送消息代码:

              import com.rabbitmq.client.Channel;
              import com.rabbitmq.client.Connection;
              import com.rabbitmq.client.ConnectionFactory;
              import com.rabbitmq.client.MessageProperties;
              import org.apache.logging.log4j.LogManager;
              import org.apache.logging.log4j.Logger;
              import org.springframework.stereotype.Service;
              import java.io.IOException;
              import java.util.concurrent.TimeoutException;
              /**
               * @author hua
               * @date 2024-08-22
               */
              @Service
              public class RabbitMqServiceImpl {
                  private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class);
                  private static final String QUEUE_NAME = "test";
                  private Connection connection;
                  private Channel channel;
                  public RabbitMqServiceImpl() {
                      ConnectionFactory factory = new ConnectionFactory();
                      factory.setHost("xx.xx.xx.xx");
                      factory.setPort(51091);
                      factory.setUsername("java_producer");
                      factory.setPassword("java_producer");
                      //如果不指定虚拟机默认会使用/
                      factory.setVirtualHost("test");
                      try {
                          this.connection = factory.newConnection();
                          this.channel = connection.createChannel();
                          this.channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                          logger.info("RabbitMqServiceImpl initialized successfully.");
                      } catch (IOException | TimeoutException e) {
                          e.printStackTrace();
                          logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage());
                          throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e);
                      }
                  }
                  public void sendMessage(String message) {
                      try {
                          channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                          System.out.println(" [x] Sent '" + message + "'");
                      } catch (IOException e) {
                          e.printStackTrace();
                          logger.error("Failed to send message: {}", e.getMessage());
                      }
                  }
                  public void close() {
                      try {
                          if (channel != null && channel.isOpen()) {
                              channel.close();
                          }
                          if (connection != null && connection.isOpen()) {
                              connection.close();
                          }
                      } catch (IOException | TimeoutException e) {
                          e.printStackTrace();
                      }
                  }
              }
              

                 上面的代码存在问题,未确认发送成功,有丢失风险,再改善如下:

              import com.qyhua.common.table.db_hex_fail_log.entity.DbHexFailLog;
              import com.qyhua.common.table.db_hex_fail_log.service.impl.DbHexFailLogServiceImpl;
              import com.rabbitmq.client.*;
              import org.apache.logging.log4j.LogManager;
              import org.apache.logging.log4j.Logger;
              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.stereotype.Service;
              import javax.annotation.PostConstruct;
              import javax.annotation.PreDestroy;
              import java.io.IOException;
              import java.time.LocalDateTime;
              import java.util.ArrayList;
              import java.util.List;
              import java.util.Map;
              import java.util.concurrent.ConcurrentNavigableMap;
              import java.util.concurrent.ConcurrentSkipListMap;
              import java.util.concurrent.TimeoutException;
              /**
               * @author hua
               * @date 2024-08-22
               */
              @Service
              public class RabbitMqServiceImpl {
                  private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class);
                  private static final String QUEUE_NAME = "hex_kyc";
                  private Connection connection;
                  private Channel channel;
                  //存放所有消息,确认时删除,没确认的保存到数据库
                  private ConcurrentNavigableMap outstandingConfirms = new ConcurrentSkipListMap();
                  @Autowired
                  DbHexFailLogServiceImpl dbHexFailLogService;
                  @PostConstruct
                  public void init() {
                      ConnectionFactory factory = new ConnectionFactory();
                      factory.setHost("xx.xx.xx.xx");
                      factory.setPort(xxx);
                      factory.setUsername("java_producer");
                      factory.setPassword("java_producer");
                      factory.setVirtualHost("xxxx");
                      factory.setConnectionTimeout(3000);
                      try {
                          this.connection = factory.newConnection();
                          this.channel = connection.createChannel();
                          this.channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                          // 启用发布者确认模式
                          this.channel.confirmSelect();
                          setupConfirmListener();
                          logger.info("RabbitMqServiceImpl initialized successfully.");
                      } catch (IOException | TimeoutException e) {
                          logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage(), e);
                          throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e);
                      }
                  }
                  public void sendMessage(String message) {
                      try {
                          long nextSeqNo = channel.getNextPublishSeqNo();
                          outstandingConfirms.put(nextSeqNo, message);
                          channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                          logger.info(" [x] Sent '{}'", message);
                      } catch (Exception e) {
                          logger.error("Failed to send message: {}", e.getMessage(), e);
                          saveFailedMessageToDatabase(message,"CF");
                      }
                  }
                  // 设置接收监听器,记录未确认的消息
                  private void setupConfirmListener() {
                      ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
                          if (multiple) {
                              outstandingConfirms.headMap(deliveryTag + 1).clear();
                          } else {
                              outstandingConfirms.remove(deliveryTag);
                          }
                          System.out.println("Message confirmed ok deliveryTag="+deliveryTag);
                      };
                      ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
                          if (multiple) {
                              // 获取从起点到 `deliveryTag + 1` 之间的所有未确认的消息
                              ConcurrentNavigableMap unconfirmedMessages = outstandingConfirms.headMap(deliveryTag + 1);
                              List FailList= new ArrayList();
                              for (Map.Entry entry : unconfirmedMessages.entrySet()) {
                                  String failedMessage = entry.getValue();
                                  logger.error("Message not confirmed: deliveryTag={}, message={}", entry.getKey(), failedMessage);
                                  FailList.add(failedMessage);
                              }
                              saveFailedMessageToDatabaseBy(FailList);  // 批量保存到数据库
                              unconfirmedMessages.clear();  // 清除这些未确认的消息
                          } else {
                              String failedMessage = outstandingConfirms.get(deliveryTag);
                              logger.error("Message not confirmed: deliveryTag={}, message={}", deliveryTag, failedMessage);
                              saveFailedMessageToDatabase(failedMessage,"SF");
                              outstandingConfirms.remove(deliveryTag);  // 移除单条未确认的消息
                          }
                      };
                      channel.addConfirmListener(ackCallback, nackCallback);
                  }
                  private void saveFailedMessageToDatabaseBy(List failList) {
                      List list=new ArrayList(failList.size());
                      LocalDateTime now = LocalDateTime.now();
                      for (String message : failList) {
                          DbHexFailLog f=new DbHexFailLog();
                          f.setInHexStr(message);
                          f.setCtime(now);
                          f.setFlag("SF");
                          list.add(f);
                      }
                      dbHexFailLogService.saveBatch(list,list.size());
                      failList.clear();
                  }
                  private void saveFailedMessageToDatabase(String message,String flag) {
                      DbHexFailLog f=new DbHexFailLog();
                      f.setInHexStr(message);
                      f.setCtime(LocalDateTime.now());
                      f.setFlag(flag);
                      dbHexFailLogService.save(f);
                  }
                  @PreDestroy
                  public void close() {
                      try {
                          if (channel != null && channel.isOpen()) {
                              channel.close();
                          }
                          if (connection != null && connection.isOpen()) {
                              connection.close();
                          }
                          logger.info("RabbitMqServiceImpl resources closed successfully.");
                      } catch (IOException | TimeoutException e) {
                          logger.error("Failed to close RabbitMqServiceImpl resources: {}", e.getMessage(), e);
                      }
                  }
              }
              

              上面的代码优化后,主要增加了三项如下:

               1 Publisher Confirms 机制:

              • 启用 channel.confirmSelect() 来激活发布者确认模式。
              • 使用 ConfirmCallback 和 NackCallback 来处理消息的确认与未确认逻辑。
              • 未确认的消息会被保存到数据库中。

                2 保存失败的消息到数据库。

                3 在 @PreDestroy 方法中关闭 Channel 和 Connection,确保服务销毁时正确关闭资源。

                方式二 SpringBoot框架使用

                mvn依赖包:

                 
                        
                            org.springframework.boot
                            spring-boot-starter-amqp
                        

                spring配置文件:

                Spring: 
                 rabbitmq:
                    host: xx.xx.xx.xx
                    port: 51091
                    username: java_consumer
                    password: java_consumer
                    virtual-host: hellow
                    connection-timeout: 6000

                JAVA代码:

                  发送消息java代码:

                import org.springframework.amqp.core.Queue;
                import org.springframework.amqp.rabbit.core.RabbitTemplate;
                import org.springframework.beans.factory.annotation.Autowired;
                import org.springframework.stereotype.Service;
                /**
                 * @author hua
                 * @date 2024-08-22
                 */
                @Component
                public class MessageProducer {
                    @Autowired
                    private RabbitTemplate rabbitTemplate;
                    @Autowired
                    private Queue queue;
                    public void sendMessage(String message) {
                        rabbitTemplate.convertAndSend(queue.getName(), message);
                        System.out.println(" [x] Sent '" + message + "'");
                    }
                }
                

                  接收消息java代码:

                import org.springframework.amqp.rabbit.annotation.RabbitListener;
                import org.springframework.beans.factory.annotation.Autowired;
                import org.springframework.stereotype.Component;
                /**
                 * @author hua
                 * @date 2024-08-22
                 */
                @Component
                public class RabbitListener {
                    private static final Logger logger = LogManager.getLogger(RabbitListener.class);
                    @RabbitListener(queues = "test")
                    public void receiveMessage(String message) {
                        try {
                            System.out.println("rabbit rev 
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码