济南东易日盛装饰公司seo优
原文链接:C++实现一个线程池
介绍
线程池是提高CPU利用率的一个非常高效的方法,线程池就是通过预先创建多个线程,当有任务时就执行,无任务时就阻塞.
相比一般的多线程方法,线程池更加简单,模块化,并且效率更高,因为不会重复创建删除线程.
预备知识
异步线程(包括future,packaged_task等对象): 创建异步返回的线程
wrapper装饰器(function+bind): 实现函数封装,泛型编程
condition_variable条件变量: 线程池任务分发和执行的条件控制
shared_ptr智能指针: 优化内存管理
类型推导和后置返回类型: 怎么实现泛型下正确返回未知类型
完美转发: 了解如何实现完美转发
线程池逻辑实现
#include<iostream>
#include<queue>
#include<functional>
#include<condition_variable>
#include<vector>
#include<thread>
#include<future>
#include<atomic>class ThreadPools{
public:ThreadPools(int n_threads): stop(false){for(int i=0;i<n_threads;i++){workers.emplace_back([this](){// 每个线程构造函数while(true){int task;{ //锁的作用域std::unique_lock<std::mutex> lock(this->mtx); //获取队列锁//判断队列非空或者线程池stop则返回.this->cv.wait(lock,[this](){return this->stop||!this->tasks.empty();});if(this->stop&&this->tasks.empty()) return ;// 如果线程池stop且队列空,结束线程.task=this->tasks.front();this->tasks.pop();}std::cout<<"run task:"<< task<<"\n";} });}};~ThreadPools(){{std::unique_lock<std::mutex> lock(mtx);stop=true;}cv.notify_all();//注意必须是 &work引用参数形式, 线程不允许复制构造for(std::thread &work: workers){ work.join();}std::cout<<"thread pools final stop\n";};void enqueue(int task){{std::unique_lock<std::mutex> lock(mtx);if(stop){std::cout<<"push to a stop pools\n";return ;}tasks.push(task);}std::cout<<"push task:"<< task<<"\n";cv.notify_one();return ;};private:std::vector<std::thread> workers;std::queue<int> tasks;std::condition_variable cv;std::mutex mtx;std::atomic<bool> stop;
};int main(){ThreadPools pools(2);for(int i=0;i<10;i++) pools.enqueue(i);return 0;
简单线程池
实现一个线程池,并执行三种任务: 有参无返回值任务,有参有返回值任务 和 有参返回消息结构体任务
#include<thread>
#include<functional>
#include<future>
#include<condition_variable>
#include<memory>
#include<iostream>
#include<mutex>
#include<atomic>
#include<vector>
#include<queue>
#include<type_traits>
#include<cstring>class ThreadPools{
public:ThreadPools(int n_threads): n_threads(n_threads),stop(false){for(int i=0;i<n_threads;i++){workers.emplace_back([this](){while(true){std::packaged_task<void()> task;{std::unique_lock<std::mutex> lock(this->mtx);this->cv.wait(lock,[this](){return this->stop || !this->tasks.empty();});if(this->stop && this->tasks.empty()) return ;task=std::move(this->tasks.front());this->tasks.pop();}task();//std::cout<<"run a task, "<<"current tasks size="<<tasks.size()<<"\n";}});}}~ThreadPools(){{std::unique_lock<std::mutex> lock(mtx);stop=true;}cv.notify_all();for(std::thread &worker : workers){worker.join();}//std::cout<<"thread pools terminated\n";}template<typename F>auto enqueue(F&& f)->std::future<typename std::result_of<F()>::type>;private:int n_threads;std::atomic<bool> stop;std::vector<std::thread> workers;std::queue<std::packaged_task<void()>> tasks;std::condition_variable cv;std::mutex mtx;
};template<typename F>
auto ThreadPools::enqueue(F&& f)->std::future<typename std::result_of<F()>::type>{using return_type=typename std::result_of<F()>::type;auto task=std::make_shared<std::packaged_task<return_type()>>(std::forward<F>(f));std::future<return_type> res=task->get_future();{std::unique_lock<std::mutex> lock(mtx);if(stop){throw std::runtime_error("enqueue on stopped ThreadPool");}tasks.emplace([task](){(*task)();});}cv.notify_one();//std::cout<<"push a task, "<<"current tasks size="<<tasks.size()<<"\n";return res;
}class Message{
public:Message(int fd,int from,int to,std::string& content): fd(fd),from(from),to(to),content(content){size=content.size();}int fd;int from;int to;int size;std::string content;
};int main(){ThreadPools tp(3);// 无返回值任务for(int i=0;i<10;i++){tp.enqueue([i](){std::cout<<"task "<<i<<"\n";});}// 有返回值任务, 正常来讲,返回值是每回轮询,不是等待.std::vector<std::future<int>> results;for(int i=0;i<10;i++){auto f=[](int i) { return i * i; };std::function<int()> bf=std::bind(f,i);results.push_back(tp.enqueue(bf));}for(auto &result: results){std::cout<<"get result="<<result.get()<<"\n";}// 消息结构体作为返回值更常用std::vector<std::future<Message>> msgs;for(int i=0;i<10;i++){auto f=[](int i) { std::string m="a message from:"; m+=std::to_string(i); Message msg(i,i,i,m); return msg;};std::function<Message()> bf=std::bind(f,i);msgs.push_back(tp.enqueue(bf));}for(auto &msg: msgs){Message m(msg.get());std::cout<<"get message:"<<" from:"<<m.from<<" to:"<<m.to<<" size:"<<m.size<<" content:"<<m.content<<"\n";}return 0;
}// task task 1
// task task 3
// 2task 4// task 5
// 0task 6// task 8
// task 9
// get result=task 07// get result=1
// get result=4
// get result=9
// get result=16
// get result=25
// get result=36
// get result=49
// get result=64
// get result=81
// get message: from:0 to:0 size:16 content:a message from:0
// get message: from:1 to:1 size:16 content:a message from:1
// get message: from:2 to:2 size:16 content:a message from:2
// get message: from:3 to:3 size:16 content:a message from:3
// get message: from:4 to:4 size:16 content:a message from:4
// get message: from:5 to:5 size:16 content:a message from:5
// get message: from:6 to:6 size:16 content:a message from:6
// get message: from:7 to:7 size:16 content:a message from:7
// get message: from:8 to:8 size:16 content:a message from:8
// get message: from:9 to:9 size:16 content:a message from:9