Github开源项目源码阅读(progschjThreadPool)
项目地址:https://github.com/progschj/ThreadPool
项目源码:
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
include <vector>
include <queue>
include <memory>
include <thread>
include <mutex>
include <condition_variable>
include <future>
include <functional>
include <stdexcept>
class ThreadPool {
public:
ThreadPool(size_t);
template<class F, class… Args>
auto enqueue(F&& f, Args&&… args)
-> std::future<typename std::result_of<F(Args…)>::type>;
~ThreadPool();
private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
std::queue< std::function<void()> > tasks;
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0;i<threads;++i)
workers.emplace_back(
[this]
{
for(;😉
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
// add new work item to the pool
template<class F, class… Args>
auto ThreadPool::enqueue(F&& f, Args&&… args)
-> std::future<typename std::result_of<F(Args…)>::type>
{
using return_type = typename std::result_of<F(Args…)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)…)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don’t allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}
endif
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)…)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don’t allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
用法示例:
#include <iostream>
#include <vector>
#include <chrono>
include “ThreadPool.h”
int main()
{
ThreadPool pool(4);
std::vector< std::future<int> > results;
for(int i = 0; i < 8; ++i) {
results.emplace_back(
pool.enqueue([i] {
std::cout << "hello " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "world " << i << std::endl;
return i*i;
})
);
}
for(auto && result: results)
std::cout << result.get() << ‘ ‘;
std::cout << std::endl;
return 0;
ThreadPool pool(4);
std::vector< std::future<int> > results;
for(int i = 0; i < 8; ++i) {
results.emplace_back(
pool.enqueue([i] {
std::cout << "hello " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "world " << i << std::endl;
return i*i;
})
);
}
for(auto && result: results)
std::cout << result.get() << ‘ ‘;
std::cout << std::endl;
return 0;
}
类成员变量
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
std::queue< std::function<void()> > tasks;
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
workers:存储启动的线程对象
tasks:存储提交到线程池的任务
queue_mutex:互斥锁,保证任务队列操作的原子性
condition:配合互斥锁,实现线程通信
stop:线程池是否停止
构造函数
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0;i<threads;++i)
workers.emplace_back(
[this]
{
for(;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
构造函数干了这几件事:
根据构造线程池对象时传入的参数,初始化线程数量threads
将stop变量初始化为false
根据threads,创建相应数量的线程对象,存储到workers中。C++中的线程是创建即启动,所以这些线程也同时开始运行。运行时行为模式为以下三种:
无任务线程池未终止:阻塞
无任务线程池已终止:返回。线程函数返回后,线程结束,资源被释放。
有任务:执行任务。
任务入队函数
template<class F, class… Args>
auto ThreadPool::enqueue(F&& f, Args&&… args)
-> std::future<typename std::result_of<F(Args…)>::type>
{
using return_type = typename std::result_of<F(Args…)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)…)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don’t allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)…)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don’t allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}
该函数将任务提交到tasks中。
template<class F, class… Args>
auto ThreadPool::enqueue(F&& f, Args&&… args)
-> std::future<typename std::result_of<F(Args…)>::type>
接收任务,也就是传入的可调用对象的类型、可调用对象、参数类型列表和参数列表。
std::result_of<F(Args…)>::type推导出可调用对象的返回值类型。
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)…)
);
std::future<return_type> res = task->get_future();
将可调用对象及实参使用绑定器绑定,返回一个无参的可调用对象;
将绑定器返回的无参的可调用对象包装成一个packaged_task对象;
使用packaged_task对象创建智能指针shared_ptr。
packaged_task对象获取一个std::future对象
补充知识:
std::packaged_task
std::packaged_task 是 C++ 标准库中用于封装可调用对象的类模板。它允许将一个函数(或其他可调用对象)及其参数包装起来,并与一个 std::future 对象关联。
模板参数:std::packaged_task<R(Args)>std::packaged_task接收的模版参数是函数签名R(Args),如同std::function<R(Args)>一样。
std::future
是 C++ 标准库中用于异步操作的工具。它表示某个异步任务完成后的返回值。这里std::future用于接收std::package_task的返回值。
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don’t allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
// don’t allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
花括号是为了让互斥锁出作用域时自动释放。如果线程池终止,就抛出异常,否则将任务入队。
condition.notify_one();
return res;
最后,通知阻塞的线程执行任务,返回。
析构函数
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}
将线程池运行状态转为终止。唤醒所有阻塞的线程,阻塞的线程被唤醒后,处于线程池终止无任务状态,会被回收。正在执行任务的线程执行完任务后,也会进入线程池终止无任务状态,被回收。worker.join();使得主线程等待所有子线程执行完毕。
小结
该线程池只实现了固定线程数量的模式。可以考虑增加动态增减线程的功能。

夜雨聆风
