[C++][第三方库][RabbitMq]详细讲解

06-02 1406阅读

目录

  • 1.介绍
  • 2.安装
    • 1.RabbitMq
    • 2.客户端库
    • 3.AMQP-CPP 简单使用
      • 1.介绍
      • 2.使用
      • 4.类与接口
        • 1.Channel
        • 2.ev
        • 5.使用
          • 1.publish.cc
          • 2.consume.cc
          • 3.makefile

            1.介绍

            • RabbitMQ:消息队列组件,实现两个客户端主机之间消息传输的功能(发布&订阅)
            • 核心概念:交换机、队列、绑定、消息
            • 交换机类型:
              • 广播交换:当交换机收到消息,则将消息发布到所有绑定的队列中
              • 直接交换:根据消息中的bkey与绑定的rkey对比,一致则放入队列
              • 主题交换:使用bkey与绑定的rkey进行规则匹配,成功则放入队列

                2.安装

                1.RabbitMq

                • 安装:sudo apt install rabbitmq-server
                • 简单使用:
                  # 安装完成的时候默认有个用户guest,但是权限不够,要创建一个administrator用户,才可以做为远程登录和发表订阅消息
                  #添加用户 
                  sudo rabbitmqctl add_user root 
                  #设置用户tag 
                  sudo rabbitmqctl set_user_tags root administrator 
                  #设置用户权限 
                  sudo rabbitmqctl set_permissions -p / root "." "." ".*" 
                  # RabbitMQ自带了web管理界面, 执行下面命令开启, 默认端口15672
                  sudo rabbitmq-plugins enable rabbitmq_management 
                  

                  2.客户端库

                  • C语言库
                  • C++库
                    sudo apt install libev-dev #libev 网络库组件
                    git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
                    cd AMQP-CPP/
                    make
                    make install
                    
                  • 如果安装时出现以下报错,则表示ssl版本出现问题
                    /usr/include/openssl/macros.h:147:4: error: #error 
                    "OPENSSL_API_COMPAT expresses an impossible API compatibility 
                    level" 
                      147 | #  error "OPENSSL_API_COMPAT expresses an impossible API 
                    compatibility level" 
                          |    ^~~~~ 
                    In file included from /usr/include/openssl/ssl.h:18, 
                                     from linux_tcp/openssl.h:20, 
                                     from linux_tcp/openssl.cpp:12: 
                    /usr/include/openssl/bio.h:687:1: error: expected constructor, 
                    destructor, or type conversion before ‘DEPRECATEDIN_1_1_0’ 
                      687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str, 
                    unsigned short *port_ptr))
                    
                  • 解决方案:卸载当前的ssl库,重新进行修复安装
                    dpkg -l | grep ssl
                    sudo dpkg -P --force-all libevent-openssl-2.1-7
                    sudo dpkg -P --force-all openssl
                    sudo dpkg -P --force-all libssl-dev
                    sudo apt --fix-broken install
                    

                    3.AMQP-CPP 简单使用

                    1.介绍

                    • AMQP-CPP是用于与RabbitMq消息中间件通信的C++库
                      • 它能解析从RabbitMq服务发送来的数据,也可以生成发向RabbitMq的数据包
                      • AMQP-CPP库不会向RabbitMq建立网络连接,所有的网络IO由用户完成
                      • AMQP-CPP提供了可选的网络层接口,它预定义了TCP模块,用户就不用自己实现网络IO,
                        • 也可以选择libevent、libev、libuv、asio等异步通信组件, 需要手动安装对应的组件
                        • AMQP-CPP完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中
                        • 注意:它需要C++17的支持

                          2.使用

                          • AMQP-CPP的使用有两种模式:
                            • 使用默认的TCP模块进行网络通信
                            • 使用扩展的libevent、libev、libuv、asio异步通信组件进行通信
                            • 此处以libev为例,不需要自己实现monitor函数,可以直接使用AMQP::LibEvHandler

                              4.类与接口

                              1.Channel

                              • channel是一个虚拟连接,一个连接上可以建立多个通道
                                • 并且所有的RabbitMq指令都是通过channel传输
                                  • 所以连接建立后的第一步,就是建立channel
                                  • 因为所有操作是异步的,所以在channel上执行指令的返回值并不能作为操作执行结果
                                    • 实际上它返回的是Deferred类,可以使用它安装处理函数
                                      namespace AMQP 
                                      { 
                                          /** 
                                           *  Generic callbacks that are used by many deferred objects 
                                           */ 
                                          using SuccessCallback = std::function; 
                                          using ErrorCallback = std::function; 
                                          using FinalizeCallback = std::function;
                                          
                                          /** 
                                           *  Declaring and deleting a queue 
                                           */ 
                                          using QueueCallback = std::function;
                                          using DeleteCallback = std::function; 
                                          using MessageCallback = std::function; 
                                          // 当使用发布者确认时,当服务器确认消息已被接收和处理时,将调用AckCallback 
                                          using AckCallback = std::function;
                                          // 使用确认包裹通道时,当消息被ack/nacked时,会调用这些回调 
                                          using PublishAckCallback = std::function; 
                                          using PublishNackCallback = std::function; 
                                          using PublishLostCallback = std::function; 
                                      	// 信道类
                                          class Channel 
                                          { 
                                              Channel(Connection *connection); 
                                              bool connected();
                                              /** 
                                                  *声明交换机 
                                                  *如果提供了一个空名称,则服务器将分配一个名称。 
                                                  *以下flags可用于交换机: 
                                                  * 
                                                  *-durable     持久化,重启后交换机依然有效 
                                                  *-autodelete  删除所有连接的队列后,自动删除交换 
                                                  *-passive     仅被动检查交换机是否存在 
                                                  *-internal    创建内部交换 
                                                  * 
                                                  *@param name    交换机的名称 
                                                  *@param-type    交换类型 
                                                      enum ExchangeType 
                                                      { 
                                                          fanout,  广播交换,绑定的队列都能拿到消息 
                                                          direct,  直接交换,只将消息交给routingkey一致的队列 
                                                          topic,   主题交换,将消息交给符合bindingkey规则的队列 
                                                          headers, 
                                                          consistent_hash, 
                                                          message_deduplication 
                                                      }; 
                                                  *@param flags    交换机标志 
                                                  *@param arguments其他参数 
                                                  * 
                                                  *此函数返回一个延迟处理程序。可以安装回调 
                                                  using onSuccess(), onError() and onFinalize() methods. 
                                              */ 
                                              Deferred &declareExchange(const std::string_view &name,
                                                                        ExchangeType type,
                                                                        int flags,
                                                                        const Table &arguments);
                                              /** 
                                                  *声明队列 
                                                  *如果不提供名称,服务器将分配一个名称。 
                                                  *flags可以是以下值的组合: 
                                                  * 
                                                  *-durable 持久队列在代理重新启动后仍然有效 
                                                  *-autodelete 当所有连接的使用者都离开时,自动删除队列 
                                                  *-passive 仅被动检查队列是否存在 
                                                  *-exclusive 队列仅存在于此连接,并且在连接断开时自动删除 
                                                  * 
                                                  *@param name        队列的名称 
                                                  *@param flags       标志组合 
                                                  *@param arguments  可选参数 
                                                  * 
                                                  *此函数返回一个延迟处理程序。可以安装回调 
                                                  *使用onSuccess()、onError()和onFinalize()方法。 
                                                  * 
                                                  Deferred &onError(const char *message) 
                                                  * 
                                                  *可以安装的onSuccess()回调应该具有以下签名: 
                                                  void myCallback(const std::string &name,  
                                                      uint32_t messageCount,  
                                                      uint32_t consumerCount); 
                                                  例如: 
                                                  channel.declareQueue("myqueue").onSuccess( 
                                                      [](const std::string &name,  
                                                          uint32_t messageCount, 
                                                          uint32_t consumerCount) { 
                                                             std::cout  
                                              /* 
                                                  注册一个回调函数,该函数在消费者启动时被调用
                                                  void onSuccess(const std::string &consumertag) 
                                              */ 
                                              DeferredConsumer &onSuccess(const ConsumeCallback& callback);
                                              /* 
                                                  注册回调函数,用于接收到一个完整消息的时候被调用 
                                                  void MessageCallback(const AMQP::Message &message,  
                                                      uint64_t deliveryTag, bool redelivered) 
                                              */ 
                                              DeferredConsumer &onReceived(const MessageCallback& callback);
                                              /* Alias for onReceived() */ 
                                              DeferredConsumer &onMessage(const MessageCallback& callback);
                                              /* 
                                                  注册要在服务器取消消费者时调用的函数 
                                                  void CancelCallback(const std::string &tag) 
                                              */ 
                                              DeferredConsumer &onCancelled(const CancelCallback& callback);
                                          };
                                          class Message : public Envelope
                                          { 
                                              const std::string &exchange();
                                              const std::string &routingkey();
                                          };
                                          
                                          class Envelope : public MetaData
                                          { 
                                              const char *body();  // 获取消息正文
                                              uint64_t bodySize(); // 获取消息正文大小
                                          };
                                      }
                                       
                                          EV_WATCHER (ev_async);
                                          EV_ATOMIC_T sent; /* private */ 
                                      }ev_async; 
                                       
                                      //break type 
                                      enum 
                                      { 
                                          EVBREAK_CANCEL = 0, /* undo unloop */ 
                                          EVBREAK_ONE    = 1, /* unloop once */ 
                                          EVBREAK_ALL    = 2  /* unloop all loops */ 
                                      }; 
                                      // 实例化并获取IO事件监控接口句柄
                                      struct ev_loop *ev_default_loop (unsigned int flags EV_CPP (= 0));
                                      # define EV_DEFAULT  ev_default_loop (0) 
                                       
                                      // 开始运行IO事件监控, 这是一个阻塞接口
                                      int  ev_run (struct ev_loop *loop);
                                      /* break out of the loop */
                                      // 结束IO监控
                                      // 如果在主线程进行ev_run(), 则可以直接调用,
                                      // 如果在其他线程中进行ev_run(), 需要通过异步通知进行
                                      void ev_break (struct ev_loop *loop, int32_t break_type) ;  
                                       
                                      void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);
                                       
                                      // 初始化异步事件结构, 并设置回调函数
                                      void ev_async_init(ev_async *w, callback cb);
                                      // 启动事件监控循环中的异步任务处理
                                      void ev_async_start(struct ev_loop *loop, ev_async *w); 
                                      // 发送当前异步事件到异步线程中执行
                                      void ev_async_send(struct ev_loop *loop, ev_async *w);
                                      
                                          // 1.实例化底层网络通信框架的IO事件监控句柄
                                          auto *loop = EV_DEFAULT;
                                          // 2.实例化libEvHandler句柄 - 将AMQP框架与事件监控关联起来
                                          AMQP::LibEvHandler handler(loop);
                                          // 3.实例化连接对象
                                          AMQP::Address address("amqp://root:SnowK8989@127.0.0.1:5672/");
                                          AMQP::TcpConnection connection(&handler, address);
                                          // 4.实例化信道对象
                                          AMQP::TcpChannel channel(&connection);
                                          // 5.声明交换机
                                          channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
                                              .onError([](const char *message)
                                                      { std::cout  std::cout  std::cout  std::cout  std::cout  std::cout 
                                              std::string msg = "Hello SnowK-" + std::to_string(i);
                                              if(channel.publish("test-exchange", "test-queue-key", msg) == false)
                                              {
                                                  std::cout 
                                          std::string msg;
                                          msg.assign(message.body(), message.bodySize());
                                          
                                          // 不能这样使用, AMQP::Message后面没有存'\0'
                                          // std::cout 
                                          // 1.实例化底层网络通信框架的IO事件监控句柄
                                          auto *loop = EV_DEFAULT;
                                          // 2.实例化libEvHandler句柄 - 将AMQP框架与事件监控关联起来
                                          AMQP::LibEvHandler handler(loop);
                                          // 3.实例化连接对象
                                          AMQP::Address address("amqp://root:SnowK8989@127.0.0.1:5672/");
                                          AMQP::TcpConnection connection(&handler, address);
                                          // 4.实例化信道对象
                                          AMQP::TcpChannel channel(&connection);
                                          // 5.声明交换机
                                          channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
                                              .onError([](const char *message)
                                                       { std::cout  std::cout  std::cout  std::cout  std::cout  std::cout  
                                                  std::cout 
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

相关阅读

目录[+]

取消
微信二维码
微信二维码
支付宝二维码