线程池的封装(c/c++)
前言:本文将要封装的线程池相关接口是基于之前一篇文章中的实现(文章链接:Linux多线程编程的艺术:封装线程、锁、条件变量和信号量的工程实践-CSDN博客)。不过无需担心,即使您没有阅读过前文,只要具备线程、互斥锁和条件变量等基础知识,完全可以理解本文内容。
目录
一、理解线程池
二、框架设计
三、成员函数实现
四、单例模式设计
五、测试
六、源码
一、理解线程池
池化技术
所谓“池化技术”,就是程序先向系统申请过量的资源,然后⾃⼰管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较⼤的开销,不如提前申请好了,这样使⽤时就会变得⾮常快捷,⼤⼤提⾼程序运⾏效率。
线程池的意义:
-
降低资源消耗:复用已创建的线程,减少线程创建销毁的开销。
-
提高响应速度:任务到达时可以直接执行,无需等待线程创建。
-
提高线程可管理性:统一分配、调优和监控线程。
二、框架设计
线程池本质就是一个类,它管理了批量的线程和任务,如何管理?先描述,再组织。我们使用数组来存储线程,队列来存储任务(任务本质就是函数,lambda,或仿函数等)。
其次线程之间需要保持互斥,线程和任务队列需要保持同步,所以还需要锁,条件变量。
核心成员变量:
- 数组:用来存储线程。
- 队列:用来存储任务。
- 锁
- 条件变量
核心成员函数
- HandlerTask:从队列中获取并执行任务,让单个线程执行。
- ThreadPool:构造函数,用于创建批量线程,并将HandlerTask函数交给线程。
- Push:外部给线程池派发任务(即入队)。
- Join:线程等待。
如下:
#define NUM 5 // 线程个数 namespace my_threadpool { template //任务类型 class ThreadPool { public: ThreadPool(int num = NUM) {} void HandlerTask() bool Push(const T &task); void Join(); private: vector _threads; queue _taskq; Mutex _lock; Cond _cond; }; }
当然这些只是核心成员,在后面实现这些函数时还会延伸出更多的成员变量和成员函数。
线程池相当于生产者消费者模型中的交易场所和消费者这一部分,关于生产者消费者模型:
多线程编程实战:基于阻塞队列与环形队列的生产者消费者模型详解_环形数组 生产者消费者-CSDN博客
三、成员函数实现
构造函数 ThreadPool(int num = NUM)
循环创建num个线程,用lambda包装HandlerTask()作为线程函数,然后push到数组中。
ThreadPool(int num = NUM) { for (int i = 0; i
这里也可以不使用lambda表达式。
void HandlerTask()
为了代码的健壮性,我们添加一个参数(成员变量_isruning)用来标记线程池是否正在运行,初始化为false,在程序执行到HandlerTask后设为true。这个参数用来防止用户对已经回收的线程池进行任务派发或线程回收。
HandlerTask是单个线程在执行,线程要通过这个函数获取并执行任务,这个逻辑需要重复执行,所以做一个死循环。接下来到队列中取任务,队列是临界资源需要加锁,这里使用RAII模式。其次为了和队列保持同步,当队列为空时需要进行等待。如下:
void HandlerTask() { _isrunning = true; while (true) { T tk; LockGuard lock(_lock); { while (_taskq.empty()) { _cond.Wait(_lock); } tk = _taskq.front(); _taskq.pop(); } tk(); } }
注意:必须选择循环等待,因为可能会有这样的情况:只派发了一个任务但多个线程一起被唤醒,进行循环判断可以解决。
这里HandlerTask函数实现不是最终形态,在后文会根据需求进行改进。
Push实现
向任务队列推送任务,要保证_isrunning为true,其次访问临界资源要加锁,然后向队列里push任务。因为队列有任务了,接下来需要将线程唤醒,这里我们还是优雅一点,不要粗鲁地直接把线程都唤醒。
添加变量_sleepnum来记录等待的线程个数,如果所有线程都在等待再进行唤醒,并且只唤醒一个就行。
_sleepnum在HandlerTask函数中进行维护,即:
while (_taskq.empty()) { _sleepnum++; _cond.Wait(_lock); _sleepnum--; }
封装一个函数用来唤醒一个线程,如下:
void WakeOnceThread() { _cond.Signal(); }
Push实现如下:
bool Push(const T &task) { if (_isrunning) { LockGuard lock(_lock); { _taskq.push(task); if (_sleepnum == _threads.size()) WakeOnceThread();// 唤醒一个线程 } return true; } return false; }
Join实现
线程池的回收,遍历每个线程的join就行,在此之前要满足两个条件:
- 线程池_isrunning为true。
- 保证任务队列里的任务已经做完了,然后让线程执行的函数退出。
对第1点直接用if判断就能解决。
针对第2点,我们可以封装一个Stop函数用来等任务做完后,让线程函数退出。
Stop的实现
Stop函数的主要目的是让线程函数(HandlerTask)退出,所以就需要把全部线程唤醒,让它们往下执行,当然这还不够,我们要让它退出循环判断队空的逻辑,这个时候_isrunning就派上大用处了,把_Stop函数中把isrunning设为false。在HandlerTask函数中如果_isrunning为false则退出函数,当然还要保证把队列任务做完,具体实现如下:
void Stop() { _isrunning = false; WakeAllThread(); cout