详解RabbitMQ工作模式之RPC通信模式

06-02 1067阅读

详解RabbitMQ工作模式之RPC通信模式

目录

RPC通信模式

概述

工作流程

特点

应用场景

代码案例

引入依赖

常量类

编写客户端代码

编写服务端代码

运行程序(先运行客户端,再运行服务端)


RPC通信模式

概述

在RabbitMQ中,RPC模式通过消息队列实现远程调用功能。客户端(生产者)发送消息到消费队列,服务端(消费者)进行消息消费并执行相应的程序,然后将结果发送到回调队列供客户端使用。这是一种双向的生产消费模式,其中客户端既是生产者又是消费者,服务端则专注于处理消息并生成响应。

详解RabbitMQ工作模式之RPC通信模式

在RPC通信的过程中, 没有⽣产者和消费者, ⽐较像咱们RPC远程调⽤, ⼤概就是通过两个队列实现了⼀个可回调的过程.

工作流程

详解RabbitMQ工作模式之RPC通信模式

1.客户端发送请求:

客户端连接到RabbitMQ服务器。

客户端声明一个用于发送RPC请求的队列(通常是固定的,如rpc_queue)。

客户端创建一个临时的回调队列,并在发送请求时,将回调队列的名称作为消息属性(reply_to)发送给交换机。

客户端为每个请求生成一个唯一的correlation_id,并将其作为消息属性发送,以便在接收响应时能够匹配请求与响应。

2.交换机路由请求:

交换机接收到RPC请求后,根据路由键将请求路由到服务端监听的队列。

3.服务端处理请求:

服务端(消费者)从队列中接收请求。

服务端处理请求,并生成响应。

服务端将响应发送到客户端指定的回调队列,并在消息属性中设置相同的correlation_id。

4.客户端接收响应:

客户端监听其回调队列以接收响应。

当接收到响应时,客户端检查correlation_id以确定响应是否与之前的请求匹配。

如果匹配,客户端处理响应;如果不匹配,客户端可能丢弃该响应。

特点

1.解耦:客户端和服务端之间不需要直接通信,降低了系统间的耦合度。

2.灵活性:支持多种语言和平台之间的远程调用。

3.可扩展性:通过增加服务端(消费者)的数量,可以轻松扩展RPC服务。

4.性能开销:由于涉及到网络传输和消息队列的处理,RPC调用的性能通常低于本地调用。

5.复杂性:需要处理消息队列的可靠性、持久性、消息确认等复杂问题。

6.安全性:远程调用可能面临更多的安全风险,如消息篡改、中间人攻击等。

应用场景

RabbitMQ的RPC通信模式适用于需要远程调用服务的场景,如分布式系统中的服务调用、微服务架构中的服务通信等。通过RabbitMQ的消息队列机制,可以实现跨系统、跨语言的远程调用,提高系统的灵活性和可扩展性。

代码案例
引入依赖

    com.rabbitmq
    amqp-client
    5.21.0
常量类
public class Constants {
    public static final String HOST = "47.98.109.138";
    public static final int PORT = 5672;
    public static final String USER_NAME = "study";
    public static final String PASSWORD = "study";
    public static final String VIRTUAL_HOST = "aaa";
    //rpc 模式
    public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
    public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";
}
编写客户端代码
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
/**
 * rpc 客户端
 * 1. 发送请求
 * 2. 接收响应
 */
public class RpcClient {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
        //3. 发送请求
        String msg = "hello rpc...";
        //设置请求的唯一标识
        String correlationID = UUID.randomUUID().toString();
        //设置请求的相关属性
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .correlationId(correlationID)
                .replyTo(Constants.RPC_RESPONSE_QUEUE)
                .build();
        channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());
        //4. 接收响应
        //使用阻塞队列, 来存储响应信息
        final BlockingQueue response = new ArrayBlockingQueue(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String respMsg = new String(body);
                System.out.println("接收到回调消息: "+ respMsg);
                if (correlationID.equals(properties.getCorrelationId())){
                    //如果correlationID校验一致
                    response.offer(respMsg);
                }
            }
        };
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
        String result = response.take();
        System.out.println("[RPC Client 响应结果]:"+ result);
    }
}
编写服务端代码
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * RPC server
 * 1. 接收请求
 * 2. 发送响应
 */
public class RpcServer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 接收请求
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String request = new String(body,"UTF-8");
                System.out.println("接收到请求:"+ request);
                String response = "针对request:"+ request +", 响应成功";
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
    }
}
运行程序(先运行客户端,再运行服务端)

可以在管理界面看到其中一个队列中有1条消息

详解RabbitMQ工作模式之RPC通信模式

详解RabbitMQ工作模式之RPC通信模式

详解RabbitMQ工作模式之RPC通信模式

我们可以看到,服务端接收到了消息并给客户端发送了响应,与预期符合。

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码