Spring Boot集成RocketMQ
目录
1.依赖
2.配置
3.同步消息
4.异步消息
5.单向消息
6.顺序消息
7.事务消息
8.报错处理
sendDefaultImpl call timeout
延时消息不生效的天坑
1.依赖
org.apache.rocketmq rocketmq-spring-boot-starter 2.2.3
2.配置
不管是生产者还是消费者端,都要配置好nameserver的地址,这样才找的到broker。消费者端还要配置好当前消费者属于哪个消费者组,rocketmq中同一个messagequeue,一个消费者组只能消费一次,这是默认的规则。
# application.yml rocketmq: name-server: 127.0.0.1:9876 #nameserver地址,支持用逗号分隔多个 producer: group: my-producer-group #消费者组
3.同步消息
Sprong Boot中有自己的一套标准,去操作诸如数据库、缓存、中间件之类的第三方组件都喜欢通过一个XXXTemplate去操作,因此第三方组件自己封装starter的时候也会按照这种方式去封装,将API操作都通过一个XXXTemplate来,操作MQ也不例外,操作rocketmq是通过RocketMQTemplate去。
@RestController public class SyncProducerController { @Autowired private RocketMQTemplate rocketMQTemplate; @GetMapping("/sendSync") public String sendSyncMessage() { Message message = MessageBuilder.withPayload("同步消息内容") .setHeader(MessageConst.PROPERTY_KEYS, "ORDER_20231109001") .build(); SendResult sendResult = rocketMQTemplate.syncSend("SYNC_TOPIC", message); return "发送成功,消息ID:" + sendResult.getMsgId(); } }
消费者都是通过Listener来消费消息
@Service @RocketMQMessageListener( topic = "SYNC_TOPIC", consumerGroup = "sync-consumer-group", selectorType = SelectorType.TAG, selectorExpression = "*" ) public class SyncConsumer implements RocketMQListener { @Override public void onMessage(String message) { System.out.println("收到同步消息: " + message); } }
4.异步消息
异步消息因为涉及要接收一个异步响应,所以要注册一个回调函数,异步响应回来之后触发这个异步响应:
public class AsyncProducerService { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendAsync() { Message message = MessageBuilder.withPayload("异步消息内容") .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3") // 延迟级别 .build(); rocketMQTemplate.asyncSend("ASYNC_TOPIC", message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("异步发送成功: " + sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.err.println("异步发送失败: " + e.getMessage()); } }); } }
5.单向消息
@RestController public class OnewayController { @Autowired private RocketMQTemplate rocketMQTemplate; @GetMapping("/sendOneway") public String sendOneway() { rocketMQTemplate.sendOneWay("ONEWAY_TOPIC", MessageBuilder.withPayload("日志消息").build()); return "单向消息已发送"; } }
6.顺序消息
顺序消息说白了只要是同一个messagequeue中的消息都是顺序的,所以要做成顺序消息也就是走同一个queueId的messagequeue中,也就是在发送消息的时候指定queueId。
// 订单服务生产者 public class OrderProducer { @Autowired private RocketMQTemplate rocketMQTemplate; // 发送顺序消息(以订单ID为路由键) public void sendOrderMessage(String orderId, String operation) { // 构建消息体 Message message = MessageBuilder.withPayload(operation) .setHeader("orderId", orderId) .build(); // 关键代码:发送顺序消息 rocketMQTemplate.syncSendOrderly( "ORDER_TOPIC", message, orderId, // 使用订单ID作为队列选择键 3000 // 超时时间 ); } }
消费者也可以指定queueId的messagequeue中去消费消息。
@Service @RocketMQMessageListener( topic = "ORDER_TOPIC", consumerGroup = "order_consumer_group", consumeMode = ConsumeMode.ORDERLY, // 必须设置为顺序模式 messageModel = MessageModel.CLUSTERING, selectorExpression = "*", consumeThreadNumber = 4 // 线程数必须 ≤ 队列数 ) public class OrderConsumer implements RocketMQListener { private final ConcurrentHashMap processingOrders = new ConcurrentHashMap(); @Override public void onMessage(MessageExt message) { String orderId = message.getProperty("orderId"); String body = new String(message.getBody(), StandardCharsets.UTF_8); try { // 1. 检查订单是否正在处理(防并发) if (processingOrders.putIfAbsent(orderId, true) != null) { throw new RuntimeException("存在并发的订单处理: " + orderId); } // 2. 顺序消费逻辑 processOrder(orderId, body); } catch (Exception e) { // 3. 消费失败时挂起当前队列 throw new RuntimeException("消费失败,触发队列重试", e); } finally { // 4. 移除处理标记 processingOrders.remove(orderId); } } private void processOrder(String orderId, String operation) { // 实际业务处理(必须幂等) System.out.printf("处理订单 %s 的操作: %s%n", orderId, operation); } }
7.事务消息
// 事务消息生产者 public class TransactionProducer { private final RocketMQTemplate rocketMQTemplate; public void sendTransactionMessage(String messageBody) { // 发送半事务消息 Message message = MessageBuilder.withPayload(messageBody) .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString()) .build(); rocketMQTemplate.sendMessageInTransaction( "tx_topic", message, null ); } }
// 事务监听器(核心实现) @Slf4j @RocketMQTransactionListener public class TransactionListenerImpl implements RocketMQLocalTransactionListener { // 执行本地事务 @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { // 这里执行本地数据库操作等业务逻辑 // 如果执行成功,返回COMMIT return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error("本地事务执行失败", e); return RocketMQLocalTransactionState.ROLLBACK; } } // 事务回查 @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 根据业务状态检查事务是否成功 // 例如:查询数据库判断事务是否完成 return RocketMQLocalTransactionState.COMMIT; } }
8.报错处理
sendDefaultImpl call timeout
RocketMq提示RemotingTooMuchRequestException: sendDefaultImpl call timeout:
延时消息不生效的天坑
只能用下面这种方式延迟消息才会延迟投递:
用下面这种设置Header的方式不会生效!!延时消息会被立即投递!!!
Message delayedMessage = MessageBuilder.withPayload(order) .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "16") .build();
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。