默契之舞 之 生产者消费者模式(RabbitMQ)
Hi~!这里是奋斗的明志,很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~~
🌱🌱个人主页:奋斗的明志
🌱🌱所属专栏:RabbitMQ
📚本系列文章为个人学习笔记,在这里撰写成文一为巩固知识,二为展示我的学习过程及理解。文笔、排版拙劣,望见谅。
生产者消费者
- 一、RabbitMQ 快速入门
- 1、创建一个空的项目
- 2、引入依赖
- 二、编写生产者代码
- 三、编写消费者代码
- 四、常见报错异常类型
- 1、关闭顺序调换
- 2、队列不存在
- 3、端口号错误
- 4、检查账号密码是否错误
- 5、检查虚拟机
- 五、小结
一、RabbitMQ 快速入门
1、创建一个空的项目
2、引入依赖
com.rabbitmq amqp-client 5.20.0
二、编写生产者代码
建立连接需要信息
- ip
- 端口号
- 账号
- 密码
- 虚拟主机
生产者和消费者创建的 channel 并不是同⼀个
//2.开启信道 Channel channel = connection.createChannel(); //3.生产者需要声明一个交换机 //rabbitmq 在创建的时候就有默认的交换机 //使用内置的交换机
- 声明队列
- 例如: 如果有⼀个名为 “hello” 的队列, 生产者可以直接发送消息到 “hello” 队列, 而消费者可以从
“hello” 队列中接收消息, 而不需要关心交换机的存在. 这种模式非常适合简单的应用场景,其中生产者和消费者之间的通信是⼀对⼀的.
//4.声明队列 /** * AMQP.Queue.DeclareOk queueDeclare * (String var1, boolean var2, boolean var3, boolean var4, Map var5) * throws IOException; * * 参数说明: * var1 :队列名称(queue) * var2 :可持久化 (durable) * var3:是否独占(exclusive) * var4:是否自动删除(autoDelete) * var5: 参数 */ channel.queueDeclare("hello",true,false,false,null);
- 生产者完整代码
package rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 生产者 */ public class ProducerDemo { public static void main(String[] args) throws IOException, TimeoutException { //1.建立连接 //创建一个连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置ip 此时 rabbitmq 在云服务器上,我们通过本地去运行的,需要公网IP,需要提前开放端口号 connectionFactory.setHost("123.57.16.61"); //设置端口号 默认的端口号 5672 connectionFactory.setPort(5672); //设置账号 connectionFactory.setUsername("study"); //设置密码 connectionFactory.setPassword("study"); //设置虚拟机 connectionFactory.setVirtualHost("bite"); //在这个工厂里面拿到一个连接 Connection connection = connectionFactory.newConnection(); //2.开启信道 Channel channel = connection.createChannel(); //3.生产者需要声明一个交换机 //rabbitmq 在创建的时候就有默认的交换机 //使用内置的交换机 //4.声明队列 /** * AMQP.Queue.DeclareOk queueDeclare * (String var1, boolean var2, boolean var3, boolean var4, Map var5) * throws IOException; * * 参数说明: * var1 :队列名称(queue) * var2 :可持久化 (durable) * var3:是否独占(exclusive) * var4:是否自动删除(autoDelete) * var5: 参数 */ channel.queueDeclare("hello",true,false,false,null); //交换机与队列之间的绑定关系 //内置交换机都有自己的绑定关系 //5.发送消息 //依然用 channel 进行发送 /** * void basicPublish * (String var1, String var2, AMQP.BasicProperties var3, byte[] var4) * throws IOException; * 参数说明: * var1:交换机的名称 (exchane) * var2:内置交换机(routingKey)和队列名称保持一致, * var3:属性配置 * var4:消息 */ String mag = "hello rabbitmq~"; channel.basicPublish("","hello",null,mag.getBytes()); //6.资源的释放 channel.close(); connection.close(); } }
- 观察界面(发送信息成功)
如果在代码中注掉资源释放的代码, 在Connections和Channels也可以看到相关信息
三、编写消费者代码
消费者代码和⽣产者前3步都是⼀样的, 第4步改为消费当前队列
- 创建连接
- 创建Channel
- 声明⼀个队列Queue
- 消费消息
- 释放资源
消费当前队列
basicConsume:
/* 2 basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue: 队列名称 2. autoAck: 是否⾃动确认, 消费者收到消息之后,⾃动和MQ确认 3. callback: 回调对象 */ String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
Consumer:
Consumer ⽤于定义消息消费者的⾏为. 当我们需要从RabbitMQ接收消息时, 需要提供⼀个实现了**Consumer** 接⼝的对象.
DefaultConsumer 是 **RabbitMQ**提供的⼀个默认消费者, 实现了Consumer 接⼝.
核心方法:
handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) : 从队列接收到消息时, 会自动动调用该方法.
在这个方法中, 我们可以定义如何处理接收到的消息, 例如打印消息内容, 处理业务逻辑或者将消息存储到数据库等.
参数说明如下:
▪ consumerTag : 消费者标签, 通常是消费者在订阅队列时指定的.
▪ envelope : 包含消息的封包信息,如队列名称, 交换机等.
▪ properties : ⼀些配置信息
▪ body : 消息的具体内容
//6. 接收消息, 并消费 /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue: 队列名称 2. autoAck: 是否⾃动确认, 消费者收到消息之后,⾃动和MQ确认 3. callback: 回调对象 */ DefaultConsumer consumer = new DefaultConsumer(channel) { /* 回调⽅法, 当收到消息后, 会⾃动执⾏该⽅法 1. consumerTag: 标识 2. envelope: 获取⼀些信息, 交换机, 路由key 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息: " + new String(body)); } }; channel.basicConsume("hello", true, consumer);
运行代码, 观察结果
- 完整代码
package rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConsumerDemo { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //创建一个工厂连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("123.57.16.61");//设置公网ip connectionFactory.setPort(5672); connectionFactory.setUsername("study"); connectionFactory.setPassword("study"); connectionFactory.setVirtualHost("bite");//设置虚拟机 Connection connection = connectionFactory.newConnection(); //创建一个通信 Channel channel = connection.createChannel(); //声明一个队列(可以省略) channel.queueDeclare("hello",true,false,false,null); //4. 消费消息 /** * basicConsume(String queue, boolean autoAck, Consumer callback) * 参数说明: * queue: 队列名称 * autoAck: 是否自动确认 * callback: 接收到消息后, 执行的逻辑 */ DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息:"+ new String(body)); } }; channel.basicConsume("hello", true, consumer); //等待程序执行完成 // Thread.sleep(2000); //5. 释放资源 // channel.close(); // connection.close(); } }
四、常见报错异常类型
1、关闭顺序调换
2、队列不存在
3、端口号错误
4、检查账号密码是否错误
5、检查虚拟机
五、小结
生产者-消费者模型是一种常见的消息队列应用场景,尤其在异步处理、解耦和提高系统可扩展性方面非常有效。使用 RabbitMQ 实现生产者-消费者模式可以让我们在不同的服务或系统之间解耦,提高系统的可靠性和性能。
- 观察界面(发送信息成功)
- 生产者完整代码