1、介绍
一个线程池由三部分组成:
(1)管理者线程,用于实时判断线程池中的任务个数、线程个数;并根据两者之间的变化来创建或销毁线程,与任务队列达到平衡。对应于示例中的thread_manage()函数。
(2)工作线程,线程池中线程,个数不定,在没有任务时处于等待状态,可以循环的执行任务。对应于示例中的thread_work()函数。
(3)任务队列,用于存放没有处理的任务,链表、数组等结构存储。
2、示例
代码参考:
手写线程池 - C语言版 | 爱编程的大丙
C语言实现简单的线程池项目(附完整工程文件)_线程池项目文档-CSDN博客
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>typedef struct TaskiLists
{void (*function)(void* arg);void* arg;
}TaskLists;struct ThreadPool
{// 任务队列TaskLists* task_list;// 任务队列的容量int task_list_cap;// 当前任务队列中任务的数量int task_list_size;// 任务队列队头数据int task_list_front;// 任务队列队尾数据int task_list_rear;// 线程池中线程队列pthread_t* threads_id;// 线程池中线程的最大数量int threads_max;// 线程池中线程的最小数量int threads_min;// 工作的线程个数int threads_busy_num;// 存活的线程个数int threads_live_num;// 要销毁的线程个数int threads_exit_num;// 管理者线程pthread_t thread_manage;// 锁 锁住整个线程池pthread_mutex_t lock_pool;// 锁 锁住threads_busy_num变量pthread_mutex_t lock_threads_busy_num;// 条件变量 task_list是不是达到最大容量pthread_cond_t task_list_full;// 条件变量 task_list是不是为空pthread_cond_t task_list_empty;// 是否销毁线程,销毁=1;不销毁=0int shut_down;
};typedef struct ThreadPool ThreadPool;//############################################################################################################
//############################################################################################################// 给线程池添加任务
void thread_pool_add(ThreadPool* pool, void(*func)(void*), void* arg)
{pthread_mutex_lock(&(pool->lock_pool));// 如果任务列表已满,添加进程阻塞while (pool->task_list_size == pool->task_list_cap && pool->shut_down == 0){pthread_cond_wait(&(pool->task_list_full), &(pool->lock_pool));}// 如果线程池要关闭了,直接返回if (pool->shut_down == 1){pthread_mutex_unlock(&(pool->lock_pool));return;}// 添加任务pool->task_list[pool->task_list_rear].function = func;pool->task_list[pool->task_list_rear].arg = arg;pool->task_list_size += 1;pool->task_list_rear = (pool->task_list_rear + 1) % pool->task_list_cap;printf("--------- add %d %ld\n", pool->task_list_size, pthread_self());// 通过条件变量通知任务开始处理pthread_cond_signal(&(pool->task_list_empty));pthread_mutex_unlock(&(pool->lock_pool));return;
}// 销毁一个线程
void thread_exit(ThreadPool* pool)
{pthread_t tid = pthread_self();for (int i = 0; i < pool->threads_max; i++){if (pool->threads_id[i] == tid){pool->threads_id[i] = 0;printf("threadExit() called, %ld exiting...\n", tid);break;}}pthread_exit(NULL);
}void thread_work(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;// 工作线程要一直在执行,时时执行while (1){sleep(1.3);pthread_mutex_lock(&(pool->lock_pool));// 如果任务列表为空,添加进程阻塞while (pool->task_list_size == 0 && pool->shut_down == 0){pthread_cond_wait(&(pool->task_list_empty), &(pool->lock_pool));if (pool->threads_exit_num > 0){pool->threads_exit_num--;if (pool->threads_live_num > pool->threads_min){pool->threads_live_num--;pthread_mutex_unlock(&(pool->lock_pool));// 销毁该线程thread_exit(pool);}}}// 如果线程池要关闭了,直接返回if (pool->shut_down == 1){pthread_mutex_unlock(&(pool->lock_pool));thread_exit(pool);}// 取任务参数TaskLists tmp;tmp.function = pool->task_list[pool->task_list_front].function;tmp.arg = pool->task_list[pool->task_list_front].arg;pool->task_list_size--;pool->task_list_front = (pool->task_list_front + 1) % pool->task_list_cap; // 向后偏移// 条件变量,通知pthread_cond_signal(&pool->task_list_full);pthread_mutex_unlock(&(pool->lock_pool));// 处理任务printf("thread %ld start working...\n", pthread_self());pthread_mutex_lock(&(pool->lock_threads_busy_num));pool->threads_busy_num += 1;pthread_mutex_unlock(&(pool->lock_threads_busy_num));tmp.function(tmp.arg);free(tmp.arg);tmp.arg = NULL;printf("thread %ld end working...\n", pthread_self());pthread_mutex_lock(&(pool->lock_threads_busy_num));pool->threads_busy_num--;pthread_mutex_unlock(&(pool->lock_threads_busy_num));}return;
}// 管理者线程 管理代码处理
void thread_manage(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (pool->shut_down == 0){printf("===========================\n");// 每3s检测一次pool的状态sleep(3);// 取出线程池中任务的数量和当前线程的数量pthread_mutex_lock(&(pool->lock_pool));int task_list_size = pool->task_list_size;int threads_live_num = pool->threads_live_num;pthread_mutex_unlock(&pool->lock_pool);// 取出工作的线程的数量pthread_mutex_lock(&pool->lock_threads_busy_num);int threads_busy_num = pool->threads_busy_num;pthread_mutex_unlock(&pool->lock_threads_busy_num);// 添加线程// 任务的个数 > 存活的线程个数 && 存活的线程数 < 最大线程数if (task_list_size > threads_live_num && threads_live_num < pool->threads_max){pthread_mutex_lock(&(pool->lock_pool));int cnt = 0;for (int i = 0; i < pool->threads_max && cnt < task_list_size && pool->threads_live_num < pool->threads_max; i++){if (pool->threads_id[i] == 0){pthread_create(&(pool->threads_id[i]), NULL, thread_work, pool);cnt += 1;pool->threads_live_num += 1;}}pthread_mutex_unlock(&pool->lock_pool);}// 销毁线程// 忙的线程*2 < 存活的线程数 && 存活的线程 > 最小线程数if (threads_busy_num * 2 < threads_live_num && threads_live_num > pool->threads_min){pthread_mutex_lock(&(pool->lock_pool));pool->threads_exit_num = threads_busy_num > pool->threads_min ? threads_busy_num : pool->threads_min;pthread_mutex_unlock(&(pool->lock_pool));// 让工作的线程自杀for (int i = 0; i < pool->threads_exit_num; i++){pthread_cond_signal(&(pool->task_list_empty));}}}return;
}// 创建线程池
ThreadPool* thread_pool_create(int max, int min, int task_list_cap)
{// 申请线程池结构体内容存放的地址ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));if (pool == NULL){printf("the pool malloc failed...\n");return NULL;}// 申请线程数组pool->threads_id = (pthread_t*)malloc(sizeof(pthread_t) * max);if (pool->threads_id == NULL) {printf("malloc threadIDs fail...\n");return NULL;}memset(pool->threads_id, 0, sizeof(pthread_t) * max);pool->threads_max = max;pool->threads_min = min;pool->threads_busy_num = 0;pool->threads_live_num = min; // 线程池中最少有min线程在启动,但是有可能不再工作pool->threads_exit_num = 0; // 需要销毁的线程个数;任务少的时候,大多数线程空闲时,需要销毁// 创建线程for (int i = 0; i < min; i++){pthread_create(&(pool->threads_id[i]), NULL, thread_work, pool);}// 管理者线程pool->thread_manage = (pthread_t*)malloc(sizeof(pthread_t));if (pool->thread_manage == NULL) {printf("malloc manage thread fail...\n");return NULL;}pthread_create(&(pool->thread_manage), NULL, thread_work, pool);// 任务队列初始化pool->task_list = (TaskLists*)malloc(sizeof(TaskLists) * task_list_cap);if (pool->task_list == NULL){printf("the task list init failed...\n");return NULL;}pool->task_list_cap = task_list_cap;pool->task_list_front = 0;pool->task_list_rear = 0;// 初始化锁和条件变量if (pthread_mutex_init(&(pool->lock_pool), NULL) != 0 || pthread_mutex_init(&(pool->lock_threads_busy_num), NULL) != 0 ||pthread_cond_init(&(pool->task_list_full), NULL) != 0 || pthread_cond_init(&(pool->task_list_empty), NULL) != 0){printf("mutex or condition init fail...\n");return;}pool->shut_down = 0;printf("----------------------- init success.\n");return pool;
}// 销毁线程池
void thread_pool_destory(ThreadPool* pool)
{if (pool == NULL){return;}// 关闭线程池pool->shut_down = 1;// 阻塞回收管理者线程pthread_join(&(pool->thread_manage), NULL);// 唤醒阻塞的消费者线程for (int i = 0; i < pool->threads_live_num; i++){pthread_cond_signal(&(pool->task_list_empty));}// 释放内存,申请的时候是一次申请出来的(一个数组),释放的时候也一样if (pool->threads_id != NULL){free(pool->threads_id);}// 释放内存,申请的时候是一次申请出来的,释放的时候也一样if (pool->task_list != NULL){free(pool->task_list);}pthread_mutex_lock(&(pool->lock_pool)); /*先锁住再销毁*/pthread_mutex_destroy(&(pool->lock_pool));pthread_mutex_lock(&(pool->lock_threads_busy_num));pthread_mutex_destroy(&(pool->lock_threads_busy_num));pthread_cond_destroy(&(pool->task_list_full));pthread_cond_destroy(&(pool->task_list_empty));// 释放内存free(pool);return;
}// 实际的处理函数
void taskFunc(void* arg)
{int num = *(int*)arg;printf("thread %ld is working, number = %d\n", pthread_self(), num);
}int main()
{ThreadPool* pool = thread_pool_create(10, 3, 100);for (int i = 0; i < 50; i++){int* num = (int*)malloc(sizeof(int));*num = i + 100;thread_pool_add(pool, taskFunc, num);sleep(2);}sleep(30);thread_pool_destory(pool);return 0;
}
结果:
----------------------- init success.
--------- add 1 140196491261696
--------- add 2 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 100
thread 140196491253504 start working...
thread 140196491253504 is working, number = 101
thread 140196491253504 end working...
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 102
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 103
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 104
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 105
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 106
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 107
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 108
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 109
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 110
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 111
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 112
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 113
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 114
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 115
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 116
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 117
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 118
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 119
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 120
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 121
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 122
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 123
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 124
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 125
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 126
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 127
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 128
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 129
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 130
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 131
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 132
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 133
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 134
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 135
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 136
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 137
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 138
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 139
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 140
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 141
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 142
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 143
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 144
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 145
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 146
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 147
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 148
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 149
thread 140196491253504 end working...
threadExit() called, 140196474468096 exiting...