0.线程池的概念
1.线程池使用步骤
①初始化线程池:确定线程数量,并做好互斥访问;
②启动所有线程
③准备好任务处理基类;
④获取任务接口:通过条件变量阻塞等待任务
2.atomic原子操作
'std:atomic`是C++11标准库中的一个模板类,用于实现多线程环境下的原子操作。它提供了一种线程安全的方式来访问和修改共享变量,可以避免多线程环境中的数据竞争问题,'std:atomic’的使用方式类似于普通的C++变量,但是它的操作是原子性的。也就是说,在多线程环境下,多个线程同时对同一个’std:atomic变量进行操作时,不会出现数据竞争问题。
3.线程池案例
①threadpool.cpp
#include "threadpool.h"void XThreadPool::Init(int num)
{std::unique_lock<std::mutex> lock(__mux__);thread_num = num;std::cout << "Thread pool Init: " << num << std::endl;
}void XThreadPool::Start()
{std::unique_lock<std::mutex> lock(__mux__);if (thread_num <= 0){std::cerr << "Please Init XThreadPool !" << std::endl;return;}if (!threads.empty()){std::cerr << "Thread Pool has start!" << std::endl;return;}for (int i = 0; i < thread_num; i++){auto th = std::make_shared<std::thread>(&XThreadPool::Run, this);threads.push_back(th);}
}void XThreadPool::Run()
{std::cout << "begin XThreadPool Run: " << std::this_thread::get_id() << std::endl;while (true){auto task = GetTask();if (!task){continue;}++__task_run_count__;try{auto re = task->Run();task->Setvalue(re);}catch (...){}--__task_run_count__;}std::cout << "end XThreadPool Run: " << std::this_thread::get_id() << std::endl;
}void XThreadPool::AddTask(XTask *task)
{std::unique_lock<std::mutex> lock(__mux__);tasks.push_back(task);task->is_exit = [this] {return is_exit(); }
}XTask* XThreadPool::GetTask()
{std::unique_lock<std::mutex> lock(__mux__);if (tasks.empty()){__cv__.wait(lock);}auto task = tasks.front();tasks.pop_front();return task;
}void XThreadPool::Stop()
{exit = true;__cv__.notify_all();for (auto &th : threads){th->join();}std::unique_lock<std::mutex> lock(__mux__);threads.clear();
}
②threadpool.h
#pragma once#include <thread>
#include <mutex>
#include <vector>
#include <list>
#include <iostream>
#include <string>
#include <condition_variable>
#include <functional>
#include <atomic>
#include <future>class XTask
{public:virtual int Run() = 0;std::function<bool()> is_exit = nullptr;void Setvalue(int v) { __p__.set_value(v); }auto GetValue() { return __p__.get_future().get(); }private:std::promise<int> __p__;//用来接收返回值
};class XThreadPool
{public:void Init(int num);void Start();//所有线程启动函数void Stop();//线程池退出void AddTask(XTask *task);XTask* GetTask();bool is_exit() { return exit; }int task_run_count() { return __task_run_count__; }private:int thread_num = 0;std::mutex __mux__;void Run();//线程池线程入口函数std::vector<std::shared_ptr<std::thread>> threads;std::list<XTask*> tasks;std::condition_variable __cv__;bool exit = false;std::atomic<int> __task_run_count__ = {0};//正在运行的任务数量
};
③main.cpp
#include "threadpool.h"class MyTask :public XTask
{public:int Run(){std::cout <<"==============================================" << std::endl;std::cout << std::this_thread::get_id() << "-Mytask" << name << std::endl;std::cout << "==============================================" << std::endl;for (int i = 0; i < 10; i++){if(is_exit()){break;}std::cout << "." << std::flush;std::this_thread::sleep_for(std::chrono::microseconds(500));}return 0;}std::string name = "";
};int main()
{XThreadPool pool;pool.Init(16);pool.Start();MyTask task1;task1.name = "test name 001";pool.AddTask(&task1);MyTask task2;task2.name = "test name 002";pool.AddTask(&task2);std::this_thread::sleep_for(std::chrono::seconds(100));std::cout << "task run count =" << pool.task_run_count() << std::endl;MyTask task3;task3.name = "test name 003";pool.AddTask(&task3);MyTask task4;task4.name = "test name 004";pool.AddTask(&task4);std::cout << "task run count = " << pool.task_run_count() << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));pool.Stop();std::cout << "task run count =" << pool.task_run_count() << std::endl;getchar();return 0;
}