基于C++、JsonCpp、Muduo库实现的分布式RPC通信框架

06-01 1778阅读
基于C++、JsonCpp、Muduo库实现的分布式RPC通信框架 ⭐️个人主页:@小羊 ⭐️所属专栏:项目 很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~

基于C++、JsonCpp、Muduo库实现的分布式RPC通信框架

目录

    • 项目介绍
    • JsonCpp库简单介绍
    • Muduo库简单介绍
    • C++11异步操作——std::future
      • 1. 使用 std::async 关联异步任务
      • 2. std::packaged_task 配合 std::future
      • 3. std::promise 配合 std::future
      • 项目设计
        • 理解项目功能
        • 服务端模块划分
          • Network:网络通信模块
          • Protocol:应用层通信协议模块
          • Dispatcher:消息分发处理模块
          • RpcRouter:远端调用路由功能模块
          • Publish-Subscribe:发布订阅功能模块
          • Registry-Discovery:服务注册/发现/上线/下线功能模块
          • Server:基于以上模块整合而出的服务端模块
          • 客户端模块划分
            • Requestor:请求管理模块
            • RpcCaller:远端调用功能模块
            • Publish-Subscribe:发布订阅功能模块
            • Registry-Discovery:服务注册/发现/上线/下线功能模块
            • Client:基于以上模块整合而出的客户端模块
            • 框架设计
              • 抽象层
              • 具象层
              • 业务层
              • 项目源码

                项目介绍

                RPC(远程过程调用 )允许程序调用远程计算机上的服务或函数,而无需显式编写网络通信代码,就像调用本地函数一样方便地调用远程服务的函数。

                本项目将基于C++、JsonCpp、muduo网络库实现一个简单、易用的RPC通信框架,它将实现同步调用、异步回调、异步futrue调用、服务注册/发现,服务上线/下线及发布订阅等功能。

                我们将实现一个远程调用接口call,然后通过传入函数名参数来调用RPC接口。


                JsonCpp库简单介绍

                Json是一种数据交换格式,它使用完全独立于编程语言的文本格式来存储和表示数据。

                例如表示张三同学的信息:

                char *name = "张三";
                int age = 18;
                double score[3] = {88.8, 99.9, 66.6};
                

                Json格式表示为:

                {
                    "姓名" : "张三", 
                    "年龄" : 18,
                    "成绩" : [88.8, 99.9, 66.6],
                    "爱好" : {
                        "运行" : "打乒乓球",
                        "文学" : "红楼梦"
                    }
                }
                

                Json 的数据类型包括对象,数组,字符串,数字。

                • 对象:使用花括号 {} 括起来表示一个对象;
                • 数组:使用中括号 [] 括起来表示一个数组;
                • 字符串:使用常规双引号 “” 括起来表示一个字符串;
                • 数字:包括整形和浮点型,直接使用。

                  Jsoncpp 库主要是用于实现Json 格式数据的序列化和反序列化,它实现了将多个数据对象组织成为Json格式字符串,以及将Json 格式字符串解析得到多个数据对象的功能。

                  Jsoncpp 库主要借助以下三个类以及其对应的少量成员函数完成序列化及反序列化。

                  • Json::Value 类:中间数据存储类
                    • 如果要将数据对象进行序列化,需要先存储到 Json::Value 对象中;
                    • 如果要将数据进行反序列化,需要将解析后的数据存到 Json::Value 对象中;
                    • Json::StreamWriter 类:序列化类
                      • Json::StreamWriter::write():序列化函数;
                      • Json::StreamWriterBuilder类:工厂类,用于生产Json::StreamWriter对象;
                      • Json::CharReader类:反序列化类
                        • Json::CharReader::parse():反序列化函数;
                        • Json::CharReaderBuilder类:工厂类,用于生产Json::CharReader对象

                          Json数据对象类:

                          class Json::Value
                          {
                              // Value重载了[]和=,因此所有的赋值和获取数据都可以通过
                              // 简单的⽅式完成val["name"] = "xx";
                              Value &operator=(const Value &other); 
                              Value& operator[](const std::string& key); 
                              Value& operator[](const char* key);
                              Value removeMember(const char* key); //移除元素
                              const Value& operator[](ArrayIndex index) const; //val["score"][0]
                              Value& append(const Value& value); //添加数组元素val["score"].append(88);  
                              ArrayIndex size() const; //获取数组元素个数val["score"].size(); 
                              std::string asString() const; //转string string name = val["name"].asString();
                              const char* asCString() const; //转char* char *name = val["name"].asCString();
                              int asInt() const; //转int int age = val["age"].asInt(); 
                              float asFloat() const; //转float float weight = val["weight"].asFloat(); 
                              bool asBool() const; //转bool bool ok = val["ok"].asBool(); 
                          };
                          

                          序列化接口:

                          class JSON_API StreamWriter 
                          {
                          	virtual int write(Value const& root, std::ostream* sout) = 0;
                          }
                          class JSON_API StreamWriterBuilder : public StreamWriter::Factory 
                          {
                          	virtual StreamWriter* newStreamWriter() const;
                          }
                          

                          反序列化接口:

                          class JSON_API CharReader 
                          {
                          	virtual bool parse(char const* beginDoc, char const* endDoc, 
                          					   Value* root, std::string* errs) = 0;
                          }
                          class JSON_API CharReaderBuilder : public CharReader::Factory 
                          {
                          	virtual CharReader* newCharReader() const;
                          }
                          

                          Json序列化实践测试:

                          #include 
                          #include 
                          #include 
                          #include 
                          #include 
                          // 实现数据的序列化
                          bool Serialize(const Json::Value& val, std::string& body)
                          {
                              std::stringstream ss;
                              // 先实例化一个工厂类对象
                              Json::StreamWriterBuilder swb;
                              // 通过工厂类对象来生产派生类对象
                              std::unique_ptr sw(swb.newStreamWriter());
                              // 开始序列化
                              bool ret = sw->write(val, &ss);
                              if (ret != 0)
                              {
                                  std::cout 
                              // 实例化一个工厂类对象
                              Json::CharReaderBuilder crb;
                              // 生产派生类对象
                              std::unique_ptr
                                  std::cout 
                              const char* name = "小明";
                              int age = 18;
                              const char* sex = "男";
                              float score[3] = {88, 77.7f, 66};
                              Json::Value student;
                              student["姓名"] = name;
                              student["年龄"] = age;
                              student["性别"] = sex;
                              student["成绩"].append(score[0]);
                              student["成绩"].append(score[1]);
                              student["成绩"].append(score[2]);
                              Json::Value fav;
                              fav["书籍"] = "红楼梦";
                              fav["运动"] = "乒乓球";
                              student["爱好"] = fav;
                              std::string body;
                              if (Serialize(student, body))
                                  std::cout 
                                  std::cout 
                                      std::cout 
                              void start(); // 启动服务器
                              void setConnectionCallback(); // 设置连接建立/关闭时的回调函数
                              void setMessageCallback(); // 设置消息处理回调函数
                          }
                          EventLoop
                          {
                              void loop(); // 开始事件循环监控
                              void quit(); // 停止循环
                              Timerld runAfter(delay, cb); // 定时任务
                          }
                          TcpConnection
                          {
                              void send(std::string &msg); // 发送数据
                              bool connected(); // 判断当前连接是否正常
                              void shutdown(); // 关闭连接
                          };
                          Buffer 
                          {
                              size_t readableBytes(); // 获取缓冲区中可读数据大小
                              const char* peek(); // 获取缓冲区中数据的起始地址
                              int32_t peekInt32(); // 尝试从缓冲区中获取4字节数据,
                                                   // 进行网络字节序的转换,但不从缓冲区中删除 
                              void retrieveInt32(); // 数据读取位置向后偏移4字节,本质就是删除前4字节数据
                              int32_t readInt32(); // peekInt32() + retrieveInt32()
                              string retrieveAllAsString(); // 从缓冲区中取出所有数据,作为字符串返回,并删除
                              string retrieveAsString(size_t len); // 从缓冲区中取出len长度的数据,并删除 
                          }
                          /*
                          需要注意的是,因为muduo库不管是服务端还是客⼾端都是异步操作,
                          对于客⼾端来说如果我们在连接还没有完全建⽴成功的时候发送数据,这是不被允许的。 
                          因此我们可以使⽤内置的CountDownLatch类进⾏同步控制
                          */
                          TcpClient
                          {
                              void connect(); // 连接服务器
                              void disconnect(); // 关闭连接
                              TcpConnectionPtr connection(); // 获取客户端对应的TcpConnection连接
                              // muduo库的客户端也是通过EventLoop进行IO事件监控处理的
                              setConnectionCallback(); // 设置连接建立/关闭时的回调函数
                              setMessageCallback(); // 设置消息处理回调函数
                          }
                          // 做计数同步
                          CountDownLatch
                          {
                              void wait(); // 计数大于0则阻塞
                              void countDown(); // 计数--
                          }
                          
                          public:
                              DictServer(int port)
                                  : _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port),
                                            "DictServer", muduo::net::TcpServer::kReusePort)
                              {
                                  // 设置连接事件的回调
                                  _server.setConnectionCallback(std::bind(&DictServer::onConnection, this, std::placeholders::_1));
                                  // 设置连接消息的回调
                                  _server.setMessageCallback(std::bind(&DictServer::onMessage, this, std::placeholders::_1, 
                                                              std::placeholders::_2, std::placeholders::_3));
                              }
                              void Start()
                              {
                                  _server.start();// 先开始监听
                                  _baseloop.loop();// 开始死循环事件监控
                              }
                              ~DictServer()
                              {}
                          private:
                              void onConnection(const muduo::net::TcpConnectionPtr &conn)
                              {
                                  if (conn-connected())
                                  {
                                      std::cout 
                                      std::cout 
                                  static std::unordered_map
                                      {"hello", "你好"},
                                      {"world", "世界"},
                                      {"apple", "苹果"},
                                      {"banana", "香蕉"}
                                  };
                                  std::string msg = buf-retrieveAllAsString();
                                  std::string res;
                                  auto iter = dict_map.find(msg);
                                  if (iter != dict_map.end())
                                  {
                                      res = iter-second;
                                  }
                                  else{
                                      res = "未知单词";
                                  }
                              }
                          private:
                              muduo::net::EventLoop _baseloop; // 事件循环监控
                              muduo::net::TcpServer _server; // 通信连接管理
                          };
                          int main()
                          {
                              DictServer ds(8888);
                              ds.Start();
                              return 0;
                          }
                          
                          public:
                              DictClient(const std::string &sip, int port)
                                  : _baseloop(_loopthread.startLoop())
                                  , _downlatch(1) // 初始化计数器为1,为0时唤醒
                                  , _client(_baseloop, muduo::net::InetAddress(sip, port), "DictClient")
                              {
                                  // 设置连接事件的回调
                                  _client.setConnectionCallback(std::bind(&DictClient::onConnection, this, std::placeholders::_1));
                                  // 设置连接消息的回调
                                  _client.setMessageCallback(std::bind(&DictClient::onMessage, this, std::placeholders::_1, 
                                                              std::placeholders::_2, std::placeholders::_3));
                                  // 连接服务器
                                  _client.connect();
                                  _downlatch.wait();
                              }
                              bool Send(const std::string &msg)
                              {
                                  if (_conn-connected() == false)
                                  {
                                      std::cout 
                                  if (conn-connected())
                                  {
                                      std::cout 
                                      std::cout 
                                  std::string res = buf-retrieveAllAsString();
                                  std::cout 
                              DictClient client("127.0.0.1", 9090);
                              while (1)
                              {
                                  std::string s;
                                  std::cin  s;
                                  client.Send(s);
                              }
                              return 0;
                          }
                          
                              using Handler = std::function
                                  handlers[type] = handler;
                              }
                              // 分发消息
                              void dispatch(const Message& msg)
                              {
                                  auto it = handlers.find(msg.type());
                                  if (it != handlers.end()) 
                                  {
                                      it-second(msg); // 执行回调
                                  } 
                                  else 
                                  {
                                      // 默认处理
                                  }
                              }
                          };
                          
                          	"method" : "Add",
                          	"parameters" : {
                          		"num1" : 11,
                          		"num2" : 22
                          	}
                          }
                          //RPC-response
                          {
                          	"rcode" : OK,
                          	"result": 33
                          }
                          {
                          	"rcode" : ERROR_INVALID_PARAMETERS
                          }
                          
                           	"method": "代收快递",
                           	"parameters": {
                              	"快递单号": "YT123456",
                              	"收件人姓名": "张三"
                           	 }
                          }
                          
                              string method_name;
                              vector
                          	"key" : "music", //主题名称 
                          	// 主题操作类型 
                          	"optype":
                          	TOPIC_CRAETE/TOPIC_REMOVE/TOPIC_SUBSCRIBE/TOPIC_CANCEL/TOPIC_PUBLISH,
                          	//只有TOPIC_PUBLISH请求才会包含有message字段 
                          	"message" : "Hello World"
                          }
                          //Topic-response
                          {
                          	"rcode" : OK,
                          }
                          {
                          	"rcode" : ERROR_INVALID_PARAMETERS,
                          }
                          
                          	//SERVICE_REGISTRY-Rpc-provider进⾏服务注册 
                          	//SERVICE_DISCOVERY - Rpc-caller进⾏服务发现
                          	//SERVICE_ONLINE/SERVICE_OFFLINE 在provider下线后对caller进⾏服务上下线通知 
                          	"optype" :
                          	SERVICE_REGISTRY/SERVICE_DISCOVERY/SERVICE_ONLINE/SERVICE_OFFLINE,
                          	"method" : "Add",
                          	//服务注册/上线/下线有host字段,发现则⽆host字段 
                          	"host" : {
                          	"ip" : "127.0.0.1",
                          	"port" : 9090
                          	}
                          }
                          //Registry/Online/Offline-response
                          {
                          	"rcode" : OK,
                          }
                          //error-response
                          {
                          	"rcode" : ERROR_INVALID_PARAMETERS,
                          }
                          //Discovery-response
                          {
                          	"method" : "Add",
                          	"host" : [
                          		{"ip" : "127.0.0.1","port" : 9090},
                          		{"ip" : "127.0.0.2","port" : 8080}
                          	]
                          }
                          
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码