介绍
C Linux实现线程池技术
作者第一次编写的线程池,推荐使用的时候修改thread_manager函数中部分逻辑
支持库
#include <stdlib.h> #include <pthread.h> #include <string.h> #include <time.h> #include <unistd.h>
代码
ThreadPool.h
typedef struct MissionAttr { void (*mission)(void *); void *param; time_t createTime; char state; } MissionAttr; typedef struct MissionNode { MissionAttr *mission; struct MissionNode *next; struct MissionNode *back; } MissionNode; typedef struct MissionList { MissionNode *front; MissionNode *final; } MissionList; typedef struct ThreadAttr { pthread_t id; time_t createTime; char state; } ThreadAttr; typedef struct ThreadNode { ThreadAttr *threadAttr; struct ThreadNode *next; struct ThreadNode *back; } ThreadNode; typedef struct ThreadList { ThreadNode *front; ThreadNode *final; } ThreadList; typedef struct ThreadPool { int controlThreadId; ThreadList *threadList; MissionList *missionWaiting; MissionList *missionList; pthread_t managerID; int waitingMissionNumber; int hanldingMissionNumber; int busyNumber; int freeNumber; int needCloseNumber; int maxNumber; int minNumber; int existNumber; char close; pthread_mutex_t managerMutex; pthread_mutex_t missionMutex; pthread_mutex_t threadMutex; pthread_mutex_t workerMutex; pthread_cond_t workerCond; } ThreadPool; typedef struct ThreadArgs { ThreadPool *threadPool; ThreadNode *threadNode; } ThreadArgs; // 主要函数 // 创建线程池 ThreadPool *create_thread_pool(int minNumber, int maxNumber); // 提交任务 void thread_pool_submit(ThreadPool *threadPool, void *func, void *args); // 启动线程池 int thread_pool_run(ThreadPool *threadPool); // 关闭并释放线程池 void thread_shutdown_and_free(ThreadPool *threadPool); // 其他函数 // 管理者线程 void *thread_manager(void *args); // 工作者线程 void *thread_worker(void *args); // 创建工作者线程 int creater_thread_worker(ThreadPool *threadPool, int number); // 获取等待中的任务 MissionNode *get_mission(ThreadPool *threadPool); // 释放完成的任务 void free_mission(ThreadPool *threadPool, MissionNode *missionNode); // 基础函数 // 申请内存修复版 void *fixMalloc(size_t size);
ThreadPool.c
#include "./ThreadPool.h" void *fixMalloc(size_t size) { void *tmp = NULL; while (1) { tmp = malloc(size); if (tmp) { break; } sleep(1); } return tmp; } ThreadPool *create_thread_pool(int minNumber, int maxNumber) { ThreadPool *threadPool = (ThreadPool *)fixMalloc(sizeof(ThreadPool)); memset(threadPool, 0, sizeof(ThreadPool)); threadPool->minNumber = minNumber; threadPool->maxNumber = maxNumber; pthread_mutex_init(&threadPool->missionMutex, NULL); pthread_mutex_init(&threadPool->threadMutex, NULL); pthread_mutex_init(&threadPool->workerMutex, NULL); pthread_cond_init(&threadPool->workerCond, NULL); pthread_mutex_init(&threadPool->managerMutex, NULL); threadPool->threadList = (ThreadList *)fixMalloc(sizeof(ThreadList)); threadPool->missionList = (MissionList *)fixMalloc(sizeof(MissionList)); threadPool->missionWaiting = (MissionList *)fixMalloc(sizeof(MissionList)); memset(threadPool->threadList, 0, sizeof(ThreadList)); memset(threadPool->missionList, 0, sizeof(MissionList)); memset(threadPool->missionWaiting, 0, sizeof(ThreadList)); threadPool->controlThreadId = 0; threadPool->waitingMissionNumber = 0; threadPool->hanldingMissionNumber = 0; threadPool->busyNumber = 0; threadPool->freeNumber = 0; threadPool->needCloseNumber = 0; threadPool->existNumber = 0; return threadPool; } void thread_pool_submit(ThreadPool *threadPool, void *func, void *args) { if (!threadPool->close && func != NULL) { MissionList *missionWaiting = threadPool->missionWaiting; MissionNode *newMissionNode = (MissionNode *)fixMalloc(sizeof(MissionNode)); newMissionNode->mission = (MissionAttr *)fixMalloc(sizeof(MissionAttr)); newMissionNode->mission->createTime = time(NULL); newMissionNode->mission->mission = func; newMissionNode->mission->param = args; newMissionNode->mission->state = 0; newMissionNode->next = NULL; if (missionWaiting->final == NULL) { missionWaiting->front = newMissionNode; } else { newMissionNode->back = missionWaiting->final; missionWaiting->final->next = newMissionNode; } missionWaiting->final = newMissionNode; threadPool->waitingMissionNumber = threadPool->waitingMissionNumber + 1; } } MissionNode *get_mission(ThreadPool *threadPool) { MissionList *missionWaiting = threadPool->missionWaiting; if (missionWaiting->front == NULL) { return NULL; } MissionList *missionList = threadPool->missionList; MissionNode *missionNode = missionWaiting->front; if (missionWaiting->front == missionWaiting->final) { missionWaiting->final = NULL; } missionWaiting->front = missionNode->next; if (missionNode->next) { missionNode->next->back = NULL; } missionNode->next = NULL; if (missionList->front == NULL) { missionList->front = missionNode; } else { missionNode->back = missionList->final; missionList->final->next = missionNode; } missionList->final = missionNode; return missionNode; } void free_mission(ThreadPool *threadPool, MissionNode *missionNode) { MissionList *missionList = threadPool->missionList; if (missionNode->back == NULL) { missionList->front = missionNode->next; } if (missionNode->back) { missionNode->back->next = missionNode->next; } if (missionNode->next) { missionNode->next->back = missionNode->back; } else { missionList->final = missionNode->back; } if (missionNode->mission) free(missionNode->mission); if (missionNode) free(missionNode); } void *thread_worker(void *args) { ThreadArgs *threadArgs = (ThreadArgs *)args; ThreadPool *threadPool = threadArgs->threadPool; ThreadNode *selfNode = threadArgs->threadNode; threadPool->existNumber = threadPool->existNumber + 1; threadPool->freeNumber = threadPool->freeNumber + 1; selfNode->threadAttr->state = 1; pthread_mutex_lock(&threadPool->workerMutex); while (threadPool->needCloseNumber <= 0 && !threadPool->close) { // 等待任务 pthread_cond_wait(&threadPool->workerCond, &threadPool->workerMutex); pthread_mutex_unlock(&threadPool->workerMutex); if (threadPool->waitingMissionNumber > 0) { pthread_mutex_lock(&threadPool->missionMutex); threadPool->freeNumber = threadPool->freeNumber - 1; selfNode->threadAttr->state = 2; if (threadPool->close) break; MissionNode *missionNode = get_mission(threadPool); MissionAttr *missionAttr = NULL; if (missionNode != NULL) { missionAttr = missionNode->mission; } selfNode->threadAttr->state = 3; if (missionAttr != NULL) { missionAttr->state = 1; } threadPool->busyNumber = threadPool->busyNumber + 1; threadPool->waitingMissionNumber = threadPool->waitingMissionNumber - 1; threadPool->hanldingMissionNumber = threadPool->hanldingMissionNumber + 1; if (threadPool->close) break; pthread_mutex_unlock(&threadPool->missionMutex); if (missionAttr != NULL) if (missionAttr->mission) { missionAttr->mission(missionAttr->param); } pthread_mutex_lock(&threadPool->missionMutex); if (threadPool->close) break; if (missionAttr != NULL) if (missionAttr->mission) { free_mission(threadPool, missionNode); } threadPool->hanldingMissionNumber = threadPool->hanldingMissionNumber - 1; threadPool->busyNumber = threadPool->busyNumber - 1; threadPool->freeNumber = threadPool->freeNumber + 1; selfNode->threadAttr->state = 1; pthread_mutex_unlock(&threadPool->missionMutex); } } if (threadPool->close) { pthread_mutex_unlock(&threadPool->workerMutex); pthread_exit(NULL); } // 线程自杀 pthread_mutex_lock(&threadPool->threadMutex); ThreadList *threadList = threadPool->threadList; selfNode->threadAttr->state = -1; if (threadList->final) if (threadList->final->threadAttr) if (threadList->final->threadAttr->id == selfNode->threadAttr->id) { threadList->final = selfNode->back; } if (threadList->front) if (threadList->front->threadAttr) if (threadList->front->threadAttr->id == selfNode->threadAttr->id) { threadList->front = selfNode->next; } if (selfNode->back) if (selfNode->back->next) { selfNode->back->next = selfNode->next; } if (selfNode->next) if (selfNode->next->back) { selfNode->next->back = selfNode->back; } free(selfNode->threadAttr); free(selfNode); threadPool->existNumber = threadPool->existNumber - 1; threadPool->freeNumber = threadPool->freeNumber - 1; threadPool->needCloseNumber = threadPool->needCloseNumber - 1; pthread_mutex_unlock(&threadPool->threadMutex); pthread_mutex_unlock(&threadPool->workerMutex); pthread_exit(NULL); } int creater_thread_worker(ThreadPool *threadPool, int number) { if ((number + threadPool->existNumber) > threadPool->maxNumber) { number = threadPool->maxNumber - threadPool->existNumber; } pthread_mutex_lock(&threadPool->threadMutex); int i, ref; for (i = 0; i < number; i++) { pthread_t threadID = 0; ThreadNode *newThreadNode = (ThreadNode *)fixMalloc(sizeof(ThreadNode)); ThreadArgs *threadArgs = (ThreadArgs *)fixMalloc(sizeof(ThreadArgs)); // 设置Woker线程参数 threadArgs->threadNode = newThreadNode; threadArgs->threadPool = threadPool; // 初始化线程节点 newThreadNode->threadAttr = (ThreadAttr *)fixMalloc(sizeof(ThreadAttr)); newThreadNode->back = NULL; newThreadNode->next = NULL; // 设置线程属性 newThreadNode->threadAttr->createTime = time(NULL); newThreadNode->threadAttr->id = threadID; newThreadNode->threadAttr->state = 0; // 插入线程链表 if (threadPool->existNumber == 0) { threadPool->threadList->front = newThreadNode; } else { ThreadNode *threadNode = threadPool->threadList->final; if (threadNode) { threadNode->next = newThreadNode; } newThreadNode->back = threadNode; } threadPool->threadList->final = newThreadNode; ref = pthread_create(&threadID, NULL, (void *)thread_worker, (void *)threadArgs); if (ref != 0) { return -2; } } pthread_mutex_unlock(&threadPool->threadMutex); return 0; } void *thread_manager(void *args) { // 管理者 ThreadPool *threadPool = (ThreadPool *)args; int minNumber = threadPool->minNumber; int waitingMissionNumber = 0; int existNumber = 0; int needCloseNumber = 0; int busyNumber = 0; int runNumber = 0; int i; pthread_t threadID; creater_thread_worker(threadPool, minNumber); while (!threadPool->close) { pthread_mutex_lock(&threadPool->managerMutex); waitingMissionNumber = threadPool->waitingMissionNumber; existNumber = threadPool->existNumber; needCloseNumber = threadPool->needCloseNumber; busyNumber = threadPool->busyNumber; pthread_mutex_unlock(&threadPool->managerMutex); // 这里的逻辑写的很随意,推荐使用时重写此逻辑 if (waitingMissionNumber > 0) { if (waitingMissionNumber > existNumber) { creater_thread_worker(threadPool, 2); } runNumber = existNumber < waitingMissionNumber ? existNumber : waitingMissionNumber; for (i = 0; i < runNumber; i++) { pthread_cond_signal(&threadPool->workerCond); } } else { threadPool->needCloseNumber = existNumber - threadPool->minNumber; pthread_cond_broadcast(&threadPool->workerCond); } usleep(1000 * 100); } // 唤醒所有线程 pthread_cond_broadcast(&threadPool->workerCond); // 释放Worker线程 ThreadList *threadList = threadPool->threadList; ThreadNode *threadNode = threadList->front; ThreadNode *tmpThreadNode = NULL; while (threadNode != NULL) { threadID = threadNode->threadAttr->id; if (threadID) { pthread_join(threadID, NULL); } free(threadNode->threadAttr); tmpThreadNode = threadNode->next; free(threadNode); threadNode = tmpThreadNode; } free(threadList); MissionList *missionWaiting = threadPool->missionWaiting; MissionNode *missionWaitingNode = missionWaiting->front; MissionNode *tmpMissionWaitingNode = NULL; // 释放等待列队 while (missionWaitingNode != NULL) { free(missionWaitingNode->mission); tmpMissionWaitingNode = missionWaitingNode->next; free(missionWaitingNode); missionWaitingNode = tmpMissionWaitingNode; } free(missionWaiting); // 释放任务列队 MissionList *missionList = threadPool->missionList; MissionNode *missionNode = missionList->front; MissionNode *tmpMissionNode = NULL; while (missionNode != NULL) { free(missionNode->mission); tmpMissionNode = missionNode->next; free(missionNode); missionNode = tmpMissionNode; } free(missionList); pthread_mutex_destroy(&threadPool->missionMutex); pthread_mutex_destroy(&threadPool->workerMutex); pthread_mutex_destroy(&threadPool->threadMutex); pthread_mutex_destroy(&threadPool->managerMutex); pthread_cond_destroy(&threadPool->workerCond); free(threadPool); pthread_exit(NULL); } int thread_pool_run(ThreadPool *threadPool) { int ref = pthread_create(&threadPool->managerID, NULL, thread_manager, (void *)threadPool); return ref; } void thread_shutdown_and_free(ThreadPool *threadPool) { threadPool->close = 1; pthread_join(threadPool->managerID, NULL); }
示例
#include <stdio.h> #include "./ThreadPool.c" struct testData { int a; }; ThreadPool *pool = NULL; int times = 0; void test(void *a) { // 模拟工作 // 处理参数 int *b = (int *)a; times++; sleep(*b); } int main() { // 初始化线程池 pool = create_thread_pool(1, 50); // 启动线程池 thread_pool_run(pool); int i, a = 1; // 提交任务 for (i = 0; i < 110; i++) { // 指针传参(无参数填NULL) thread_pool_submit(pool, &test, &a); } i = 0; while (i < 52) { printf("当前任务数量:%d\n", pool->waitingMissionNumber); printf("当前线程数量:%d\n", pool->existNumber); printf("当前执行线程:%d\n", pool->busyNumber); printf("当前空闲线程:%d\n", pool->freeNumber); printf("当前执行任务:%d\n", pool->hanldingMissionNumber); printf("当前自杀线程:%d\n", pool->needCloseNumber); printf("函数执行次数:%d\n", times); printf("================================\n"); sleep(1); int rands = (rand() / 1000000000.0) * 20 + 10; printf("当前随机数:%d\n", rands); // 模拟不定时提交 if (rands > 40) for (i = 0; i < rands; i++) { thread_pool_submit(pool, &test, &a); } } // 关机并释放 thread_shutdown_and_free(pool); return 0; }
编译命令示例
gcc ./main.c -o ./test.out -lpthread