乐于分享
好东西不私藏

Github开源项目源码阅读(progschjThreadPool)

Github开源项目源码阅读(progschjThreadPool)

项目地址:https://github.com/progschj/ThreadPool

项目源码:

#ifndef THREAD_POOL_H

#define THREAD_POOL_H

include <vector>

include <queue>

include <memory>

include <thread>

include <mutex>

include <condition_variable>

include <future>

include <functional>

include <stdexcept>

class ThreadPool {

public:

ThreadPool(size_t);

template<class F, class… Args>

auto enqueue(F&& f, Args&&… args)

-> std::future<typename std::result_of<F(Args…)>::type>;

~ThreadPool();

private:

// need to keep track of threads so we can join them

std::vector< std::thread > workers;

// the task queue

std::queue< std::function<void()> > tasks;

// synchronization

std::mutex queue_mutex;

std::condition_variable condition;

bool stop;

};

// the constructor just launches some amount of workers

inline ThreadPool::ThreadPool(size_t threads)

:   stop(false)

{

for(size_t i = 0;i<threads;++i)

workers.emplace_back(

[this]

{

for(;😉

{

std::function<void()> task;

                {

                    std::unique_lock&lt;std::mutex&gt; lock(this-&gt;queue_mutex);

                    this-&gt;condition.wait(lock,

                        [this]{ return this-&gt;stop || !this-&gt;tasks.empty(); });

                    if(this-&gt;stop &amp;&amp; this-&gt;tasks.empty())

                        return;

                    task = std::move(this-&gt;tasks.front());

                    this-&gt;tasks.pop();

                }

                task();

            }

        }

    );

}

// add new work item to the pool

template<class F, class… Args>

auto ThreadPool::enqueue(F&& f, Args&&… args)

-> std::future<typename std::result_of<F(Args…)>::type>

{

using return_type = typename std::result_of<F(Args…)>::type;

auto task = std::make_shared&lt; std::packaged_task&lt;return_type()&gt; &gt;(

        std::bind(std::forward&lt;F&gt;(f), std::forward&lt;Args&gt;(args)…)

    );

std::future&lt;return_type&gt; res = task-&gt;get_future();

{

    std::unique_lock&lt;std::mutex&gt; lock(queue_mutex);

    // don’t allow enqueueing after stopping the pool

    if(stop)

        throw std::runtime_error(&quot;enqueue on stopped ThreadPool&quot;);

    tasks.emplace([task](){ (*task)(); });

}

condition.notify_one();

return res;

}

// the destructor joins all threads

inline ThreadPool::~ThreadPool()

{

{

std::unique_lock<std::mutex> lock(queue_mutex);

stop = true;

}

condition.notify_all();

for(std::thread &worker: workers)

worker.join();

}

endif

// synchronization

std::mutex queue_mutex;

std::condition_variable condition;

bool stop;

                {

                    std::unique_lock&lt;std::mutex&gt; lock(this-&gt;queue_mutex);

                    this-&gt;condition.wait(lock,

                        [this]{ return this-&gt;stop || !this-&gt;tasks.empty(); });

                    if(this-&gt;stop &amp;&amp; this-&gt;tasks.empty())

                        return;

                    task = std::move(this-&gt;tasks.front());

                    this-&gt;tasks.pop();

                }

                task();

            }

        }

    );

auto task = std::make_shared&lt; std::packaged_task&lt;return_type()&gt; &gt;(

        std::bind(std::forward&lt;F&gt;(f), std::forward&lt;Args&gt;(args)…)

    );

std::future&lt;return_type&gt; res = task-&gt;get_future();

{

    std::unique_lock&lt;std::mutex&gt; lock(queue_mutex);

    // don’t allow enqueueing after stopping the pool

    if(stop)

        throw std::runtime_error(&quot;enqueue on stopped ThreadPool&quot;);

    tasks.emplace([task](){ (*task)(); });

}

condition.notify_one();

return res;

用法示例:

#include <iostream>

#include <vector>

#include <chrono>

include “ThreadPool.h”

int main()

{

ThreadPool pool(4);

std::vector&lt; std::future&lt;int&gt; &gt; results;

for(int i = 0; i &lt; 8; ++i) {

    results.emplace_back(

        pool.enqueue([i] {

            std::cout &lt;&lt; &quot;hello &quot; &lt;&lt; i &lt;&lt; std::endl;

            std::this_thread::sleep_for(std::chrono::seconds(1));

            std::cout &lt;&lt; &quot;world &quot; &lt;&lt; i &lt;&lt; std::endl;

            return i*i;

        })

    );

}

for(auto &amp;&amp; result: results)

    std::cout &lt;&lt; result.get() &lt;&lt; ‘ ‘;

std::cout &lt;&lt; std::endl;

return 0;

ThreadPool pool(4);

std::vector&lt; std::future&lt;int&gt; &gt; results;

for(int i = 0; i &lt; 8; ++i) {

    results.emplace_back(

        pool.enqueue([i] {

            std::cout &lt;&lt; &quot;hello &quot; &lt;&lt; i &lt;&lt; std::endl;

            std::this_thread::sleep_for(std::chrono::seconds(1));

            std::cout &lt;&lt; &quot;world &quot; &lt;&lt; i &lt;&lt; std::endl;

            return i*i;

        })

    );

}

for(auto &amp;&amp; result: results)

    std::cout &lt;&lt; result.get() &lt;&lt; ‘ ‘;

std::cout &lt;&lt; std::endl;

return 0;

}

类成员变量

// need to keep track of threads so we can join them

std::vector< std::thread > workers;

// the task queue

std::queue< std::function<void()> > tasks;

// synchronization

std::mutex queue_mutex;

std::condition_variable condition;

bool stop;

    workers:存储启动的线程对象

    tasks:存储提交到线程池的任务

    queue_mutex:互斥锁,保证任务队列操作的原子性

    condition:配合互斥锁,实现线程通信

    stop:线程池是否停止

构造函数

inline ThreadPool::ThreadPool(size_t threads)

    :   stop(false)

{

    for(size_t i = 0;i<threads;++i)

        workers.emplace_back(

            [this]

            {

                for(;;)

                {

                    std::function<void()> task;

                {

                    std::unique_lock&lt;std::mutex&gt; lock(this-&gt;queue_mutex);

                    this-&gt;condition.wait(lock,

                        [this]{ return this-&gt;stop || !this-&gt;tasks.empty(); });

                    if(this-&gt;stop &amp;&amp; this-&gt;tasks.empty())

                        return;

                    task = std::move(this-&gt;tasks.front());

                    this-&gt;tasks.pop();

                }

                task();

            }

        }

    );

                {

                    std::unique_lock&lt;std::mutex&gt; lock(this-&gt;queue_mutex);

                    this-&gt;condition.wait(lock,

                        [this]{ return this-&gt;stop || !this-&gt;tasks.empty(); });

                    if(this-&gt;stop &amp;&amp; this-&gt;tasks.empty())

                        return;

                    task = std::move(this-&gt;tasks.front());

                    this-&gt;tasks.pop();

                }

                task();

            }

        }

    );

}

构造函数干了这几件事:

    根据构造线程池对象时传入的参数,初始化线程数量threads

    将stop变量初始化为false

    根据threads,创建相应数量的线程对象,存储到workers中。C++中的线程是创建即启动,所以这些线程也同时开始运行。运行时行为模式为以下三种:

        无任务线程池未终止:阻塞

        无任务线程池已终止:返回。线程函数返回后,线程结束,资源被释放。

        有任务:执行任务。

任务入队函数

template<class F, class… Args>

auto ThreadPool::enqueue(F&& f, Args&&… args) 

    -> std::future<typename std::result_of<F(Args…)>::type>

{

    using return_type = typename std::result_of<F(Args…)>::type;

auto task = std::make_shared&lt; std::packaged_task&lt;return_type()&gt; &gt;(

        std::bind(std::forward&lt;F&gt;(f), std::forward&lt;Args&gt;(args)…)

    );

std::future&lt;return_type&gt; res = task-&gt;get_future();

{

    std::unique_lock&lt;std::mutex&gt; lock(queue_mutex);

    // don’t allow enqueueing after stopping the pool

    if(stop)

        throw std::runtime_error(&quot;enqueue on stopped ThreadPool&quot;);

    tasks.emplace([task](){ (*task)(); });

}

condition.notify_one();

return res;

auto task = std::make_shared&lt; std::packaged_task&lt;return_type()&gt; &gt;(

        std::bind(std::forward&lt;F&gt;(f), std::forward&lt;Args&gt;(args)…)

    );

std::future&lt;return_type&gt; res = task-&gt;get_future();

{

    std::unique_lock&lt;std::mutex&gt; lock(queue_mutex);

    // don’t allow enqueueing after stopping the pool

    if(stop)

        throw std::runtime_error(&quot;enqueue on stopped ThreadPool&quot;);

    tasks.emplace([task](){ (*task)(); });

}

condition.notify_one();

return res;

}

该函数将任务提交到tasks中。

template<class F, class… Args>

auto ThreadPool::enqueue(F&& f, Args&&… args) 

    -> std::future<typename std::result_of<F(Args…)>::type>

接收任务,也就是传入的可调用对象的类型、可调用对象、参数类型列表和参数列表。

​std::result_of<F(Args…)>::type​推导出可调用对象的返回值类型。

auto task = std::make_shared< std::packaged_task<return_type()> >(

            std::bind(std::forward<F>(f), std::forward<Args>(args)…)

        );

std::future<return_type> res = task->get_future();

    将可调用对象及实参使用绑定器绑定,返回一个无参的可调用对象;

    将绑定器返回的无参的可调用对象包装成一个packaged_task​对象;

    使用packaged_task​对象创建智能指针shared_ptr​。

    ​packaged_task​对象获取一个std::future​对象

补充知识:

    ​​std::packaged_task​​

​std::packaged_task​ 是 C++ 标准库中用于封装可调用对象的类模板。它允许将一个函数(或其他可调用对象)及其参数包装起来,并与一个 std::future​ 对象关联。

    模板参数:std::packaged_task<R(Args)>std::packaged_task​接收的模版参数是函数签名R(Args)​,如同std::function<R(Args)>​一样。

    ​std::future​​

是 C++ 标准库中用于异步操作的工具。它表示某个异步任务完成后的返回值。这里std::future​用于接收std::package_task​的返回值。

{

        std::unique_lock<std::mutex> lock(queue_mutex);

    // don’t allow enqueueing after stopping the pool

    if(stop)

        throw std::runtime_error(&quot;enqueue on stopped ThreadPool&quot;);

    tasks.emplace([task](){ (*task)(); });

    // don’t allow enqueueing after stopping the pool

    if(stop)

        throw std::runtime_error(&quot;enqueue on stopped ThreadPool&quot;);

    tasks.emplace([task](){ (*task)(); });

}

花括号是为了让互斥锁出作用域时自动释放。如果线程池终止,就抛出异常,否则将任务入队。

condition.notify_one();

return res;

最后,通知阻塞的线程执行任务,返回。

析构函数

inline ThreadPool::~ThreadPool()

{

    {

        std::unique_lock<std::mutex> lock(queue_mutex);

        stop = true;

    }

    condition.notify_all();

    for(std::thread &worker: workers)

        worker.join();

}

将线程池运行状态转为终止。唤醒所有阻塞的线程,阻塞的线程被唤醒后,处于线程池终止无任务状态,会被回收。正在执行任务的线程执行完任务后,也会进入线程池终止无任务状态,被回收。worker.join();​使得主线程等待所有子线程执行完毕。

小结

该线程池只实现了固定线程数量的模式。可以考虑增加动态增减线程的功能。

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » Github开源项目源码阅读(progschjThreadPool)

评论 抢沙发

3 + 2 =
  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
×
订阅图标按钮