SpringCloud系列教程:微服务的未来(二十五)-基于注解的声明队列交换机、消息转换器、业务改造

06-01 1006阅读

前言

在现代分布式系统中,消息队列是实现服务解耦和异步处理的关键组件。Spring框架提供了强大的支持,使得与消息队列(如RabbitMQ、Kafka等)的集成变得更加便捷和灵活。本文将深入探讨如何利用Spring的注解驱动方式来配置和管理队列、交换机、消息转换器等组件,从而实现一个高效且可扩展的消息处理架构。

在本博客中,我们将重点介绍:

如何使用Spring的注解方式配置RabbitMQ的队列和交换机。

如何配置消息转换器(如Jackson2JsonMessageConverter)来处理不同格式的消息。

如何根据业务需求对现有代码进行改造,将消息队列引入到系统中,从而实现消息的异步处理与解耦。

通过这篇文章,您将了解如何使用Spring框架的注解配置简化消息队列的管理,同时提升系统的可扩展性和维护性。


基于注解的声明队列交换机

利用SpringAMQP声明DirectExchange并与队列绑定

需求如下:

  1. 在consumer服务中,声明队列direct.queue1和direct.queue2
  2. 在consumer服务中,声明交换机hmall.direct,将两个队列与其绑定
  3. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

SpringCloud系列教程:微服务的未来(二十五)-基于注解的声明队列交换机、消息转换器、业务改造

基于Bean声明队列和交换机代码如下:

package com.itheima.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectConfiguration {
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("hmall.direct")
    }
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queuue1");
    }
    @Bean
    public Binding directQueue1bindingRed( Queue directQueue1, DirectExchange directExchange ){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
    @Bean
    public Binding directQueue1bindingBlue( Queue directQueue1, DirectExchange directExchange ){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }
    @Bean
    public Queue directQueue2(){
        return new Queue("direct.queuue2");
    }
    @Bean
    public Binding directQueue2bindingRed( Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    }
    @Bean
    public Binding directQueue2bindingYellow( Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    }
}

SpringAMOP还提供了基于@RabbitListener注解来声明队列和交换机的方式

@RabbitListener(bindings =@QueueBinding(
	value = @Queue(name =direct.queue1),
	exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
	key = {"red","blue"}
))
public void listenDirectQueuel(string msg){
	System.out.println("消费者1接收到Direct消息:【+msg+"】");
}

接收者代码如下:

	@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1",durable = "true"),
            exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public void listenDirectQueue1(String message)throws Exception {
        log.info("消费者1监听到direct.queue2的消息,["+message+"]");
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2",durable = "true"),
            exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))

消息转换器

消息转换器

需求:测试利用SpringAMQP发送对象类型的消息

  • 声明一个队列,名为object.queue
  • 编写单元测试,向队列中直接发送一条消息,消息类型为Map
  • 在控制台查看消息,总结你能发现的问题
    // 准备消息
    Mapmsg = new HashMap();
    msg.put("name","Jack");
    msg.put("age",21);
    

    创建队列object.queue

    SpringCloud系列教程:微服务的未来(二十五)-基于注解的声明队列交换机、消息转换器、业务改造

    测试代码如下:

    	@Test
        public void TestSendObject(){
            Map msg = new HashMap();
            msg.put("name", "Jack");
            msg.put("age", 18);
            //3.发送消息 参数分别是:交换机名称、RoutingKey(暂时为空)、消息
            rabbitTemplate.convertAndSend("object.queue",msg);
        }
    

    在控制台上找到object.queue中得到消息

    SpringCloud系列教程:微服务的未来(二十五)-基于注解的声明队列交换机、消息转换器、业务改造

    Spring的对消息对象的处理是由org.springframework.amgp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于IDK的ObjectOutputStream完成序列化。存在下列问题:

    • JDK的序列化有安全风险
    • JDK序列化的消息太大
    • JDK序列化的消息可读性差

      建议采用JSON序列化代替默认的JDK序列化,要做两件事情:

      在publisher和consumer中都要引入jackson依赖:

      	com.fasterxml.jackson.core
      	jackson-databind
      
      

      在publisher和consumer中都要配置Messageconverter:

      @Bean
      public MessageConverter messageConverter(){
      	return new Jackson2JsonMessageConverter();
      }
      

      SpringCloud系列教程:微服务的未来(二十五)-基于注解的声明队列交换机、消息转换器、业务改造

      SpringCloud系列教程:微服务的未来(二十五)-基于注解的声明队列交换机、消息转换器、业务改造

      消费者代码:

      	@RabbitListener(queues = "object.queue")
          public void listenObjectQueue(Map msg)throws Exception {
              log.info("消费者监听到pbject.queue的消息,["+msg+"]");
          }
      

      SpringCloud系列教程:微服务的未来(二十五)-基于注解的声明队列交换机、消息转换器、业务改造

      运行结果如下:

      SpringCloud系列教程:微服务的未来(二十五)-基于注解的声明队列交换机、消息转换器、业务改造

      业务改造

      需求:改造余额支付功能,不再同步调用交易服务的0penFeign接口,而是采用异步MO通知交易服务更新订单状态。

      SpringCloud系列教程:微服务的未来(二十五)-基于注解的声明队列交换机、消息转换器、业务改造

      在trade-service微服务消费者配置和pay-service微服务发送者都配置MQ依赖

      	
        
            org.springframework.boot
            spring-boot-starter-amqp
        
      

      在trade-service微服务和pay-service微服务添加上RabbitMQ配置信息

      spring:
        rabbitmq:
          host: 192.168.244.136
          port: 5672
          virtual-host: /hmall
          username: hmall
          password: 1234
      

      因为消费者和发送者都需要消息转换器,故直接将代码写到hm-common服务中,在config包中创建MqConfig类

      package com.hmall.common.config;
      import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
      import org.springframework.amqp.support.converter.MessageConverter;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      @Configuration
      public class MqConfig {
          @Bean
          public MessageConverter messageConverter() {
              return new Jackson2JsonMessageConverter();
          }
      }
      

      同时trade-service微服务和pay-service微服务是无法自动扫描到该类,采用SpringBoot自动装配的原理,在resource文件夹下的META-INF文件夹下的spring.factories文件中添加类路径:

      SpringCloud系列教程:微服务的未来(二十五)-基于注解的声明队列交换机、消息转换器、业务改造

      在接收者trade-service微服务中创建PayStatusListener

      package com.hmall.trade.listener;
      import com.hmall.trade.service.IOrderService;
      import lombok.RequiredArgsConstructor;
      import org.springframework.amqp.rabbit.annotation.Exchange;
      import org.springframework.amqp.rabbit.annotation.Queue;
      import org.springframework.amqp.rabbit.annotation.QueueBinding;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;
      @Component
      @RequiredArgsConstructor
      public class PayStatusListener {
          private final IOrderService orderService;
          @RabbitListener(bindings = @QueueBinding(
                  value = @Queue("trade.pay.success.queue"),
                  exchange = @Exchange(value = "pay.direct"),
                  key = "pay.success"
          ))
          public void ListenPaySuccess(Long orderId) {
              orderService.markOrderPaySuccess(orderId);
          }
      }
      

      修改pay-service服务下的com.hmall.pay.service.impl.PayOrderServiceImpl类中的tryPayOrderByBalance方法:

      @Service
      @RequiredArgsConstructor
      @Slf4j
      public class PayOrderServiceImpl extends ServiceImpl implements IPayOrderService {
      	private final RabbitTemplate rabbitTemplate;
      	...
      	@Override
      	@Transactional
      	public void tryPayOrderByBalance(PayOrderDTO payOrderDTO) {
          	// 1.查询支付单
         		PayOrder po = getById(payOrderDTO.getId());
         		// 2.判断状态
          	if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){
              	// 订单不是未支付,状态异常
              	throw new BizIllegalException("交易已支付或关闭!");
          	}
          	// 3.尝试扣减余额
          	userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());
          	// 4.修改支付单状态
          	boolean success = markPayOrderSuccess(payOrderDTO.getId(), LocalDateTime.now());
          	if (!success) {
              	throw new BizIllegalException("交易已支付或关闭!");
          	}
          	// 5.修改订单状态
          	// tradeClient.markOrderPaySuccess(po.getBizOrderNo());
          	try {
              	rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());
          	} catch (Exception e) {
              	log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);
          	}
      	}
      }
      

      SpringCloud系列教程:微服务的未来(二十五)-基于注解的声明队列交换机、消息转换器、业务改造

      SpringCloud系列教程:微服务的未来(二十五)-基于注解的声明队列交换机、消息转换器、业务改造


      总结

      本文介绍了基于Spring框架的注解方式来配置消息队列、交换机以及消息转换器的实现方法。通过注解配置,开发者可以更轻松地创建和管理RabbitMQ等消息队列的组件,而无需过多的 XML 配置或繁琐的手动配置。具体来说,我们探讨了如何:

      使用 @RabbitListener 和 @EnableRabbit 注解配置消息监听器和消息队列。

      配置消息转换器,特别是如何通过 Jackson2JsonMessageConverter 将消息转换为JSON格式,从而实现数据的序列化与反序列化。

      结合业务需求,讲解如何对现有系统进行改造,集成消息队列,实现异步处理和服务解耦。

      通过这些配置和改造,系统的消息处理能力得到了增强,性能和可扩展性也得到了显著提升。消息队列的使用不仅能够减少服务之间的紧耦合,还能够通过异步方式提高系统的响应速度和吞吐量。

      希望本博客能够帮助您理解Spring在消息队列方面的强大功能,并为您的业务应用提供参考。随着系统复杂度的增加,合理的使用消息队列将成为构建高可用、高性能系统的关键之一。

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码