手写线程池——C和C++版本

发布时间 2023-11-24 17:27:00作者: Beasts777

内容参考:爱编程的大丙 (subingwen.cn)

C语言版

大致思路

  • 采用生产者——消费者模型:
    • 生产者:用户向任务队列添加任务,是生产者。
    • 消费者:线程池里面的线程从任务队列中取出任务是,是消费者。
  • 任务队列:
    • 单个任务结构:使用结构体封装,其中包含一个函数指针,用于指向要处理的具体任务函数。另一个是参数指针,指向向任务函数传递的参数。
    • 任务队列结构:使用环形队列,从队列头取数据,从队列尾存放数据。
  • 线程池结构:
    • 管理者线程:用于定时检测线程池状态,根据任务数量和存货线程数量决定创建或销毁线程。
    • 消费者线程:使用数组结构。承载所有用于工作的子线程。
    • 任务队列:用于存储任务,尾部存,头部取。
    • 锁和条件变量:用于维持线程同步。
    • 线程池销毁标志:用于标志线程池状态,同时也间接参与线程的销毁。
    • 其它变量:如任务队列容量、最多同时存在的线程数等。

头文件

 // 线程池头文件
#ifndef _THREADPOOL_H
#define _THREADPOOL_H

typedef struct ThreadPool ThreadPool;    // 使用前的预定义,具体定义在源代码内
// 创建线程池并初始化
ThreadPool *threadPoolCreate(int min, int max, int queueCapacity);
// 销毁线程池
void threadPoolDestroy(ThreadPool *pool);
// 添加工作
void threadPoolAdd(ThreadPool *pool, void (*func)(void *), void *arg);
// 获取当前正在工作的线程数
int threadPoolBusyNum(ThreadPool *pool);
// 获取当前存活的线程数量
int threadPoolAliveNum(ThreadPool *pool);
// 线程池工作函数 
void* work(void *arg);
// 管理线程的工作函数
void* manage(void *arg);
// 退出当前线程,并将其在线程池内的ID值转化为0
void threadExit(ThreadPool *pool);
#endif

源代码

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>    // C语言线程池头文件
#include "threadpool.h"

// 任务结构体
typedef struct Task{
    void (*function) (void *);    // 函数指针
    void *arg;	
}Task;
// 每次新增、删除线程的最大数量
const int NUMBER = 2;

// 线程池结构体
typedef struct ThreadPool{
    // 任务队列
    Task* taskQ;
    int queueCapacity;    // 容量
    int queueSize;        // 当前队列内存在的任务
    int queueFront;       // 队头。用于取出数据
    int queueRear;        // 队尾。用于存放数据
    // 线程
    pthread_t manageID;   // 管理者线程的ID
    pthread_t* threadIDs; // 生产者线程的ID,指向一个数组
    int minNum;           // 最小的线程数量
    int maxNum;           // 最大线程数量
    int busyNum;          // 正在工作的线程的数量(变量最频繁的变量)
    int liveNum;          // 当前存活的线程的数量
    int exitNum;          // 需要销毁的线程数量(根据实际场景计算)
    pthread_mutex_t mutexPool;    // 互斥锁,锁住线程池 
    pthread_mutex_t mutexBusy;    // 互斥锁,锁住busyNum变量
	pthread_cond_t full;       // 任务队列是否已经满了
	pthread_cond_t empty;      // 任务队列是不是空的 
    int shutdown;                 // 是否销毁线程池
}ThreadPool;

// 退出线程的函数
void threadExit(ThreadPool *pool) {
    pthread_t nowThread = pthread_self();
    for (int i = 0; i < pool->maxNum; i++){
        if(pool->threadIDs[i] == nowThread) {
            pool->threadIDs[i] = 0;
            printf("consumer thread %ld is finished\n", nowThread);
            pthread_exit(NULL);
            break;
        }
    }
    return ;
}

// 管理线程的函数:主要负责新增线程、销毁线程
void *manage(void *arg){
    ThreadPool *pool = (ThreadPool *)arg;
    while (!pool->shutdown) {       // 当线程池没有被关闭时
        // 三秒检测一次线程池
        sleep(3);                   
        // 访问只有读操作的内存中存储的参数,无需使用锁
        int maxNum = pool->maxNum;          // 最大线程数
        int minNum = pool->minNum;          // 最小线程数
        int queueCapacity = pool->queueCapacity;    //  任务队列容量
        // 对临界资源进行操作 
        pthread_mutex_lock(&pool->mutexPool);
        int liveNum = pool->liveNum;        // 当前存活的线程数
        int queueSize = pool->queueSize;    // 当前任务数量
        // 检查是否需要新增线程:当任务数量>存活线程数 && 存活线程数<最大线程数
        if(queueSize > liveNum && liveNum < maxNum) {
            int var = 0;
            // 遍历线程数组,找出目前还未初始化的位置
            for (int i = 0; i < maxNum; i++) {
                if (pool->threadIDs[i] == 0){
                    pthread_t newThread;
                    pthread_create(&newThread, NULL, work, pool);
                    pool->threadIDs[i] = newThread;
                    pool->liveNum++;
                    if(++var >= NUMBER){
                        break;
                    }
                }
            }
        }
        // 检查是否需要销毁线程:当任务数量*2<存活线程数 && 存活线程数>最小线程数
        if (queueSize * 2 < liveNum && liveNum > minNum) {
            int var = 0;
            // 通过设置exitNum并且释放至少一个被empty阻塞的子线程,让子线程自己死亡
            for (int i = 0; i < NUMBER; i++) {
                pool->exitNum++;
                pthread_cond_signal(&pool->empty);
                var++;
                if (liveNum - var <= minNum) {
                    break;
                }
            }
        }
        // 释放线程池锁
        pthread_mutex_unlock(&pool->mutexPool);
    }
    return NULL;
}
// 消费者线程的工作函数
void *work(void *arg) {
    ThreadPool *pool = (ThreadPool *)arg;
    while (1) {
        // 需要对线程池的绝大部分内存进行操作,因此直接锁住整个线程池
        pthread_mutex_lock(&pool->mutexPool);
        // 如果当前任务队列为空且线程池还未被销毁,那么阻塞所有线程
        while( pool->queueSize == 0 && !pool->shutdown ){
            pthread_cond_wait(&pool->empty, &pool->mutexPool); 
            // 判断是否需要销毁子线程
            if(pool->exitNum > 0){
                pool->exitNum--;
                if (pool->liveNum > pool->minNum) {
                    pool->liveNum--;
                    pthread_mutex_unlock(&pool->mutexPool);
                    threadExit(pool);
                }
            }
        }
        // 如果此时线程池已经被摧毁,那么释放锁,退出线程
        if(pool->shutdown){
            pthread_mutex_unlock(&pool->mutexPool);
            printf("consumer thread %ld is destroyed\n", pthread_self());
            pthread_exit(NULL);
        }
        // 此时线程走通了,那么从队列的头部取出任务并执行
        Task *task = &pool->taskQ[pool->queueFront];
        pool->queueFront = (++pool->queueFront) % pool->queueCapacity;  // 维护一个环形数组
        pool->queueSize--;
        // 唤醒因为任务队列已满而被阻塞的生产者
        pthread_cond_signal(&pool->full);
        // 此时就可以释放整个线程池的锁。将线程工作的内容放在整体锁的外面,提升效率
        pthread_mutex_unlock(&pool->mutexPool);
        // 线程开始工作,修改正在工作的线程的数量
        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum++; 
        pthread_mutex_unlock(&pool->mutexBusy);
        printf("thread %ld begin work\n", pthread_self());
        task->function(task->arg);     // 通过指针函数执行任务
        printf("thread %ld finish work\n", pthread_self());
        // 线程工作完毕
        free(task->arg);         // 任务参数建议使用堆内存,避免了生命周期的问题 
        task->arg = NULL;
        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum--;        // 工作线程数-- 
        pthread_mutex_unlock(&pool->mutexBusy);
    }
    return NULL;
}
// 初始化线程池
ThreadPool * threadPoolCreate(int min, int max, int queueCapacity){
	ThreadPool *threadpool = NULL;
    do {     	
         // 初始化线程池
         threadpool = (ThreadPool *)malloc(sizeof(ThreadPool));
         if(threadpool == NULL){
	          printf("malloc threadpool fail...\n");
			  break;
         }
	     // 初始化生产者线程数组L:这里使用calloc,是为了将所有线程ID初始化为0,便于后续判断
	      threadpool->threadIDs= (pthread_t *)calloc(max, sizeof(pthread_t));
         if(threadpool->threadIDs == NULL){
              printf("malloc threadIDs fail...\n");
			  break;
	     }
         // 初始化变量
	     threadpool->minNum = min;
         threadpool->maxNum = max;
         threadpool->liveNum = min;
         threadpool->busyNum = 0;
         threadpool->exitNum = 0;
         // 调用函数初始化互斥锁和条件变量
	     if((pthread_mutex_init(&threadpool->mutexPool, NULL) != 0)||
	     	(pthread_mutex_init(&threadpool->mutexBusy, NULL) != 0) || 
	     	(pthread_cond_init(&threadpool->empty, NULL) != 0) || 
	     	(pthread_cond_init(&threadpool->full, NULL) != 0)){
              printf("mutex or condition init fail...\n");
			  break;
	     }
         // 对任务队列进行初始化
         threadpool->taskQ = (Task *)malloc(sizeof(Task) * queueCapacity);
         threadpool->queueCapacity = queueCapacity;
         threadpool->queueSize = 0;
         threadpool->queueFront = 0;
         threadpool->queueRear = 0;
         // 创建管理者线程
	     pthread_create(&threadpool->manageID, NULL, manage, threadpool);
         // 创建消费者线程
         for (int i = 0; i < min; ++i) {
              pthread_create(&threadpool->threadIDs[i], NULL, work, threadpool);	
	     }	
        threadpool->shutdown = 0;
        printf("threadpool is successfully created!\n");
        return threadpool;
	}while(0);     // 通过while,从而使用break,避免在if内进行多次内存释放
    if(!threadpool && !threadpool->taskQ){
         free(threadpool->taskQ);	
	}	
    if(!threadpool && threadpool->threadIDs){
         free(threadpool->threadIDs);	
	}
    if(threadpool){
         free(threadpool);	
	}	
    return threadpool; 
}

// 生产者函数:添加任务
void threadPoolAdd(ThreadPool *pool, void (*func)(void *), void *arg){
    // 首先判读用户传入的pool是否已经初始化
    if(pool == NULL){
        return;
    }
    // 抢线程池锁
    pthread_mutex_lock(&pool->mutexPool);
    // 判断任务队列是否已经满了
    while (pool->queueSize == pool->queueCapacity && !pool->shutdown){
        pthread_cond_wait(&pool->full, &pool->mutexPool);
    }
    if(pool->shutdown){
        pthread_mutex_unlock(&pool->mutexPool);
        return;
    }
    // 在任务队列的队尾添加任务
    pool->taskQ[pool->queueRear].function = func;
    pool->taskQ[pool->queueRear].arg = arg;
    pool->queueSize++;
    pool->queueRear = (++pool->queueRear) % pool->queueCapacity;
    // 唤醒被empty阻塞的消费者线程
    pthread_cond_signal(&pool->empty);
    // 释放线程池锁
    pthread_mutex_unlock(&pool->mutexPool);
    return;
}
// 销毁线程池
void threadPoolDestroy(ThreadPool *pool){
    if (pool == NULL) {
        return;
    }
    // 如果任务还没有完成,返回提示
    if(pool->queueSize > 0){
        printf("there are still unfinished tasks!\n");
    }
    // 修改删除标志符,修改这个标识符后,子线程函数内判断,并自动结束,管理线程这里要手动回收
    pool->shutdown = 1;
    // 唤醒阻塞的消费者线程,让其自动死亡
    for(int i = 0; i < pool->liveNum; ++i) {
        pthread_cond_signal(&pool->empty);
    } 
    // 手动回收管理线程
    printf("manage thread %ld is destroyed\n", pool->manageID);
    pthread_join(pool->manageID, NULL);
    // 释放消费者线程占用的内存
    if(pool->threadIDs) {
        printf("consumer thread memory is freed\n");
        free(pool->threadIDs);
    }
    // 释放任务占用的内存
    if(pool->taskQ) {
        printf("task queue memory is freed\n");
        free(pool->taskQ);
    }
    // 释放互斥锁和条件变量
    printf("mutexex and condtion variables are destroyed\n");
    pthread_mutex_destroy(&pool->mutexBusy);
    pthread_mutex_destroy(&pool->mutexPool);
    pthread_cond_destroy(&pool->empty);
    pthread_cond_destroy(&pool->full);
    // 释放线程池占用的内存
    printf("threadpool memory is freed\n");
    free(pool);
    return;
}

// 获取当前存活的线程数量
int threadPoolAliveNum(ThreadPool *pool){
    if (pool == NULL) {
        return -1;
    }
    pthread_mutex_lock(&pool->mutexPool);
    int liveNum = pool->liveNum;
    pthread_mutex_unlock(&pool->mutexPool);
    return liveNum;
}

// 获取当前正在工作的线程数量 
int threadPoolBusyNum(ThreadPool *pool){
    if (pool == NULL) {
        return -1;
    }
    pthread_mutex_lock(&pool->mutexBusy);
    int busyNum = pool->busyNum;
    pthread_mutex_unlock(&pool->mutexBusy);
    return busyNum;
}