RabbitMq详解

06-02 1388阅读

一、概念

1. 什么是RabbitMQ

使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

AMQP,即Advanced Message Queuing Protocol(高级消息队列协议),一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件传递消息,不受客户端/中间件不同产品、不同开发语言等条件的限制

RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。

2. 核心特点

RabbitMQ 是基于 Erlang 语言开发、遵循 AMQP(高级消息队列协议)的开源消息中间件,具备以下显著特性:

  1. 高可靠性保障:通过消息持久化、传输确认及发布确认等机制,确保消息在生产、传输与消费过程中的可靠性,避免数据丢失风险。
  2. 灵活的消息分发策略:依托 Exchange(交换机)实现多样化消息路由,支持简单模式、工作队列模式、发布订阅模式、路由模式及通配符模式,可灵活适配不同业务场景需求。
  3. 集群化部署能力:支持多台 RabbitMQ 服务器组成集群,构建逻辑统一的 Broker,提升系统可用性与负载均衡能力。
  4. 多协议兼容性:除 AMQP 外,还支持 STOMP、MQTT 等多种消息队列协议,便于与不同技术栈的系统集成。
  5. 广泛的客户端支持:提供对 Java、.NET、Ruby 等几乎所有常用编程语言的客户端支持,降低开发接入门槛。
  6. 可视化管理界面:内置直观易用的管理界面,方便用户实时监控消息队列状态、管理 Broker 资源及配置参数。
  7. 插件化扩展体系:具备丰富的官方插件,并支持自定义开发,可根据业务需求灵活扩展功能,如权限控制、消息追踪等。

3. RabbitMQ能做什么

RabbitMQ可以帮助开发者构建可靠、高效、可扩展的分布式系统,实现异步通信、任务分发和事件驱动等功能。它被广泛应用于各种场景,包括微服务架构、消息驱动的架构、实时数据处理等。

4. RabbitMQ的工作流程

RabbitMQ的工作流程主要基于生产者-消费者模型和AMQP协议。以下是RabbitMQ消息传递的基本流程:

  • 生产者发送消息:生产者通过RabbitMQ的客户端库创建消息,并指定交换机的名称和路由键。然后,生产者将消息发送到RabbitMQ服务器上的指定交换机。
  • 交换机接收并路由消息:交换机接收到生产者的消息后,根据配置的路由规则和路由键将消息分发到相应的队列中。如果消息没有匹配到任何队列,则可能会被丢弃或返回给生产者。
  • 消费者消费消息:消费者连接到RabbitMQ服务器,并监听指定的队列。当队列中有新消息时,消费者从队列中获取并处理消息。处理完成后,消费者可以选择发送确认消息给RabbitMQ服务器,以表示消息已被成功处理。
  • 消息确认机制:RabbitMQ使用消息确认机制来确保消息的可靠传递。生产者在发送消息后会收到一个确认,表示消息已成功发送到交换机。消费者在处理完

    5. 消息队列优缺点

    消息队列在特殊场景下具备显著优势,主要体现在解耦、异步处理以及削峰等方面。然而,引入消息队列也伴随着一些不可忽视的缺点:

    1. 系统可用性降低:随着外部依赖的增加,系统的稳定性面临挑战。例如,原本 A 系统只需调用 B、C、D 三个系统的接口,系统运行正常。但引入消息队列(MQ)后,若 MQ 出现故障,可能导致整个系统崩溃。因此,如何保障消息队列的高可用性成为关键,可点击 [具体链接] 查看相关解决方案。
    2. 系统复杂度提升:消息队列的加入使得系统逻辑变得复杂。例如,需要处理消息的重复消费问题,即确保每条消息仅被处理一次;处理消息丢失的情况,通过可靠的消息传递机制保证消息不丢失;以及保证消息传递的顺序性,尤其是在对消息顺序有严格要求的场景中。这些问题增加了系统开发和维护的难度。
    3. 数据一致性问题:当 A 系统处理请求并返回成功后,可能出现数据不一致的情况。例如,B、D 系统写库成功,而 C 系统写库失败,这会导致数据状态的不一致。在使用消息队列时,需要额外的技术方案来确保数据的一致性,例如采用分布式事务、消息补偿机制等。

    消息队列虽然功能强大,能够带来诸多好处,但同时也引入了系统复杂性。在使用消息队列时,需要针对其缺点制定相应的技术架构和解决方案,以平衡系统的性能、可用性和一致性。

    6. AMQP和JMS

    MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。

    二者的区别与联系:

    1. 接口与协议:JMS(Java Message Service)定义了统一的接口,用于规范消息操作;而 AMQP(Advanced Message Queuing Protocol)则是通过制定协议,来统一数据交互的格式。
    2. 语言限制:JMS 主要限定在 Java 语言环境中使用,专为 Java 平台提供消息服务;AMQP 作为一种协议,不限制实现语言,具备跨语言的特性,可在多种编程语言的项目中使用。
    3. 消息模型:JMS 规定了特定的两种消息模型,满足 Java 平台下的消息通信需求;相比之下,AMQP 的消息模型更为丰富多样,能够适应更多不同场景下的消息通信需求。

    7. 常见MQ产品

    消息队列(MQ)作为重要的中间件,在消息通信中起着关键作用。目前主流的 MQ 产品有 Kafka、ActiveMQ、RocketMQ 和 RabbitMQ,它们各自具有不同的特点和适用场景。

    1. ActiveMQ:基于 JMS(Java Message Service),在早期项目中广泛使用。但随着技术发展,其官网活跃度较低,更新频率低。单机吞吐量为万级,对于高并发的互联网项目,性能上无法满足需求。高可用方面采用主从架构实现,不过消息可靠性较低,存在数据丢失的可能。由于其性能和可靠性的限制,在现代项目中逐渐被替代,可被 RabbitMQ 取代。
    2. RabbitMQ:基于 AMQP 协议,使用 Erlang 语言开发。社区活跃,单机吞吐量同样为万级,无法应对超高并发场景。高可用上采用镜像集群模式,能保障系统的高可用性。消息可靠性较高,可保证数据不丢失,还支持消息重试、死信队列等高级功能。然而,由于其使用 Erlang 语言开发,国内精通该语言的人较少,导致源码阅读困难。适合中小型公司,在不需要面对复杂技术挑战的场景下使用。
    3. RocketMQ:阿里巴巴开源的消息中间件,基于 JMS。单机吞吐量可达 10 万级,能应对互联网项目的高并发挑战。高可用方面采用分布式架构,可搭建大规模集群,性能表现出色。消息可靠性高,通过配置能确保数据不丢失,同时支持延迟消息、事务消息、消息回溯、死信队列等丰富的高级功能。使用 Java 语言开发,便于阅读源码理解底层原理。在技术选型中是一个优秀的选择,适合大型互联网公司和中小型公司使用,商业版存在收费情况。
    4. Kafka:分布式消息系统,单机吞吐量极高,可达十几万的并发量。高可用上支持分布式集群部署。消息可靠性方面,由于消息先存储在磁盘缓冲区,机器故障时可能导致缓冲区数据丢失,即异步性能和数据可靠性存在一定矛盾。功能相对单一,主要用于消息的接收与发送。在行业内常用于大数据领域,如采集用户行为日志并进行计算,实现 “猜你喜欢” 等功能。如果项目没有大数据相关需求,一般不会选择 Kafka。

    RabbitMq详解

    7.1. 选择 RabbitMQ 的原因

    1. 对比 ActiveMQ:ActiveMQ 性能不佳,在高并发场景下无法满足需求,虽然其 API 完善,但在高并发场景下不适用,而 RabbitMQ 的稳定性和可靠性更好,能满足更多场景需求。
    2. 对比 Kafka:Kafka 强调高性能,但在业务对消息可靠性要求高时,其可能丢失消息的特性无法满足要求。而 RabbitMQ 能保证消息不丢失,更适合对消息可靠性有要求的业务场景。
    3. 对比 RocketMQ:RocketMQ 具备高性能、高可靠性、支持分布式事务、水平扩展和大量消息堆积等优点,但商业版收费,某些功能不对外提供。RabbitMQ 社区活跃,功能也较为丰富,对于没有 RocketMQ 高级功能需求且不想承担商业版费用的项目来说,是一个不错的选择。

    8. 核心组件

    RabbitMq详解

    RabbitMq详解

    RabbitMq详解

    • Publisher(生产者):消息的生产者,也是一个向交换器发布消息的客户端应用程序,负责将业务数据封装为消息,并推送至 RabbitMQ 消息服务器。
    • Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序
    • Exchange(交换机):消息分发枢纽,接收来自生产者的消息,并根据消息携带的 RoutingKey(路由键)及绑定规则,将消息分配至对应的 Queue(队列)。
    • Binding:绑定,用于将消息队列和交换器之间建立关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将它理解成一个由绑定构成的路由表。
    • Queue(队列):消息存储容器,用于暂存待消费的消息,直到被消费者获取处理。
    • Connection:网络连接,比如一个 TCP 连接
    • 消息服务器(RabbitMQ):作为消息中转站,本身不生产或消费消息,仅负责接收生产者的消息,并根据规则路由至对应的消费者。
    • ConnectionFactory:连接管理器,负责创建并管理应用程序(生产者或消费者)与 RabbitMQ 之间的网络连接,确保通信链路的稳定建立与维护。
    • Channel(信道):消息传输通道,所有消息的发布、接收操作均通过信道完成。多个信道可复用同一物理连接,提升资源利用率与并发性能。
      • RoutingKey:消息携带的路由标识,作为交换机路由消息的依据,决定消息最终投递到哪个队列。
      • BindingKey:队列与交换机的绑定标识,通过 BindingKey 建立 Queue 与 Exchange 之间的关联关系,使交换机能够按规则将消息路由至目标队列 。
      • Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /
      • Broker:表示消息队列服务器实体
      • Message:消息实体,它由消息头和消息体组成。消息头主要由路由键、交换器、队列、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等属性组成,而消息体就是指具体的业务对象。

        相比传统的 JMS 模型,AMQP 主要多了 Exchange、Binding 这个新概念。

        在 AMQP 模型中,消息的生产者不是直接将消息发送到Queue队列,而是将消息发送到Exchange交换机,其中还新加了一个中间层Binding绑定,作用就是通过路由键Key将交换机和队列建立绑定关系。

        就好比类似用户表和角色表,中间通过用户角色表来将用户和角色建立关系,从而实现关系绑定,在 RabbitMQ 中,消息生产者不直接跟队列建立关系,而是将消息发送到交换器之后,由交换器通过已经建立好的绑定关系,将消息发送到对应的队列。

        RabbitMQ 最终的架构模型,核心部分就变成如下图所示:

        RabbitMq详解

        从图中很容易看出,与 JMS 模型最明显的差别就是消息的生产者不直接将消息发送给队列,而是由Binding绑定决定交换器的消息应该发送到哪个队列,进一步实现了在消息的推送方面,更加灵活!

        8.1. 生产者发送消息流程:

        1、生产者和Broker建立TCP连接。

        2、生产者和Broker建立通道。

        3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。

        4、Exchange将消息转发到指定的Queue(队列)

        8.2. 消费者接收消息流程:

        1、消费者和Broker建立TCP连接

        2、消费者和Broker建立通道

        3、消费者监听指定的Queue(队列)

        4、当有消息到达Queue时Broker默认将消息推送给消费者。

        5、消费者接收到消息。

        6、ack回复

        二、安装使用

        1. Windows安装

        1.1. 安装erlang

        进入erlang的官方下载页面进行下载:Downloads - Erlang/OTP

        在下载过程中一定要对应匹配RabbitMQ的版本

        双击安装并配置环境变量

        RabbitMq详解

        RabbitMq详解

        RabbitMq详解

        1.2. 下载RabbitMQ

        RabbitMQ下载地址:Installing RabbitMQ | RabbitMQ

        RabbitMq详解

        双击安装,安装完成后,开始安装RabbitMQ-Plugins插件

        先cd D:\software\RabbitMQ\rabbitmq_server-3.8.8\sbin

        然后运行命令:rabbitmq-plugins enable rabbitmq_management

        RabbitMq详解

        执行rabbitmqctl status,出现以下内容,说明成功

        RabbitMq详解

        然后双击运行rabbitmq-server.bat

        RabbitMq详解

        进入登录页面,发现启动成功

        然后我们可以将RabbitMQ做成Windows服务

        以管理员身份运行cmd

        cd D:\software\RabbitMQ\rabbitmq_server-3.8.8\sbin

        执行rabbitmq-service.bat install

        RabbitMq详解

        可以通过任务管理器去查看RabbitMQ服务

        RabbitMq详解

        以上就是Windows安装RabbitMQ的全部过程,页面设置跟上面的Linux一样。

        2. 管理界面介绍

        2.1. 概览

        RabbitMq详解

        RabbitMq详解

        connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况

        channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。

        Exchanges:交换机,用来实现消息的路由

        Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。

        端口:

        RabbitMq详解

        2.2. 连接

        RabbitMq详解

        2.3. 通道

        RabbitMq详解

        2.4. 交换机

        交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列

        RabbitMq详解

        Type

        解释

        direct

        它会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中

        fanout

        它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中

        headers

        headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。(headers 类型的交换器性能差,不实用,基本不会使用。)

        topic

        与 direct 模型相比,多了个可以使用通配符!,这种模型 Routing key 一般都是由一个或多个单词组成,多个单词之间以 "." 分割,例如:item.insert 。星号 匹配一个词,例 audit.* ;# 号匹配一个或多个词 audit.#

        x-delayed-message

        延迟交换机,可以延迟接收消息

        Features

        解释

        D

        d是 durable的缩写,代表这个队列中的消息支持持久化

        AD

        ad是 autoDelete的缩写,代表当前队列的最后一个消费者退订时被自动删除。注意:此时不管队列中是否还存在消息,队列都会删除

        excl

        是 exclusive

        的缩写,代表这是一个排他队列。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。需注意:

        1. 排他队列基于连接可见,同一连接的不同信道可访问同一连接创建的排他队列;

        2. 首次声明后,其他连接不允许创建同名排他队列;

        3. 即使队列持久化,连接关闭或客户端退出,排他队列仍会自动删除。适用于单个客户端发送读取消息的场景

        Args

        是 arguments的缩写,代表该队列配置了 arguments参数

        TTL

        是 x-message-ttl的缩写,用于设置队列中所有消息的生存周期(统一配置),也可在发布消息时单独为某个消息指定剩余生存时间,单位为毫秒

        Exp

        Auto Expire,是 x-expires 配置的缩写。当队列在指定时间内未被访问(如 consume、basicGet、queueDeclare 等操作),将会被删除

        Lim

        说明该队列配置了 x-max-length,用于限定队列中消息的最大数量,超过指定长度将删除最早的消息

        Lim B

        说明队列配置了 x-max-length-bytes,用于限定队列最大占用空间大小,通常受内存、磁盘空间限制

        DLX

        说明该队列配置了 x-dead-letter-exchange。当队列消息长度超过限制、过期等情况发生时,从队列中删除的消息将被推送到指定的交换机,而非直接丢弃

        DLK

        x-dead-letter-routing-key的缩写,用于将删除的消息推送到指定交换机的指定路由键对应的队列中

        Pri

        x-max-priority的缩写,标识该队列为优先级队列。需先定义最大优先级值(通常不宜过大),发布消息时可指定消息优先级,数值更大(优先级更高)的消息优先被消费

        Ovfl

        x-overflow的缩写,用于配置队列消息溢出时的处理方式。可选择 drop-head(默认,丢弃队列头部消息)或 reject-publish(拒绝接收生产者后续发送的所有消息)

        ha-all

        镜像队列。all表示将队列镜像到集群上的所有节点,ha-params参数将被忽略

        2.5. 队列

        RabbitMq详解

        点击名称进去,可以看到队列的详细信息

        get Message可以看到消息的内容

        RabbitMq详解

        arguments具体参数如下:

        参数名

        作用

        x-message-ttl

        发送到队列的消息在被丢弃之前可存活的时间(单位:毫秒)

        x-max-length

        限定队列可容纳的最大消息数量

        x-expires

        队列在自动删除前可使用的时长(单位:毫秒)

        x-max-length-bytes

        以字节为单位限制队列的容量大小,通过控制队列占用的存储空间来达到容量限制目的,需设置为非负整数

        x-dead-letter-exchange

        配置队列溢出行为(有效值为 drop-head

        或 reject-publish

        );同时指定当消息被拒绝或过期时,将消息重新发布到的交换机名称(可选)

        x-dead-letter-routing-key

        当消息变为死信时,可选的替换路由键;若未设置,则使用消息的原始路由键

        x-max-priority

        定义队列支持的最大优先级数值;若未设置,队列不支持消息优先级功能

        x-queue-mode

        将队列设置为延迟模式,使队列尽可能将消息存储在磁盘以降低内存占用;若未设置,队列将保留内存缓存以实现消息快速传递

        x-queue-master-locator

        设置队列的主位置模式,确定在集群节点上声明队列时,队列主节点所在位置的规则

        2.6. Admin用户和虚拟主机管理

        2.6.1. 添加用户

        RabbitMq详解

        上面的Tags选项,其实是指定用户的角色,可选的有以下几个:

        • 超级管理员 (administrator):能够登录管理控制台,可查看所有信息,还能对用户和策略(policy)进行操作。
        • 监控者 (monitoring):可以登录管理控制台,同时能查看 RabbitMQ 节点的相关信息,如进程数、内存使用情况、磁盘使用情况等。
        • 策略制定者 (policymaker):可登录管理控制台,对策略(policy)进行管理,但无法查看节点的相关信息(如进程数、内存使用情况等,即上图红框标识的部分) 。
        • 普通管理者 (management):仅能登录管理控制台,既无法看到节点信息,也不能对策略进行管理。
        • 其他:不具备登录管理控制台的权限,通常作为普通的生产者和消费者,专注于消息的生产和消费操作。
          2.6.2. 创建虚拟主机

          RabbitMq详解

          为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。

          RabbitMq详解

          创建好虚拟主机,我们还要给用户添加访问权限:

          点击添加好的虚拟主机:

          RabbitMq详解

          进入虚拟主机设置界面:

          RabbitMq详解

          2.7. 开启日志

          进入容器,输入

          rabbitmq-plugins enable rabbitmq_tracing

          此时会多一个tracing标签,输入信息添加日志。

          RabbitMq详解

          Name:自定义,建议标准点容易区分

          Format:表示输出的消息日志格式,有Text和JSON两种,Text格式的日志方便人类阅读,JSON的方便程序解析。

          JSON格式的payload(消息体)默认会采用Base64进行编码,如上面的“trace test payload.”会被编码成“dHJhY2Ug dGVzdCBwYXlsb2FkLg==”。

          Max payload bytes: 表示每条消息的最大限制,单位为B。比如设置了了此值为10,那么当有超过10B的消息经过Rabbit MQ流转时,在记录到trace文件的时候会被截断。如上text日志格式中“trace test payload.”会被截断成“trace tes

          Pattern:

          #:追踪所有进入和离开MQ的消息

          publish.#:追踪所有进入MQ的消息

          publish.myExchage:追踪所有进入到myExchange的消息

          deliver.#:跟踪所有离开MQ的消息

          deliver.myQueue:追踪所有从myQueue离开的消息

          #.myQueue:实测效果等同于deliver.myQueue

          添加后,点击即可查看日志

          RabbitMq详解

          RabbitMq详解

          如果出现错误,是因为插件默认是使用 guest 用户,是因为把 guest 用户删除了,或者在配置文件里面使用其他用户

          RabbitMq详解

          解决: 配置/etc/rabbitmq/rabbitmq.config,添加配置

          {rabbitmq_tracing,
             [
                 {directory, "/var/vcap/sys/log/rabbitmq-server/tracing"},
                 {username, },
                 {password, }
             ]
          },

          三、详细说明

          1. RabbitMQ 的 5 种核心消息模式

          RabbitMQ 提供五种核心工作模式,适用于不同业务场景,满足多样化消息传递需求:

          1.1. 简单模式(HelloWorld)

          架构:单生产者、单消费者、单队列,无需手动配置交换机,默认使用 RabbitMQ 内置的 Direct 交换机。

          原理:生产者发送消息直接进入队列,消费者从队列获取并处理消息。

          应用场景:适用于快速验证消息收发功能,如基础程序间通信测试、简单任务处理,可实现高效的消息生产与消费。

          1.2. 工作队列模式(Work Queue)

          架构:一个生产者对应多个消费者,依赖默认交换机,消费者间形成竞争关系。

          原理:生产者将任务消息发送至队列,多个消费者从队列拉取任务,同一消息仅被一个消费者处理,实现任务并行处理。若消费者宕机,未确认的消息会在一段时间后重新分配,保障任务可靠性。

          应用场景:适用于异步处理耗时任务(如文件处理、数据分析)、批量执行后台任务(发送邮件、生成报表)等场景,提升系统处理能力。

          RabbitMq详解

          1.3. 发布订阅模式(Publish/Subscribe)

          架构:需配置fanout类型交换机,交换机绑定多个队列,队列与消费者解耦。

          原理:生产者将消息发送至fanout交换机,交换机无视路由键,直接将消息广播至所有绑定队列,实现一对多消息分发。

          应用场景:常用于实时通知(如系统告警、新闻推送)、日志广播等场景,确保消息快速扩散至多个接收端。

          RabbitMq详解

          1.4. 路由模式(Routing)

          架构:采用direct类型交换机,交换机与队列绑定需指定精确的routing key(路由键)。

          原理:生产者发送消息时携带routing key,交换机根据该键值将消息精准投递至匹配的队列,实现按条件筛选消息的定向分发。

          应用场景:适用于根据业务规则分类处理消息的场景,如订单系统中按订单类型(普通订单、加急订单)分发至不同处理队列。

          RabbitMq详解

          1.5. 主题模式(Topic)

          架构:使用topic类型交换机,支持通配符规则(*匹配单个单词,#匹配零个或多个单词)。

          原理:生产者发送消息时携带routing key,交换机依据通配符规则将消息路由至多个符合条件的队列,实现灵活的消息过滤与动态分发。

          应用场景:适用于复杂消息分类场景,如日志系统按级别(info.*、error.#)和模块(user.log、order.log)归档消息,或物联网设备状态监控中的多维度数据分发。

          RabbitMq详解

          通过灵活选择不同工作模式,RabbitMQ 能够高效支持从基础通信到复杂业务场景的消息处理需求,保障系统的可靠性与扩展性。

          RabbitMq详解

          RabbitMq详解

          2. 交换机分发策略

          当消息的生产者将消息发送到交换器之后,是不会存储消息的,而是通过中间层绑定关系将消息分发到不同的队列上,其中交换器的分发策略分为四种:Direct、Topic、Headers、Fanout!

          • Direct:直连类型,即在绑定时设定一个 routing_key, 消息的 routing_key 匹配时, 才会被交换器投送到绑定的队列中去,原则是先匹配、后投送;
          • Topic:按照规则转发类型,支持通配符匹配,和 Direct 功能一样,但是在匹配 routing_key的时候,更加灵活,支持通配符匹配,原则也是先匹配、后投送;
          • Headers:头部信息匹配转发类型,根据消息头部中的 header attribute 参数类型,将消息转发到对应的队列,原则也是先匹配、后投送;
          • Fanout:广播类型,将消息转发到所有与该交互机绑定的队列上,不关心 routing_key;

            2.1. Direct(直接交换机)

            Direct 交换机是 RabbitMQ 的默认交换机类型,遵循精确匹配路由规则:只有当消息携带的路由键(routing key)与队列绑定到交换机时配置的绑定键(binding key)完全一致,消息才会被转发至该队列。这种模式属于单播机制,确保消息精准投递。

            RabbitMQ 默认提供了一个名称为空字符串的 Direct 交换机(在 Web 管理界面显示为(AMQP default)),该交换机自动绑定到所有队列,且每个队列的绑定键为自身名称。因此,在未显式指定交换机的场景下,生产者发送的消息实际通过此默认交换机完成路由,这也是部分开发者误以为 “不配置交换机也能收发消息” 的原因。

            例如,若交换机绑定队列时设置绑定键为black,仅当消息路由键也为black时,消息才会被转发至该队列;若消息路由键为black.green,则不会被投递。Direct 交换机以严格的一对一匹配逻辑,实现消息的确定性路由。

            RabbitMq详解

            示例1:

            如图,假设生产发送消息到直接交换机,消息路由键为:green,那么消息将被投放入Queue1队列中,因为Queue1的绑定建和消息路由键精确匹配。

            RabbitMq详解

            示例2:

            如图,假设生产发送消息到直接交换机,消息路由键为:green,那么消息将被投放入Queue1和Queue2队列中,因为Queue1,Queue2的绑定建和消息路由键精确匹配。

            RabbitMq详解

            2.2. Fanout(扇出交换机)

            扇出(fanout)类型的交换机与其他类型不同,它采用消息广播模式。其消息转发规则是:会将所有发送到该交换机的消息转发到与之绑定的所有消息队列中,完全不考虑路由键和绑定键,即便配置了路由键也会被忽略。

            这就类似于子网广播,子网内的每台主机都会获得一份复制的消息。在 RabbitMQ 的四种交换机类型(Direct、Topic、Headers、Fanout)中,扇出交换机转发消息的速度是最快的,能够高效地将消息同时分发给多个绑定队列。

            RabbitMq详解

            示例:

            假设生产发送消息到直接交换机,消息路由键为:quick,那么最后消息将被投放入所有队列中(和路由键,绑定键无关)

            RabbitMq详解

            2.3. Topic(主题交换机)

            Topic 类型交换机基于通配符匹配规则实现消息路由,相比 Direct 交换机更加灵活:仅当消息携带的路由键(routing key)符合队列绑定的通配符规则时,才会将消息转发至对应队列。该模式支持两种特殊通配符:

            • *(星号):匹配单个单词。例如绑定键为logs.*,仅接收路由键中包含两个单词且第一个单词为logs的消息(如logs.error)。
            • #(井号):匹配零个或多个单词。若绑定键设置为logs.#,可接收logs开头的任意长度单词消息(如logs、logs.error、logs.error.server);当绑定键为#时,该队列将接收交换机的所有消息,效果等同于 Fanout 交换机。

              当绑定键中不使用通配符时,Topic 交换机将退化为 Direct 交换机的精确匹配模式。例如,若绑定键为black,仅当消息路由键也为black时才会投递;而使用通配符(如black.#),则可匹配black、black.green等包含black前缀的多种路由键,实现动态、灵活的消息过滤与分发,适用于日志分级处理、多维度消息分类等场景。

              RabbitMq详解

              示例:

              假设

              • 生产发送消息到主题交换机,消息路由键为:quick.orange.rabbit、lazy.orange.element,那么消息将被投放入所有队列中。
              • 生产发送消息到主题交换机,消息路由键为:quick.orange.fox,那么消息将被投放入Queue1队列中。
              • 生产发送消息到主题交换机,消息路由键为: lazy.pink.rabbit",那么消息将被投放入Queue2队列中,且只会放入一次,虽然匹配两个绑定键。
              • 生产发送消息到主题交换机,消息路由键为:quick.brown.fox、 orange、quick.orange.new.rabbit,那么消息将不会被投放入任何队列中。
              • 生产发送消息到主题交换机,消息路由键为:lazy.orange.new.rabbit,那么消息将被投放入Queue2队列中。

                RabbitMq详解

                2.4. Headers(消息头交换机)

                头部交换机(Headers)依据消息的自定义消息头属性(headers)进行消息路由匹配,与路由键和绑定键无关,具体规则如下:

                2.4.1. 绑定指定键值对且包含 x - match

                当在绑定队列和交换机时指定了一组键值对,并且其中包含一个键为 x - match,其值为 any 或 all:

                • all:消息携带的键值对(即消息头)必须全部匹配消息和队列绑定时配置的全部键值对(消息携带的键值对可以包含绑定键值中没有的键值对),消息才会被转发到该队列。
                • any:消息携带的键值对只要匹配消息和队列绑定时配置的任一键值对,消息就会被转发到该队列。
                  2.4.2. 绑定指定键值对但未指定 x - match

                  如果在绑定队列和交换机时指定了一组键值对,但没有指定 x - match 键值对,那么默认 x - match 为 all。

                  2.4.3. 绑定未指定键值对

                  若在绑定队列和交换机时未指定键值对,交换机也会把消息发送到该队列。

                  2.4.4. 特殊匹配说明

                  对于 any 和 all 模式,以字符串 x - 开头的消息头不会用于路由匹配。若将 x - match 设置为 any - with - x 或 all - with - x,则会使用以字符串 x - 开头的请求头进行路由匹配。

                  2.4.5. 与其他交换机对比

                  Headers 交换机同样是基于规则匹配来路由消息,与 Direct 和 Topic 交换机固定使用路由键不同,它通过自定义的消息头部键值对规则进行匹配。在队列与交换机绑定时设定一组键值对规则,消息中也包含一组键值对(即 headers 属性),当这些键值对部分或全部匹配时,消息就会被投送到对应队列。不过,Headers 交换机和 Direct 交换机功能类似,但性能远不如 Direct 交换机,目前在实际应用中几乎很少使用。

                  RabbitMq详解

                  2.5. 不同交换机使用场景

                  不同类型的交换机性能排序:fanout > direct > topic > headers。

                  不同类型的 RabbitMQ 交换机适用于不同的使用场景,以下是它们的一些常见用例:

                  • Direct Exchange(直连交换机):适用于一对一的消息分发,如订单处理系统,每个订单只发送到特定的处理队列。
                  • Fanout Exchange(扇出交换机):适用于广播消息给多个消费者,不关心消息的具体内容。如:实时日志处理,将日志广播给多个监控系统。
                  • Topic Exchange(主题交换机):根据消息的特定属性选择性地将消息路由到不同的队列。如:新闻发布系统,根据主题和地区将新闻分发给不同的订阅者。
                  • Headers Exchange(头交换机)尽量不要用,性能太低。根据消息的头部属性来选择性地将消息路由到队列。
                  • Default Exchange(默认交换机):用于简单的队列名称与绑定键相同的情况,不需要显式声明交换机。如:简单的消息发布和订阅,不需要复杂的路由规则。

                    选择适当的交换机类型取决于您的应用程序需求。通常情况下,您可以根据以下考虑来选择:

                    • 如果消息需要精确路由到特定队列,使用直连交换机。
                    • 如果需要将消息广播给多个队列,使用扇出交换机。
                    • 如果消息需要根据多个属性进行选择性路由,使用主题交换机
                    • 如果只是简单的队列名称与绑定键相同,可以使用默认交换机。

                      3. 队列 -- Queue

                      RabbitMQ中的队列是一个缓冲区,用于为消费者应用程序存储消息。这些应用程序可以创建、使用和删除队列。RabbitMQ中的队列可以是持久的、临时的或自动删除的。持久队列一直保留,直到它们被删除。临时队列一直存在,直到服务器关闭。自动删除队列在不再使用时被删除。

                      3.1. Queue的属性

                      Type:指定Queue类型。RabbitMQ提供了三种Type:

                      • Classic Queues:经典队列。这是RabbitMQ最为经典的队列类型。在单机环境中,拥有比较高的消息可靠性
                      • Quorum Queues:仲裁队列。仲裁队列相比Classic经典队列,在分布式环境下对消息的可靠性保障更高。
                      • Stream Queues:流式队列。这种队列类型的消息是持久化到磁盘并且具备分布式备份的,更适合于消费者多,读消息非常频繁的场景

                        Durability:声明队列是否持久化,即服务器重启后是否还存在

                        Auto delete:是否自动删除,如果为true,当没有消费者连接到这个队列的时候,队列会自动删除

                        Exclusive:exclusive属性的队列只对首次声明它的连接可见,并且在连接断开时自动删除;基本上不设置它,设置成false

                        Arguments:队列的其他属性,例如指定DLX(死信交换机等);

                        通过设置不同的属性,可以实现不同功能的Queue,如优先级队列,死信队列,临时队列等

                        3.2. 3.2 优先级队列

                        RabbitMq详解

                        3.2.1 什么是优先级队列

                        优先级队列是一种特殊类型的队列,它根据消息的优先级进行排序和发送,在这种队列中,高优先级的消息将先被消费。

                        优先级可以设置为1~255,但建议设置为1~5,如果最大值5满足不了需求建议1~10。更高的优先级需要更多的CPU和内存资源,因为RabbitMq需要在内部为每个优先级维护一个子队列,从1到你配置最大值。

                        优先级队列与普通队列有相同特性,如支持持久化,分页和镜像等功能,但需要注意的是消息的过期机制,过期的消息是从队列的头部开始过期的,即使你设置了队列级别的TTL,低优先级的过期消息仍然会被高优先级的未过期消息阻塞,导致无法传递,但它会出现在队列统计信息中;

                        另外,如果你队列设置了最大长度限制,RabbitMq会按正常流程从队列中删除消息,无论是高优先级还是低优先级。

                        3.2.2 如何配置优先级队列

                        • 创建Queue时,需要添加 x-max-priority 参数。此参数应为介于 1 和 255 之间的正整数,指示队列应支持的最大优先级。
                        • 创建Exchange及Binding
                        • 发布者发布消息时,在Properties中添加 priority 属性,指定消息的优先级。值越大,优先级越高。

                          没有 priority 属性的消息其优先级被视为 0。优先级高于队列最大值的消息将被视为以最大优先级发布。

                          3.2.3 优先级队列的应用

                          • 订单催付,客户下单后,商家希望在客户设定的时间内完成付款,如果为付款,我们需要推送催付短信。对于大客户的订单,我们就可以使用优先队列去处理。
                          • 任务调度,后台调度任务中,有些任务可以稍后执行,有些需要立即执行,所以我们可以用优先队列发送一些紧急任务。

                            3.3. 死信队列

                            3.3.1 什么是死信

                            死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

                            死信的来源主要有三类:

                            • 消息 TTL 过期:通过配置 x-message-ttl 参数指定消息过期实践
                            • 队列达到最大长度(队列满了,无法再添加数据到 mq 中):通过配置 x-max-length参数指定队列最大长度
                            • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

                              3.3.2 如何配置死信队列

                              RabbitMq详解

                              上图是死信队列的一个例子。我们通过交换机 tempreture.ex 接收消息,消息路由到 队列 tempreture.queue。在队列中队列 tempreture.queue中,配置超时时间(如3秒)和死信交换机tempreture.dlx.ex。 消息超时后,消息会被转移到死信交换机 tempreture.dlx.ex 中,该死信交换机绑定队列tempreture.dlx.queue。

                              RabbitMq详解

                              RabbitMq详解

                              如上例子中,队列tempreture.queue的 arguments配置了:

                              x-dead-letter-exchange:

                              tempreture.dlx.ex

                              x-message-ttl:

                              5000

                              3.4. 自动删除队列 -- Auto delete

                              临时队列是一种自动删除队列。当这个队列被创建后,如果没有消费者监听,则会一直存在,还可以不断向其发布消息。但是一旦的消费者开始监听,然后断开监听后,它就会被自动删除。

                              PS:有些地方将自动删除队列称为 临时队列。不过临时队列有时还指 非持久化队列 (与持久化队列相对应,对应参数Durability)

                              自动删除队列创建时,将Auto delete配置成Yes 即可。

                              3.5. 惰性队列

                              在RabbitMQ 3.12之前,Classic的队列可以配置为在惰性模式下运行,这意味着它们会将所有消息写入磁盘,而不会将消息保存在内存中。 3.12之后,该配置被忽略。当前的行为是:

                              • 通常情况下,消息被短暂的存储在内存中,知道缓存的消息写入磁盘
                              • 一小部分消息仍保留在内存中,以满足消息的快速传递(存储在内存中的消息数量,取决于消费者的消费速度)
                              • 如果发布的消息可以立即传递给消费者,并且消费者在消息被写入磁盘之前确认了消息,那么消息不会被写入磁盘(此时消息已经传递并得到确认,因此不需要写入)

                                3.6. 延迟队列

                                RabbitMQ本身不直接支持延迟消息队列,但是可以通过一些高级特性配合使用来实现延迟消息的功能。一种常见的方法是使用RabbitMQ的"死信队列"(Dead Letter Queues)特性配合消息的TTL(Time-To-Live)。

                                • 首先,设置一个交换机(Exchange)和队列(Queue),并将它们绑定,用于存放需要延迟处理的消息。
                                • 将消息的TTL设置为所需的延迟时间。
                                • 当消息过期,它们会被自动发送到一个死信交换机(Dead Letter Exchange)。
                                • 创建一个死信队列,并将其绑定到死信交换机。
                                • 消费者从死信队列中获取消息进行处理。
                                  3.6.1. 延迟队列使用场景
                                  • 订单在十分钟之内未支付则自动取消
                                  • 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
                                  • 用户注册成功后,如果三天内没有登陆则进行短信提醒。
                                  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
                                  • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

                                    4. 其它工作机制

                                    4.1. 消息确认机制

                                    RabbitMQ 提供两种消息确认模式,用于解决消息处理失败、消费者宕机等场景下的消息生命周期管理问题:

                                    1. 自动确认模式:消费者接收消息后,RabbitMQ 自动认为消息已被处理,无需额外确认操作。此模式虽高效,但存在消息丢失风险,适用于允许少量数据丢失的场景。
                                    2. 显式确认模式:消费者需调用basic.ack方法手动确认消息。确认时机灵活,可在消息接收后、持久化存储前或处理完成后执行。若消费者未发送确认即消亡,RabbitMQ 会将消息重新分配给其他消费者或等待新消费者接入,确保消息不丢失。但需注意,若消费者遗漏确认,可能导致队列消息堆积。

                                    4.2. 生产者确认机制

                                    为确保消息成功抵达 RabbitMQ,生产者可采用以下两种方式:

                                    1. 事务机制:通过Tx()、TxCommit()和TxRollback()方法实现事务控制。开启事务后,仅当消息成功提交至 RabbitMQ,事务才会生效;若提交前发生异常,可回滚事务并重发消息。但事务机制会使生产者处于阻塞状态,严重影响性能。
                                    2. 发送者确认机制:将信道设置为确认(confirm)模式后,每条消息会被分配唯一 ID。消息抵达匹配队列或持久化到磁盘后,RabbitMQ 会发送确认(Ack)或否定确认(Nack)。生产者可通过回调方法异步处理确认结果,支持批量确认(multiple参数)。该机制虽提升性能,但无法回滚,服务器崩溃时可能导致消息重复,需业务层实现去重逻辑。

                                    4.3. 消息持久化

                                    为防止 RabbitMQ 重启导致消息丢失,需将队列和消息均标记为持久化:

                                    1. 队列持久化:声明队列时设置durable=True,确保队列元数据存储在磁盘,如channel.queue_declare(queue='hello', durable=True)。
                                    2. 消息持久化:发送消息时设置delivery_mode = pika.DeliveryMode.Persistent,使消息写入磁盘。但需注意,持久化存在短暂未写入磁盘的窗口风险,结合发布确认机制可增强可靠性。

                                    4.4. 工作队列消息分发

                                    工作队列通过多个消费者处理同一队列消息,提升处理效率,支持两种分发策略:

                                    1. 轮询分发(Round-robin):RabbitMQ 按顺序将消息发送给消费者,实现负载均衡。但可能出现消息丢失(未处理即确认)、任务分配不均(复杂任务积压)等问题。
                                    2. 公平分发(Fair dispatch):消费者设置basic_qos(prefetch_count=1),每次仅接收一条消息,处理完成并手动确认后再获取下一条,避免忙闲不均。适用于任务处理耗时差异大的场景。

                                    4.5. 备用交换机(Alternate Exchange, AE)

                                    用于处理无法路由的消息(无匹配队列或绑定规则):当消息无法路由至目标队列时,交换机将其转发至配置的备用交换机。若 AE 仍无法路由,消息将继续沿 AE 链传递,直至成功路由或到达链尾。此机制可捕获异常消息、实现分级处理逻辑。

                                    4.6. 队列长度限制

                                    可通过策略或客户端参数设置队列最大长度(消息数或字节数):

                                    • 限制规则:仅计算就绪态消息(未确认消息不计入)。达到限制后,默认丢弃队首消息,可通过overflow参数调整行为(如reject-publish拒绝新消息、reject-publish-dlx将拒绝消息转为死信)。
                                    • 监控指标:通过messages_ready和message_bytes_ready查看就绪态消息数量及占用空间。

                                      4.7. 死信交换机(Dead Letter Exchange, DLX)

                                      当消息出现以下情况时,会被重新发布到死信交换机:

                                      1. 消费者使用basic.reject或basic.nack且requeue=false;
                                      2. 消息因 TTL 过期;
                                      3. 队列超过长度限制;
                                      4. 仲裁队列中消息重试次数超过delivery-limit。

                                      可通过客户端参数(x-dead-letter-exchange)或策略配置 DLX,死信消息将使用原路由键或指定路由键重新路由,需避免死信循环导致消息无限流转。

                                      4.8. 优先级队列

                                      RabbitMQ 支持为经典队列设置优先级(1-255,建议 1-5):

                                      1. 声明方式:通过x-max-priority参数声明队列支持的最大优先级,如args.put("x-max-priority", 10);发布消息时设置priority字段指定优先级。
                                      2. 行为特性:高优先级消息优先处理,但需注意内存、CPU 开销;未设置优先级的消息默认为 0;结合basic.qos可避免优先级失效(如消费者预取过多导致高优先级消息等待)。

                                      4.9. 延迟消息

                                      通过rabbitmq_delayed_message_exchange插件实现:发送消息时设置x-delay头(单位毫秒)指定延迟时间,到期后消息将路由至匹配队列。若无法路由,消息将被丢弃。

                                      4.10. 生存时间(Time-To-Live, TTL)

                                      可分别为消息和队列设置过期时间:

                                      1. 消息 TTL:通过策略(message-ttl)或消息属性(expiration)设置,取队列级和消息级 TTL 的最小值。过期消息将被丢弃或转为死信(取决于队列类型)。
                                      2. 队列 TTL:通过策略(expires)或参数(x-expires)设置,仅适用于非持久化经典队列,未使用(无消费者、未重声明、无basic.get调用)的队列将在到期后自动删除。

                                      四、SpringBoot中使用RabbitMQ

                                      1. 简单模式

                                      基础配置

                                      首先创建两个SpringBoot项目,一个作为消费者,一个作为生产者,并导入依赖项。

                                           org.springframework.boot
                                           spring-boot-starter-amqp
                                      

                                      在两个项目的yaml文件中写入rabbitmq的配置信息

                                      spring:
                                        rabbitmq:
                                          host: localhost # 主机名
                                          port: 5672 # 端口
                                          virtual-host: / # 虚拟主机
                                          username: guest # 用户名
                                          password: guest # 密码

                                      在rabbitmq中创建一个队列叫做queue1

                                      RabbitMq详解

                                      生产者

                                      在生产者中编写测试类,发送消息

                                      import org.junit.jupiter.api.Test;
                                      import org.springframework.amqp.rabbit.core.RabbitTemplate;
                                      import org.springframework.beans.factory.annotation.Autowired;
                                      import org.springframework.boot.test.context.SpringBootTest;
                                      @SpringBootTest
                                      class RabbitPublisherApplicationTests {
                                       
                                          @Autowired
                                          private RabbitTemplate rabbitTemplate;
                                       
                                          @Test
                                          public void SimpleQueue(){
                                              //队列名字
                                              String name = "queue1";
                                              //具体的消息
                                              String message = "你好!!!rabbit";
                                              //发送消息
                                              rabbitTemplate.convertAndSend(name,message);
                                          }
                                      }

                                      执行测试类后在rabbitmq的相应队列中就会有消息了。

                                      RabbitMq详解

                                      消费者

                                      随后编写消费者来消费消息。

                                      创建一个监听类,来监听具体的队列

                                      @Component
                                      public class SpringRabbitListener {
                                          /**
                                           * 从队列queue1中取消息
                                           * @param msg
                                           */
                                          @RabbitListener(queues = "queue1")
                                          public void listenSimpleQueueMessage(String msg) {
                                              System.out.println("spring 消费者接收到消息:" + msg);
                                          }
                                      }

                                      随后启动消费者项目,就可以从队列中成功获取到消息了。

                                      RabbitMq详解

                                      2. 工作队列模式

                                      编写方法发送20条消息

                                          @Test
                                          public void WorkQueue(){
                                              String name = "work.queue";
                                              for(int i=0;i {
                                                  // 发送消息的时候 延迟时长
                                                  msg.getMessageProperties().setDelay(expiration);
                                                  return msg;
                                              });
                                          }
                                          public CustomExchange createCustomExchange() {
                                              Map arguments = new HashMap();
                                              /**
                                      		 * 参数说明:
                                      		 * 1.交换机的名称
                                      		 * 2.交换机的类型
                                      		 * 3.是否需要持久化
                                      		 * 4.是否自动删除
                                      		 * 5.其它参数
                                      		 */
                                              arguments.put("x-delayed-type", "direct");
                                              return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message", true, false, arguments);
                                          }
                                      }
                                      7.4.3. 生产者
                                      @Autowired
                                      private DelayedQueue delayedQueue;
                                      /**
                                      * 发送延迟队列
                                      * @param queueName 队列名称
                                      * @param params 消息内容
                                      * @param expiration 延迟时间 毫秒
                                      */
                                      @GetMapping("/test9")
                                      public void topicTest8() {
                                          delayedQueue.sendDelayedQueue("delayTest2","这是消息",5000);
                                      }
                                      
                                      7.4.4. 消费者
                                      @RabbitListener(queuesToDeclare = @Queue(value = "delayTest2",durable = "true"))
                                      public void declareExchange2(String message){
                                          System.out.println("delayTest2 = " + message);
                                      }

                                      7.5. TTL队列

                                      TTL是time to live的缩写,生存时间,RabbitMQ支持消息的过期时间,消息发送时可以指定,从消息入队列开始计算,只要超过队列的超时时间配置,消息没被接收,消息就会自动清除

                                      7.5.1. 封装发送TTL队列工具类
                                      import org.springframework.amqp.core.*;
                                      import org.springframework.amqp.rabbit.core.RabbitAdmin;
                                      import org.springframework.amqp.rabbit.core.RabbitTemplate;
                                      import org.springframework.beans.factory.annotation.Autowired;
                                      import org.springframework.stereotype.Component;
                                      import javax.annotation.Resource;
                                      import java.util.HashMap;
                                      import java.util.Map;
                                      @Component
                                      public class TtlQueue {
                                          // routingKey
                                          private static final String TTL_KEY = "ttl.routingkey";
                                          private static final String TTL_EXCHANGE = "ttl.exchange";
                                          @Autowired
                                          RabbitTemplate rabbitTemplate;
                                          @Resource
                                          RabbitAdmin rabbitAdmin;
                                          /**
                                      	 * 发送TTL队列
                                      	 * @param queueName 队列名称
                                      	 * @param params 消息内容
                                      	 * @param expiration 过期时间 毫秒
                                      	 */
                                          public void sendTtlQueue(String queueName, Object params, Integer expiration) {
                                              /**
                                      		 * ----------------------------------先创建一个ttl队列--------------------------------------------
                                      		 */
                                              Map map = new HashMap();
                                              // 队列设置存活时间,单位ms,必须是整形数据。
                                              map.put("x-message-ttl",expiration);
                                              /*参数1:队列名称  参数2:持久化  参数3:是否排他 参数4:自动删除队列  参数5:队列参数*/
                                              Queue queue = new Queue(queueName,true,false,false,map);
                                              rabbitAdmin.declareQueue(queue);
                                              /**
                                      		 * ---------------------------------创建交换机---------------------------------------------
                                      		 */
                                              DirectExchange directExchange = new DirectExchange(TTL_EXCHANGE, true, false);
                                              rabbitAdmin.declareExchange(directExchange);
                                              /**
                                      		 * ---------------------------------队列绑定交换机---------------------------------------------
                                      		 */
                                              // 将队列和交换机绑定
                                              Binding binding = BindingBuilder.bind(queue).to(directExchange).with(TTL_KEY);
                                              rabbitAdmin.declareBinding(binding);
                                              // 发送消息
                                              rabbitTemplate.convertAndSend(TTL_EXCHANGE,TTL_KEY,params);
                                          }
                                      }
                                      
                                      7.5.2. 生产者
                                      @Autowired
                                      private TtlQueue ttlQueue;
                                      /**
                                      * 发送TTL队列
                                      * @param queueName 队列名称
                                      * @param params 消息内容
                                      * @param expiration 过期时间 毫秒
                                      */
                                      @GetMapping("/test10")
                                      public void topicTest10() {
                                          ttlQueue.sendTtlQueue("ttlQueue","这是消息内容",5000);
                                      }
                                      
                                      7.5.3. 消费者
                                      @RabbitListener(queues = "ttlQueue" )
                                      public void ttlQueue(String message){
                                          System.out.println("message = " + message);
                                      }

                                      7.6. 死信队列

                                      DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器。队列消息变成死信(deadmessage)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。

                                      消息变成死信的几种情况:

                                      1.消息被拒绝(basic.reject/ basic.nack)并且requeue=false

                                      2. 消息TTL过期

                                      3. 队列达到最大长度

                                      流程:发送消息,消息过期后进入到另一个队列(这个队列设置持久化,不过期)的过程。

                                      RabbitMq详解

                                      7.6.1. 封装发送死信队列工具类
                                      import org.springframework.amqp.core.Binding;
                                      import org.springframework.amqp.core.BindingBuilder;
                                      import org.springframework.amqp.core.DirectExchange;
                                      import org.springframework.amqp.core.Queue;
                                      import org.springframework.amqp.rabbit.core.RabbitAdmin;
                                      import org.springframework.amqp.rabbit.core.RabbitTemplate;
                                      import org.springframework.beans.factory.annotation.Autowired;
                                      import org.springframework.stereotype.Component;
                                      import javax.annotation.Resource;
                                      import java.util.HashMap;
                                      import java.util.Map;
                                      @Component
                                      public class DLXQueue {
                                          // routingKey
                                          private static final String DEAD_ROUTING_KEY = "dead.routingkey";
                                          private static final String ROUTING_KEY = "routingkey";
                                          private static final String DEAD_EXCHANGE = "dead.exchange";
                                          private static final String EXCHANGE = "common.exchange";
                                          @Autowired
                                          RabbitTemplate rabbitTemplate;
                                          @Resource
                                          RabbitAdmin rabbitAdmin;
                                          /**
                                      	 * 发送死信队列,过期后进入死信交换机,进入死信队列
                                      	 * @param queueName 队列名称
                                      	 * @param deadQueueName 死信队列名称
                                      	 * @param params 消息内容
                                      	 * @param expiration 过期时间 毫秒
                                      	 */
                                          public void sendDLXQueue(String queueName, String deadQueueName,Object params, Integer expiration){
                                              /**
                                      		 * ----------------------------------先创建一个ttl队列和死信队列--------------------------------------------
                                      		 */
                                              Map map = new HashMap();
                                              // 队列设置存活时间,单位ms,必须是整形数据。
                                              map.put("x-message-ttl",expiration);
                                              // 设置死信交换机
                                              map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
                                              // 设置死信交换器路由键
                                              map.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
                                              /*参数1:队列名称  参数2:持久化  参数3:是否排他 参数4:自动删除队列  参数5:队列参数*/
                                              Queue queue = new Queue(queueName,true,false,false,map);
                                              rabbitAdmin.declareQueue(queue);
                                              /**
                                      		 * ---------------------------------创建交换机---------------------------------------------
                                      		 */
                                              DirectExchange directExchange = new DirectExchange(EXCHANGE, true, false);
                                              rabbitAdmin.declareExchange(directExchange);
                                              /**
                                      		 * ---------------------------------队列绑定交换机---------------------------------------------
                                      		 */
                                              Binding binding = BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY);
                                              rabbitAdmin.declareBinding(binding);
                                              /**
                                      		 * ---------------------------------在创建一个死信交换机和队列,接收死信队列---------------------------------------------
                                      		 */
                                              DirectExchange deadExchange = new DirectExchange(DEAD_EXCHANGE, true, false);
                                              rabbitAdmin.declareExchange(deadExchange);
                                              Queue deadQueue = new Queue(deadQueueName,true,false,false);
                                              rabbitAdmin.declareQueue(deadQueue);
                                              /**
                                      		 * ---------------------------------队列绑定死信交换机---------------------------------------------
                                      		 */
                                              // 将队列和交换机绑定
                                              Binding deadbinding = BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY);
                                              rabbitAdmin.declareBinding(deadbinding);
                                              // 发送消息
                                              rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,params);
                                          }
                                      }
                                      
                                      7.6.2. 生产者
                                      @Autowired
                                      private DLXQueue dlxQueue;
                                      /**
                                      	 * 发送死信队列,过期后进入死信交换机,进入死信队列
                                      	 * @param queueName 队列名称
                                      	 * @param deadQueueName 死信队列名称
                                      	 * @param params 消息内容
                                      	 * @param expiration 过期时间 毫秒
                                      	 */
                                      @GetMapping("/test11")
                                      public void topicTest11() {
                                          dlxQueue.sendDLXQueue("queue","deadQueue","这是消息内容",5000);
                                      }
                                      7.6.3. 消费者
                                      // 接收转移后的队列消息
                                      @RabbitListener(queuesToDeclare = @Queue(value = "deadQueue",durable = "true"))
                                      public void ttlQueue(String message){
                                          System.out.println("message = " + message);
                                      }

                                      7.7. 消息确认

                                      7.7.1. 发送消息确认机制

                                      为确保消息发送有真的发送出去,设置发布时确认,确认消息是否到达 Broker 服务器

                                      spring:
                                        rabbitmq:
                                          host: 47.99.110.29
                                          port: 5672
                                          username: guest
                                          password: guest
                                          virtual-host: /
                                          listener:
                                            simple:
                                              prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
                                          publisher-confirm-type: correlated   #确认消息已发送到交换机(Exchange)
                                          publisher-returns: true  #确认消息已发送到队列(Queue)

                                      如果有使用rabbitAdmin配置的话,那里也需要加配置

                                      7.7.1.1. 修改RabbitAdmin配置
                                      import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
                                      import org.springframework.amqp.rabbit.connection.ConnectionFactory;
                                      import org.springframework.amqp.rabbit.core.RabbitAdmin;
                                      import org.springframework.beans.factory.annotation.Value;
                                      import org.springframework.context.annotation.Bean;
                                      import org.springframework.context.annotation.Configuration;
                                      @Configuration
                                      public class RabbitAdminConfig {
                                          @Value("${spring.rabbitmq.host}")
                                          private String host;
                                          @Value("${spring.rabbitmq.username}")
                                          private String username;
                                          @Value("${spring.rabbitmq.password}")
                                          private String password;
                                          @Value("${spring.rabbitmq.virtualhost}")
                                          private String virtualhost;
                                          @Bean
                                          public ConnectionFactory connectionFactory(){
                                              CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
                                              connectionFactory.setAddresses(host);
                                              connectionFactory.setUsername(username);
                                              connectionFactory.setPassword(password);
                                              connectionFactory.setVirtualHost(virtualhost);
                                              // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
                                              connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
                                              connectionFactory.setPublisherReturns(true);
                                              return connectionFactory;
                                          }
                                          @Bean
                                          public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
                                              RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
                                              rabbitAdmin.setAutoStartup(true);
                                              return rabbitAdmin;
                                          }
                                      }
                                      7.7.1.2. 实现发送消息确认接口

                                      消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调 。

                                      /**
                                       * 消息发送确认配置
                                       */
                                      @Component
                                      public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback{
                                          @Autowired
                                          private RabbitTemplate rabbitTemplate;
                                          @PostConstruct // @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执
                                          public void init(){
                                              rabbitTemplate.setConfirmCallback(this);
                                          }
                                          /**
                                      	 * 交换机不管是否收到消息的一个回调方法
                                      	 * @param correlationData 消息相关数据
                                      	 * @param ack 交换机是否收到消息
                                      	 * @param cause 失败原因
                                      	 */
                                          @Override
                                          public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                                              if (ack){ // 消息投递到broker 的状态,true表示成功
                                                  System.out.println("消息发送成功!");
                                              }else { // 发送异常
                                                  System.out.println("发送异常原因 = " + cause);
                                              }
                                          }
                                      }
                                      
                                      7.7.1.3. 实现发送消息回调接口

                                      如果消息未能投递到目标queue里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

                                      @Component
                                      public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback {
                                      	@Autowired
                                      	private RabbitTemplate rabbitTemplate;
                                      	@PostConstruct // @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执
                                      	public void init(){
                                      		rabbitTemplate.setReturnsCallback(this);
                                      	}
                                      	@Override
                                      	public void returnedMessage(ReturnedMessage returnedMessage) {
                                      		System.out.println("消息"+returnedMessage.getMessage().toString()+"被交换机"+returnedMessage.getExchange()+"回退!"
                                      				+"退回原因为:"+returnedMessage.getReplyText());
                                      		// 回退了所有的信息,可做补偿机制
                                      	}
                                      }
                                      
                                      7.7.2. 消费者消息确认机制

                                      为确保消息消费成功,需设置消费者消息确认机制,如果消费失败或异常了,可做补偿机制。

                                      7.7.2.1. 配置
                                      spring:
                                        rabbitmq:
                                          host: 47.99.110.29
                                          port: 5672
                                          username: guest
                                          password: guest
                                          virtual-host: /
                                          # 消费者配置
                                          listener:
                                            simple:
                                              prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
                                              acknowledge-mode: manual # 设置消费端手动ack确认
                                              retry:
                                                enabled: true # 是否支持重试
                                          # 生产者配置      
                                          publisher-confirm-type: correlated   #确认消息已发送到交换机(Exchange)
                                          publisher-returns: true  #确认消息已发送到队列(Queue)
                                      7.7.2.2. channel.basicAck消息确认

                                      消费者修改,利用消费者参数Channel 进行消息确认操作

                                      @RabbitListener(queuesToDeclare = @Queue(value = "simple.queue",durable = "true")) // queuesToDeclare 自动声明队列
                                      public void holloWordListener(String msg, Channel channel, Message message) throws IOException {
                                          // 消息
                                          System.out.println("msg = " + msg);
                                          /**
                                      		 * 确认
                                      		 * deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加
                                      		 * multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
                                      		 */ 
                                          channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                                      }
                                      7.7.2.3. channel.basicNack消息回退

                                      将消息重返队列

                                      @RabbitListener(queuesToDeclare = @Queue(value = "simple.queue",durable = "true")) // queuesToDeclare 自动声明队列
                                      public void holloWordListener(String msg, Channel channel, Message message) throws IOException {
                                          try {
                                              // 消息
                                              System.out.println("msg = " + msg);
                                              throw new RuntimeException("来个异常");
                                          } catch (Exception e) {
                                              e.printStackTrace();
                                              System.out.println("消息消费异常,重回队列");
                                              /**
                                      			 * deliveryTag:表示消息投递序号。
                                      			 * multiple:是否批量确认。
                                      			 * requeue:值为 true 消息将重新入队列。
                                      			 */
                                              channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
                                          }
                                          // 确认
                                          /**
                                          * deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加
                                          * multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
                                          */
                                          channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                                      }
                                      7.7.2.4. channel.basicReject消息拒绝

                                      拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

                                      /**
                                       * 消息拒绝
                                       * deliveryTag:表示消息投递序号。
                                       * requeue:值为 true 消息将重新入队列。
                                       */
                                      channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
                                      7.7.3. 封装消息确认处理类

                                      我们在消费mq的时候都会做消息确认机制,消息要是消费过程中发送异常,将消息回退。

                                      7.7.3.1. 正常代码
                                      @RabbitListener(queuesToDeclare = @Queue(value = "simple.queue",durable = "true")) // queuesToDeclare 自动声明队列
                                      public void holloWordListener(String msg, Channel channel, Message message) throws IOException {
                                          try {
                                              // 消息消费
                                              System.out.println("msg = " + msg);
                                              // 消息消费。。。。。。
                                          } catch (Exception e) {
                                              e.printStackTrace();
                                              System.out.println("消息消费异常,重回队列");
                                              // 回退
                                              channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
                                          }
                                          // 确认
                                          channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                                      }

                                      但是这样会有一个缺点,要是我们有很多个消费者,重复的代码太多。

                                      7.7.3.2. 封装后的代码

                                      封装后我们只需要处理消息内容

                                      @RabbitListener(queues = "rabbitListener")
                                      public void nonTicketUploadHandler(String json, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
                                          super.onMessage(json, deliveryTag, channel, new MqListener() {
                                              @Override
                                              public void handler(String msg, Channel channel) throws IOException {
                                                  // 消息消费
                                                  System.out.println("msg = " + msg);
                                                  // 消息消费。。。。。。
                                              }
                                          });
                                      }

                                      封装消息确认

                                      import com.rabbitmq.client.Channel;
                                      import lombok.extern.slf4j.Slf4j;
                                      import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
                                      import java.io.IOException;
                                      @Slf4j
                                      public class BaseRabbiMqHandler {
                                          public void onMessage(T t, Long deliveryTag, Channel channel, MqListener mqListener) {
                                              try {
                                                  // 通过实现这个接口,处理我们的业务代码
                                                  mqListener.handler(t, channel);
                                                  /**
                                                   * 手动确认消息
                                                   * deliveryTag:该消息的index
                                                   *  false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除 )
                                                    */
                                                  channel.basicAck(deliveryTag, false);
                                              }
                                              catch (Exception e) { log.error("接收消息失败,重新放回队列,异常原因:{}",e.getMessage());
                                                                    try {
                                                                        /**
                                                       * 重回队列
                                                       * deliveryTag:该消息的index
                                                       * multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
                                                       * requeue:被拒绝的是否重新入队列
                                                       */
                                                                        //                channel.basicAck(deliveryTag, false);
                                                                        channel.basicNack(deliveryTag, false, true);
                                                                    } catch (IOException ex) {
                                                                        ex.printStackTrace();
                                                                    }
                                                                  }
                                          }
                                      }
                                      

                                      mqListener接口

                                      业务通过实现这个接口处理

                                      import com.rabbitmq.client.Channel;
                                      import java.io.IOException;
                                      public interface MqListener {
                                          // 业务通过实现这个接口处理
                                          default void handler(T map, Channel channel) throws IOException {
                                          }
                                      }

                                      mq消费者

                                      消费者代码示例

                                      import com.rabbitmq.client.Channel;
                                      import org.jeecg.boot.starter.rabbitmq.core.BaseRabbiMqHandler;
                                      import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
                                      import org.springframework.amqp.rabbit.annotation.RabbitListener;
                                      import org.springframework.amqp.support.AmqpHeaders;
                                      import org.springframework.messaging.handler.annotation.Header;
                                      import java.io.IOException;
                                      public class TestListener extends BaseRabbiMqHandler { // 继承封装的消息确认类
                                      	@RabbitListener(queues = "rabbitListener")
                                      	public void nonTicketUploadHandler(String json, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
                                      		super.onMessage(json, deliveryTag, channel, new MqListener() { // 实现父类的消息方法里的接口
                                      			@Override 
                                      			public void handler(String msg, Channel channel) throws IOException {
                                      				// 业务代码写这里,try/catch不用再写了
                                      				System.out.println("msg = " + msg);
                                      				// 消息消费。。。。。。
                                      			}
                                      		});
                                      	}
                                      }

                                      8. rabbitmq集群搭建

                                      8.1. 普通集群

                                      RabbitMq详解

                                      1、新建三个docker容器

                                      docker run -d --hostname rabbit1 --name myrabbit1  -v /home/rabbitmq/data:/var/lib/rabbitmq -p 15671:15672 -p 5671:5672 rabbitmq
                                      docker run -d --hostname rabbit2 --name myrabbit2  -v /home/rabbitmq/data:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 --link myrabbit1:rabbit1 rabbitmq
                                      docker run -d --hostname rabbit3 --name myrabbit3  -v /home/rabbitmq/data:/var/lib/rabbitmq -p 15673:15672 -p 5673:5672 --link myrabbit1:rabbit1 --link myrabbit2:rabbit2 rabbitmq
                                      

                                      2、三个都进入容器下载可视化工具

                                      RabbitMq详解

                                      3、进入第一个mq容器重启

                                      docker exec -it ef4a1f0fade7 /bin/bash
                                      rabbitmqctl stop_app
                                      rabbitmqctl reset
                                      rabbitmqctl start_app
                                      exit

                                      4、进入第二个 和 第三个 mq容器执行

                                      docker exec -it e36d94d40008 /bin/bash
                                      rabbitmqctl stop_app
                                      rabbitmqctl reset
                                      rabbitmqctl join_cluster --ram rabbit@rabbit1   //如遇到报错再执行上句、再继续执行
                                      rabbitmqctl start_app
                                      exit

                                      5、进去mq可视化界面,overview面板中的Nodes可查看到节点信息。

                                      RabbitMq详解

                                      6、测试,在mq上新建交换机、其余两个也出现新建的交换机

                                      RabbitMq详解

                                      此时普通集群已构建完成

                                      1、此种集群主节点down掉后,消费者也无法消费从节点的消息,不能做故障转移,只能当作备份。

                                      2、主节点正常,从节点则可以消费消息

                                      8.2. 镜像集群(高可用)

                                      这种集群弥补第一种的缺陷,需在普通集群的基础下搭建(确保第一种集群可用)

                                      镜像队列机制就是将队列在三个节点之间设置主从关系,消息会在三个节点之间进行自动同步,且如果其中一个节点不可用,并不会导致消息丢失或服务不可用的情况,提升mq集群的高可用性。

                                      RabbitMq详解

                                      1、配置集群架构

                                      RabbitMq详解

                                      2、进入任意节点配置策略

                                      docker exec -it ef4a1f0fade7 /bin/bash
                                      rabbitmqctl set_policy ha-all "^rabbitmq" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

                                      3、测试,新建一个rabbitmq开头的队列

                                      RabbitMq详解

                                      此时某个节点down掉(包括主节点),其余节点也能消费

                                      将主节点down掉,节点自动切换

                                      RabbitMq详解

                                      4、清除策略

                                      rabbitmqctl clear_policy ha-all

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码