C++服务开发入门指南
  • 序言
  • 前言
  • 一、一个简单的服务
    • 1 什么是服务
    • 2 服务可以用来做什么
    • 3 简单服务框架
  • 二、网络通信服务框架
    • 1 网络服务的基本概念
    • 2 增加监听端口
    • 3 处理客户端会话
    • 小结
  • 三、添加基础模块
    • 1 日志模块
    • 2 定时器
    • 3 事件机制
    • 4 线程池
    • 5 线程安全
    • 小结
  • 四、一个聊天服务
    • 1 需求描述及分析
    • 2 概要设计
    • 3 创建服务项目
    • 4 ClientUser实现
    • 5 RoomMgr实现
    • 6 ChatRoom实现
    • 7 RoomIDMgr实现
    • 小结
  • 五、测试、迭代及重构
    • 1 测试
    • 2 迭代
    • 3 重构
    • 4 版本号
  • 六、架构设计
    • 1 单点服务
    • 2 分布式服务
  • 七、部署及发布
    • 1 部署环境
    • 2 编译环境
    • 3 部署服务
    • 4 发布服务
  • 八、线上问题处理
    • 1 线上问题
    • 2 问题处理
  • 九、程序员的职业规划
    • 职业规划
Powered by GitBook
On this page
  • 1、什么是线程池
  • 2、线程池实现
  • 3、线程池使用
  1. 三、添加基础模块

4 线程池

前面定时器和事件的回调函数,都是在定时器线程,以及事件管理线程直接回调的。

如果回调函数里面执行的逻辑是耗时逻辑,则可能导致后续定时器和事件的处理延后。因此,将回调函数放入一个新的线程中执行是一个较好的方案。

如果为每一个定时或事件回调创建一个新的线程,则线程创建、销毁及调度的开销较大,局部或整体性能未必能够达到一个理想的状态,这种情况下,使用线程池是一个较好的方案。

1、什么是线程池

线程池是一种多线程使用的模式。

为了避免频繁创建和销毁线程带来的开销,我们可以预先创建一些线程,使这些线程处于等待状态。

线程池中维护一个任务列表,当有任务添加到列表中,线程池唤醒其中的一个线程处理任务,处理完成后继续等待下一个任务的到来。

面试时,经常会被问到的一个问题是:线程池创建多少个线程是比较合适的?

先说答案:这个和具体的业务场景相关,一般会选择机器CPU核心数+2比较合适。

线程池中的线程数量并不是越多越好,首先线程开辟会有一定的内存开销,每个线程会占用大约1M左右的内存,无限制的创建新线程,可能会导致内存不足,进而出现异常。

其次,线程数量过多,内核调度线程需要进行频繁的切换,不断的唤醒、挂起线程,以便尽量使每个线程都有执行的机会。线程数量过多时,这个开销很大,会影响性能。

为什么说和业务场景相关?

CPU计算密集型的任务,线程数量不宜太多,因为始终要占用CPU的计算资源;IO密集型任务,线程数量就可以多设置些,因为主要耗时在IO操作,CPU性能是过剩的。

此外,任务的优先级,耗时长短,对其他系统资源是否有依赖,都可以作为考虑的因素。

和核心数量有什么关系?

现在的机器一般都是多核的,4核或8核的机器很常见,每个核心可以独立运行一个线程。如果线程数和核心数一致,就不用担心调度切换引起的性能损耗。而+2是为了在某些资源竞争或io耗时的时候,能够最大限度的使用CPU资源。

2、线程池实现

下面实现一个简单的线程池,初始化时传入线程数量。

添加任务后,线程池选择一个线程执行任务。

#pragma once
/************************************************************************/
/*
 * 通用线程池
 * 线程池初始化N个线程,通过AddTask的方式加入任务,线程池会分配线程处理任务
 * 任务函数类型是无参void返回的函数
*/
/************************************************************************/
#include <thread>
#include <vector>
#include <mutex>
#include <functional>
#include <queue>
using Mutex = std::mutex;
using AutoLock = std::lock_guard<Mutex>;
using UniqueLock = std::unique_lock<Mutex>;
using Task = std::function<void()>;
const int MAX_TASK_COUNT = 8000;

class CThreadPool
{
public:
	CThreadPool();
	~CThreadPool();

	CThreadPool(const CThreadPool& other) = delete;
	CThreadPool& operator=(const CThreadPool& other) = delete;

public:
	bool init(int nThreadCount);

	void Stop();

	void AddTask(Task task);

private:
	void Process(unsigned nId);

	bool IsTaskFull();

	Task PopTask();

public:
	int m_nMaxThread;
	int m_nMaxTask;
	std::vector<std::unique_ptr<std::thread>> m_vecpThread;

	Mutex m_lock;

	bool m_bRunning;

	std::deque<Task> m_taskList;

	std::condition_variable m_condNotEmpty;
	std::condition_variable m_condNotFull;
};

m_vecpThread用于储存线程,主要用于线程销毁回收。

m_taskList存储放入线程池的任务,任务上限为8000,超出上限后,会等待任务队列减少后再加入。

这里面用C++的条件变量控制任务的上限,以及通知队列中有任务需要处理。

对外接口很简单,Init函数传入线程数量。使用时调用AddTask添加任务即可。

#include "ThreadPool.h"

CThreadPool::CThreadPool()
	: m_nMaxThread(0)
	, m_nMaxTask(MAX_TASK_COUNT)
	, m_bRunning(false)
{
}

CThreadPool::~CThreadPool()
{
	if (m_bRunning)
	{
		Stop();
	}
}

bool CThreadPool::init(int nThreadCount)
{
	if (m_vecpThread.size() != 0) return false;
	if (nThreadCount < 1) return false;

	m_nMaxThread = nThreadCount;
	m_vecpThread.reserve(m_nMaxThread);
	for (auto i = 0; i < m_nMaxThread; ++i)
	{
		m_vecpThread.emplace_back(new std::thread(&CThreadPool::Process, this, i));
	}
	m_bRunning = true;
	return true;
}

void CThreadPool::Process(unsigned nId)
{
	try
	{
		while (m_bRunning)
		{
			Task task(PopTask());
			if (task)
			{
				printf("this is thread %d\n", nId);
				task();
			}
		}
	}
	catch (...)
	{
		// TODO
		throw;
	}
}

bool CThreadPool::IsTaskFull()
{
	return m_nMaxTask > 0 && m_taskList.size() >= m_nMaxTask;
}

void CThreadPool::AddTask(Task task)
{
	if (m_vecpThread.empty())
	{
		task();
	}
	else
	{
		UniqueLock lock(m_lock);
		while (IsTaskFull())
		{
			m_condNotFull.wait(lock); // 等待队列变得不满
		}

		m_taskList.push_back(std::move(task));
		m_condNotEmpty.notify_one(); // 通知现在有任务了
	}
}

Task CThreadPool::PopTask()
{
	UniqueLock lock(m_lock);
	while (m_taskList.empty() && m_bRunning)
	{
		m_condNotEmpty.wait(lock);
	}

	Task task;
	if (!m_taskList.empty())
	{
		task = m_taskList.front();
		m_taskList.pop_front();
		if (m_nMaxTask > 0)
		{
			m_condNotFull.notify_one();
		}
	}
	return task;
}

void CThreadPool::Stop()
{
	{
		AutoLock lk(m_lock);
		m_bRunning = false;
		m_condNotEmpty.notify_all();
	}

	for (auto& th : m_vecpThread)
	{
		th->join();
	}
}

实现比较简单,直接看代码逻辑即可。

3、线程池使用

现在我们可以使用线程池,改造上一节实现事件实现。

先定义成员变量

CThreadPool m_pool;

构造函数进行初始化

EventMgr::EventMgr()
{
    m_pool.init(4);

    std::thread th(&EventMgr::WorkThreadFunc, this);
	th.detach();
}

修改OnEvent函数

 bool EventMgr::OnEvent(CHYEvent* pEvent)
 {
    AutoLock lock(m_lockEventMap);
    if ( m_mapEvent.count(pEvent->nType) == 0 ) return false;

    auto & map = m_mapEvent[pEvent->nType];

    m_pool.AddTask([=](){
        for (auto itr : map)
        {
            itr.second(pEvent);
        }
        char*e = (char*)pEvent;
        delete []e;
        return true;
    });
    
 }

我们观察日志变化

12/02/22 16:16:41 001D0600 [Info]ServiceBase Start ...
12/02/22 16:16:41 001D0600 [Info]this is thread postEvent
12/02/22 16:16:41 03C06000 [Info]this is EventMgr WorkThreadFunc
12/02/22 16:16:41 03C06000 [Info]Service On event 2
12/02/22 16:16:41 03C06000 [Info]this is Service event=s 
12/02/22 16:16:54 038F4000 [Info]this is thread postEvent
12/02/22 16:16:54 03C06000 [Info]Service On event 2
12/02/22 16:16:54 03C06000 [Info]this is Service event=s 

日志中第三列记录了线程id,可以看到Event的线程是03C06000,两次回调OnEvent都是在这个线程中执行的。

主线程是001D0600,第一次PostEvent是主线程中执行的,第二次PostEvent是在定时器回调线程(038F4000)中发出的。

12/02/22 16:19:00 001DC600 [Info]ServiceBase Start ...
12/02/22 16:19:00 02F79000 [Info]this is EventMgr WorkThreadFunc
12/02/22 16:19:00 001DC600 [Info]this is thread postEvent
12/02/22 16:19:00 02DF0000 [Info]Service On event 2
12/02/22 16:19:00 02DF0000 [Info]this is Service event=s 
12/02/22 16:19:13 02BE4000 [Info]this is thread postEvent
12/02/22 16:19:13 02E73000 [Info]Service On event 2
12/02/22 16:19:13 02E73000 [Info]this is Service event=s 

使用了线程池后,Event的线程是02F79000,第一次OnEvent回调时是线程02DF0000,第二次回调时是线程02E73000执行的。两次Event回调各不相同,且与Event所在线程不相同,线程池已经生效。

Previous3 事件机制Next5 线程安全

Last updated 2 years ago