一个Windows C++的线程池的实现

by admin on 2020年1月21日

#endif //
!defined(AFX_SOCKETMAP_H__0EBB7372_60BB_42AF_8AF4_0E961630C7C7__INCLUDED_)

 

 

机器:

 

// Socketmap.h: interface for the CSocketmap class.
//
//////////////////////////////////////////////////////////////////////

  1. #include “ThreadPoolExecutor.h”  
  2.   
  3. CThreadPoolExecutor::CWorker::CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask) :   
  4. m_pThreadPool(pThreadPool),  
  5. m_pFirstTask(pFirstTask),  
  6. m_bRun(true)  
  7. {  
  8.       
  9. }  
  10.   
  11. CThreadPoolExecutor::CWorker::~CWorker()  
  12. {  
  13. }  
  14.   
  15. /** 
  16.   执行任务的工作线程。 
  17.   当前没有任务时, 
  18.   如果当前线程数量大于最小线程数量,减少线程, 
  19.   否则,执行清理程序,将线程类给释放掉 
  20. **/  
  21. void CThreadPoolExecutor::CWorker::Run()  
  22. {  
  23.     Runnable * pTask = NULL;  
  24.     while(m_bRun)  
  25.     {  
  26.         if(NULL == m_pFirstTask)  
  27.         {  
  28.             pTask = m_pThreadPool->GetTask();  
  29.         }  
  30.         else  
  31.         {  
  32.             pTask = m_pFirstTask;  
  33.             m_pFirstTask = NULL;  
  34.         }  
  35.   
  36.         if(NULL == pTask)  
  37.         {  
  38.             EnterCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));  
  39.             if(m_pThreadPool->GetThreadPoolSize() > m_pThreadPool->m_minThreads)  
  40.             {  
  41.                 ThreadPoolItr itr = m_pThreadPool->m_ThreadPool.find(this);  
  42.                 if(itr != m_pThreadPool->m_ThreadPool.end())  
  43.                 {  
  44.                     m_pThreadPool->m_ThreadPool.erase(itr);  
  45.                     m_pThreadPool->m_TrashThread.insert(this);  
  46.                 }  
  47.                 m_bRun = false;  
  48.             }  
  49.             else  
  50.             {  
  51.                 ThreadPoolItr itr = m_pThreadPool->m_TrashThread.begin();  
  52.                 while(itr != m_pThreadPool->m_TrashThread.end())  
  53.                 {  
  54.                     (*itr)->Join();  
  55.                     delete (*itr);  
  56.                     m_pThreadPool->m_TrashThread.erase(itr);  
  57.                     itr = m_pThreadPool->m_TrashThread.begin();  
  58.                 }  
  59.             }  
  60.             LeaveCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));  
  61.             continue;  
  62.         }  
  63.         else  
  64.         {  
  65.             pTask->Run();  
  66.             pTask = NULL;  
  67.         }  
  68.     }  
  69. }  
  70.   
  71. /////////////////////////////////////////////////////////////////////////////////////////////  
  72.   
  73. CThreadPoolExecutor::CThreadPoolExecutor(void) :   
  74. m_bRun(false),  
  75. m_bEnableInsertTask(false)  
  76. {  
  77.     InitializeCriticalSection(&m_csTasksLock);  
  78.     InitializeCriticalSection(&m_csThreadPoolLock);  
  79. }  
  80.   
  81. CThreadPoolExecutor::~CThreadPoolExecutor(void)  
  82. {  
  83.     Terminate();  
  84.     DeleteCriticalSection(&m_csTasksLock);  
  85.     DeleteCriticalSection(&m_csThreadPoolLock);  
  86. }  
  87.   
  88. bool CThreadPoolExecutor::Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTasks)  
  89. {  
  90.     if(minThreads == 0)  
  91.     {  
  92.         return false;  
  93.     }  
  94.     if(maxThreads < minThreads)  
  95.     {  
  96.         return false;  
  97.     }  
  98.     m_minThreads = minThreads;  
  99.     m_maxThreads = maxThreads;  
  100.     m_maxPendingTasks = maxPendingTasks;  
  101.     unsigned int i = m_ThreadPool.size();  
  102.     for(; i<minThreads; i++)  
  103.     {  
  104.         //创建线程  
  105.         CWorker * pWorker = new CWorker(this);  
  106.         if(NULL == pWorker)  
  107.         {  
  108.             return false;  
  109.         }  
  110.         EnterCriticalSection(&m_csThreadPoolLock);  
  111.         m_ThreadPool.insert(pWorker);  
  112.         LeaveCriticalSection(&m_csThreadPoolLock);  
  113.         pWorker->Start();  
  114.     }  
  115.     m_bRun = true;  
  116.     m_bEnableInsertTask = true;  
  117.     return true;  
  118. }  
  119.   
  120. bool CThreadPoolExecutor::Execute(Runnable * pRunnable)  
  121. {  
  122.     if(!m_bEnableInsertTask)  
  123.     {  
  124.         return false;  
  125.     }  
  126.     if(NULL == pRunnable)  
  127.     {  
  128.         return false;  
  129.     }  
  130.     if(m_Tasks.size() >= m_maxPendingTasks)  
  131.     {  
  132.         if(m_ThreadPool.size() < m_maxThreads)  
  133.         {  
  134.             CWorker * pWorker = new CWorker(this, pRunnable);  
  135.             if(NULL == pWorker)  
  136.             {  
  137.                 return false;  
  138.             }  
  139.             EnterCriticalSection(&m_csThreadPoolLock);  
  140.             m_ThreadPool.insert(pWorker);  
  141.             LeaveCriticalSection(&m_csThreadPoolLock);  
  142.             pWorker->Start();  
  143.         }  
  144.         else  
  145.         {  
  146.             return false;  
  147.         }  
  148.     }  
  149.     else  
  150.     {  
  151.         EnterCriticalSection(&m_csTasksLock);  
  152.         m_Tasks.push_back(pRunnable);  
  153.         LeaveCriticalSection(&m_csTasksLock);  
  154.     }  
  155.     return true;  
  156. }  
  157.   
  158. Runnable * CThreadPoolExecutor::GetTask()  
  159. {  
  160.     Runnable * Task = NULL;  
  161.     EnterCriticalSection(&m_csTasksLock);  
  162.     if(!m_Tasks.empty())  
  163.     {  
  164.         Task = m_Tasks.front();  
  165.         m_Tasks.pop_front();  
  166.     }  
  167.     LeaveCriticalSection(&m_csTasksLock);  
  168.     return Task;  
  169. }  
  170.   
  171. unsigned int CThreadPoolExecutor::GetThreadPoolSize()  
  172. {  
  173.     return m_ThreadPool.size();  
  174. }  
  175.   
  176. void CThreadPoolExecutor::Terminate()  
  177. {  
  178.     m_bEnableInsertTask = false;  
  179.     while(m_Tasks.size() > 0)  
  180.     {  
  181.         Sleep(1);  
  182.     }  
  183.     m_bRun = false;  
  184.     m_minThreads = 0;  
  185.     m_maxThreads = 0;  
  186.     m_maxPendingTasks = 0;  
  187.     while(m_ThreadPool.size() > 0)  
  188.     {  
  189.         Sleep(1);  
  190.     }  
  191.     EnterCriticalSection(&m_csThreadPoolLock);  
  192.     ThreadPoolItr itr = m_TrashThread.begin();  
  193.     while(itr != m_TrashThread.end())  
  194.     {  
  195.         (*itr)->Join();  
  196.         delete (*itr);  
  197.         m_TrashThread.erase(itr);  
  198.         itr = m_TrashThread.begin();  
  199.     }  
  200.     LeaveCriticalSection(&m_csThreadPoolLock);  
  201. }  

此线程池所依赖的线程类,请参看《一个Windows C++的线程类实现》:

template<class VALUE>
class CSocketQueue 
{
public:
 CSocketQueue();
 ~CSocketQueue();
 int GetCount(){return m_nCount;};
 bool InitQueue(int nCount);
 void DestroyQueue();
 VALUE* AllocItem();
 bool FreeItem(VALUE *pItem);
 bool GetItem(VALUE **pItem);//取得第一个项,并不释放
 VALUE& operator[](int index)const;
 LPCRITICAL_SECTION GetSync(){return &m_Sync;};
protected:
 int  m_nCount;
 VALUE* m_ValueArray;
 VALUE** m_ValueQueue;
 unsigned int m_iHeadIndex;
 //unsigned int m_iGetIndex;
 unsigned int m_iTailIndex;
 int m_iFreeCount;
 CRITICAL_SECTION m_Sync;
};
template<class VALUE>
CSocketQueue<VALUE>::CSocketQueue()
{
 m_nCount = 0;
 m_ValueArray = NULL;
 m_ValueQueue = NULL;
 m_iHeadIndex = 0;
 m_iTailIndex = 0;
 m_iFreeCount = 0;
 //m_iGetIndex = 0;
}
template<class VALUE>
CSocketQueue<VALUE>::~CSocketQueue()
{
 DestroyQueue();
}
template<class VALUE>
VALUE& CSocketQueue<VALUE>::operator[](int index)const
{
 return m_ValueArray[index];
}
template<class VALUE>
bool CSocketQueue<VALUE>::InitQueue(int nCount)
{
 m_ValueArray =
(VALUE*)LocalAlloc(LMEM_FIXED|LMEM_ZEROINIT,sizeof(VALUE)*nCount);
 m_ValueQueue =
(VALUE**)LocalAlloc(LMEM_FIXED|LMEM_ZEROINIT,sizeof(VALUE*)*nCount);
 if(m_ValueArray == NULL || m_ValueQueue == NULL)
 {
  if(m_ValueArray != NULL)
   LocalFree(m_ValueArray);
  if(m_ValueQueue != NULL)
   LocalFree(m_ValueQueue);
  m_ValueArray = NULL;
  m_ValueQueue = NULL;
  return false;
 }
 InitializeCriticalSection(&m_Sync);
 m_nCount = nCount;
 for(int i = 0; i < m_nCount; i++)
 {
  m_ValueQueue[i] = &m_ValueArray[i];
 }
 m_iHeadIndex = 0;
 m_iTailIndex = 0;
// m_iGetIndex = 0;
 m_iFreeCount = m_nCount;
 return true;
}
template<class VALUE>
void CSocketQueue<VALUE>::DestroyQueue()
{
 if(m_nCount == 0)return;
 EnterCriticalSection(&m_Sync);
 __try
 {
  LocalFree(m_ValueArray);
  LocalFree(m_ValueQueue);
  m_ValueArray = NULL;
  m_ValueQueue = NULL;
  m_nCount = 0;
  m_iHeadIndex = m_iTailIndex = 0;
 }
 __finally
 {
  LeaveCriticalSection(&m_Sync);
 }
 DeleteCriticalSection(&m_Sync);
}
template<class VALUE>
VALUE* CSocketQueue<VALUE>::AllocItem()
{
 VALUE* pItem = NULL;
 EnterCriticalSection(&m_Sync);
 __try
 {
  if(m_iFreeCount == 0)
   __leave;
  pItem = m_ValueQueue[m_iHeadIndex ++];
  m_iHeadIndex %= m_nCount;
  m_iFreeCount –;
 }
 __finally
 {
  LeaveCriticalSection(&m_Sync);
 }
 return pItem;
}
template<class VALUE>
bool CSocketQueue<VALUE>::FreeItem(VALUE *pItem)
{
 EnterCriticalSection(&m_Sync);
 __try
 {
  if(m_iFreeCount == m_nCount )
   __leave;
  unsigned int index = 0;
  //先判断释放的项是否存在
  for(index=m_iTailIndex;index<m_iTailIndex+m_nCount-m_iFreeCount;index++)
  {
   if(m_ValueQueue[index%m_nCount] == pItem)
    break;
  }
  //如果不存在直接返回
  if(index == m_iTailIndex+m_nCount-m_iFreeCount)
   __leave;
  VALUE* pTemp = m_ValueQueue[m_iTailIndex];
  m_ValueQueue[m_iTailIndex] = pItem;
  m_ValueQueue[index%m_nCount] = pTemp;
  m_iTailIndex ++;
  m_iTailIndex %= m_nCount;
  m_iFreeCount ++;
 }
 __finally
 {
  LeaveCriticalSection(&m_Sync);
 }
 return true;
}

5个线程的线程池耗时:1166时间片

  1. #define __SYSTEM_THREAD_POOL__  
  2. #include “Thread.h”  
  3. #include <list>  
  4. #include <windows.h>  
  5. class CThreadPoolExecutor  
  6. {  
  7. public:  
  8.     CThreadPoolExecutor(void);  
  9.     ~CThreadPoolExecutor(void);  
  10.     /** 
  11.       初始化线程池,创建minThreads个线程 
  12.     **/  
  13.     bool Init(unsigned int maxTaskse);  
  14.     /** 
  15.       执行任务,若当前任务列表没有满,将此任务插入到任务列表,返回true 
  16.       否则返回false 
  17.     **/  
  18.     bool Execute(Runnable * pRunnable);  
  19.     /** 
  20.       终止线程池,先制止塞入任务, 
  21.       然后等待直到任务列表为空, 
  22.       然后设置最小线程数量为0, 
  23.       等待直到线程数量为空, 
  24.       清空垃圾堆中的任务 
  25.     **/  
  26.     void Terminate();  
  27.     /** 
  28.       返回线程池中当前的线程数量 
  29.     **/  
  30.     unsigned int GetThreadPoolSize();  
  31. private:  
  32.     static unsigned int WINAPI StaticThreadFunc(void * arg);  
  33. private:  
  34.     typedef std::list<Runnable *> Tasks;  
  35.     typedef Tasks::iterator TasksItr;  
  36.     Tasks m_Tasks;  
  37.     CRITICAL_SECTION m_csTasksLock;  
  38.     volatile bool m_bRun;  
  39.     volatile bool m_bEnableInsertTask;  
  40.     volatile unsigned int m_maxTasks;  
  41. };  
  42. #endif  

template<class VALUE>
bool CSocketQueue<VALUE>::GetItem(VALUE **pItem)
{
 bool bResult = true;
 EnterCriticalSection(&m_Sync);
 __try
 {
  if(m_iTailIndex == m_iHeadIndex )
  {
   *pItem = NULL;
   bResult = false;
   __leave;
  }
  *pItem = m_ValueQueue[m_iTailIndex];
 }
 __finally
 {
  LeaveCriticalSection(&m_Sync);
 }
 return bResult;
}

ThreadPoolExecutor.h

 

#ifndef _SOCKETMAP_H_
#define _SOCKETMAP_H_

单线程池执行耗时:2219时间片

Intel(R) Core(TM)2 Duo CPU

用法:

 

#include “Thread.h”
#include “ThreadPoolExecutor.h”

#include “Thread.h”
#include “SystemThreadPool.h”

ThreadPoolExecutor.cpp

 

 

对于100个任务并且每个任务包含10000000个循环,任务中无等待:

对于100个任务并且每个任务包含10000000个循环,任务中无等待:

[cpp] view
plaincopy

测试结果:

 

 

用法:

class R : public Runnable
{
public:
    ~R()
    {
    }
    void Run()
    {
        printf(“Hello World/n”);
    }
};

from:

2G内存

 

测试结果:

 

from:

 

E8400 @ 3.00GHz

 

  1. #include “SystemThreadPool.h”  
  2. CThreadPoolExecutor::CThreadPoolExecutor(void) :   
  3. m_bRun(false),  
  4. m_bEnableInsertTask(false)  
  5. {  
  6.     InitializeCriticalSection(&m_csTasksLock);  
  7. }  
  8. CThreadPoolExecutor::~CThreadPoolExecutor(void)  
  9. {  
  10.     Terminate();  
  11.     DeleteCriticalSection(&m_csTasksLock);  
  12. }  
  13. bool CThreadPoolExecutor::Init(unsigned int maxTasks)  
  14. {  
  15.     if(maxTasks == 0)  
  16.     {  
  17.         return false;  
  18.     }  
  19.     m_maxTasks = maxTasks;  
  20.     m_bRun = true;  
  21.     m_bEnableInsertTask = true;  
  22.     return true;  
  23. }  
  24. bool CThreadPoolExecutor::Execute(Runnable * pRunnable)  
  25. {  
  26.     if(!m_bEnableInsertTask)  
  27.     {  
  28.         return false;  
  29.     }  
  30.     if(NULL == pRunnable)  
  31.     {  
  32.         return false;  
  33.     }  
  34.     EnterCriticalSection(&m_csTasksLock);  
  35.     if(m_Tasks.size() >= m_maxTasks)  
  36.     {  
  37.         LeaveCriticalSection(&m_csTasksLock);  
  38.         return false;  
  39.     }  
  40.     m_Tasks.push_back(pRunnable);  
  41.     LeaveCriticalSection(&m_csTasksLock);  
  42.     bool ret = QueueUserWorkItem((LPTHREAD_START_ROUTINE)StaticThreadFunc, this, WT_EXECUTEINPERSISTENTIOTHREAD);  
  43.     if(!ret)  
  44.     {  
  45.         EnterCriticalSection(&m_csTasksLock);  
  46.         m_Tasks.remove(pRunnable);  
  47.         LeaveCriticalSection(&m_csTasksLock);  
  48.     }  
  49.     return ret;  
  50. }  
  51. unsigned int CThreadPoolExecutor::GetThreadPoolSize()  
  52. {  
  53.     return m_Tasks.size();  
  54. }  
  55. void CThreadPoolExecutor::Terminate()  
  56. {  
  57.     m_bEnableInsertTask = false;  
  58.     m_bRun = false;  
  59.     while(m_Tasks.size() != 0)  
  60.     {  
  61.         Sleep(1);  
  62.     }  
  63. }  
  64. unsigned int WINAPI CThreadPoolExecutor::StaticThreadFunc(void * arg)  
  65. {  
  66.     CThreadPoolExecutor * pThreadPool = (CThreadPoolExecutor *)arg;  
  67.     Runnable * pRunnable = NULL;  
  68.     EnterCriticalSection(&pThreadPool->m_csTasksLock);  
  69.     pRunnable = pThreadPool->m_Tasks.front();  
  70.     if(NULL != pRunnable)  
  71.     {  
  72.         pThreadPool->m_Tasks.pop_front();  
  73.     }  
  74.     LeaveCriticalSection(&pThreadPool->m_csTasksLock);  
  75.     if(NULL != pRunnable)  
  76.     {  
  77.         pRunnable->Run();  
  78.     }  
  79.     return 0;  
  80. }  
  1. #ifndef __THREAD_POOL_EXECUTOR__  
  2. #define __THREAD_POOL_EXECUTOR__  
  3.   
  4. #include “Thread.h”  
  5. #include <set>  
  6. #include <list>  
  7. #include <windows.h>  
  8.   
  9. class CThreadPoolExecutor  
  10. {  
  11. public:  
  12.     CThreadPoolExecutor(void);  
  13.     ~CThreadPoolExecutor(void);  
  14.   
  15.     /** 
  16.       初始化线程池,创建minThreads个线程 
  17.     **/  
  18.     bool Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTaskse);  
  19.   
  20.     /** 
  21.       执行任务,若当前任务列表没有满,将此任务插入到任务列表,返回true 
  22.       若当前任务列表满了,但当前线程数量小于最大线程数,将创建新线程执行此任务,返回true 
  23.       若当前任务列表满了,但当前线程数量等于最大线程数,将丢弃此任务,返回false 
  24.     **/  
  25.     bool Execute(Runnable * pRunnable);  
  26.   
  27.     /** 
  28.       终止线程池,先制止塞入任务, 
  29.       然后等待直到任务列表为空, 
  30.       然后设置最小线程数量为0, 
  31.       等待直到线程数量为空, 
  32.       清空垃圾堆中的任务 
  33.     **/  
  34.     void Terminate();  
  35.   
  36.     /** 
  37.       返回线程池中当前的线程数量 
  38.     **/  
  39.     unsigned int GetThreadPoolSize();  
  40.   
  41. private:  
  42.     /** 
  43.       获取任务列表中的任务,若任务列表为空,返回NULL 
  44.     **/  
  45.     Runnable * GetTask();  
  46.   
  47.     static unsigned int WINAPI StaticThreadFunc(void * arg);  
  48.   
  49. private:  
  50.     class CWorker : public CThread  
  51.     {  
  52.     public:  
  53.         CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask = NULL);  
  54.         ~CWorker();  
  55.         void Run();  
  56.   
  57.     private:  
  58.         CThreadPoolExecutor * m_pThreadPool;  
  59.         Runnable * m_pFirstTask;  
  60.         volatile bool m_bRun;  
  61.     };  
  62.   
  63.     typedef std::set<CWorker *> ThreadPool;  
  64.     typedef std::list<Runnable *> Tasks;  
  65.     typedef Tasks::iterator TasksItr;  
  66.     typedef ThreadPool::iterator ThreadPoolItr;  
  67.   
  68.     ThreadPool m_ThreadPool;  
  69.     ThreadPool m_TrashThread;  
  70.     Tasks m_Tasks;  
  71.   
  72.     CRITICAL_SECTION m_csTasksLock;  
  73.     CRITICAL_SECTION m_csThreadPoolLock;  
  74.   
  75.     volatile bool m_bRun;  
  76.     volatile bool m_bEnableInsertTask;  
  77.     volatile unsigned int m_minThreads;  
  78.     volatile unsigned int m_maxThreads;  
  79.     volatile unsigned int m_maxPendingTasks;  
  80. };  
  81.   
  82. #endif  

SytemThreadPool.cpp

此线程池所依赖的线程类,请参看《一个Windows C++的线程类实现》:

class R : public Runnable
{
public:
    ~R()
    {
    }
    void Run()
    {
        printf(“Hello World/n”);
    }
};

2个线程的线程池耗时:1156时间片

 

100个线程的线程池耗时:1177时间片

int _tmain(int argc, _TCHAR* argv[])
{
    CThreadPoolExecutor * pExecutor = new CThreadPoolExecutor();
    pExecutor->Init(50);
    R r;
    for(int i=0;i<100;i++)
    {
        while(!pExecutor->Execute(&r))
        {
        }
    }
    pExecutor->Terminate();
    delete pExecutor;
    getchar();
    return 0;
}

int _tmain(int argc, _TCHAR* argv[])
{
    CThreadPoolExecutor * pExecutor = new CThreadPoolExecutor();
    pExecutor->Init(1, 10, 50);
    R r;
    for(int i=0;i<100;i++)
    {
        while(!pExecutor->Execute(&r))
        {
        }
    }
    pExecutor->Terminate();
    delete pExecutor;
    getchar();
    return 0;
}

 

 

 

单线程执行耗时:2281时间片

 

 

机器:

10个线程的线程池耗时:1157时间片

SystemThreadPool.h

E8400 @ 3.00GHz

[cpp] view
plaincopy

Intel(R) Core(TM)2 Duo CPU

线程池耗时:2203时间片

[cpp] view
plaincopy

 

2G内存

 

 

[cpp] view
plaincopy

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图