前面定时器和事件的回调函数,都是在定时器线程,以及事件管理线程直接回调的。
如果回调函数里面执行的逻辑是耗时逻辑,则可能导致后续定时器和事件的处理延后。因此,将回调函数放入一个新的线程中执行是一个较好的方案。
如果为每一个定时或事件回调创建一个新的线程,则线程创建、销毁及调度的开销较大,局部或整体性能未必能够达到一个理想的状态,这种情况下,使用线程池是一个较好的方案。
1、什么是线程池
线程池是一种多线程使用的模式。
为了避免频繁创建和销毁线程带来的开销,我们可以预先创建一些线程,使这些线程处于等待状态。
线程池中维护一个任务列表,当有任务添加到列表中,线程池唤醒其中的一个线程处理任务,处理完成后继续等待下一个任务的到来。
面试时,经常会被问到的一个问题是:线程池创建多少个线程是比较合适的?
先说答案:这个和具体的业务场景相关,一般会选择机器CPU核心数+2比较合适。
线程池中的线程数量并不是越多越好,首先线程开辟会有一定的内存开销,每个线程会占用大约1M左右的内存,无限制的创建新线程,可能会导致内存不足,进而出现异常。
其次,线程数量过多,内核调度线程需要进行频繁的切换,不断的唤醒、挂起线程,以便尽量使每个线程都有执行的机会。线程数量过多时,这个开销很大,会影响性能。
为什么说和业务场景相关?
CPU计算密集型的任务,线程数量不宜太多,因为始终要占用CPU的计算资源;IO密集型任务,线程数量就可以多设置些,因为主要耗时在IO操作,CPU性能是过剩的。
此外,任务的优先级,耗时长短,对其他系统资源是否有依赖,都可以作为考虑的因素。
和核心数量有什么关系?
现在的机器一般都是多核的,4核或8核的机器很常见,每个核心可以独立运行一个线程。如果线程数和核心数一致,就不用担心调度切换引起的性能损耗。而+2是为了在某些资源竞争或io耗时的时候,能够最大限度的使用CPU资源。
2、线程池实现
下面实现一个简单的线程池,初始化时传入线程数量。
添加任务后,线程池选择一个线程执行任务。
#pragma once
/************************************************************************/
/*
* 通用线程池
* 线程池初始化N个线程,通过AddTask的方式加入任务,线程池会分配线程处理任务
* 任务函数类型是无参void返回的函数
*/
/************************************************************************/
#include <thread>
#include <vector>
#include <mutex>
#include <functional>
#include <queue>
using Mutex = std::mutex;
using AutoLock = std::lock_guard<Mutex>;
using UniqueLock = std::unique_lock<Mutex>;
using Task = std::function<void()>;
const int MAX_TASK_COUNT = 8000;
class CThreadPool
{
public:
CThreadPool();
~CThreadPool();
CThreadPool(const CThreadPool& other) = delete;
CThreadPool& operator=(const CThreadPool& other) = delete;
public:
bool init(int nThreadCount);
void Stop();
void AddTask(Task task);
private:
void Process(unsigned nId);
bool IsTaskFull();
Task PopTask();
public:
int m_nMaxThread;
int m_nMaxTask;
std::vector<std::unique_ptr<std::thread>> m_vecpThread;
Mutex m_lock;
bool m_bRunning;
std::deque<Task> m_taskList;
std::condition_variable m_condNotEmpty;
std::condition_variable m_condNotFull;
};
m_vecpThread用于储存线程,主要用于线程销毁回收。
m_taskList存储放入线程池的任务,任务上限为8000,超出上限后,会等待任务队列减少后再加入。
这里面用C++的条件变量控制任务的上限,以及通知队列中有任务需要处理。
对外接口很简单,Init函数传入线程数量。使用时调用AddTask添加任务即可。
#include "ThreadPool.h"
CThreadPool::CThreadPool()
: m_nMaxThread(0)
, m_nMaxTask(MAX_TASK_COUNT)
, m_bRunning(false)
{
}
CThreadPool::~CThreadPool()
{
if (m_bRunning)
{
Stop();
}
}
bool CThreadPool::init(int nThreadCount)
{
if (m_vecpThread.size() != 0) return false;
if (nThreadCount < 1) return false;
m_nMaxThread = nThreadCount;
m_vecpThread.reserve(m_nMaxThread);
for (auto i = 0; i < m_nMaxThread; ++i)
{
m_vecpThread.emplace_back(new std::thread(&CThreadPool::Process, this, i));
}
m_bRunning = true;
return true;
}
void CThreadPool::Process(unsigned nId)
{
try
{
while (m_bRunning)
{
Task task(PopTask());
if (task)
{
printf("this is thread %d\n", nId);
task();
}
}
}
catch (...)
{
// TODO
throw;
}
}
bool CThreadPool::IsTaskFull()
{
return m_nMaxTask > 0 && m_taskList.size() >= m_nMaxTask;
}
void CThreadPool::AddTask(Task task)
{
if (m_vecpThread.empty())
{
task();
}
else
{
UniqueLock lock(m_lock);
while (IsTaskFull())
{
m_condNotFull.wait(lock); // 等待队列变得不满
}
m_taskList.push_back(std::move(task));
m_condNotEmpty.notify_one(); // 通知现在有任务了
}
}
Task CThreadPool::PopTask()
{
UniqueLock lock(m_lock);
while (m_taskList.empty() && m_bRunning)
{
m_condNotEmpty.wait(lock);
}
Task task;
if (!m_taskList.empty())
{
task = m_taskList.front();
m_taskList.pop_front();
if (m_nMaxTask > 0)
{
m_condNotFull.notify_one();
}
}
return task;
}
void CThreadPool::Stop()
{
{
AutoLock lk(m_lock);
m_bRunning = false;
m_condNotEmpty.notify_all();
}
for (auto& th : m_vecpThread)
{
th->join();
}
}
实现比较简单,直接看代码逻辑即可。
3、线程池使用
现在我们可以使用线程池,改造上一节实现事件实现。
先定义成员变量
构造函数进行初始化
EventMgr::EventMgr()
{
m_pool.init(4);
std::thread th(&EventMgr::WorkThreadFunc, this);
th.detach();
}
修改OnEvent函数
bool EventMgr::OnEvent(CHYEvent* pEvent)
{
AutoLock lock(m_lockEventMap);
if ( m_mapEvent.count(pEvent->nType) == 0 ) return false;
auto & map = m_mapEvent[pEvent->nType];
m_pool.AddTask([=](){
for (auto itr : map)
{
itr.second(pEvent);
}
char*e = (char*)pEvent;
delete []e;
return true;
});
}
我们观察日志变化
12/02/22 16:16:41 001D0600 [Info]ServiceBase Start ...
12/02/22 16:16:41 001D0600 [Info]this is thread postEvent
12/02/22 16:16:41 03C06000 [Info]this is EventMgr WorkThreadFunc
12/02/22 16:16:41 03C06000 [Info]Service On event 2
12/02/22 16:16:41 03C06000 [Info]this is Service event=s
12/02/22 16:16:54 038F4000 [Info]this is thread postEvent
12/02/22 16:16:54 03C06000 [Info]Service On event 2
12/02/22 16:16:54 03C06000 [Info]this is Service event=s
日志中第三列记录了线程id,可以看到Event的线程是03C06000,两次回调OnEvent都是在这个线程中执行的。
主线程是001D0600,第一次PostEvent是主线程中执行的,第二次PostEvent是在定时器回调线程(038F4000)中发出的。
12/02/22 16:19:00 001DC600 [Info]ServiceBase Start ...
12/02/22 16:19:00 02F79000 [Info]this is EventMgr WorkThreadFunc
12/02/22 16:19:00 001DC600 [Info]this is thread postEvent
12/02/22 16:19:00 02DF0000 [Info]Service On event 2
12/02/22 16:19:00 02DF0000 [Info]this is Service event=s
12/02/22 16:19:13 02BE4000 [Info]this is thread postEvent
12/02/22 16:19:13 02E73000 [Info]Service On event 2
12/02/22 16:19:13 02E73000 [Info]this is Service event=s
使用了线程池后,Event的线程是02F79000,第一次OnEvent回调时是线程02DF0000,第二次回调时是线程02E73000执行的。两次Event回调各不相同,且与Event所在线程不相同,线程池已经生效。