RabbitMQ的高级特性

06-01 1130阅读

文章目录

  • 1、消息确认
    • 原生API
    • Spring.AMQP
    • 2、发送方确认(Spring)
      • Confirm确认模式
      • Retrurn回退:
      • 3、持久化
        • 交换机持久化
        • 队列持久化
        • 消息持久化
        • 4、重试机制
        • 5、TTL(过期时间)
          • 消息TTL
          • 队列TTL
          • 6、死信队列
          • 7、延迟队列
            • TTL+DLQ
            • 延迟队列插件
              • 1. [安装教程](https://www.cnblogs.com/isunsine/p/11572457.html)
              • 2. 代码实现
              • 8、事务
                • 概念理解
                • 代码实现
                • 9、消息分发
                  • 代码实现

                    1、消息确认

                    消息确认机制作用于队列和消费者之间,当消费者接收到消息,会执行回调函数handleDelivery,发送一个回调信息给到队列,告诉队列它已经正确接收到了消息,以此保证消息的可靠传递。

                    原生API

                    RabbitMQ支持自动确认和手动确认(autoAck)。

                    • 自动确认:

                      autoAck=true,发送完该消息,该消息就会从队列中移除,不论消费者是否真正消费。适合可靠性要求底的场景。

                    • 手动确认:

                      autoAck=false,消息发送后,需要消费者发送确认信号,才能从队列中移除该消息,否则一直存在于队列中。

                      String basicConsume(String queue, boolean autoAck, Consumer callback) throws
                      IOException;
                      

                      callback用于指定回调方式:

                      DefaultConsumer consumer = new DefaultConsumer(channel) {
                      //回调函数
                       @Override
                       public void handleDelivery(String consumerTag, Envelope envelope,
                      AMQP.BasicProperties properties, byte[] body) throws IOException {
                      	//如果autoAck=false,需要指定消息确认方式
                       System.out.println("接收到消息: " + new String(body));
                       
                       }
                      };
                      //消费消息
                      channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);
                      

                      RabbitMQ原生API中消息确认方式有三种:

                      1. 肯定确认: Channel.basicAck(long deliveryTag, boolean multiple)

                        通过信道告知队列消费者已经处理了该消息,队列会主动把该消息从磁盘中删除

                        参数说明:

                        1)deliveryTag是每个信道独自维护的自增唯一ID用于区分不同消息。

                        2)multiple用于标识是否批量确认接收到的消息。

                      2. 否定确认: Channel.basicReject(long deliveryTag, boolean requeue)

                      3. 否定确认Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

                      💡TIPS

                      1. requeue的含义是如果否定确认是否让队列重新发送消息。
                      2. basicReject和basicReject都是用于否定确认的,唯一的区别在于basicReject一次性只能确认一条消息,basicNack可以批量否定(multiple)

                      Spring.AMQP

                      Spring 框架中的消息确认机制(通常通过 Spring AMQP 提供)是对 RabbitMQ 原生 API 的封装。Spring AMQP 提供了一种更高层次的抽象,简化了与 RabbitMQ 的交互,底层使用的还是RabbitMQ原生API。

                      Spring提供了三种确认模式:

                      spring:
                        application:
                          name:
                            extensions
                        rabbitmq:
                          addresses: amqp://admin:password@ip:port/virtualhost_name
                          listener:
                            simple:
                            #	acknowledge-mode: none  # 不做处理
                            	acknowledge-mode: auto  # 自动模式 spring默认配置
                            # acknowledge-mode: manual  # 手动模式 
                      
                      • none

                        • 队列发送完消息,不论消费者是否正确处理,RabbitMQ自动ACK消息,从队列移除该消息,因此如果消费者没有正确处理消息,消息可能丢失。
                        • auto

                          • 消息正确处理,自动确认消息,移除队列;如果处理失败,消息不会被移除,RabbitMQ会一直重发该消息直到处理成功(每次重试DeliveryTag会自增);如果一直处理失败,消息就会一直处于UnAck状态,导致消息积压。
                          • manual

                            • 消息发送后,不会自动确认,需要消费端使用channel.basicAck(deliveryTag, multiple) 或channel.basicNack(deliveryTag, multiple,requeue),又或者channel.basicReject(deliveryTag, requeue)进行确认。如果不进行确认,消息会处于UnAck状态,requeue为true表示重发消息,反之不会。

                              💡

                              basicNack和basicReject的唯一区别是前者可以批量否定确认,后者只能单个消息否定确认。

                              代码演示(manual模式为例):

                              spring:
                                application:
                                  name:
                                    extensions
                                rabbitmq:
                                  addresses: amqp://账号:密码@113.44.150.39:5672/extension
                                  listener:
                                    simple:
                                      acknowledge-mode: manual  # 消息接收确认模式
                              
                              //绑定关系
                              @Configuration
                              public class RabbitMQConfig {
                                  @Bean("ackQueue")
                                  public Queue ackQueue(){
                                      return QueueBuilder.durable(Constants.ACK_QUEUE).build();
                                  }
                                  @Bean("ackExchange")
                                  public DirectExchange ackExchange(){
                                      return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();
                                  }
                                  @Bean("ackBind")
                                  public Binding ackBind(@Qualifier("ackExchange") DirectExchange directExchange, @Qualifier("ackQueue") Queue ackQueue) {
                                      return BindingBuilder.bind(ackQueue).to(directExchange).with("ack");
                                  }
                              }
                              
                                  @RequestMapping("ack")
                                  public String ack() {
                                      //默认情况下,发送持久化消息
                                      rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "消息发送了,用的Direct类型交换机,绑定的ack key");
                                      return "消息发送成功";
                                  }
                              
                              @Component
                              public class AckListener {
                                  @RabbitListener(queues = Constants.ACK_QUEUE)
                                  public void handerMessage(Message message, Channel channel) throws Exception {
                                      try {
                                          System.out.println("消费者接收到消息:"
                                                  + new String(message.getBody(), "UTF-8") +
                                                  message.getMessageProperties().getDeliveryTag());
                              //        //业务逻辑处理,此处一定会抛出异常,进行reject
                                          int num = 3 / 0;
                                          System.out.println("业务处理完成");
                                      } catch (Exception e) {
                                          channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);
                                      channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
                                      }
                                  }
                              }
                              

                              此时我们发送消息给RabbitMQ服务器,就会不断重试(requeue=true):

                              RabbitMQ的高级特性


                              2、发送方确认(Spring)

                              发送方确认作用与生产者和RabbitMQ服务器之间,也是为了保证数据传递的可靠性。发送方确认分为两个模式:Confirm确认和Retrurn回退。

                              Confirm确认模式

                              1. 配置:
                              spring:
                               rabbitmq:
                                 addresses: amqp://账号:密码@110.41.51.65:15673/bite
                                 listener:
                                   simple:
                               	   acknowledge-mode: manual #消息接收确认
                               	 publisher-confirm-type: correlated #消息发送确认
                              
                              1. 定义消息确认的回调逻辑,并发送消息

                                配置yml之后,Spring会自动创建connectionFactory

                                @Bean("confirmRabbitTemplate")
                                public RabbitTemplate confirmRabbitTemplate(ConnectionFactory
                                     connectionFactory) {
                                    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
                                    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                                        @Override
                                        public void confirm(CorrelationData correlationData, boolean ack,
                                                            String cause) {
                                            System.out.printf("");
                                            if (ack) {
                                                System.out.printf("消息接收成功, id:%s \n",
                                                        correlationData.getId());//这个id用来区分不同的消息
                                            } else {
                                                System.out.printf("消息接收失败, id:%s, cause: %s",
                                                        correlationData.getId(), cause);
                                            }
                                        }
                                    });
                                    return rabbitTemplate;
                                }
                              
                                  @Resource(name = "confirmRabbitTemplate")
                                  private RabbitTemplate confirmRabbitTemplate;
                                  @RequestMapping("/confirm")
                                  public String confirm() throws InterruptedException {
                                      CorrelationData correlationData1 = new CorrelationData("1");
                                      confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME,
                                              "confirm", "confirm test...", correlationData1);
                                      return "确认成功";
                                  }
                              

                              说明:

                              	@FunctionalInterface
                              	public interface ConfirmCallback {
                              		/**
                              		 * Confirmation callback.
                              		 * @param correlationData 发送消息时的附加消息,识别特定消息
                              		 * @param ack 消息被exchange确认,为true,否则false
                              		 * @param cause 消息确认失败时,存储的出错原因
                              		 */
                              		void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
                              	}
                              

                              💡RabbitTemplate.ConfirmCallback 和 ConfirmListener 区别

                              1. 前者作用域生产者和exchange之间,后者作用与队列和消费者之间
                              2. RabbitTemplate.ConfirmCallback是Spring.AMQP实现,只有一个需要重写的方法confirm()用于确认回调。
                              3. ConfirmListener来自于RabbitMQ原生API,内含handleAck 和handleNack, ⽤于处理消息确认和否定确认。

                              Retrurn回退:

                              confirm确认保证的是生产者和exchange之间的可靠性,Return回退保证的是exchange能够正确路由到指定队列,如果消息没有被路由到任何队列,把消息回退给生产者。

                              1. Spring的yml配置和confirm确认一致。
                              2. 定义回退逻辑并发送消息
                                  @Bean
                                  public RabbitTemplate confirmRabbitTemplate(ConnectionFactory factory) {
                                      RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
                                           
                                      //消息被退回时,回调下面的方法
                                      rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
                                          @Override
                                          public void returnedMessage(ReturnedMessage returned) {
                                              System.out.println("消息退回:" + returned);
                                          }
                                      });
                                      return rabbitTemplate;
                                  }
                              
                                @RequestMapping("/returns")
                                  public String returns() {
                                      CorrelationData correlationData = new CorrelationData("5");
                                      //回退模式必须加上这个设置
                                      confirmRabbitTemplate.setMandatory(true);
                                      confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE
                                              , "confirm***"
                                              , "return test..."
                                              , correlationData);
                                      return "回退模式,消息发送了";
                                  }
                              

                              💡TIPS

                              1. setMandatory

                                设置为true告诉RabbitMQ,如果exchange没有把该消息成功给到任何队列,执行回退,因此setMandatory必须加。

                              2. ReturnedMessage参数中的信息
                              public class ReturnedMessage {
                                  //返回的消息对象,包含了消息体和消息属性
                                  private final Message message;
                                  //由Broker提供的回复码, 表⽰消息⽆法路由的原因. 通常是⼀个数字代码,每个数字代表不同的含义 .
                                  private final int replyCode;
                                  //⼀个⽂本字符串, 提供了⽆法路由消息的额外信息或错误描述.
                                  private final String replyText;
                                  //消息被发送到的交换机名称
                                  private final String exchange;
                                  //消息的路由键,即发送消息时指定的键
                                  private final String routingKey;
                              }
                              

                              3、持久化

                              如果RabbitMQ服务器宕机,某些重要数据遗失可能造成严重后果,RabbbitMQ提供持久化功能,保证消息不会丢失。

                              RabbitMQ的持久化分为三个部分:

                              1. 交换器的持久化
                              2. 队列的持久化
                              3. 消息的持久化

                              交换机持久化

                              当MQ的服务器发⽣意外或关闭之后,重启 RabbitMQ 时不需要重新去建⽴交换机, 交换机会⾃动建⽴,相当于⼀直存在。如果是一个需要长期使用的交换机,建议进行持久化。

                                  @Bean("ackExchange")
                                  public DirectExchange ackExchange(){
                                      return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();
                                  }
                              

                              上面这种方式创建交换机,默认是持久化的,如果想要设置非持久化,这样设置:

                                 @Bean("ackExchange")
                                  public DirectExchange ackExchange(){
                                      return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(false).build();
                                  }
                              

                              队列持久化

                              如果队列不设置持久化,RabbitMQ重启,队列也会跟着消息,队列中的消息也会丢失。

                              队列设置持久化方式和Exchange有所不同:

                                  @Bean("ackQueue")
                                  public Queue ackQueue(){
                                      return QueueBuilder.durable(Constants.ACK_QUEUE).build();
                                  }
                              

                              设置非持久化:

                                @Bean("ackQueue")
                                  public Queue ackQueue(){
                                      return QueueBuilder.nonDurable(Constants.ACK_EXCHANGE).build();
                                  }
                              

                              消息持久化

                              如果队列是持久化的,消息不是持久化的,RabbitMQ重启,消息也会丢失。换言之,队列和消息共同持久化才能保证消息在重启之后不会丢失。

                              设置消息持久化需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2,也就是 MessageDeliveryMode.PERSISTENT

                              public enum MessageDeliveryMode {
                               		NON_PERSISTENT,//⾮持久化
                               		PERSISTENT;//持久化
                               }
                              

                              RabbitMQ原生API设置持久化:

                              RabbitMQ的高级特性

                              💡TIP

                              PERSISTENT_TEXT_PLAIN中设置了deliveryMode =2,进而实现的持久化:RabbitMQ的高级特性

                              Spring.AMQP设置持久化:

                              RabbitMQ的高级特性


                              4、重试机制

                              在消息传递过程中, 可能会遇到各种问题, 如⽹络故障, 服务不可⽤, 资源不⾜等, 这些问题可能导致消息

                              处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送.

                              但如果是程序逻辑引起的错误, 那么多次重试也是没有⽤的。启用下方配置重试机制将会生效;

                              配置:

                              spring:
                                application:
                                  name: rabbit-extensions-demo
                                rabbitmq:
                                  addresses: amqp://账号:密码@47.108.157.13:5672/extension
                                  listener:
                                    simple:
                                       acknowledge-mode: auto
                                      retry:
                                        enabled: true # 开启消费者失败重试
                                        initial-interval: 5000ms # 初始失败等待时长为5秒
                                        max-attempts: 5 # 最大重试次数
                              

                              这里我们故意设置一个异常,看看运行结果:

                              @Component
                              public class RetryListener {
                                  @RabbitListener(queues=Constants.RETRY_QUEUE)
                                  public void retryQueue(Message message) throws UnsupportedEncodingException {
                                      System.out.println("重试队列消费者auto模式 接收到消息 deliveryTag是:"+message.getMessageProperties().getDeliveryTag());
                                      int a=10/0;
                                      System.out.println("业务处理完成");
                                  }
                              }
                              

                              运行结果:

                              RabbitMQ的高级特性

                              💡 1、重试机制只在manual模式下生效!

                              1. none模式消息一旦发送rabbitmq就会把消息从队列移除。
                              2. 而manual模式下不进行手动确认,只会导致消息积压
                              3. manual模式进行手动确认且requeue=false,那么消息只会被发送一次,因为设置了不重新入队
                              4. manual模式进行手动确认且requeue=true,会一直进行重试,因为每次发送确认消息给rabbitmq,都会告诉他重新入队,发送消息过来。

                              💡2、为什么deliveryTag一直是1,而不像第二点中auto模式自动重发消息deliveryTag一直递增?

                              • 重试机制,retry=enable,每次重试deliveryTag相同,是因为这个重试是在消费者内部执行的
                              • 而auto模式下,出现异常,不断重试则是RabbitMQ服务器重发消息给到消费者,所以deliveryTag自然是递增的。

                                5、TTL(过期时间)

                                当消息到达存活时间之后, 还没有被消费, 就会被⾃动清除。目前设置TTL方式有两种:

                                1. 设置队列的TTL, 队列中所有消息都有相同的过期时间.
                                2. 对消息本⾝进⾏单独设置, 每条消息的TTL可以不同.

                                如果两种⽅法⼀起使⽤, 则消息的TTL以两者之间较⼩的那个数值为准.

                                消息TTL

                                  @RequestMapping("/ttl")
                                    public String ttl() {
                                        System.out.println("ttl....");
                                  rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","ttl test...");
                                        //也可以这样设置过期时间
                                        rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...", message -> {
                                            message.getMessageProperties().setExpiration("10000");
                                            return message;
                                        });
                                        return "ttl测试";
                                    }
                                

                                队列TTL

                                    public Queue ttlQueue() {
                                        return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(20000).build();
                                

                                两者区别:

                                1. 设置队列TTL属性的⽅法, ⼀旦消息过期, 就会从队列中删除
                                2. 设置消息TTL的⽅法, 即使消息过期, 也不会⻢上从队列中删除, ⽽是在即将投递到消费者之前进⾏判定

                                💡为什么这两种⽅法处理的⽅式不⼀样?

                                因为设置队列过期时间, 队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期的消息即可.⽽设置消息TTL的⽅式, 每条消息的过期时间不同, 如果要删除所有过期消息需要扫描整个队列, 所以不如等到此消息即将被消费时再判定是否过期, 如果过期再进⾏删除即可.


                                6、死信队列

                                死信队列(DLQ)和死信交换机(DLX)与普通队列、普通交换机本质上没有任何区别。

                                只不过DLX绑定的是一个个队列,接收的消息都是普通队列丢弃的消息。

                                DLX和DLQ协同分工,把普通队列中的死信发送给指定消费者进行特殊处理。

                                RabbitMQ的高级特性

                                示例代码:

                                    //普通队列
                                    @Bean("normalQueue")
                                    public Queue normalQueue() {
                                        /*
                                        * 重点代码:*/
                                        return QueueBuilder.durable(Constants.NORMAL_QUEUE)
                                                .deadLetterExchange(Constants.DL_EXCHANGE)//正常队列需要绑定指定死信交换机(根据名字)
                                                .deadLetterRoutingKey("key2")//根据dl routingKey指定路由到哪一个死信队列
                                                .build();
                                    }
                                    //普通交换机
                                    @Bean("normalExchange")
                                    public DirectExchange normalExchange() {
                                        return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();
                                    }
                                    //交换机队列进行绑定
                                    @Bean("bindingNormal")
                                    public Binding bindingNormal(@Qualifier("normalQueue") Queue normalQueue,@Qualifier("normalExchange") DirectExchange normalExchange) {
                                        return BindingBuilder.bind(normalQueue).to(normalExchange).with("key1");
                                    }
                                    //死信队列
                                    @Bean("DLQueue")
                                    public Queue DLQueue() {
                                        return QueueBuilder.durable(Constants.DL_QUEUE).build();
                                    }
                                    //死信交换机
                                    @Bean("DLExchange")
                                    public DirectExchange DLExchange() {
                                        return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();
                                    }
                                    //死信队列和死信交换机的绑定
                                    @Bean("DLBinding")
                                    public Binding DLBinding(@Qualifier("DLQueue") Queue DLQueue,@Qualifier("DLExchange") DirectExchange DLExchange){
                                        return BindingBuilder.bind(DLQueue).to(DLExchange).with("key2");
                                    }
                                

                                💡TIPS

                                • 由于normalQueue设置了 deadLetterExchange 和deadLetterRoutingKey ,所以normalQueue中的死信会被路由到指定的DLQ(死信队列),给到指定消费者执行,(当然代码中并没有编写消费者代码)
                                • 消息变成死信,主要有这几种情况:
                                  1. Basic.Reject/Basic.Nack,拒绝了消息,并且requeue=false
                                  2. 消息TTL过期
                                  3. 队列已经到达了最大长度

                                  7、延迟队列

                                  延迟队列(Delayed Queue),即消息被发送以后, 并不想让消费者⽴刻拿到消息, ⽽是等待特定时间后,消费者才能拿到这个消息进⾏消费。

                                  比如只能家具的定时任务、会议时间提前提醒、会员注册一定时间后的用户满意度调查等。

                                  坏消息是RabbitMQ不直接支持延迟队列。好消息是RabbitMQ可以通过其他方式实现延迟队列:

                                  1. TTL+DLQ
                                  2. 延迟队列插件

                                  TTL+DLQ

                                  我们可以设置一个TTL队列,队列中消息一旦过期,产生死信,把该死信路由到指定的死信队列,然后消费者直接去监听该死信队列即可实现一个延迟队列的功能。

                                  我们在第6节死信队列代码基础上进行编写:

                                  • 交换机和队列相关配置
                                    @Configuration
                                    public class DLConfig {
                                        //普通队列
                                        @Bean("normalQueue")
                                        public Queue normalQueue() {
                                            /*
                                            * 重点代码:*/
                                            return QueueBuilder.durable(Constants.NORMAL_QUEUE)
                                                    .deadLetterExchange(Constants.DL_EXCHANGE)//正常队列需要绑定指定死信交换机(根据名字)
                                                    .deadLetterRoutingKey("key2")//根据dl routingKey指定路由到哪一个死信队列
                                                    .build();
                                        }
                                        //普通交换机
                                        @Bean("normalExchange")
                                        public DirectExchange normalExchange() {
                                            return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();
                                        }
                                        //交换机队列进行绑定
                                        @Bean("bindingNormal")
                                        public Binding bindingNormal(@Qualifier("normalQueue") Queue normalQueue,@Qualifier("normalExchange") DirectExchange normalExchange) {
                                            return BindingBuilder.bind(normalQueue).to(normalExchange).with("key1");
                                        }
                                        //死信队列
                                        @Bean("DLQueue")
                                        public Queue DLQueue() {
                                            return QueueBuilder.durable(Constants.DL_QUEUE).build();
                                        }
                                        //死信交换机
                                        @Bean("DLExchange")
                                        public DirectExchange DLExchange() {
                                            return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();
                                        }
                                        //死信队列和死信交换机的绑定
                                        @Bean("DLBinding")
                                        public Binding DLBinding(@Qualifier("DLQueue") Queue DLQueue,@Qualifier("DLExchange") DirectExchange DLExchange){
                                            return BindingBuilder.bind(DLQueue).to(DLExchange).with("key2");
                                        }
                                    }
                                    
                                    • 监听死信队列
                                      @Component
                                      public class DelayListener {
                                          @RabbitListener(queues =Constants.DL_QUEUE)
                                          public void handleMessage(Message message){
                                              long end=System.currentTimeMillis();//获取接收到消息时的时间
                                              String msg=new String(message.getBody());
                                              String[] msgs=msg.split(":");
                                              long start=Long.parseLong(msgs[1]);
                                              System.out.println("写收到延迟消息,时间:"+(end-start)/1000);
                                          }
                                      }
                                      
                                      • 发送延迟消息
                                            @RequestMapping("/delay")
                                            public String delay() {
                                                rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "key1",("当前消息需要延迟10秒发送:" + System.currentTimeMillis()).getBytes(),message -> {
                                                    message.getMessageProperties().setExpiration("10000");  //单位: 毫秒, 过期时间为30s
                                                    return message;
                                                });
                                                return "延迟队列消息已经执行";
                                            }
                                        

                                        运行结果:在这里插入图片描述

                                        消息首先在normal.queue上等待10s,然后发送到dl.queue消费(我设置的auto模式,所以消息已经ack,没有显示任何消息):

                                        RabbitMQ的高级特性

                                        💡TTL+DLQ有一个缺陷

                                        如果如果在队列中:消息m1延迟时间大于消息m2的延迟时间,并且m1优先于m2出队列,这样种情况消息m2只能等待m1过期才能被发送,导致m2消息在发送的时候必定过期。

                                        因此rabbitmq提供了一个插件,来解决这个问题

                                        延迟队列插件

                                        1. 安装教程

                                        注意:如果启动完插件之后没有生效,需要使用这个命令重启rabbitmq服务:

                                        service rabbitmq-server restart
                                        

                                        重启完之后,就会增加一个交换机类型:

                                        RabbitMQ的高级特性

                                        这个交换机可以认为是一个“高级的延迟队列”,它可以临时存储延迟消息,所有存储在该交换机的消息,都会在延迟时间结束后发送给绑定了该交换机的队列,解决了TTL+DLQ中的缺陷。

                                        2. 代码实现

                                        @Configuration
                                        public class DLConfig {
                                            //声明队列方式和之前没有差别
                                            @Bean("delayQueue")
                                            public Queue delayQueue() {
                                                return QueueBuilder.durable(Constants.DELAY_QUEUE).build();
                                            }
                                            //与之前不同的是,这里需要用。delays声明使用延迟队列插件
                                            @Bean("delayExchange")
                                            public Exchange delayExchange() {
                                                return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();
                                            }
                                            //和其他绑定没有区别
                                            @Bean("delayBinding")
                                            public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue,@Qualifier("delayExchange") Exchange delayExchange) {
                                                return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay").noargs();
                                            }
                                        }
                                        
                                        @Component
                                        public class DelayListener {
                                            @RabbitListener(queues =Constants.DELAY_QUEUE)
                                            public void handleMessage(Message message){
                                                long end=System.currentTimeMillis();//获取接收到消息时的时间
                                                String msg=new String(message.getBody());
                                                String[] msgs=msg.split(":");
                                                long start=Long.parseLong(msgs[1]);
                                                System.out.println("写收到延迟消息,时间:"+(end-start)/1000);
                                            }
                                        }
                                        
                                         @RequestMapping("/delayPlug")
                                            public String delayPlug() {
                                                rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay"
                                                        ,("当前消息需要延迟10秒发送:" + System.currentTimeMillis()).getBytes(),messagePostProcessor -> {
                                                    messagePostProcessor.getMessageProperties().setDelayLong(10000L);  //单位: 毫秒 延迟时间
                                                    return messagePostProcessor;
                                                });
                                                rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay"
                                                        ,("当前消息需要延迟5秒发送:" + System.currentTimeMillis()).getBytes(),messagePostProcessor -> {
                                                    messagePostProcessor.getMessageProperties().setDelayLong(5000L);  //单位: 毫秒 延迟时间
                                                    return messagePostProcessor;
                                                });
                                                return "延迟队列消息已经执行";
                                            }
                                        

                                        运行结果:

                                        RabbitMQ的高级特性

                                        可以看到即使先发送10s延迟的数据,在发送5s延迟的数据,消息仍然按照正常顺序接收,解决的消息乱序问题。


                                        8、事务

                                        概念理解

                                        RabbitMQ支持消息的发送和接收是原子性的,要么所有消息全部成功,要么全部失败。

                                        例如这个场景,发送两个消息,由于过程中出现异常,正常情况下,队列会接收到第一条消息,第二条消息不会接收到:

                                            @RequestMapping("/trans")
                                            public String trans() {
                                                System.out.println("trans test...");
                                              rabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 1...");
                                                int num = 5/0;
                                               rabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 2...");
                                                return "消息发送成功";
                                            }
                                        

                                        控制台只显示接收到一条消息:

                                        RabbitMQ的高级特性

                                        如果我们希望,出现异常时,回滚所有发送的消息(出现错误一条消息也不发送),那么就可以启用事务。

                                        代码实现

                                        配置代码,为了简便使用默认交换机:

                                        @Configuration
                                        public class TransactionMQConfig {
                                            @Bean("transationQueue")
                                            public Queue transationQueue() {
                                                return QueueBuilder.durable(Constants.TRANS_QUEUE).build();
                                            }
                                        	//1、建立事务管理器
                                            @Bean
                                            public RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
                                                    return new RabbitTransactionManager(connectionFactory);
                                            }
                                        	//2、开启事务通道
                                            @Bean("transRabbitTemplate")
                                            public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {
                                                RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
                                                rabbitTemplate.setChannelTransacted(true);
                                                return rabbitTemplate;
                                            }
                                        }
                                        

                                        发送消息:

                                           @Transactional//3、声明开启事务
                                            @RequestMapping("/trans")
                                            public String trans() {
                                                System.out.println("trans test...");
                                                transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 1...");
                                                int num = 5/0;
                                                transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 2...");
                                                return "消息发送成功";
                                            }
                                        

                                        发送消息后运行结果:

                                        程序抛出异常:

                                        RabbitMQ的高级特性

                                        但是控制台,队列没有接收到任何消息:

                                        RabbitMQ的高级特性


                                        9、消息分发

                                        RabbitMQ中,消费者获取消息既可以使用推模式(队列把消息发送给消费者),也可以使用拉模式(消费者主要向队列获取)。虽然两者都支持,但是主要还是使用推模式。使用推模式,会出现一个问题,就是默认情况下RabbitMQ采用轮询方式,把消息发送给指定消费者,这种情况下如果消费者本身负载较重,继续接收消息,可能会出现严重后果。为此,RabbitMQ提供了消息分发这样一个机制。每个消费者都可以设置最大接收消息数prefetchCount,如果消费者目前获取消息数等于prefetchCount,RabbittMQ就不会推送消息给这个消费者,这是RabbitMQ中消息分发的核心概念。通过这样一个机制可以实现限流、负载均衡等功能。

                                        代码实现

                                        RabbitMQ 的SDK通过这个方法可以设置预取数量:

                                        RabbitMQ的高级特性

                                        Spring.AMQP可以直接通过yml配置:

                                        spring:
                                          application:
                                            name:
                                              applicationName
                                          rabbitmq:
                                            addresses: amqp://账号:密码@113.44.150.39:5672/extension
                                            listener:
                                              simple:
                                                acknowledge-mode: manual # 必须开启手动模式,prefetch才会生效
                                                prefetch: 5
                                        

                                        💡TIPS

                                        1. prefectch设置为0表示没有获取上限。
                                        2. 在拉模式中,设置channel.basicQos(prefetchCount)是无效的,因为拉模式,一次只能从队列拉去一个消息。

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码