本文共 6011 字,大约阅读时间需要 20 分钟。
epoll不需要多线程的情况下可以实现多个socket交互;
使用了epoll前置服务器访问共享内存不会存在数据安全问题,但是因为单个线程单个进程操作必然会出现,多个socket同时操作,CPU随机轮片执行原则造成例如1000个客户端同时操作,单位之间只允许一个socket执行,起于999个要等待第一个socket完成以后才可以进行执行。 综上所述:开发服务器线程是必须的,线程解决的是多个socket读取到客户端的业务请求,防止数据到共享内存。 1、所谓线程池即存储多(N)个线程 2、服务器使用多少条线程是未知数,使用容器存储线程 3、线程池中分工作队列 a.忙碌队列 b.空闲队列 线程池一开始所有线程必须处于等待状态,等到任务数据唤醒线程。频繁开辟线程和销毁线程会调用内存,频繁使用内存,所以我们想着把线程执行完毕之后保留,用到下一个任务,达到不用重新创建线程和销毁线程。 4、任务队列 要实现线程池就是要实现队列,进行任务的移入和移出。 线程池是预先创建线程的一种技术,线程池在任务还没有到来之前,创建一定数量(N)的线程,放入空闲队列中,这些线程都是出于阻塞状态,不消耗CPU,但是占用较小的内存空间。当新任务到来时,缓冲区选择一个空闲线程,把任务传入此线程中运行,如果缓冲池已经没有空闲线程,则新建若干个线程,当系统比较空闲是,就将大部分线程都一直处于暂停状态,线程池自动销毁一大部分线程,回收系统资源。线程池类
① 维护工作者线程队列(包括空闲和忙碌队列) ② 维护一个任务队列 ③ 维护一个线程池调度器指针 线程池调度器(本身也是一个线程),负责线程调度,负责任务分配。 工作者线程类(线程池中的线程类的封装) 任务队列 任务接口(实际的业务逻辑都继承自该接口)线程池类至少提供三个接口,初始化线程池,销毁线程池,添加任务接口
① 初始化线程池 开启线程池调度器线程 预先创建N个线程(由线程调度池器类负责创建工作者线程),放入空闲线程队列 指定最大的忙碌状态的线程数 ② 销毁线程池 释放空闲队列中的线程与工作状态中的线程 释放调度器线程 ③ 添加任务 添加一实际任务,但是并没有立刻运行该任务,只是放入任务队列,由线程池调度器从任务队列获取该任务,并从线程池中获得一个线程来运行该任务,这里实际上是一种生产者消费者模型。 线程池调度器包含创建空闲线程、销毁空闲线程接口线程池调度器本身也是一个线程,主要负责任务调度与线程调度,其工作过程大致如下:
① 从任务队列获取任务,如果队列为空,阻塞等待新任务到来 ② 队列不为空,取出该任务,从空闲线程队列取一线程,如果为空,判断工作者线程数是否达到上限,如果没有,则创建若个空闲线程,否则等待某一任务执行完毕,并且该任务对应的线程归还给线程池 ③ 获得空闲工作者线程,将任务交给工作者线程来处理,工作者线程维护一任务指针,这里只要该指针指向任务,并且唤醒线程 ④ 判断空闲工作者线程数是否超过最大工作者线程数,如果超过,销毁(空闲线程数-允许最大空闲线程数)个线程 A、任务接口是一个抽象类,只有一个虚函数run方法执行的是实际的业务逻辑 B、工作者线程维护一任务指针,工作者线程的任务主要是运行任务对象的run方法。 C、当线程池调度器调度一个工作者线程后,就唤醒工作者线程,并调用run方法来执行实际的业务逻辑,当run方法执行完毕,即业务逻辑处理结束,将工作者线程归还到空闲线程池队列,而不是销毁。这样线程池调度器下一次就有机会调度到该工作者线程。 代码: 先封装任务: CTask.h:#pragma onceclass CTask{ protected: char* Taskname;//任务名称 void* Data;//任务数据public: CTask(char* Taskname); ~CTask(); void setData(void* data); virtual int Run()=0;};
CTask.cpp:
#include "CTask.h"#includeCTask::CTask(char* Taskname){ strcpy(this->Taskname, Taskname); this->Data = NULL;}CTask::~CTask(){ }void CTask::setData(void* data){ this->Data = data;}
具体的业务:继承任务
CWorkTask.h:#pragma once#include "CTask.h"class CWorkTask : public CTask{ public: CWorkTask(char * Taskname); ~CWorkTask(); int Run();};
CWorkTask.cpp:
#include "CWorkTask.h"#include#include #include using namespace std;CWorkTask::CWorkTask(char* Taskname):CTask(Taskname){ }CWorkTask::~CWorkTask(){ }//子类实现具体的业务逻辑int CWorkTask::Run(){ cout << (char*)this->Data << endl; sleep(10); return 0;}
封装线程池:
Cthreadpoll.h:#pragma once#include "CTask.h"#include#include #include using namespace std;class Cthreadpoll{ public: //初始化线程池的时候,初始化线程池中的线程的数量 Cthreadpoll(int threadNum);//线程池对象构造,传入线程数量 ~Cthreadpoll(); int addTask(CTask *task);//增加新任务,将新任务添加到任务队列中 int StopAll();//线程池停止protected: static int moveToIdle(pthread_t tid);//线程任务执行结束后,将线程加入空闲队列 static int moveTobusy(pthread_t tid);//如果任务,将线程加入到忙碌队列 static void* ThreadFunction(void* threadData); int Create();//创建所有线程private: int threadNum;//初始化线程池的时候,初始化线程池中的线程数量 vector TaskList;//任务队列 static vector idleThread;//空闲队列 static vector busyThread;//忙碌队列 static pthread_mutex_t ptheadmutex;//同步的锁对象 static pthread_cond_t pthreadcond;//线程条件变量,等待/唤醒 状态切换};
Cthreadpoll.cpp:
#include "Cthreadpoll.h"//空闲队列vectorCthreadpoll::idleThread;//忙碌队列vector Cthreadpoll::busyThread;//同步的锁对象pthread_mutex_t Cthreadpoll::ptheadmutex = PTHREAD_MUTEX_INITIALIZER;//线程条件变量,等待/唤醒 状态切换pthread_cond_t Cthreadpoll::pthreadcond = PTHREAD_COND_INITIALIZER;/** 线程池对象构造,传入线程数量*/Cthreadpoll::Cthreadpoll(int threadNum){ this->threadNum = threadNum; Create();}Cthreadpoll::~Cthreadpoll(){ }/** 线程任务执行结束后,将线程加入空闲队列*/int Cthreadpoll::moveToIdle(pthread_t tid){ //遍历忙碌队列,找到该线程 vector ::iterator busyIter = busyThread.begin(); while (busyIter != busyThread.end()) { if (*busyIter == tid) { break; } busyIter++; } //从忙碌队列中移除 busyThread.erase(busyIter); //添加到空闲队列 idleThread.push_back(tid); return 0;}/** 如果有任务,将线程加入到忙碌队列*/int Cthreadpoll::moveTobusy(pthread_t tid){ //遍历空闲队列,找到该线程 vector ::iterator idleIter = idleThread.begin(); while (idleIter != idleThread.end()) { if (*idleIter == tid) { break; } idleIter++; } //从空闲队列中移除 idleThread.erase(idleIter); //添加到忙碌队列 busyThread.push_back(tid); return 0;}/** 线程执行函数 参数为任务队列*/void* Cthreadpoll::ThreadFunction(void* threadData){ pthread_t tid = pthread_self();//获取线程自己的id while (1) { pthread_mutex_lock(&ptheadmutex); //等待放在线程锁后面 pthread_cond_wait(&pthreadcond,&ptheadmutex); cout << "线程 tid:" << tid <<"run"< * taskList = (vector *)threadData; vector ::iterator iter = taskList->begin(); while (iter != taskList->end()) { moveTobusy(tid); break; } CTask* task = *iter; taskList->erase(iter); pthread_mutex_unlock(&ptheadmutex); cout << "空闲线程数量:" << Cthreadpoll::idleThread.size()<< endl; cout << "忙碌线程数:" << Cthreadpoll::busyThread.size() << endl; cout << "任务队列数:" << taskList->size() << endl; //执行具体的任务 task->Run(); cout << "任务执行完......" << endl; cout << "tid:" < <<"空闲l"<< endl; } return nullptr;}/** 创建所有的线程 启动*/int Cthreadpoll::Create(){ //循环创建线程 for (int i = 0; i < this->threadNum; i++) { pthread_t tid = 0; //创建形成,加入到标识符,将任务队列丢到线程中去 pthread_create(&tid, NULL, ThreadFunction,&TaskList); //将线程加入到空闲队列 idleThread.push_back(tid); } return 0;}/** 将新任务添加到任务队列中*/int Cthreadpoll::addTask(CTask* task){ //添加任务都任务队列 this->TaskList.push_back(task); //通知线程可以做事情了 唤醒满足条件的 pthread_cond_signal(&pthreadcond); return 0;}/** 线程池停止*/int Cthreadpoll::StopAll(){ vector ::iterator iter = idleThread.begin(); while( iter != idleThread.end() ) { //通知系统,该线程可以结束 pthread_cancel(*iter); //等待线程逻辑被执行完毕之后才结束 pthread_join(*iter, NULL); iter++; } iter = busyThread.begin(); while (iter != idleThread.end()) { //通知系统,该线程可以结束 pthread_cancel(*iter); //等待线程逻辑被执行完毕之后才结束 pthread_join(*iter, NULL); iter++; } return 0;}
转载地址:http://ssxzi.baihongyu.com/