RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)

06-01 1066阅读

Hi~!这里是奋斗的明志,很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~~

🌱🌱个人主页:奋斗的明志

🌱🌱所属专栏:RabbitMQ

📚本系列文章为个人学习笔记,在这里撰写成文一为巩固知识,二为展示我的学习过程及理解。文笔、排版拙劣,望见谅。

RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)

发布订阅模式、路由模式、通配符模式

  • 一、Publish/Subscribe(发布/订阅)
    • 1、引入依赖
    • 2、编写配置类
    • 3、编写生产者代码
    • 4、编写消费者代码
    • 二、Routing (路由模式)
      • 1、引入依赖
      • 2、编写配置类
      • 3、编写生产者代码
      • 4、编写消费者代码
      • 三、Topics (通配符模式)
        • 1、引入依赖
        • 2、编写配置类
        • 3、编写生产者代码
        • 4、编写消费者代码

          一、Publish/Subscribe(发布/订阅)

          在发布/订阅模型中,多了一个Exchange角色.

          Exchange 常见有三种类型, 分别代表不同的路由规则

          a) Fanout:广播,将消息交给所有绑定到交换机的队列 (Publish/Subscribe模式)

          b) Direct:定向,把消息交给符合指定routing key的队列(Routing模式)

          c) Topic:通配符,把消息交给符合routing pattern(路由模式)的队列(Topics模式)

          也就分别对应不同的工作模式

          RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)

          1、引入依赖


                  
                  
                      com.rabbitmq
                      amqp-client
                      5.20.0
                  
          
          

          2、编写配置类

          package rabbitmq.constant;
          public class Constants {
              public static final String HOST = "123.57.16.61";
              public static final Integer PORT = 5672;
              public static final String USERNAME = "study";
              public static final String PASSWORD = "study";
              public static final String VIRTUAL_HOST = "bite";
              //发布订阅模式
              public static final String FANOUT_EXCHANGE = "fanout.exchange";
              public static final String FANOUT_QUEUE1 = "fanout.queue1";
              public static final String FANOUT_QUEUE2 = "fanout.queue2";
          }
          

          3、编写生产者代码

          package rabbitmq.fanout;
          import com.rabbitmq.client.BuiltinExchangeType;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.rabbitmq.client.ConnectionFactory;
          import rabbitmq.constant.Constants;
          import java.io.IOException;
          import java.util.concurrent.TimeoutException;
          public class Producer {
              public static void main(String[] args) throws IOException, TimeoutException {
                  ConnectionFactory connectionFactory = new ConnectionFactory();
                  connectionFactory.setHost(Constants.HOST);
                  connectionFactory.setPort(Constants.PORT);
                  connectionFactory.setUsername(Constants.USERNAME);
                  connectionFactory.setPassword(Constants.PASSWORD);
                  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
                  Connection connection = connectionFactory.newConnection();
                  Channel channel = connection.createChannel();
                  //声明交换机
                  /**
                   * 交换机名称,交换机类型,开启可持久化(关机数据不会丢失)
                   */
                  channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
                  //声明队列
                  //queueDeclare 队列声明
                  channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
                  channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
                  //交换机和队列进行绑定
                  channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");
                  channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");
                  //发布消息
                  String msg = "hello fanout...";
                  //basicPublish (基础发布)
                  channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes());
                  System.out.println("消息发送成功!!!");
                  //关闭资源
                  channel.close();
                  connection.close();
              }
          }
          

          点击运行:

          RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)


          RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)


          RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)

          4、编写消费者代码

          消费者1:

          package rabbitmq.fanout;
          import com.rabbitmq.client.*;
          import rabbitmq.constant.Constants;
          import java.io.IOException;
          import java.util.concurrent.TimeoutException;
          public class Consumer1 {
              public static void main(String[] args) throws IOException, TimeoutException {
                  ConnectionFactory connectionFactory = new ConnectionFactory();
                  connectionFactory.setHost(Constants.HOST);
                  connectionFactory.setPort(Constants.PORT);
                  connectionFactory.setUsername(Constants.USERNAME);
                  connectionFactory.setPassword(Constants.PASSWORD);
                  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
                  Connection connection = connectionFactory.newConnection();
                  Channel channel = connection.createChannel();
                  //声明队列
                  //queueDeclare 队列声明 (也可以省略)
                  channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
                  //消费消息
                  DefaultConsumer consumer = new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          System.out.println("接收到消息:" + new String(body));
                      }
                  };
                  channel.basicConsume(Constants.FANOUT_QUEUE1,true,consumer);
              }
          }
          

          消费者2:

          package rabbitmq.fanout;
          import com.rabbitmq.client.*;
          import rabbitmq.constant.Constants;
          import java.io.IOException;
          import java.util.concurrent.TimeoutException;
          public class Consumer2 {
              public static void main(String[] args) throws IOException, TimeoutException {
                  ConnectionFactory connectionFactory = new ConnectionFactory();
                  connectionFactory.setHost(Constants.HOST);
                  connectionFactory.setPort(Constants.PORT);
                  connectionFactory.setUsername(Constants.USERNAME);
                  connectionFactory.setPassword(Constants.PASSWORD);
                  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
                  Connection connection = connectionFactory.newConnection();
                  Channel channel = connection.createChannel();
                  //声明队列
                  //queueDeclare 队列声明 (也可以省略)
                  channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
                  //消费消息
                  DefaultConsumer consumer = new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          System.out.println("接收到消息:" + new String(body));
                      }
                  };
                  channel.basicConsume(Constants.FANOUT_QUEUE2,true,consumer);
              }
          }
          

          RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)

          二、Routing (路由模式)

          队列和交换机的绑定, 不能是任意的绑定了, 而是要指定⼀个BindingKey(RoutingKey的⼀种)

          消息的发送方在向 Exchange 发送消息时, 也需要指定消息的 RoutingKey

          Exchange也不再把消息交给每⼀个绑定的key, 而是根据消息的RoutingKey进行判断, 只有队列绑定时的BindingKey和发送消息的RoutingKey 完全⼀致, 才会接收到消息

          RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)

          1、引入依赖


                  
                  
                      com.rabbitmq
                      amqp-client
                      5.20.0
                  
          
          

          2、编写配置类

          package rabbitmq.constant;
          public class Constants {
              public static final String HOST = "123.57.16.61";
              public static final Integer PORT = 5672;
              public static final String USERNAME = "study";
              public static final String PASSWORD = "study";
              public static final String VIRTUAL_HOST = "bite";
              //路由模式
              public static final String DIRECT_EXCHANGE = "direct.exchange";
              public static final String DIRECT_QUEUE1 = "direct.queue1";
              public static final String DIRECT_QUEUE2 = "direct.queue2";
          }
          

          3、编写生产者代码

          package rabbitmq.direct;
          import com.rabbitmq.client.BuiltinExchangeType;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.rabbitmq.client.ConnectionFactory;
          import rabbitmq.constant.Constants;
          import java.io.IOException;
          import java.util.concurrent.TimeoutException;
          public class Producer {
              public static void main(String[] args) throws IOException, TimeoutException {
                  ConnectionFactory connectionFactory = new ConnectionFactory();
                  connectionFactory.setHost(Constants.HOST);
                  connectionFactory.setPort(Constants.PORT);
                  connectionFactory.setUsername(Constants.USERNAME);
                  connectionFactory.setPassword(Constants.PASSWORD);
                  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
                  Connection connection = connectionFactory.newConnection();
                  Channel channel = connection.createChannel();
                  //声明交换机
                  /**
                   * 交换机名称,交换机类型,开启可持久化(关机数据不会丢失)
                   */
                  channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
                  //声明队列
                  //queueDeclare 队列声明
                  channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
                  channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);
                  //交换机和队列进行绑定
                  channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");
                  channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");
                  channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");
                  channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");
                  //发布消息
                  String msg_a = "hello direct, my routingkey is a...";
                  //basicPublish (基础发布)
                  channel.basicPublish(Constants.DIRECT_EXCHANGE, "", null, msg_a.getBytes());
                  String msg_b = "hello direct, my routingkey is b...";
                  channel.basicPublish(Constants.DIRECT_EXCHANGE, "", null, msg_b.getBytes());
                  String msg_c = "hello direct, my routingkey is c...";
                  channel.basicPublish(Constants.DIRECT_EXCHANGE, "", null, msg_c.getBytes());
                  System.out.println("消息发送成功!!!");
                  //关闭资源
                  channel.close();
                  connection.close();
              }
          }
          

          RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)


          RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)


          RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)


          RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)


          RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)

          4、编写消费者代码

          消费者1:

          package rabbitmq.direct;
          import com.rabbitmq.client.*;
          import rabbitmq.constant.Constants;
          import java.io.IOException;
          import java.util.concurrent.TimeoutException;
          public class Consumer1 {
              public static void main(String[] args) throws IOException, TimeoutException {
                  ConnectionFactory connectionFactory = new ConnectionFactory();
                  connectionFactory.setHost(Constants.HOST);
                  connectionFactory.setPort(Constants.PORT);
                  connectionFactory.setUsername(Constants.USERNAME);
                  connectionFactory.setPassword(Constants.PASSWORD);
                  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
                  Connection connection = connectionFactory.newConnection();
                  Channel channel = connection.createChannel();
                  //声明队列
                  //queueDeclare 队列声明 (也可以省略)
                  channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
                  //消费消息
                  DefaultConsumer consumer = new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          System.out.println("接收到消息:" + new String(body));
                      }
                  };
                  channel.basicConsume(Constants.DIRECT_QUEUE1,true,consumer);
              }
          }
          

          RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)

          消费者2:

          package rabbitmq.direct;
          import com.rabbitmq.client.*;
          import rabbitmq.constant.Constants;
          import java.io.IOException;
          import java.util.concurrent.TimeoutException;
          public class Consumer2 {
              public static void main(String[] args) throws IOException, TimeoutException {
                  ConnectionFactory connectionFactory = new ConnectionFactory();
                  connectionFactory.setHost(Constants.HOST);
                  connectionFactory.setPort(Constants.PORT);
                  connectionFactory.setUsername(Constants.USERNAME);
                  connectionFactory.setPassword(Constants.PASSWORD);
                  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
                  Connection connection = connectionFactory.newConnection();
                  Channel channel = connection.createChannel();
                  //声明队列
                  //queueDeclare 队列声明 (也可以省略)
                  channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);
                  //消费消息
                  DefaultConsumer consumer = new DefaultConsumer(channel){
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          System.out.println("接收到消息:" + new String(body));
                      }
                  };
                  channel.basicConsume(Constants.DIRECT_QUEUE2,true,consumer);
              }
          }
          

          RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)

          三、Topics (通配符模式)

          Topics 和Routing模式的区别是:

          1. topics 模式使用的交换机类型为topic(Routing模式使用的交换机类型为direct)
          2. topic 类型的交换机在匹配规则上进行了扩展, Binding Key⽀持通配符匹配(direct类型的交换机路由规则是BindingKey和RoutingKey完全匹配)

          RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)

          在topic类型的交换机在匹配规则上, 有些要求:

          1. RoutingKey 是⼀系列由点( . )分隔的单词, ⽐如 " stock.usd.nyse ", " nyse.vmw ",

            " quick.orange.rabbit "

          2. BindingKey 和RoutingKey⼀样, 也是点( . )分割的字符串.
          3. Binding Key中可以存在两种特殊字符串, 用于模糊匹配
          • * 表⽰⼀个单词 # 表⽰多个单词(0-N个)
            

            比如:

            • Binding Key 为"d.a.b" 会同时路由到Q1 和Q2

            • Binding Key 为"d.a.f" 会路由到Q1

            • Binding Key 为"c.e.f" 会路由到Q2

            • Binding Key 为"d.b.f" 会被丢弃, 或者返回给⽣产者(需要设置mandatory参数)

            1、引入依赖


                    
                    
                        com.rabbitmq
                        amqp-client
                        5.20.0
                    
            
            

            2、编写配置类

            package rabbitmq.constant;
            public class Constants {
                public static final String HOST = "123.57.16.61";
                public static final Integer PORT = 5672;
                public static final String USERNAME = "study";
                public static final String PASSWORD = "study";
                public static final String VIRTUAL_HOST = "bite";
                //通配符模式
                public static final String TOPIC_EXCHANGE = "topic.exchange";
                public static final String TOPIC_QUEUE1 = "topic.queue1";
                public static final String TOPIC_QUEUE2 = "topic.queue2";
            

            3、编写生产者代码

            package rabbitmq.topic;
            import com.rabbitmq.client.BuiltinExchangeType;
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            import rabbitmq.constant.Constants;
            import java.io.IOException;
            import java.util.concurrent.TimeoutException;
            public class Producer {
                public static void main(String[] args) throws IOException, TimeoutException {
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost(Constants.HOST);
                    connectionFactory.setPort(Constants.PORT);
                    connectionFactory.setUsername(Constants.USERNAME);
                    connectionFactory.setPassword(Constants.PASSWORD);
                    connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
                    Connection connection = connectionFactory.newConnection();
                    Channel channel = connection.createChannel();
                    //声明交换机
                    /**
                     * 交换机名称,交换机类型,开启可持久化(关机数据不会丢失)
                     */
                    channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
                    //声明队列
                    //queueDeclare 队列声明
                    channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
                    channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
                    //交换机和队列进行绑定
                    channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");
                    channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");
                    channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");
                    //发布消息
                    String msg_a = "hello topic, my routingkey is ae.a.f...";
                    //basicPublish (基础发布)
                    channel.basicPublish(Constants.TOPIC_EXCHANGE, "ae.a.f", null, msg_a.getBytes());
                    String msg_b = "hello topic, my routingkey is ef.a.b...";
                    channel.basicPublish(Constants.TOPIC_EXCHANGE, "ef.a.b", null, msg_b.getBytes());
                    String msg_c = "hello topic, my routingkey is c.ef.b...";
                    channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.ef.b", null, msg_c.getBytes());
                    System.out.println("消息发送成功!!!");
                    //关闭资源
                    channel.close();
                    connection.close();
                }
            }
            

            4、编写消费者代码

            消费者1:

            package rabbitmq.topic;
            import com.rabbitmq.client.*;
            import rabbitmq.constant.Constants;
            import java.io.IOException;
            import java.util.concurrent.TimeoutException;
            public class Consumer1 {
                public static void main(String[] args) throws IOException, TimeoutException {
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost(Constants.HOST);
                    connectionFactory.setPort(Constants.PORT);
                    connectionFactory.setUsername(Constants.USERNAME);
                    connectionFactory.setPassword(Constants.PASSWORD);
                    connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
                    Connection connection = connectionFactory.newConnection();
                    Channel channel = connection.createChannel();
                    //声明队列
                    //queueDeclare 队列声明 (也可以省略)
                    channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
                    //消费消息
                    DefaultConsumer consumer = new DefaultConsumer(channel){
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                            System.out.println("接收到消息:" + new String(body));
                        }
                    };
                    channel.basicConsume(Constants.TOPIC_QUEUE1,true,consumer);
                }
            }
            

            RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)

            消费者2:

            package rabbitmq.topic;
            import com.rabbitmq.client.*;
            import rabbitmq.constant.Constants;
            import java.io.IOException;
            import java.util.concurrent.TimeoutException;
            public class Consumer2 {
                public static void main(String[] args) throws IOException, TimeoutException {
                    ConnectionFactory connectionFactory = new ConnectionFactory();
                    connectionFactory.setHost(Constants.HOST);
                    connectionFactory.setPort(Constants.PORT);
                    connectionFactory.setUsername(Constants.USERNAME);
                    connectionFactory.setPassword(Constants.PASSWORD);
                    connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
                    Connection connection = connectionFactory.newConnection();
                    Channel channel = connection.createChannel();
                    //声明队列
                    //queueDeclare 队列声明 (也可以省略)
                    channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
                    //消费消息
                    DefaultConsumer consumer = new DefaultConsumer(channel){
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                            System.out.println("接收到消息:" + new String(body));
                        }
                    };
                    channel.basicConsume(Constants.TOPIC_QUEUE2,true,consumer);
                }
            }
            

            RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)

            RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)

            RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码