【RabbitMq C++】消息队列组件

06-02 604阅读

RabbitMq 消息队列组件

  • 1. RabbitMq介绍
  • 2. 安装RabbitMQ
  • 3. 安装 RabbitMQ 的 C++客户端库
  • 4. AMQP-CPP 库的简单使用
    • 4.1 使用
      • 4.1.1 TCP 模式
      • 4.1.2 扩展模式
      • 4.2 常用类与接口介绍
        • 4.2.1 Channel
        • 4.3.2 ev
        • 5. RabbitMQ样例编写
          • 5.1 发布消息
          • 5.2 订阅消息

            1. RabbitMq介绍

            RabbitMq - 消息队列组件:实现两个客户端主机之间消息传输的功能(发布&订阅)。

            一端发布消息,一端订阅消息,消息就会被推送到订阅消息那一端然后进行处理。

            RabbitMq遵守AMQP协议(标准的高级消息队列协议)

            AMQP协议核心概念:交换机(交换机类型)、队列,绑定,消息。

            两个客户端之间进行消息传输,一端产生消息另一端接收消息然后处理。按照以前的思想就是两个客户端直接进行网络通信socket,通过网络消息将一条消息发送给对方让对方进行处理,这是一种最基础数据传输过程。

            【RabbitMq C++】消息队列组件

            但是这种消息传输是存在缺陷的!如果有一端连接断开了,那另一端消息到底还发不发,是等,还是将这条消息丢弃掉。如果一直等,新产生的消息又该怎么办,总不能一直存着。所以这种安全性是很低的。而且一对一这种客户端里面,通常数据的产生和数据的处理所消耗的时间是不成正比的。通常消息的处理消耗时间更多。

            基于两端消息进行安全传输的需求,所以高级消息队列组件就产生了。两端不直接进行消息传输了。而是通过消息队列服务器来进行一个中间的数据转发功能。发布消息客户端将信息发布到服务器上,服务器在将这条消息推送给订阅消息队列客户端让它来进行处理。

            【RabbitMq C++】消息队列组件

            但是针对一个高级消息队列设计的话,单纯一个只是做中间数据转发其实是不够的。我们希望它能在做中间数据转发更加灵活,在不同场景提供不同的功能。这个时候就有了AMQP的核心概念(交换机、队列、绑定、消息)。

            消息队列服务器里面首先有一个交换机,它是用来处理数据转发逻辑功能模块。然后还有队列。订阅客户端连接服务器告诉服务器订阅那个队列。发布客户端进行消息发布并不是直接把消息发布到某个队列中,而是把信息发布到交换机,由交换机来决定把这条消息放到那个队列。决定了这条消息推送到那个订阅客户端哪里去进行处理。

            【RabbitMq C++】消息队列组件

            交换机该把消息放到那一个队列中呢?这个时候就有了不同的交换机类型:

            1. 广播交换:当交换机收到消息,则将消息发布到所有绑定的队列中

            交换机和队列都创建好了之后,会把交换机和队列进行关系绑定,也就是交换机和队列建立一个关联关系。而且会设置一个routing key(路由密钥:一定规则的字符串)用来标识这是一个放置什么类型消息的队列。

            【RabbitMq C++】消息队列组件

            1. 直接交换:根据消息中的binding_key与绑定的routing_key对比,一致则放到队列中

            【RabbitMq C++】消息队列组件

            1. 主题交换:使用binding_key与绑定的routing_key进行规则匹配,成功则放入队列

            【RabbitMq C++】消息队列组件

            2. 安装RabbitMQ

            sudo apt install rabbitmq-server
            
            # 启动服务
            sudo systemctl start rabbitmq-server.service
            # 查看服务状态
            sudo systemctl status rabbitmq-server.service
            # 安装完成的时候默认有个用户 guest ,但是权限不够,要创建一个
            # administrator 用户,才可以做为远程登录和发表订阅消息:
             #添加用户
            sudo rabbitmqctl add_user root 123456
             #设置用户 tag
            sudo rabbitmqctl set_user_tags root administrator
             #设置用户权限
            sudo rabbitmqctl set_permissions -p / root "." "." ".*"
            # RabbitMQ 自带了 web 管理界面,执行下面命令开启
            sudo rabbitmq-plugins enable rabbitmq_management
            

            访问 webUI 界面, 默认端口为 15672

            至此RabbitMQ安装成功。

            3. 安装 RabbitMQ 的 C++客户端库

            我们这里使用 AMQP-CPP 库来编写客户端程序。

            先安装libev网络通信库。在搭建RabbitMQ客户端的时候需要进行一个网络通信的事件监控。事件监控我们可以自己写poll,epoll但是太麻烦了。这里我们使用第三方网络通信框架。RabbitMQ对libevent、libev等等这些都支持。这里我们选择的是libvev。

            sudo apt install libev-dev #libev 网络库组件
            
            git clone https://gitee.com/iOceanPlus_Forked/AMQP-CPP.git
            cd AMQP-CPP/
            make 
            make install
            

            至此可以通过 AMQP-CPP 来操作 rabbitmq

            4. AMQP-CPP 库的简单使用

            AMQP-CPP 是用于与 RabbitMq 消息中间件通信的 c++库。它能解析从 RabbitMq

            服务发送来的数据,也可以生成发向 RabbitMq 的数据包。AMQP-CPP 库不会向

            RabbitMq 建立网络连接,所有的网络I/O由用户完成。

            • 当然,AMQP-CPP 提供了可选的网络层接口,它预定义了 TCP 模块,用户就不用自己实现网络IO,我们也可以选择 libevent、libev、libuv、asio 等异步通信组件,需要手动安装对应的组件。
            • AMQP-CPP 完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能

              应用中。

            • 注意:它需要 c++17 的支持

              4.1 使用

              AMQP-CPP 的使用有两种模式:

              • 使用默认的 TCP 模块进行网络通信
              • 使用扩展的 libevent、libev、libuv、asio 异步通信组件进行通信

                4.1.1 TCP 模式

                • 实现一个类继承自 AMQP::TcpHandler 类, 它负责网络层的 TCP 连接
                • 重写相关函数, 其中必须重写 monitor 函数
                • 在 monitor 函数中需要实现的是将 fd 放入 eventloop(select、epoll)中监控, 当 fd可写可读就绪之后, 调用 AMQP-CPP 的 connection->process(fd, flags)方法
                  #include 
                  #include 
                  class MyTcpHandler : public AMQP::TcpHandler
                  {
                      /**
                       *AMQP 库在创建新连接时调用的方法
                       *与处理程序相关联。这是对处理程序的第一次调用
                       *@param connection 附加到处理程序的连接
                       */
                      virtual void onAttached(AMQP::TcpConnection *connection) override
                      {
                          //@todo
                          // 添加您自己的实现,例如初始化事物
                          // 以处理连接。
                      }
                      /**
                       *当 TCP 连接时由 AMQP 库调用的方法
                       *已经建立。调用此方法后,库
                       *仍然需要设置可选的 TLS 层和
                       *在 TCP 层的顶部建立 AMQP 连接。,这种方法
                       *总是与稍后对 onLost()的调用配对。
                       *@param connection 现在可以使用的连接
                       */
                      virtual void onConnected(AMQP::TcpConnection *connection) override
                      {
                          //@todo
                          // 添加您自己的实现(可能不需要)
                      }
                      /**
                      *在建立安全 TLS 连接时调用的方法。
                      *这只对 amqps://连接调用。它允许您检查连接是否足够安全,以满足
                     您的喜好
                      *(例如,您可以检查服务器证书)。AMQP 协议仍然需要启动。
                      *@param connection 已被保护的连接
                      *@param ssl 来自 openssl 库的 ssl 结构
                      *@return bool 如果可以使用连接,则为 True
                      */
                      virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override
                      {
                          //@todo
                          // 添加您自己的实现,例如读取证书并检查它是否确实是您的
                          return true;
                      }
                      /**
                      *当登录尝试成功时由 AMQP 库调用的方法。在此之后,连接就可以使用
                     了。
                      *@param connection 现在可以使用的连接
                      */
                      virtual void onReady(AMQP::TcpConnection *connection) override
                      {
                          //@todo
                          // 添加您自己的实现,例如通过创建一个通道实例,然后开始发布或使用
                      }
                      /**
                      *该方法在服务器尝试协商检测信号间隔时调用,
                      *并被覆盖以摆脱默认实现(否决建议的检测信号间隔),转而接受该间
                     隔。
                      *@param connection 发生错误的连接
                      *@param interval 建议的间隔(秒)
                      */
                      virtual uint16_t onNegotiate(AMQP::TcpConnection *connection,uint16_t interval)
                      {
                          // 我们接受服务器的建议,但如果间隔小于一分钟,我们将使用一分钟的间隔
                          if (interval heartbeat()。
                          // 返回我们要使用的间隔
                          return interval;
                      }
                      /**
                       * *发生致命错误时由 AMQP 库调用的方法
                  例如,因为无法识别从 RabbitMQ 接收的数据,或者基础连接丢失。
                  此调用之后通常会调用 onLost()(如果错误发生在 TCP 连接建立之
                  后)和 onDetached()。
                  *@param connection 发生错误的连接
                  *@param message 一条人类可读的错误消息
                  */
                      virtual void onError(AMQP::TcpConnection *connection, const char *message) override
                      {
                          //@todo
                          // 添加您自己的实现,例如,通过向程序的用户报告错误并记录错误
                      }
                      /**
                      *该方法在 AMQP 协议结束时调用的方法。这是调用 connection.close
                     ()以正常关闭连接的计数器部分。请注意,TCP 连接此时仍处于活动状态,您还
                     将收到对 onLost()和 onDetached()的调用
                      @param connection AMQP 协议结束的连接
                      */
                      virtual void onClosed(AMQP::TcpConnection *connection) override
                      {
                          //@todo
                          // 添加您自己的实现, 可能没有必要,
                          // 但如果您想在 amqp 连接结束后立即执行某些操作,
                          // 又不想等待 tcp 连接关闭,则这可能会很有用
                      }
                      /**
                       *当 TCP 连接关闭或丢失时调用的方法。
                       *如果同时调用了 onConnected(),则始终调用此方法
                       *@param connection 已关闭但现在无法使用的连接
                       */
                      virtual void onLost(AMQP::TcpConnection *connection) override
                      {
                          //@todo
                          // 添加您自己的实现(可能没有必要)
                      }
                      /**
                      *调用的最终方法。这表示将不再对处理程序进行有关连接的进一步调
                     用。
                     *@param connection 可以被破坏的连接
                  */
                      virtual void onDetached(AMQP::TcpConnection *connection) override
                      {
                          //@todo
                          // 添加您自己的实现,如清理资源或退出应用程序
                      }
                      /**
                      *当 AMQP-CPP 库想要与主事件循环交互时,它会调用该方法。
                      *AMQP-CPP 库是完全不阻塞的,
                      *并且只有在事先知道这些调用不会阻塞时才进行“write()”或“read()”系统
                     调用。
                      *要在事件循环中注册文件描述符,它会调用这个“monitor()”方法,
                      *该方法带有一个文件描述符和指示是否该检查文件描述符的可读性或可写性
                     的标志。
                      *
                      *@param connection 想要与事件循环交互的连接
                      *@param fd 应该检查的文件描述符
                      *@param 标记位或 AMQP::可读和/或 AMQP::可写
                      */
                      virtual void monitor(AMQP::TcpConnection *connection, int fd,int flags) override
                      {
                          //@todo
                          // 添加您自己的实现,
                          // 例如将文件描述符添加到主应用程序事件循环(如 select()或 poll()循环)。
                  		 // 当事件循环报告描述符变为可读和或可写时,
                  		 // 由您通过调用 connection->process(fd,flags)方法
                  		 // 通知 AMQP-CPP 库文件描述符处于活动状态。
                      }
                  };
                  

                  4.1.2 扩展模式

                  以 libev 为例, 我们不必要自己实现 monitor 函数, 可以直接使用AMQP::LibEvHandler

                  4.2 常用类与接口介绍

                  4.2.1 Channel

                  channel(信道类) 是一个虚拟连接,大佬认为一个socket只用于一个连接太浪费了,所有在socket之上又做了封装,一个连接上可以建立多个信道。每个信道都可以支持一个客户端和服务器进行通信。并且所有的 RabbitMq 指令都是通过 channel 传输,所以连接建立后的第一步,就是建立 channel。因为所有操作是异步的,所以在 channel 上执行指令的返回值并不能作为操作执行结果,实际上它返回的是 Deferred 类,可以使用它安装处理函数。

                  namespace AMQP
                  {
                      /**
                       * Generic callbacks that are used by many deferred objects
                       */
                      using SuccessCallback = std::function;
                      using ErrorCallback = std::function;
                      using FinalizeCallback = std::function;
                      /**
                       * Declaring and deleting a queue
                       */
                      using QueueCallback = std::function;
                      using DeleteCallback = std::function;
                      using MessageCallback = std::function;
                      // 当使用发布者确认时,当服务器确认消息已被接收和处理时,将调用
                      AckCallback using AckCallback = std::function;
                          
                      // 使用确认包裹通道时,当消息被 ack/nacked 时,会调用这些回调
                      using PublishAckCallback = std::function;
                      using PublishNackCallback = std::function;
                      using PublishLostCallback = std::function;
                      class Channel
                      {
                      	//构造函数
                          Channel(Connection *connection);
                          //判断是否连接成功
                          bool connected();
                          
                          /**
                            *声明交换机,交换机已经存在就ok,不存在就创建
                            *如果提供了一个空名称,则服务器将分配一个名称。
                            *以下 flags 可用于交换机:
                            *
                            *-durable 持久化,重启后交换机依然有效
                            *-autodelete 删除所有连接的队列后,自动删除交换
                            *-passive 仅被动检查交换机是否存在
                            *-internal 创建内部交换
                            *
                            *@param name 交换机的名称
                            *@param-type 交换类型
                            enum ExchangeType
                            {
                                fanout, 广播交换,绑定的队列都能拿到消息
                                direct, 直接交换,只将消息交给 routingkey 一致的队列
                                topic, 主题交换,将消息交给符合 bindingkey 规则的队列
                                headers,
                                consistent_hash,
                                message_deduplication
                            };
                            *@param flags 交换机标志
                            *@param arguments 其他参数
                            *
                            *此函数返回一个延迟处理程序Deferred类。可以设置回调函数
                            using onSuccess(), onError() and onFinalize() methods.
                            */
                          Deferred &declareExchange(
                              const std::string_view &name,
                              ExchangeType type,
                              int flags,
                              const Table &arguments)
                                
                            /**
                            *声明队列
                            *如果不提供名称,服务器将分配一个名称。
                            *flags 可以是以下值的组合:
                            *
                            *-durable 持久队列在代理重新启动后仍然有效
                            *-autodelete 当所有连接的使用者都离开时,自动删除队列
                            *-passive 仅被动检查队列是否存在
                            *-exclusive 队列仅存在于此连接,并且在连接断开时自动删除
                            *
                            *@param name 队列的名称
                            *@param flags 标志组合
                            *@param arguments 可选参数
                            *
                            *此函数返回一个延迟处理程序DeferredQueue类。可以设置回调函数
                            *使用 onSuccess()、onError()和 onFinalize()方法。
                            *
                            Deferred &onError(const char *message)
                            *
                            *可以安装的 onSuccess()回调应该具有以下签名:
                            void myCallback(const std::string &name,
                            uint32_t messageCount,
                            uint32_t consumerCount);
                            例如:
                            channel.declareQueue("myqueue").onSuccess(
                            [](const std::string &name,
                            uint32_t messageCount,
                            uint32_t consumerCount) {
                            std::cout 
                          /*
                  	        注册一个回调函数,该函数在消费者启动时被调用。
                  	        void onSuccess(const std::string &consumertag)
                          */
                          DeferredConsumer &onSuccess(const ConsumeCallback &callback)
                          /*
                  	        注册回调函数,用于接收到一个完整消息的时候被调用
                  	        void MessageCallback(const AMQP::Message &message,
                  	        uint64_t deliveryTag, bool redelivered)
                          */
                          DeferredConsumer &onReceived(const MessageCallback &callback)
                          /* Alias for onReceived() */
                          DeferredConsumer &onMessage(const MessageCallback &callback)
                          /*
                  	        注册要在服务器取消消费者时调用的函数
                  	        void CancelCallback(const std::string &tag)
                          */
                          DeferredConsumer &onCancelled(const CancelCallback &callback)
                      } 
                      
                      class Message : public Envelope
                      {
                          const std::string &exchange()
                              const std::string &routingkey():q
                      } 
                      class Envelope : public MetaData
                      {
                          const char *body()
                              uint64_t bodySize()
                      }
                  }
                  
                      EV_WATCHER(ev_async)
                      EV_ATOMIC_T sent; /* private */
                  } ev_async;
                  // break type
                  enum
                  {
                      EVBREAK_CANCEL = 0, /* undo unloop */
                      EVBREAK_ONE = 1,    /* unloop once */
                      EVBREAK_ALL = 2     /* unloop all loops */
                  };
                  //实例化并获取I/O事件监控结构句柄
                  struct ev_loop *ev_default_loop(unsigned int flags EV_CPP(= 0))
                  #define EV_DEFAULT ev_default_loop(0)(使用宏获取上面结构)
                  //开始运行I/O事件监控,这是一个阻塞接口(创建一个线程执行该接口)
                  int ev_run(struct ev_loop *loop);
                  /* break out of the loop */
                  //结束I/O监控
                  void ev_break(struct ev_loop *loop, int32_t break_type);
                  void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents) 
                  //如果在当前线程进行ev_run则可以直接调用,如果在其他线程中进行ev_run需要通过异步通知进行
                  void ev_async_init(ev_async *w, callback cb);//初始化异步事件结构,并设置回调函数
                  void ev_async_start(struct ev_loop *loop, ev_async *w);//启动事件监控循环中的异步任务处理
                  void ev_async_send(struct ev_loop *loop, ev_async *w);//发送当前异步事件到异步线程中执行
                  
                      //1. 实例化底层网络通信框架的I/O事件监控句柄
                      auto *loop = EV_DEFAULT;
                      //2. 实例化libEvHandler句柄 --- 将AMQP框架与事件监控关联起来
                      AMQP::LibEvHandler handler(loop);
                      //3. 实例化连接对象
                      AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");
                      AMQP::TcpConnection connection(&handler, address);
                      //4. 实例化信道对象
                      AMQP::TcpChannel channel(&connection);
                      //5. 声明交换机
                      channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
                          .onError([](const char *message) {
                              std::cout 
                              std::cout 
                              std::cout 
                              std::cout 
                              std::cout 
                              std::cout 
                          std::string msg = "Hello Bite-" + std::to_string(i);
                          bool ret = channel.publish("test-exchange", "test-queue-key", msg);
                          if (ret == false) {
                              std::cout 
                      std::string msg;
                      msg.assign(message.body(), message.bodySize());
                      std::cout 
                      //1. 实例化底层网络通信框架的I/O事件监控句柄
                      auto *loop = EV_DEFAULT;
                      //2. 实例化libEvHandler句柄 --- 将AMQP框架与事件监控关联起来
                      AMQP::LibEvHandler handler(loop);
                      //2.5. 实例化连接对象
                      AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");
                      AMQP::TcpConnection connection(&handler, address);
                      //3. 实例化信道对象
                      AMQP::TcpChannel channel(&connection);
                      //4. 声明交换机
                      channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
                          .onError([](const char *message) {
                              std::cout 
                              std::cout 
                              std::cout 
                              std::cout 
                              std::cout 
                              std::cout 
                              std::cout 
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

相关阅读

目录[+]

取消
微信二维码
微信二维码
支付宝二维码