RabbitMQ的高级特性
文章目录
- 1、消息确认
- 原生API
- Spring.AMQP
- 2、发送方确认(Spring)
- Confirm确认模式
- Retrurn回退:
- 3、持久化
- 交换机持久化
- 队列持久化
- 消息持久化
- 4、重试机制
- 5、TTL(过期时间)
- 消息TTL
- 队列TTL
- 6、死信队列
- 7、延迟队列
- TTL+DLQ
- 延迟队列插件
- 1. [安装教程](https://www.cnblogs.com/isunsine/p/11572457.html)
- 2. 代码实现
- 8、事务
- 概念理解
- 代码实现
- 9、消息分发
- 代码实现
1、消息确认
消息确认机制作用于队列和消费者之间,当消费者接收到消息,会执行回调函数handleDelivery,发送一个回调信息给到队列,告诉队列它已经正确接收到了消息,以此保证消息的可靠传递。
原生API
RabbitMQ支持自动确认和手动确认(autoAck)。
- 自动确认:
autoAck=true,发送完该消息,该消息就会从队列中移除,不论消费者是否真正消费。适合可靠性要求底的场景。
- 手动确认:
autoAck=false,消息发送后,需要消费者发送确认信号,才能从队列中移除该消息,否则一直存在于队列中。
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
callback用于指定回调方式:
DefaultConsumer consumer = new DefaultConsumer(channel) { //回调函数 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //如果autoAck=false,需要指定消息确认方式 System.out.println("接收到消息: " + new String(body)); } }; //消费消息 channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);
RabbitMQ原生API中消息确认方式有三种:
-
肯定确认: Channel.basicAck(long deliveryTag, boolean multiple)
通过信道告知队列消费者已经处理了该消息,队列会主动把该消息从磁盘中删除
参数说明:
1)deliveryTag是每个信道独自维护的自增唯一ID用于区分不同消息。
2)multiple用于标识是否批量确认接收到的消息。
-
否定确认: Channel.basicReject(long deliveryTag, boolean requeue)
-
否定确认Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
💡TIPS
- requeue的含义是如果否定确认是否让队列重新发送消息。
- basicReject和basicReject都是用于否定确认的,唯一的区别在于basicReject一次性只能确认一条消息,basicNack可以批量否定(multiple)
Spring.AMQP
Spring 框架中的消息确认机制(通常通过 Spring AMQP 提供)是对 RabbitMQ 原生 API 的封装。Spring AMQP 提供了一种更高层次的抽象,简化了与 RabbitMQ 的交互,底层使用的还是RabbitMQ原生API。
Spring提供了三种确认模式:
spring: application: name: extensions rabbitmq: addresses: amqp://admin:password@ip:port/virtualhost_name listener: simple: # acknowledge-mode: none # 不做处理 acknowledge-mode: auto # 自动模式 spring默认配置 # acknowledge-mode: manual # 手动模式
-
none
- 队列发送完消息,不论消费者是否正确处理,RabbitMQ自动ACK消息,从队列移除该消息,因此如果消费者没有正确处理消息,消息可能丢失。
-
auto
- 消息正确处理,自动确认消息,移除队列;如果处理失败,消息不会被移除,RabbitMQ会一直重发该消息直到处理成功(每次重试DeliveryTag会自增);如果一直处理失败,消息就会一直处于UnAck状态,导致消息积压。
-
manual
- 消息发送后,不会自动确认,需要消费端使用channel.basicAck(deliveryTag, multiple) 或channel.basicNack(deliveryTag, multiple,requeue),又或者channel.basicReject(deliveryTag, requeue)进行确认。如果不进行确认,消息会处于UnAck状态,requeue为true表示重发消息,反之不会。
💡
basicNack和basicReject的唯一区别是前者可以批量否定确认,后者只能单个消息否定确认。
代码演示(manual模式为例):
spring: application: name: extensions rabbitmq: addresses: amqp://账号:密码@113.44.150.39:5672/extension listener: simple: acknowledge-mode: manual # 消息接收确认模式
//绑定关系 @Configuration public class RabbitMQConfig { @Bean("ackQueue") public Queue ackQueue(){ return QueueBuilder.durable(Constants.ACK_QUEUE).build(); } @Bean("ackExchange") public DirectExchange ackExchange(){ return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build(); } @Bean("ackBind") public Binding ackBind(@Qualifier("ackExchange") DirectExchange directExchange, @Qualifier("ackQueue") Queue ackQueue) { return BindingBuilder.bind(ackQueue).to(directExchange).with("ack"); } }
@RequestMapping("ack") public String ack() { //默认情况下,发送持久化消息 rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "消息发送了,用的Direct类型交换机,绑定的ack key"); return "消息发送成功"; }
@Component public class AckListener { @RabbitListener(queues = Constants.ACK_QUEUE) public void handerMessage(Message message, Channel channel) throws Exception { try { System.out.println("消费者接收到消息:" + new String(message.getBody(), "UTF-8") + message.getMessageProperties().getDeliveryTag()); // //业务逻辑处理,此处一定会抛出异常,进行reject int num = 3 / 0; System.out.println("业务处理完成"); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false); channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } }
此时我们发送消息给RabbitMQ服务器,就会不断重试(requeue=true):
2、发送方确认(Spring)
发送方确认作用与生产者和RabbitMQ服务器之间,也是为了保证数据传递的可靠性。发送方确认分为两个模式:Confirm确认和Retrurn回退。
Confirm确认模式
- 配置:
spring: rabbitmq: addresses: amqp://账号:密码@110.41.51.65:15673/bite listener: simple: acknowledge-mode: manual #消息接收确认 publisher-confirm-type: correlated #消息发送确认
- 定义消息确认的回调逻辑,并发送消息
配置yml之后,Spring会自动创建connectionFactory
@Bean("confirmRabbitTemplate") public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.printf(""); if (ack) { System.out.printf("消息接收成功, id:%s \n", correlationData.getId());//这个id用来区分不同的消息 } else { System.out.printf("消息接收失败, id:%s, cause: %s", correlationData.getId(), cause); } } }); return rabbitTemplate; }
@Resource(name = "confirmRabbitTemplate") private RabbitTemplate confirmRabbitTemplate; @RequestMapping("/confirm") public String confirm() throws InterruptedException { CorrelationData correlationData1 = new CorrelationData("1"); confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME, "confirm", "confirm test...", correlationData1); return "确认成功"; }
说明:
@FunctionalInterface public interface ConfirmCallback { /** * Confirmation callback. * @param correlationData 发送消息时的附加消息,识别特定消息 * @param ack 消息被exchange确认,为true,否则false * @param cause 消息确认失败时,存储的出错原因 */ void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause); }
💡RabbitTemplate.ConfirmCallback 和 ConfirmListener 区别
- 前者作用域生产者和exchange之间,后者作用与队列和消费者之间
- RabbitTemplate.ConfirmCallback是Spring.AMQP实现,只有一个需要重写的方法confirm()用于确认回调。
- ConfirmListener来自于RabbitMQ原生API,内含handleAck 和handleNack, ⽤于处理消息确认和否定确认。
Retrurn回退:
confirm确认保证的是生产者和exchange之间的可靠性,Return回退保证的是exchange能够正确路由到指定队列,如果消息没有被路由到任何队列,把消息回退给生产者。
- Spring的yml配置和confirm确认一致。
- 定义回退逻辑并发送消息
@Bean public RabbitTemplate confirmRabbitTemplate(ConnectionFactory factory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(factory); //消息被退回时,回调下面的方法 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { System.out.println("消息退回:" + returned); } }); return rabbitTemplate; }
@RequestMapping("/returns") public String returns() { CorrelationData correlationData = new CorrelationData("5"); //回退模式必须加上这个设置 confirmRabbitTemplate.setMandatory(true); confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE , "confirm***" , "return test..." , correlationData); return "回退模式,消息发送了"; }
💡TIPS
- setMandatory
设置为true告诉RabbitMQ,如果exchange没有把该消息成功给到任何队列,执行回退,因此setMandatory必须加。
- ReturnedMessage参数中的信息
public class ReturnedMessage { //返回的消息对象,包含了消息体和消息属性 private final Message message; //由Broker提供的回复码, 表⽰消息⽆法路由的原因. 通常是⼀个数字代码,每个数字代表不同的含义 . private final int replyCode; //⼀个⽂本字符串, 提供了⽆法路由消息的额外信息或错误描述. private final String replyText; //消息被发送到的交换机名称 private final String exchange; //消息的路由键,即发送消息时指定的键 private final String routingKey; }
3、持久化
如果RabbitMQ服务器宕机,某些重要数据遗失可能造成严重后果,RabbbitMQ提供持久化功能,保证消息不会丢失。
RabbitMQ的持久化分为三个部分:
- 交换器的持久化
- 队列的持久化
- 消息的持久化
交换机持久化
当MQ的服务器发⽣意外或关闭之后,重启 RabbitMQ 时不需要重新去建⽴交换机, 交换机会⾃动建⽴,相当于⼀直存在。如果是一个需要长期使用的交换机,建议进行持久化。
@Bean("ackExchange") public DirectExchange ackExchange(){ return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build(); }
上面这种方式创建交换机,默认是持久化的,如果想要设置非持久化,这样设置:
@Bean("ackExchange") public DirectExchange ackExchange(){ return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(false).build(); }
队列持久化
如果队列不设置持久化,RabbitMQ重启,队列也会跟着消息,队列中的消息也会丢失。
队列设置持久化方式和Exchange有所不同:
@Bean("ackQueue") public Queue ackQueue(){ return QueueBuilder.durable(Constants.ACK_QUEUE).build(); }
设置非持久化:
@Bean("ackQueue") public Queue ackQueue(){ return QueueBuilder.nonDurable(Constants.ACK_EXCHANGE).build(); }
消息持久化
如果队列是持久化的,消息不是持久化的,RabbitMQ重启,消息也会丢失。换言之,队列和消息共同持久化才能保证消息在重启之后不会丢失。
设置消息持久化需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2,也就是 MessageDeliveryMode.PERSISTENT
public enum MessageDeliveryMode { NON_PERSISTENT,//⾮持久化 PERSISTENT;//持久化 }
RabbitMQ原生API设置持久化:
💡TIP
PERSISTENT_TEXT_PLAIN中设置了deliveryMode =2,进而实现的持久化:
Spring.AMQP设置持久化:
4、重试机制
在消息传递过程中, 可能会遇到各种问题, 如⽹络故障, 服务不可⽤, 资源不⾜等, 这些问题可能导致消息
处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送.
但如果是程序逻辑引起的错误, 那么多次重试也是没有⽤的。启用下方配置重试机制将会生效;
配置:
spring: application: name: rabbit-extensions-demo rabbitmq: addresses: amqp://账号:密码@47.108.157.13:5672/extension listener: simple: acknowledge-mode: auto retry: enabled: true # 开启消费者失败重试 initial-interval: 5000ms # 初始失败等待时长为5秒 max-attempts: 5 # 最大重试次数
这里我们故意设置一个异常,看看运行结果:
@Component public class RetryListener { @RabbitListener(queues=Constants.RETRY_QUEUE) public void retryQueue(Message message) throws UnsupportedEncodingException { System.out.println("重试队列消费者auto模式 接收到消息 deliveryTag是:"+message.getMessageProperties().getDeliveryTag()); int a=10/0; System.out.println("业务处理完成"); } }
运行结果:
💡 1、重试机制只在manual模式下生效!
- none模式消息一旦发送rabbitmq就会把消息从队列移除。
- 而manual模式下不进行手动确认,只会导致消息积压
- manual模式进行手动确认且requeue=false,那么消息只会被发送一次,因为设置了不重新入队
- manual模式进行手动确认且requeue=true,会一直进行重试,因为每次发送确认消息给rabbitmq,都会告诉他重新入队,发送消息过来。
💡2、为什么deliveryTag一直是1,而不像第二点中auto模式自动重发消息deliveryTag一直递增?
- 重试机制,retry=enable,每次重试deliveryTag相同,是因为这个重试是在消费者内部执行的
- 而auto模式下,出现异常,不断重试则是RabbitMQ服务器重发消息给到消费者,所以deliveryTag自然是递增的。
5、TTL(过期时间)
当消息到达存活时间之后, 还没有被消费, 就会被⾃动清除。目前设置TTL方式有两种:
- 设置队列的TTL, 队列中所有消息都有相同的过期时间.
- 对消息本⾝进⾏单独设置, 每条消息的TTL可以不同.
如果两种⽅法⼀起使⽤, 则消息的TTL以两者之间较⼩的那个数值为准.
消息TTL
@RequestMapping("/ttl") public String ttl() { System.out.println("ttl...."); rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","ttl test..."); //也可以这样设置过期时间 rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...", message -> { message.getMessageProperties().setExpiration("10000"); return message; }); return "ttl测试"; }
队列TTL
public Queue ttlQueue() { return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(20000).build();
两者区别:
- 设置队列TTL属性的⽅法, ⼀旦消息过期, 就会从队列中删除
- 设置消息TTL的⽅法, 即使消息过期, 也不会⻢上从队列中删除, ⽽是在即将投递到消费者之前进⾏判定
💡为什么这两种⽅法处理的⽅式不⼀样?
因为设置队列过期时间, 队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期的消息即可.⽽设置消息TTL的⽅式, 每条消息的过期时间不同, 如果要删除所有过期消息需要扫描整个队列, 所以不如等到此消息即将被消费时再判定是否过期, 如果过期再进⾏删除即可.
6、死信队列
死信队列(DLQ)和死信交换机(DLX)与普通队列、普通交换机本质上没有任何区别。
只不过DLX绑定的是一个个队列,接收的消息都是普通队列丢弃的消息。
DLX和DLQ协同分工,把普通队列中的死信发送给指定消费者进行特殊处理。
示例代码:
//普通队列 @Bean("normalQueue") public Queue normalQueue() { /* * 重点代码:*/ return QueueBuilder.durable(Constants.NORMAL_QUEUE) .deadLetterExchange(Constants.DL_EXCHANGE)//正常队列需要绑定指定死信交换机(根据名字) .deadLetterRoutingKey("key2")//根据dl routingKey指定路由到哪一个死信队列 .build(); } //普通交换机 @Bean("normalExchange") public DirectExchange normalExchange() { return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build(); } //交换机队列进行绑定 @Bean("bindingNormal") public Binding bindingNormal(@Qualifier("normalQueue") Queue normalQueue,@Qualifier("normalExchange") DirectExchange normalExchange) { return BindingBuilder.bind(normalQueue).to(normalExchange).with("key1"); } //死信队列 @Bean("DLQueue") public Queue DLQueue() { return QueueBuilder.durable(Constants.DL_QUEUE).build(); } //死信交换机 @Bean("DLExchange") public DirectExchange DLExchange() { return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build(); } //死信队列和死信交换机的绑定 @Bean("DLBinding") public Binding DLBinding(@Qualifier("DLQueue") Queue DLQueue,@Qualifier("DLExchange") DirectExchange DLExchange){ return BindingBuilder.bind(DLQueue).to(DLExchange).with("key2"); }
💡TIPS
- 由于normalQueue设置了 deadLetterExchange 和deadLetterRoutingKey ,所以normalQueue中的死信会被路由到指定的DLQ(死信队列),给到指定消费者执行,(当然代码中并没有编写消费者代码)
- 消息变成死信,主要有这几种情况:
- Basic.Reject/Basic.Nack,拒绝了消息,并且requeue=false
- 消息TTL过期
- 队列已经到达了最大长度
7、延迟队列
延迟队列(Delayed Queue),即消息被发送以后, 并不想让消费者⽴刻拿到消息, ⽽是等待特定时间后,消费者才能拿到这个消息进⾏消费。
比如只能家具的定时任务、会议时间提前提醒、会员注册一定时间后的用户满意度调查等。
坏消息是RabbitMQ不直接支持延迟队列。好消息是RabbitMQ可以通过其他方式实现延迟队列:
- TTL+DLQ
- 延迟队列插件
TTL+DLQ
我们可以设置一个TTL队列,队列中消息一旦过期,产生死信,把该死信路由到指定的死信队列,然后消费者直接去监听该死信队列即可实现一个延迟队列的功能。
我们在第6节死信队列代码基础上进行编写:
- 交换机和队列相关配置
@Configuration public class DLConfig { //普通队列 @Bean("normalQueue") public Queue normalQueue() { /* * 重点代码:*/ return QueueBuilder.durable(Constants.NORMAL_QUEUE) .deadLetterExchange(Constants.DL_EXCHANGE)//正常队列需要绑定指定死信交换机(根据名字) .deadLetterRoutingKey("key2")//根据dl routingKey指定路由到哪一个死信队列 .build(); } //普通交换机 @Bean("normalExchange") public DirectExchange normalExchange() { return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build(); } //交换机队列进行绑定 @Bean("bindingNormal") public Binding bindingNormal(@Qualifier("normalQueue") Queue normalQueue,@Qualifier("normalExchange") DirectExchange normalExchange) { return BindingBuilder.bind(normalQueue).to(normalExchange).with("key1"); } //死信队列 @Bean("DLQueue") public Queue DLQueue() { return QueueBuilder.durable(Constants.DL_QUEUE).build(); } //死信交换机 @Bean("DLExchange") public DirectExchange DLExchange() { return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build(); } //死信队列和死信交换机的绑定 @Bean("DLBinding") public Binding DLBinding(@Qualifier("DLQueue") Queue DLQueue,@Qualifier("DLExchange") DirectExchange DLExchange){ return BindingBuilder.bind(DLQueue).to(DLExchange).with("key2"); } }
- 监听死信队列
@Component public class DelayListener { @RabbitListener(queues =Constants.DL_QUEUE) public void handleMessage(Message message){ long end=System.currentTimeMillis();//获取接收到消息时的时间 String msg=new String(message.getBody()); String[] msgs=msg.split(":"); long start=Long.parseLong(msgs[1]); System.out.println("写收到延迟消息,时间:"+(end-start)/1000); } }
- 发送延迟消息
@RequestMapping("/delay") public String delay() { rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "key1",("当前消息需要延迟10秒发送:" + System.currentTimeMillis()).getBytes(),message -> { message.getMessageProperties().setExpiration("10000"); //单位: 毫秒, 过期时间为30s return message; }); return "延迟队列消息已经执行"; }
运行结果:在这里插入图片描述
消息首先在normal.queue上等待10s,然后发送到dl.queue消费(我设置的auto模式,所以消息已经ack,没有显示任何消息):
💡TTL+DLQ有一个缺陷
如果如果在队列中:消息m1延迟时间大于消息m2的延迟时间,并且m1优先于m2出队列,这样种情况消息m2只能等待m1过期才能被发送,导致m2消息在发送的时候必定过期。
因此rabbitmq提供了一个插件,来解决这个问题
延迟队列插件
1. 安装教程
注意:如果启动完插件之后没有生效,需要使用这个命令重启rabbitmq服务:
service rabbitmq-server restart
重启完之后,就会增加一个交换机类型:
这个交换机可以认为是一个“高级的延迟队列”,它可以临时存储延迟消息,所有存储在该交换机的消息,都会在延迟时间结束后发送给绑定了该交换机的队列,解决了TTL+DLQ中的缺陷。
2. 代码实现
@Configuration public class DLConfig { //声明队列方式和之前没有差别 @Bean("delayQueue") public Queue delayQueue() { return QueueBuilder.durable(Constants.DELAY_QUEUE).build(); } //与之前不同的是,这里需要用。delays声明使用延迟队列插件 @Bean("delayExchange") public Exchange delayExchange() { return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build(); } //和其他绑定没有区别 @Bean("delayBinding") public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue,@Qualifier("delayExchange") Exchange delayExchange) { return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay").noargs(); } }
@Component public class DelayListener { @RabbitListener(queues =Constants.DELAY_QUEUE) public void handleMessage(Message message){ long end=System.currentTimeMillis();//获取接收到消息时的时间 String msg=new String(message.getBody()); String[] msgs=msg.split(":"); long start=Long.parseLong(msgs[1]); System.out.println("写收到延迟消息,时间:"+(end-start)/1000); } }
@RequestMapping("/delayPlug") public String delayPlug() { rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay" ,("当前消息需要延迟10秒发送:" + System.currentTimeMillis()).getBytes(),messagePostProcessor -> { messagePostProcessor.getMessageProperties().setDelayLong(10000L); //单位: 毫秒 延迟时间 return messagePostProcessor; }); rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay" ,("当前消息需要延迟5秒发送:" + System.currentTimeMillis()).getBytes(),messagePostProcessor -> { messagePostProcessor.getMessageProperties().setDelayLong(5000L); //单位: 毫秒 延迟时间 return messagePostProcessor; }); return "延迟队列消息已经执行"; }
运行结果:
可以看到即使先发送10s延迟的数据,在发送5s延迟的数据,消息仍然按照正常顺序接收,解决的消息乱序问题。
8、事务
概念理解
RabbitMQ支持消息的发送和接收是原子性的,要么所有消息全部成功,要么全部失败。
例如这个场景,发送两个消息,由于过程中出现异常,正常情况下,队列会接收到第一条消息,第二条消息不会接收到:
@RequestMapping("/trans") public String trans() { System.out.println("trans test..."); rabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 1..."); int num = 5/0; rabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 2..."); return "消息发送成功"; }
控制台只显示接收到一条消息:
如果我们希望,出现异常时,回滚所有发送的消息(出现错误一条消息也不发送),那么就可以启用事务。
代码实现
配置代码,为了简便使用默认交换机:
@Configuration public class TransactionMQConfig { @Bean("transationQueue") public Queue transationQueue() { return QueueBuilder.durable(Constants.TRANS_QUEUE).build(); } //1、建立事务管理器 @Bean public RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory); } //2、开启事务通道 @Bean("transRabbitTemplate") public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setChannelTransacted(true); return rabbitTemplate; } }
发送消息:
@Transactional//3、声明开启事务 @RequestMapping("/trans") public String trans() { System.out.println("trans test..."); transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 1..."); int num = 5/0; transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 2..."); return "消息发送成功"; }
发送消息后运行结果:
程序抛出异常:
但是控制台,队列没有接收到任何消息:
9、消息分发
RabbitMQ中,消费者获取消息既可以使用推模式(队列把消息发送给消费者),也可以使用拉模式(消费者主要向队列获取)。虽然两者都支持,但是主要还是使用推模式。使用推模式,会出现一个问题,就是默认情况下RabbitMQ采用轮询方式,把消息发送给指定消费者,这种情况下如果消费者本身负载较重,继续接收消息,可能会出现严重后果。为此,RabbitMQ提供了消息分发这样一个机制。每个消费者都可以设置最大接收消息数prefetchCount,如果消费者目前获取消息数等于prefetchCount,RabbittMQ就不会推送消息给这个消费者,这是RabbitMQ中消息分发的核心概念。通过这样一个机制可以实现限流、负载均衡等功能。
代码实现
RabbitMQ 的SDK通过这个方法可以设置预取数量:
Spring.AMQP可以直接通过yml配置:
spring: application: name: applicationName rabbitmq: addresses: amqp://账号:密码@113.44.150.39:5672/extension listener: simple: acknowledge-mode: manual # 必须开启手动模式,prefetch才会生效 prefetch: 5
💡TIPS
- prefectch设置为0表示没有获取上限。
- 在拉模式中,设置channel.basicQos(prefetchCount)是无效的,因为拉模式,一次只能从队列拉去一个消息。
完
- 发送延迟消息
- 监听死信队列
- 消息发送后,不会自动确认,需要消费端使用channel.basicAck(deliveryTag, multiple) 或channel.basicNack(deliveryTag, multiple,requeue),又或者channel.basicReject(deliveryTag, requeue)进行确认。如果不进行确认,消息会处于UnAck状态,requeue为true表示重发消息,反之不会。
-
- 自动确认:
- 代码实现