我们来看一个稍微复杂一点的例子。
在 C++11 之前,由于 C++98/03 本身缺乏对线程和线程同步原语的支持,我们要写一个生产者消费者逻辑要这么写。
代码
/** * RecvMsgTask.h */ class CRecvMsgTask : public CThreadPoolTask { public: CRecvMsgTask(void); ~CRecvMsgTask(void); public: virtual int Run(); virtual int Stop(); virtual void TaskFinish(); BOOL AddMsgData(CBuffer* lpMsgData); private: BOOL HandleMsg(CBuffer* lpMsg); private: HANDLE m_hEvent; CRITICAL_SECTION m_csItem; HANDLE m_hSemaphore; std::vector<CBuffer*> m_arrItem; }; /** * RecvMsgTask.cpp */ CRecvMsgTask::CRecvMsgTask(void) { ::InitializeCriticalSection(&m_csItem); m_hSemaphore = ::CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL); m_hEvent = ::CreateEvent(NULL, TRUE, FALSE, NULL); } CRecvMsgTask::~CRecvMsgTask(void) { ::DeleteCriticalSection(&m_csItem); if (m_hSemaphore != NULL) { ::CloseHandle(m_hSemaphore); m_hSemaphore = NULL; } if (m_hEvent != NULL) { ::CloseHandle(m_hEvent); m_hEvent = NULL; } } int CRecvMsgTask::Run() { HANDLE hWaitEvent[2]; DWORD dwIndex; CBuffer * lpMsg; hWaitEvent[0] = m_hEvent; hWaitEvent[1] = m_hSemaphore; while (1) { dwIndex = ::WaitForMultipleObjects(2, hWaitEvent, FALSE, INFINITE); if (dwIndex == WAIT_OBJECT_0) break; lpMsg = NULL; ::EnterCriticalSection(&m_csItem); if (m_arrItem.size() > 0) { //消费者从队列m_arrItem中取出任务执行 lpMsg = m_arrItem[0]; m_arrItem.erase(m_arrItem.begin() + 0); } ::LeaveCriticalSection(&m_csItem); if (NULL == lpMsg) continue; //处理任务 HandleMsg(lpMsg); delete lpMsg; } return 0; } int CRecvMsgTask::Stop() { m_HttpClient.SetCancalEvent(); ::SetEvent(m_hEvent); return 0; } void CRecvMsgTask::TaskFinish() { } //生产者调用这个方法将Task放入队列m_arrItem中 BOOL CRecvMsgTask::AddMsgData(CBuffer * lpMsgData) { if (NULL == lpMsgData) return FALSE; ::EnterCriticalSection(&m_csItem); m_arrItem.push_back(lpMsgData); ::LeaveCriticalSection(&m_csItem); ::ReleaseSemaphore(m_hSemaphore, 1, NULL); return TRUE; }
代码
#include <pthread.h> #include <errno.h> #include <unistd.h> #include <list> #include <semaphore.h> #include <iostream> class Task { public: Task(int taskID) { this->taskID = taskID; } void doTask() { std::cout << "handle a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl; } private: int taskID; }; pthread_mutex_t mymutex; std::list<Task*> tasks; pthread_cond_t mycv; void* consumer_thread(void* param) { Task* pTask = NULL; while (true) { pthread_mutex_lock(&mymutex); while (tasks.empty()) { //如果获得了互斥锁,但是条件不合适的话,pthread_cond_wait会释放锁,不往下执行。 //当发生变化后,条件合适,pthread_cond_wait将直接获得锁。 pthread_cond_wait(&mycv, &mymutex); } pTask = tasks.front(); tasks.pop_front(); pthread_mutex_unlock(&mymutex); if (pTask == NULL) continue; pTask->doTask(); delete pTask; pTask = NULL; } return NULL; } void* producer_thread(void* param) { int taskID = 0; Task* pTask = NULL; while (true) { pTask = new Task(taskID); pthread_mutex_lock(&mymutex); tasks.push_back(pTask); std::cout << "produce a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl; pthread_mutex_unlock(&mymutex); //释放信号量,通知消费者线程 pthread_cond_signal(&mycv); taskID ++; //休眠1秒 sleep(1); } return NULL; } int main() { pthread_mutex_init(&mymutex, NULL); pthread_cond_init(&mycv, NULL); //创建5个消费者线程 pthread_t consumerThreadID[5]; for (int i = 0; i < 5; ++i) pthread_create(&consumerThreadID[i], NULL, consumer_thread, NULL); //创建一个生产者线程 pthread_t producerThreadID; pthread_create(&producerThreadID, NULL, producer_thread, NULL); pthread_join(producerThreadID, NULL); for (int i = 0; i < 5; ++i) pthread_join(consumerThreadID[i], NULL); pthread_cond_destroy(&mycv); pthread_mutex_destroy(&mymutex); return 0; }
怎么样?上述代码如果对于新手来说,望而却步。
为了实现这样的功能在 Windows 上你需要掌握线程如何创建、线程同步对象 CriticalSection、Event、Semaphore、WaitForSingleObject/WaitForMultipleObjects 等操作系统对象和 API。
在 Linux 上需要掌握线程创建,你需要了解线程创建、互斥体、条件变量。
对于需要支持多个平台的开发,需要开发者同时熟悉上述原理并编写多套适用不同平台的代码。
C++11 的线程库改变了这个现状,现在你只需要掌握 std::thread、std::mutex、std::condition_variable 少数几个线程同步对象即可,同时使用这些对象编写出来的代码也可以跨平台。示例如下:
代码
#include <thread> #include <mutex> #include <condition_variable> #include <list> #include <iostream> class Task { public: Task(int taskID) { this->taskID = taskID; } void doTask() { std::cout << "handle a task, taskID: " << taskID << ", threadID: " << std::this_thread::get_id() << std::endl; } private: int taskID; }; std::mutex mymutex; std::list<Task*> tasks; std::condition_variable mycv; void* consumer_thread() { Task* pTask = NULL; while (true) { std::unique_lock<std::mutex> guard(mymutex); while (tasks.empty()) { //如果获得了互斥锁,但是条件不合适的话,pthread_cond_wait会释放锁,不往下执行。 //当发生变化后,条件合适,pthread_cond_wait将直接获得锁。 mycv.wait(guard); } pTask = tasks.front(); tasks.pop_front(); if (pTask == NULL) continue; pTask->doTask(); delete pTask; pTask = NULL; } return NULL; } void* producer_thread() { int taskID = 0; Task* pTask = NULL; while (true) { pTask = new Task(taskID); //使用括号减小guard锁的作用范围 { std::lock_guard<std::mutex> guard(mymutex); tasks.push_back(pTask); std::cout << "produce a task, taskID: " << taskID << ", threadID: " << std::this_thread::get_id() << std::endl; } //释放信号量,通知消费者线程 mycv.notify_one(); taskID ++; //休眠1秒 std::this_thread::sleep_for(std::chrono::seconds(1)); } return NULL; } int main() { //创建5个消费者线程 std::thread consumer1(consumer_thread); std::thread consumer2(consumer_thread); std::thread consumer3(consumer_thread); std::thread consumer4(consumer_thread); std::thread consumer5(consumer_thread); //创建一个生产者线程 std::thread producer(producer_thread); producer.join(); consumer1.join(); consumer2.join(); consumer3.join(); consumer4.join(); consumer5.join(); return 0; }
C++并发编程实战
参考