C++ asio网络编程(8)处理粘包问题
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 前言 粘包问题
- 一、粘包原因
- 总结:
- 二、如何处理粘包
- 处理方法
- 三、完善消息节点MsgNode
- 代码部分
- 细节详解
- memcpy(_data, &max_len, HEAD_LENGTH);
- _data[_total_len] = '\0'
- 四、完善CSession
- 五、完善接收逻辑
- 代码部分
- 流程图
- 六、客户端修改
- 七、测试粘包
- 总结
前言 粘包问题
提示:这里可以添加本文要记录的大概内容:
今天介绍一下如何处理粘包,粘包问题是服务器收发数据常遇到的一个现象,下面我们介绍一下粘包问题是什么,当客户端发送多个数据包给服务器时,服务器底层的tcp接收缓冲区收到的数据为粘连在一起的,如下图所示
当客户端发送两个Hello World!给服务器,服务器TCP接收缓冲区接收了两次,一次是Hello World!Hello, 第二次是World!
提示:以下是本篇文章正文内容,下面案例可供参考
一、粘包原因
因为TCP底层通信是面向字节流的,TCP只保证发送数据的准确性和顺序性,字节流以字节为单位,客户端每次发送N个字节给服务端,N取决于当前客户端的发送缓冲区是否有数据,比如发送缓冲区总大小为10个字节,当前有5个字节数据(上次要发送的数据比如’loveu’)未发送完,那么此时只有5个字节空闲空间,我们调用发送接口发送hello world!其实就是只能发送Hello给服务器,那么服务器一次性读取到的数据就很可能是loveuhello。而剩余的world!只能留给下一次发送,下一次服务器接收到的就是world!
如下图
这是最好理解的粘包问题的产生原因。还有一些其他的原因比如
1 客户端的发送频率远高于服务器的接收频率,就会导致数据在服务器的tcp接收缓冲区滞留形成粘连,比如客户端1s内连续发送了两个hello world!,服务器过了2s才接收数据,那一次性读出两个hello world!。
2 tcp底层的安全和效率机制不允许字节数特别少的小包发送频率过高,tcp会在底层累计数据长度到一定大小才一起发送,比如连续发送1字节的数据要累计到多个字节才发送,可以了解下tcp底层的Nagle算法。
3 再就是我们提到的最简单的情况,发送端缓冲区有上次未发送完的数据或者接收端的缓冲区里有未取出的数据导致数据粘连。
总结:
💡 为什么会出现 TCP 粘包?
- TCP 是面向字节流的协议,没有“包”的概念。
- 客户端每次 send() 实际上是把数据写入 操作系统内核的发送缓冲区。
- 如果缓冲区有部分上次数据还没发完(如 loveu),新的数据(如 hello world)马上又加入。
- 接收端就可能收到合并后的数据(如 loveuhello),这就是“粘包”。
💥 导致粘包的具体原因
● 应用层发送太快,TCP 来不及分开发
● 接收方 read 时读取了多个数据一起到缓存
● TCP 对小数据包做了优化(Nagle 算法) tcp底层的安全和效率机制不允许字节数特别少的小包发送频率过高,tcp会在底层累计数据长度到一定大小才一起发送,比如连续发送1字节的数据要累计到多个字节才发送,可以了解下tcp底层的Nagle算法
二、如何处理粘包
处理方法
处理粘包的方式主要采用应用层定义收发包格式的方式,这个过程俗称切包处理,常用的协议被称为tlv协议
(消息id+消息长度+消息内容),如下图
为保证大家容易理解,我们先简化发送的格式,格式变为消息长度+消息内容的方式,之后再完善为tlv格式。
简化后的结构如下图
三、完善消息节点MsgNode
之前我们设计过消息节点的数据结构MsgNode,这里需要完善一下
代码部分
class MsgNode { friend class CSession; public: MsgNode(char* msg,short max_len):_cur_len(0),_total_len(HEAD_LENGTH+max_len) { _data = new char[_total_len+1](); memcpy(_data, &max_len, HEAD_LENGTH); memcpy(_data+ HEAD_LENGTH, msg, max_len); _data[_total_len] = '\0'; } MsgNode(short max_len) :_total_len(max_len), _cur_len(0) { _data = new char[_total_len + 1](); } ~MsgNode() { delete[]_data; } void Clear() { ::memset(_data, 0, _total_len); _cur_len = 0; } private: short _cur_len; short _total_len; char* _data; };
1.两个参数的构造函数做了完善,之前的构造函数通过消息首地址和长度构造节点数据,现在需要在构造节点的同时把长度信息也写入节点,该构造函数主要用来发送数据时构造发送信息的节点。
2. 一个参数的构造函数为较上次新增的,主要根据消息的长度构造消息节点,该构造函数主要是接收对端数据时构造接收节点调用的。
3.新增一个Clear函数清除消息节点的数据,主要是避免多次构造节点造成开销。
细节详解
memcpy(_data, &max_len, HEAD_LENGTH);
这里为什么要用&
什么意思呢,现在解释一下
首先我们要做的格式是消息长度+消息内容
在tlv协议中,这个消息长度需要用二进制来存储
#define HEAD_LENGTH 2 //用2个字节的数据存储消息长度 memcpy(_data, &max_len, HEAD_LENGTH);
✅ 解释与流程:
- memcpy(_data, &max_len, HEAD_LENGTH);
○ 目的:将消息的长度写入消息包的前面作为 “消息头”。
○ &max_len:取 max_len 的地址,即把这个整数以原始内存二进制形式写入 _data 中。
○ HEAD_LENGTH:通常为 2 或 4,代表写入几个字节(例如使用 short 就是 2 字节)。
○ ✅ 效果:在 _data 中的前 2 字节保存了消息的长度,二进制形式。
相当于我们要先把这个5写进去,告诉后面的数据长度为5,记得用二进制!!!所以加&
_data[_total_len] = ‘\0’
为什么最后要加一个结束符号呢??
如果没有 ‘\0’,字符串函数会一直读取内存,直到碰到恰好为 0 的字节,容易造成越界访问、程序崩溃或打印乱码
如果你的有效数据长度刚好覆盖了整个数组(不留多余的 ‘\0’),字符串函数找不到结束符,就会继续往后读,导致越界访问
四、完善CSession
为能够对收到的数据切包处理,需要定义一个消息接收节点,一个bool类型的变量表示头部是否解析完成,以及将处理好的头部先缓存起来的结构
在原先代码加入就行
//收到的消息结构 std::shared_ptr _recv_msg_node; bool _b_head_parse;//头部是否解析完成 //收到的头部结构 std::shared_ptr _recv_head_node;
_recv_msg_node 用来存储接受的消息体信息
_recv_head_node 用来存储接收的头部信息
_b_head_parse 表示是否处理完头部信息
五、完善接收逻辑
我们需要修改HandleRead函数
代码部分
//读的回调函数 void CSession::HandleRead(const boost::system::error_code& ec, size_t bytes_transferred, shared_ptr_self_shared) { if (!ec) { int copy_len = 0;//本次回调读取了多少数据 后续需要偏移 while (bytes_transferred > 0)//接收到了数据 { if (!_b_head_parse)//先解析头部 { //当前传入的数据+头部中的数据 依然没有达到2字节 那就全加入进去 if (bytes_transferred + _recv_head_node->_cur_len _data + _recv_head_node->_cur_len, _data+copy_len, bytes_transferred); _recv_head_node->_cur_len += bytes_transferred; ::memset(_data, 0, max_length); //继续回调 等待数据传入 _socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_shared)); return; } //收到的数据比头部多 可能刚好等于或者大于 //头部剩余未复制的长度 int head_remain = HEAD_LENGTH - _recv_head_node->_cur_len; memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain); //更新已处理的data长度和剩余未处理的长度 copy_len += head_remain; _recv_head_node->_cur_len += head_remain; bytes_transferred -= head_remain; //获取头部数据进行判断 short data_len = 0; memcpy(&data_len, _recv_head_node->_data, HEAD_LENGTH); cout std::cout memcpy(_recv_msg_node-_data + _recv_msg_node-_cur_len, _data + copy_len, bytes_transferred); _recv_msg_node-_cur_len += bytes_transferred; ::memset(_data, 0, max_length); _socket.async_read_some(boost::asio::buffer(_data,max_length), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_shared)); //头部处理完成 _b_head_parse = true; return; } //数据大 就只存data_len长度 和头部显示的长度一样 memcpy(_recv_msg_node-_data + _recv_msg_node->_cur_len, _data + copy_len, data_len); _recv_msg_node->_cur_len += data_len; copy_len += data_len; bytes_transferred -= data_len; _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0'; cout _total_len); //继续轮询剩余未处理数据 _b_head_parse = false; _recv_head_node->Clear(); if (bytes_transferred == 0) //说明后面没有数据了 刚好是一个包 { ::memset(_data, 0, max_length); _socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_shared)); return; } continue;//说明后面还有数据 继续轮询处理 } //头部已经解析完 int data_len = _recv_msg_node->_total_len - _recv_msg_node->_cur_len; //数据小 可以全存入 但后续还需要存入 因为没有达到头部所显示的长度 if (bytes_transferred _data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); _recv_msg_node->_cur_len += bytes_transferred; ::memset(_data, 0, max_length); _socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_shared)); return; } //数据大 就只存data_len长度 和头部显示的长度一样 memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, data_len); _recv_msg_node->_cur_len += data_len; bytes_transferred -= data_len; copy_len += data_len; _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0'; cout _total_len); //继续轮询剩余未处理数据 _b_head_parse = false; _recv_head_node->Clear(); if (bytes_transferred == 0) { ::memset(_data, 0,max_length); _socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_shared)); return; } continue; } } else { std::cout while (1) { try { //创建上下文服务 boost::asio::io_context ios; //创建endpoint boost::asio::ip::tcp::endpoint remote_ep(boost::asio::ip::make_address("127.0.0.1"), 10086); //127.0.0.1是本机的回路地址 因为客户端和服务端在同一地址 //服务器就是本机 boost::asio::ip::tcp::socket sock(ios, remote_ep.protocol()); boost::system::error_code error = boost::asio::error::host_not_found; sock.connect(remote_ep, error); if (error) { std::cout 0 }; memcpy(send_data, &request_length, 2); memcpy(send_data + 2, request, request_length); boost::asio::write(sock, boost::asio::buffer(send_data, request_length+2)); /* char reply[MAX_LENGTH];//存储接收数据 //用read读 size_t reply_length = 0; boost::system::error_code ec; reply_length = sock.read_some(boost::asio::buffer(reply, MAX_LENGTH), ec); if (ec) { std::cerr 0 }; size_t msg_length = boost::asio::read(sock, boost::asio::buffer(msg, msglen)); std::cout std::cerr stringstream ss; string result = "0x"; for (int i = 0; i