C++线程池
C++线程池
该文介绍 C++11
通用线程池。
C++线程池
1. 基础概念
线程池: 当进行并行的任务作业操作时,线程的建立与销毁的开销是,阻碍性能进步的关键,因此线程池,由此产生。使用多个线程,无限制循环等待队列,进行计算和操作。帮助快速降低和减少性能损耗。
2. 线程池的组成
- 线程池管理器:初始化和创建线程,启动和停止线程,调配任务;管理线程池。
- 工作线程:线程池中等待并执行分配的任务。
- 任务接口:添加任务的接口,以提供工作线程调度任务的执行。
- 任务队列:用于存放没有处理的任务,提供一种缓冲机制,同时具有调度功能,高优先级的任务放在队列前面。
3. C++线程池实现1
添加中文注释及解释。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>
class ThreadPool {
public:
// 构造函数
ThreadPool(size_t);
// 添加任务
template <class F, class... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;
// 析构函数
~ThreadPool();
private:
std::vector<std::thread> workers; // 线程数组,需要跟踪线程,以便我们可以结束它们
std::queue<std::function<void()>> tasks; // 任务队列
std::mutex queue_mutex; // 同步,任务队列互斥量
std::condition_variable condition; // 同步,条件变量
bool stop; // 结束标志
};
// 构造函数,指定启动线程数量,推荐 (std::thread::hardware_concurrency() - 1)
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for (size_t i = 0; i < threads; ++i)
workers.emplace_back(
[this] {
for (;;)
{
std::function<void()> task;
// 大括号用于自动销毁 lock,作用域和生命周期
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
// 等待信号,判断队列非空或线程池被终止
this->condition.wait(lock,
[this] { return this->stop || !this->tasks.empty(); });
// 若线程池被终止,并且没有待处理任务,则该线程终止
if (this->stop && this->tasks.empty())
return;
// 从队列中获取首部任务并弹出
task = std::move(this->tasks.front());
this->tasks.pop();
}
// 执行任务
task();
}
});
}
// 添加一个新的任务到线程池
template <class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
{
// return_type 是该任务的返回类型
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
// 用于存放任务结果的 std::future 对象
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// 终止线程池之后不允许添加任务
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task]() { (*task)(); });
}
// 通知一个线程来处理
condition.notify_one();
return res;
}
// 析构函数,结束所有线程
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers)
worker.join();
}
#endif
3.1. 构造函数
构造函数负责创建所有线程,每个线程反复地从任务队列中获取一个任务并执行。
[this]{...}
是一个lambda
表达式,中括号定义了捕获列表,大括号定义了函数体,从函数体内可以访问被捕获的对象。std::unique_lock<std::mutex>
是对std::mutex
的封装。std::unique_lock<std::mutex> lock(mutex)
会让lock
获得mutex
(获得之前线程会被阻塞),此外lock
会在销毁时自动释放mutex
(这就是大括号的作用)。std::condition_variable
的wait(lock, stop_waiting)
函数首先会释放lock
,随后在stop_waiting
条件被满足之前,循环地获得lock
(获得之前线程会被阻塞)。
std::condition_variable
的 wait
函数用于使当前线程等待在条件变量上,直到被通知为止。下面是 wait
函数的大致运行过程:
- 当前线程调用
wait
函数时,会持有一个std::unique_lock<std::mutex>
锁,该锁用于保护条件变量和等待队列。 wait
函数会释放锁,并将当前线程加入到条件变量的等待队列中,然后进入阻塞状态,等待被通知。- 当其他线程调用
notify_one
或notify_all
函数时,条件变量会从等待队列中选择一个或多个线程,然后唤醒它们。 - 被唤醒的线程会重新尝试获取之前释放的锁,并重新检查条件。如果条件满足,线程将继续执行;如果条件不满足,则线程会重新加入等待队列中,等待下一次被通知。
3.2. 添加任务
template <class F, class... Args>
说明这个函数是个模板函数。auto function -> result_type
的作用是定义函数的返回类型,等价于result_type function
。typename std::result_of<F(Args...)>::type
是函数F(Args...)
的返回类型。std::future
用于获得异步操作的结果。整体看来,enqueue
的作用是,接受用户提供的函数f
和参数args
,将该任务添加到队列中,给用户返回一个std::future
对象,供用户获得结果。
enqueue
函数向队列中添加一个任务,并通知一个线程来处理该任务。
3.3. 析构函数
析构函数将所有被阻塞的线程唤醒,并结束这些线程。
代码适用于
C++11
,C++17
及后续版本因为std::result_of
被弃用,所以不适用,适配版本可以查看 A cpp threadpool for c++11 c++14 c++17 c++20
参考
本文由作者按照 CC BY 4.0 进行授权