文章

C++线程池

C++线程池

该文介绍 C++11 通用线程池。

C++线程池

1. 基础概念

线程池: 当进行并行的任务作业操作时,线程的建立与销毁的开销是,阻碍性能进步的关键,因此线程池,由此产生。使用多个线程,无限制循环等待队列,进行计算和操作。帮助快速降低和减少性能损耗。

2. 线程池的组成

  1. 线程池管理器:初始化和创建线程,启动和停止线程,调配任务;管理线程池。
  2. 工作线程:线程池中等待并执行分配的任务。
  3. 任务接口:添加任务的接口,以提供工作线程调度任务的执行。
  4. 任务队列:用于存放没有处理的任务,提供一种缓冲机制,同时具有调度功能,高优先级的任务放在队列前面。

3. C++线程池实现1

添加中文注释及解释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>

class ThreadPool {
public:
    // 构造函数
    ThreadPool(size_t);

    // 添加任务
    template <class F, class... Args>
    auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;

    // 析构函数
    ~ThreadPool();

private:
    std::vector<std::thread>          workers;     // 线程数组,需要跟踪线程,以便我们可以结束它们
    std::queue<std::function<void()>> tasks;       // 任务队列
    std::mutex                        queue_mutex; // 同步,任务队列互斥量
    std::condition_variable           condition;   // 同步,条件变量
    bool                              stop;        // 结束标志
};

// 构造函数,指定启动线程数量,推荐 (std::thread::hardware_concurrency() - 1)
inline ThreadPool::ThreadPool(size_t threads)
    : stop(false)
{
    for (size_t i = 0; i < threads; ++i)
        workers.emplace_back(
            [this] {
                for (;;)
                {
                    std::function<void()> task;
                    // 大括号用于自动销毁 lock,作用域和生命周期
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        // 等待信号,判断队列非空或线程池被终止
                        this->condition.wait(lock,
                                             [this] { return this->stop || !this->tasks.empty(); });

                        // 若线程池被终止,并且没有待处理任务,则该线程终止
                        if (this->stop && this->tasks.empty())
                            return;

                        // 从队列中获取首部任务并弹出
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    // 执行任务
                    task();
                }
            });
}

// 添加一个新的任务到线程池
template <class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
{
    // return_type 是该任务的返回类型
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));

    // 用于存放任务结果的 std::future 对象
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);

        // 终止线程池之后不允许添加任务
        if (stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace([task]() { (*task)(); });
    }
    // 通知一个线程来处理
    condition.notify_one();
    return res;
}

// 析构函数,结束所有线程
inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for (std::thread& worker : workers)
        worker.join();
}

#endif
大括号的作用是为了创建一个局部作用域,用于控制变量的生命周期和范围,并确保在作用域结束时执行必要的清理操作。

3.1. 构造函数

构造函数负责创建所有线程,每个线程反复地从任务队列中获取一个任务并执行。

  • [this]{...}是一个 lambda 表达式,中括号定义了捕获列表,大括号定义了函数体,从函数体内可以访问被捕获的对象。
  • std::unique_lock<std::mutex> 是对 std::mutex 的封装。std::unique_lock<std::mutex> lock(mutex) 会让 lock 获得 mutex(获得之前线程会被阻塞),此外 lock 会在销毁时自动释放mutex(这就是大括号的作用)。
  • std::condition_variablewait(lock, stop_waiting) 函数首先会释放 lock,随后在 stop_waiting 条件被满足之前,循环地获得 lock(获得之前线程会被阻塞)。

std::condition_variablewait 函数用于使当前线程等待在条件变量上,直到被通知为止。下面是 wait 函数的大致运行过程:

  1. 当前线程调用 wait 函数时,会持有一个 std::unique_lock<std::mutex> 锁,该锁用于保护条件变量和等待队列。
  2. wait 函数会释放锁,并将当前线程加入到条件变量的等待队列中,然后进入阻塞状态,等待被通知。
  3. 当其他线程调用 notify_onenotify_all 函数时,条件变量会从等待队列中选择一个或多个线程,然后唤醒它们。
  4. 被唤醒的线程会重新尝试获取之前释放的锁,并重新检查条件。如果条件满足,线程将继续执行;如果条件不满足,则线程会重新加入等待队列中,等待下一次被通知。

3.2. 添加任务

template <class F, class... Args>说明这个函数是个模板函数。auto function -> result_type的作用是定义函数的返回类型,等价于result_type functiontypename std::result_of<F(Args...)>::type是函数F(Args...)的返回类型。std::future用于获得异步操作的结果。整体看来,enqueue的作用是,接受用户提供的函数f和参数args,将该任务添加到队列中,给用户返回一个std::future对象,供用户获得结果。

enqueue函数向队列中添加一个任务,并通知一个线程来处理该任务。

3.3. 析构函数

析构函数将所有被阻塞的线程唤醒,并结束这些线程。

代码适用于 C++11C++17 及后续版本因为 std::result_of 被弃用,所以不适用,适配版本可以查看 A cpp threadpool for c++11 c++14 c++17 c++20

参考

本文由作者按照 CC BY 4.0 进行授权