内容参考:爱编程的大丙 (subingwen.cn)
C语言版
大致思路
- 采用
生产者——消费者模型:
- 生产者:用户向任务队列添加任务,是生产者。
- 消费者:线程池里面的线程从任务队列中取出任务是,是消费者。
- 任务队列:
- 单个任务结构:使用结构体封装,其中包含一个函数指针,用于指向要处理的具体任务函数。另一个是参数指针,指向向任务函数传递的参数。
- 任务队列结构:使用环形队列,从队列头取数据,从队列尾存放数据。
- 线程池结构:
- 管理者线程:用于定时检测线程池状态,根据任务数量和存货线程数量决定创建或销毁线程。
- 消费者线程:使用数组结构。承载所有用于工作的子线程。
- 任务队列:用于存储任务,尾部存,头部取。
- 锁和条件变量:用于维持线程同步。
- 线程池销毁标志:用于标志线程池状态,同时也间接参与线程的销毁。
- 其它变量:如任务队列容量、最多同时存在的线程数等。
头文件
// 线程池头文件
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
typedef struct ThreadPool ThreadPool; // 使用前的预定义,具体定义在源代码内
// 创建线程池并初始化
ThreadPool *threadPoolCreate(int min, int max, int queueCapacity);
// 销毁线程池
void threadPoolDestroy(ThreadPool *pool);
// 添加工作
void threadPoolAdd(ThreadPool *pool, void (*func)(void *), void *arg);
// 获取当前正在工作的线程数
int threadPoolBusyNum(ThreadPool *pool);
// 获取当前存活的线程数量
int threadPoolAliveNum(ThreadPool *pool);
// 线程池工作函数
void* work(void *arg);
// 管理线程的工作函数
void* manage(void *arg);
// 退出当前线程,并将其在线程池内的ID值转化为0
void threadExit(ThreadPool *pool);
#endif
源代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h> // C语言线程池头文件
#include "threadpool.h"
// 任务结构体
typedef struct Task{
void (*function) (void *); // 函数指针
void *arg;
}Task;
// 每次新增、删除线程的最大数量
const int NUMBER = 2;
// 线程池结构体
typedef struct ThreadPool{
// 任务队列
Task* taskQ;
int queueCapacity; // 容量
int queueSize; // 当前队列内存在的任务
int queueFront; // 队头。用于取出数据
int queueRear; // 队尾。用于存放数据
// 线程
pthread_t manageID; // 管理者线程的ID
pthread_t* threadIDs; // 生产者线程的ID,指向一个数组
int minNum; // 最小的线程数量
int maxNum; // 最大线程数量
int busyNum; // 正在工作的线程的数量(变量最频繁的变量)
int liveNum; // 当前存活的线程的数量
int exitNum; // 需要销毁的线程数量(根据实际场景计算)
pthread_mutex_t mutexPool; // 互斥锁,锁住线程池
pthread_mutex_t mutexBusy; // 互斥锁,锁住busyNum变量
pthread_cond_t full; // 任务队列是否已经满了
pthread_cond_t empty; // 任务队列是不是空的
int shutdown; // 是否销毁线程池
}ThreadPool;
// 退出线程的函数
void threadExit(ThreadPool *pool) {
pthread_t nowThread = pthread_self();
for (int i = 0; i < pool->maxNum; i++){
if(pool->threadIDs[i] == nowThread) {
pool->threadIDs[i] = 0;
printf("consumer thread %ld is finished\n", nowThread);
pthread_exit(NULL);
break;
}
}
return ;
}
// 管理线程的函数:主要负责新增线程、销毁线程
void *manage(void *arg){
ThreadPool *pool = (ThreadPool *)arg;
while (!pool->shutdown) { // 当线程池没有被关闭时
// 三秒检测一次线程池
sleep(3);
// 访问只有读操作的内存中存储的参数,无需使用锁
int maxNum = pool->maxNum; // 最大线程数
int minNum = pool->minNum; // 最小线程数
int queueCapacity = pool->queueCapacity; // 任务队列容量
// 对临界资源进行操作
pthread_mutex_lock(&pool->mutexPool);
int liveNum = pool->liveNum; // 当前存活的线程数
int queueSize = pool->queueSize; // 当前任务数量
// 检查是否需要新增线程:当任务数量>存活线程数 && 存活线程数<最大线程数
if(queueSize > liveNum && liveNum < maxNum) {
int var = 0;
// 遍历线程数组,找出目前还未初始化的位置
for (int i = 0; i < maxNum; i++) {
if (pool->threadIDs[i] == 0){
pthread_t newThread;
pthread_create(&newThread, NULL, work, pool);
pool->threadIDs[i] = newThread;
pool->liveNum++;
if(++var >= NUMBER){
break;
}
}
}
}
// 检查是否需要销毁线程:当任务数量*2<存活线程数 && 存活线程数>最小线程数
if (queueSize * 2 < liveNum && liveNum > minNum) {
int var = 0;
// 通过设置exitNum并且释放至少一个被empty阻塞的子线程,让子线程自己死亡
for (int i = 0; i < NUMBER; i++) {
pool->exitNum++;
pthread_cond_signal(&pool->empty);
var++;
if (liveNum - var <= minNum) {
break;
}
}
}
// 释放线程池锁
pthread_mutex_unlock(&pool->mutexPool);
}
return NULL;
}
// 消费者线程的工作函数
void *work(void *arg) {
ThreadPool *pool = (ThreadPool *)arg;
while (1) {
// 需要对线程池的绝大部分内存进行操作,因此直接锁住整个线程池
pthread_mutex_lock(&pool->mutexPool);
// 如果当前任务队列为空且线程池还未被销毁,那么阻塞所有线程
while( pool->queueSize == 0 && !pool->shutdown ){
pthread_cond_wait(&pool->empty, &pool->mutexPool);
// 判断是否需要销毁子线程
if(pool->exitNum > 0){
pool->exitNum--;
if (pool->liveNum > pool->minNum) {
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}
// 如果此时线程池已经被摧毁,那么释放锁,退出线程
if(pool->shutdown){
pthread_mutex_unlock(&pool->mutexPool);
printf("consumer thread %ld is destroyed\n", pthread_self());
pthread_exit(NULL);
}
// 此时线程走通了,那么从队列的头部取出任务并执行
Task *task = &pool->taskQ[pool->queueFront];
pool->queueFront = (++pool->queueFront) % pool->queueCapacity; // 维护一个环形数组
pool->queueSize--;
// 唤醒因为任务队列已满而被阻塞的生产者
pthread_cond_signal(&pool->full);
// 此时就可以释放整个线程池的锁。将线程工作的内容放在整体锁的外面,提升效率
pthread_mutex_unlock(&pool->mutexPool);
// 线程开始工作,修改正在工作的线程的数量
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
printf("thread %ld begin work\n", pthread_self());
task->function(task->arg); // 通过指针函数执行任务
printf("thread %ld finish work\n", pthread_self());
// 线程工作完毕
free(task->arg); // 任务参数建议使用堆内存,避免了生命周期的问题
task->arg = NULL;
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--; // 工作线程数--
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}
// 初始化线程池
ThreadPool * threadPoolCreate(int min, int max, int queueCapacity){
ThreadPool *threadpool = NULL;
do {
// 初始化线程池
threadpool = (ThreadPool *)malloc(sizeof(ThreadPool));
if(threadpool == NULL){
printf("malloc threadpool fail...\n");
break;
}
// 初始化生产者线程数组L:这里使用calloc,是为了将所有线程ID初始化为0,便于后续判断
threadpool->threadIDs= (pthread_t *)calloc(max, sizeof(pthread_t));
if(threadpool->threadIDs == NULL){
printf("malloc threadIDs fail...\n");
break;
}
// 初始化变量
threadpool->minNum = min;
threadpool->maxNum = max;
threadpool->liveNum = min;
threadpool->busyNum = 0;
threadpool->exitNum = 0;
// 调用函数初始化互斥锁和条件变量
if((pthread_mutex_init(&threadpool->mutexPool, NULL) != 0)||
(pthread_mutex_init(&threadpool->mutexBusy, NULL) != 0) ||
(pthread_cond_init(&threadpool->empty, NULL) != 0) ||
(pthread_cond_init(&threadpool->full, NULL) != 0)){
printf("mutex or condition init fail...\n");
break;
}
// 对任务队列进行初始化
threadpool->taskQ = (Task *)malloc(sizeof(Task) * queueCapacity);
threadpool->queueCapacity = queueCapacity;
threadpool->queueSize = 0;
threadpool->queueFront = 0;
threadpool->queueRear = 0;
// 创建管理者线程
pthread_create(&threadpool->manageID, NULL, manage, threadpool);
// 创建消费者线程
for (int i = 0; i < min; ++i) {
pthread_create(&threadpool->threadIDs[i], NULL, work, threadpool);
}
threadpool->shutdown = 0;
printf("threadpool is successfully created!\n");
return threadpool;
}while(0); // 通过while,从而使用break,避免在if内进行多次内存释放
if(!threadpool && !threadpool->taskQ){
free(threadpool->taskQ);
}
if(!threadpool && threadpool->threadIDs){
free(threadpool->threadIDs);
}
if(threadpool){
free(threadpool);
}
return threadpool;
}
// 生产者函数:添加任务
void threadPoolAdd(ThreadPool *pool, void (*func)(void *), void *arg){
// 首先判读用户传入的pool是否已经初始化
if(pool == NULL){
return;
}
// 抢线程池锁
pthread_mutex_lock(&pool->mutexPool);
// 判断任务队列是否已经满了
while (pool->queueSize == pool->queueCapacity && !pool->shutdown){
pthread_cond_wait(&pool->full, &pool->mutexPool);
}
if(pool->shutdown){
pthread_mutex_unlock(&pool->mutexPool);
return;
}
// 在任务队列的队尾添加任务
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueSize++;
pool->queueRear = (++pool->queueRear) % pool->queueCapacity;
// 唤醒被empty阻塞的消费者线程
pthread_cond_signal(&pool->empty);
// 释放线程池锁
pthread_mutex_unlock(&pool->mutexPool);
return;
}
// 销毁线程池
void threadPoolDestroy(ThreadPool *pool){
if (pool == NULL) {
return;
}
// 如果任务还没有完成,返回提示
if(pool->queueSize > 0){
printf("there are still unfinished tasks!\n");
}
// 修改删除标志符,修改这个标识符后,子线程函数内判断,并自动结束,管理线程这里要手动回收
pool->shutdown = 1;
// 唤醒阻塞的消费者线程,让其自动死亡
for(int i = 0; i < pool->liveNum; ++i) {
pthread_cond_signal(&pool->empty);
}
// 手动回收管理线程
printf("manage thread %ld is destroyed\n", pool->manageID);
pthread_join(pool->manageID, NULL);
// 释放消费者线程占用的内存
if(pool->threadIDs) {
printf("consumer thread memory is freed\n");
free(pool->threadIDs);
}
// 释放任务占用的内存
if(pool->taskQ) {
printf("task queue memory is freed\n");
free(pool->taskQ);
}
// 释放互斥锁和条件变量
printf("mutexex and condtion variables are destroyed\n");
pthread_mutex_destroy(&pool->mutexBusy);
pthread_mutex_destroy(&pool->mutexPool);
pthread_cond_destroy(&pool->empty);
pthread_cond_destroy(&pool->full);
// 释放线程池占用的内存
printf("threadpool memory is freed\n");
free(pool);
return;
}
// 获取当前存活的线程数量
int threadPoolAliveNum(ThreadPool *pool){
if (pool == NULL) {
return -1;
}
pthread_mutex_lock(&pool->mutexPool);
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return liveNum;
}
// 获取当前正在工作的线程数量
int threadPoolBusyNum(ThreadPool *pool){
if (pool == NULL) {
return -1;
}
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}