默契之舞 之 生产者消费者模式(RabbitMQ)

06-01 1228阅读

Hi~!这里是奋斗的明志,很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~~

🌱🌱个人主页:奋斗的明志

🌱🌱所属专栏:RabbitMQ

📚本系列文章为个人学习笔记,在这里撰写成文一为巩固知识,二为展示我的学习过程及理解。文笔、排版拙劣,望见谅。

默契之舞 之 生产者消费者模式(RabbitMQ)

生产者消费者

  • 一、RabbitMQ 快速入门
    • 1、创建一个空的项目
    • 2、引入依赖
    • 二、编写生产者代码
    • 三、编写消费者代码
    • 四、常见报错异常类型
      • 1、关闭顺序调换
      • 2、队列不存在
      • 3、端口号错误
      • 4、检查账号密码是否错误
      • 5、检查虚拟机
      • 五、小结

        一、RabbitMQ 快速入门

        1、创建一个空的项目

        默契之舞 之 生产者消费者模式(RabbitMQ)


        默契之舞 之 生产者消费者模式(RabbitMQ)

        2、引入依赖

        默契之舞 之 生产者消费者模式(RabbitMQ)


        默契之舞 之 生产者消费者模式(RabbitMQ)


        默契之舞 之 生产者消费者模式(RabbitMQ)


        
            com.rabbitmq
            amqp-client
            5.20.0
        
        

        默契之舞 之 生产者消费者模式(RabbitMQ)

        二、编写生产者代码

        建立连接需要信息

        1. ip
        2. 端口号
        3. 账号
        4. 密码
        5. 虚拟主机

        默契之舞 之 生产者消费者模式(RabbitMQ)


        默契之舞 之 生产者消费者模式(RabbitMQ)


        生产者和消费者创建的 channel 并不是同⼀个

        //2.开启信道
        Channel channel = connection.createChannel();
        //3.生产者需要声明一个交换机
        //rabbitmq 在创建的时候就有默认的交换机
        //使用内置的交换机
        

        默契之舞 之 生产者消费者模式(RabbitMQ)

        • 声明队列
        • 例如: 如果有⼀个名为 “hello” 的队列, 生产者可以直接发送消息到 “hello” 队列, 而消费者可以从

          “hello” 队列中接收消息, 而不需要关心交换机的存在. 这种模式非常适合简单的应用场景,其中生产者和消费者之间的通信是⼀对⼀的.

          默契之舞 之 生产者消费者模式(RabbitMQ)

           //4.声明队列
           /**
            * AMQP.Queue.DeclareOk queueDeclare
            * (String var1, boolean var2, boolean var3, boolean var4, Map var5)
            * throws IOException;
            *
            * 参数说明:
            * var1 :队列名称(queue)
            * var2 :可持久化 (durable)
            * var3:是否独占(exclusive)
            * var4:是否自动删除(autoDelete)
            * var5: 参数
           */
          channel.queueDeclare("hello",true,false,false,null);
          
          • 生产者完整代码
            package rabbitmq;
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            import java.io.IOException;
            import java.util.concurrent.TimeoutException;
            /**
             * 生产者
             */
            public class ProducerDemo {
                public static void main(String[] args) throws IOException, TimeoutException {
                    //1.建立连接
                    //创建一个连接工厂
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    //设置ip 此时 rabbitmq 在云服务器上,我们通过本地去运行的,需要公网IP,需要提前开放端口号
                    connectionFactory.setHost("123.57.16.61");
                    //设置端口号 默认的端口号 5672
                    connectionFactory.setPort(5672);
                    //设置账号
                    connectionFactory.setUsername("study");
                    //设置密码
                    connectionFactory.setPassword("study");
                    //设置虚拟机
                    connectionFactory.setVirtualHost("bite");
                    //在这个工厂里面拿到一个连接
                    Connection connection = connectionFactory.newConnection();
                    //2.开启信道
                    Channel channel = connection.createChannel();
                    //3.生产者需要声明一个交换机
                    //rabbitmq 在创建的时候就有默认的交换机
                    //使用内置的交换机
                    //4.声明队列
                    /**
                     * AMQP.Queue.DeclareOk queueDeclare
                     * (String var1, boolean var2, boolean var3, boolean var4, Map var5)
                     * throws IOException;
                     *
                     * 参数说明:
                     * var1 :队列名称(queue)
                     * var2 :可持久化 (durable)
                     * var3:是否独占(exclusive)
                     * var4:是否自动删除(autoDelete)
                     * var5: 参数
                     */
                    channel.queueDeclare("hello",true,false,false,null);
                    //交换机与队列之间的绑定关系
                    //内置交换机都有自己的绑定关系
                    //5.发送消息
                    //依然用 channel 进行发送
                    /**
                     * void basicPublish
                     * (String var1, String var2, AMQP.BasicProperties var3, byte[] var4)
                     * throws IOException;
                     * 参数说明:
                     * var1:交换机的名称 (exchane)
                     * var2:内置交换机(routingKey)和队列名称保持一致,
                     * var3:属性配置
                     * var4:消息
                     */
                    String mag = "hello rabbitmq~";
                    channel.basicPublish("","hello",null,mag.getBytes());
                    //6.资源的释放
                    channel.close();
                    connection.close();
                }
            }
            

            默契之舞 之 生产者消费者模式(RabbitMQ)

            • 观察界面(发送信息成功)

              默契之舞 之 生产者消费者模式(RabbitMQ)

              如果在代码中注掉资源释放的代码, 在Connections和Channels也可以看到相关信息

              默契之舞 之 生产者消费者模式(RabbitMQ)


              默契之舞 之 生产者消费者模式(RabbitMQ)


              默契之舞 之 生产者消费者模式(RabbitMQ)


              默契之舞 之 生产者消费者模式(RabbitMQ)

              三、编写消费者代码

              消费者代码和⽣产者前3步都是⼀样的, 第4步改为消费当前队列

              1. 创建连接
              2. 创建Channel
              3. 声明⼀个队列Queue
              4. 消费消息
              5. 释放资源

              消费当前队列

              basicConsume:

              /*
              2 basicConsume(String queue, boolean autoAck, Consumer callback)
              参数:
              1. queue: 队列名称
              2. autoAck: 是否⾃动确认, 消费者收到消息之后,⾃动和MQ确认
              3. callback: 回调对象
               */
              String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
              

              Consumer:

              Consumer ⽤于定义消息消费者的⾏为. 当我们需要从RabbitMQ接收消息时, 需要提供⼀个实现了**Consumer** 接⼝的对象.

              DefaultConsumer 是 **RabbitMQ**提供的⼀个默认消费者, 实现了Consumer 接⼝.

              核心方法:

              handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) : 从队列接收到消息时, 会自动动调用该方法.

              在这个方法中, 我们可以定义如何处理接收到的消息, 例如打印消息内容, 处理业务逻辑或者将消息存储到数据库等.

              参数说明如下:

              ▪ consumerTag : 消费者标签, 通常是消费者在订阅队列时指定的.

              ▪ envelope : 包含消息的封包信息,如队列名称, 交换机等.

              ▪ properties : ⼀些配置信息

              ▪ body : 消息的具体内容

              //6. 接收消息, 并消费
              /*
              basicConsume(String queue, boolean autoAck, Consumer callback)
              参数:
              1. queue: 队列名称
              2. autoAck: 是否⾃动确认, 消费者收到消息之后,⾃动和MQ确认
              3. callback: 回调对象
               */
              DefaultConsumer consumer = new DefaultConsumer(channel) {
               /*
               回调⽅法, 当收到消息后, 会⾃动执⾏该⽅法
               1. consumerTag: 标识
               2. envelope: 获取⼀些信息, 交换机, 路由key
              3. properties:配置信息
              4. body:数据
              */
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   System.out.println("接收到消息: " + new String(body));
                 }
              };
              channel.basicConsume("hello", true, consumer);
              

              运行代码, 观察结果

              默契之舞 之 生产者消费者模式(RabbitMQ)


              默契之舞 之 生产者消费者模式(RabbitMQ)

              • 完整代码
                package rabbitmq;
                import com.rabbitmq.client.*;
                import java.io.IOException;
                import java.util.concurrent.TimeoutException;
                public class ConsumerDemo {
                    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
                        //创建一个工厂连接
                        ConnectionFactory connectionFactory = new ConnectionFactory();
                        connectionFactory.setHost("123.57.16.61");//设置公网ip
                        connectionFactory.setPort(5672);
                        connectionFactory.setUsername("study");
                        connectionFactory.setPassword("study");
                        connectionFactory.setVirtualHost("bite");//设置虚拟机
                        Connection connection = connectionFactory.newConnection();
                        //创建一个通信
                        Channel channel = connection.createChannel();
                        //声明一个队列(可以省略)
                        channel.queueDeclare("hello",true,false,false,null);
                        //4. 消费消息
                        /**
                         * basicConsume(String queue, boolean autoAck, Consumer callback)
                         * 参数说明:
                         * queue: 队列名称
                         * autoAck: 是否自动确认
                         * callback: 接收到消息后, 执行的逻辑
                         */
                        DefaultConsumer consumer = new DefaultConsumer(channel){
                            @Override
                            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                                System.out.println("接收到消息:"+ new String(body));
                            }
                        };
                        channel.basicConsume("hello", true, consumer);
                        //等待程序执行完成
                //        Thread.sleep(2000);
                        //5. 释放资源
                //        channel.close();
                //        connection.close();
                    }
                }
                

                四、常见报错异常类型

                1、关闭顺序调换

                默契之舞 之 生产者消费者模式(RabbitMQ)

                2、队列不存在

                默契之舞 之 生产者消费者模式(RabbitMQ)

                3、端口号错误

                默契之舞 之 生产者消费者模式(RabbitMQ)

                4、检查账号密码是否错误

                默契之舞 之 生产者消费者模式(RabbitMQ)

                5、检查虚拟机

                默契之舞 之 生产者消费者模式(RabbitMQ)

                五、小结

                生产者-消费者模型是一种常见的消息队列应用场景,尤其在异步处理、解耦和提高系统可扩展性方面非常有效。使用 RabbitMQ 实现生产者-消费者模式可以让我们在不同的服务或系统之间解耦,提高系统的可靠性和性能。

                默契之舞 之 生产者消费者模式(RabbitMQ)

                默契之舞 之 生产者消费者模式(RabbitMQ)

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码