一个独立的线程函数示例
#ifndef __PUSH_CACHE_THREAD_H__ #define __PUSH_CACHE_THREAD_H__ #include "util/tc_singleton.h" #include "util/tc_thread.h" #include "util/tc_config.h" //taf::TC_Config #include "util/tc_thread_rwlock.h" #include <vector> #include <string> #include "PushInterface.h" #include "PushCache.h" class CPushCacheThread :public taf::TC_Thread , public taf::TC_Singleton<CPushCacheThread> { public: CPushCacheThread(); ~CPushCacheThread(); bool initialize(const taf::TC_Config &conf); void run(); void terminate(); public: void pushToCache(const HQExtend::HPushUserMsgReq& oneMsg); private: std::queue<HQExtend::HPushUserMsgReq> m_msgQue; taf::TC_ThreadLock m_ThreadLock;//线程锁 taf::TC_ThreadRWLocker m_rwLock;//读写锁 HQExtend::PushCachePrx m_pushCachePrx; std::vector<std::string> m_vObjName; }; #endif
#include "pushCacheThread.h" #include "CountTimeApp.h" //TELL_TIME_COST_THIS #include "servant/Application.h" //ServerConfig::LocalIp #include "servant/taf_logger.h" //FDLOG #include "Pusher.h" using namespace std; CPushCacheThread::CPushCacheThread() { FDLOG("debug") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|" << std::this_thread::get_id() << endl; } CPushCacheThread::~CPushCacheThread() { FDLOG("debug") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|" << std::this_thread::get_id() << endl; terminate(); } bool CPushCacheThread::initialize(const taf::TC_Config &conf) { bool bRet = false; try { string pushCacheObj = conf.get("/conf/push<pushCache>"); if (!pushCacheObj.empty()) { //预警历史缓存服务 接口 m_pushCachePrx = Communicator::getInstance()->stringToProxy<HQExtend::PushCachePrx>(pushCacheObj); FDLOG("debug") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|" << "Push Cache prx init succ " << endl; bRet = true; } return bRet; } catch (std::exception &ex) { LOG_ERROR << "Exception: " << ex.what() << endl; } catch (...) { LOG_ERROR << "Unkonwn exception" << endl; } return bRet; } void CPushCacheThread::run() { FDLOG("cast") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|" << " =========== run START =========== " << std::this_thread::get_id() << endl; while (_running) { try { // do nothing... std::queue<HQExtend::HPushUserMsgReq> tempMsgQue; { taf::TC_ThreadWLock wlock(m_rwLock); tempMsgQue.swap(m_msgQue); } while (tempMsgQue.size() > 0) { HQExtend::HPushUserMsgReq req = tempMsgQue.front(); HQExtend::PushCachePrxCallbackPtr callback(new PushCacheCallback()); m_pushCachePrx->async_pushUserMsg(callback, req); tempMsgQue.pop(); } { TC_ThreadLock::Lock lock(m_ThreadLock); m_ThreadLock.timedWait(5000); } } catch (std::exception &ex) { LOG_ERROR << "Exception:" << ex.what() << endl; } catch (...) { LOG_ERROR << "Unkown exception." << endl; } } FDLOG("cast") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|" << " =========== run END =========== " << std::this_thread::get_id() << endl; } void CPushCacheThread::terminate() { FDLOG("cast") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|" << " =========== terminate =========== " << std::this_thread::get_id() << endl; if (_running) { _running = false; try { TC_ThreadLock::Lock lock(m_ThreadLock); m_ThreadLock.notifyAll(); LOG_DEBUG << __FILE__ << " CPushCacheThread::terminate " << endl; } catch (std::exception &ex) { LOG_ERROR << __FILE__ << "Exception:" << ex.what() << endl; } catch (...) { LOG_ERROR << __FILE__ << "Unknown exception." << endl; } } } void CPushCacheThread::pushToCache(const HQExtend::HPushUserMsgReq &oneMsg) { taf::TC_ThreadWLock wlock(m_rwLock); m_msgQue.push(oneMsg); }