diff options
author | Rider Linden <rider@lindenlab.com> | 2015-07-28 15:29:51 -0700 |
---|---|---|
committer | Rider Linden <rider@lindenlab.com> | 2015-07-28 15:29:51 -0700 |
commit | b57b0d97bb2fb880084cbcca1b915f8e67b442a5 (patch) | |
tree | 787fe3b51aad8fe2b083efcf20fd05bf346bae1a | |
parent | 7882396811fdf8b297f6d0c92d8e1e37859fde9d (diff) |
Named pools of coroutines.
-rwxr-xr-x | indra/newview/app_settings/settings.xml | 18 | ||||
-rw-r--r-- | indra/newview/llcoproceduremanager.cpp | 288 | ||||
-rw-r--r-- | indra/newview/llcoproceduremanager.h | 56 | ||||
-rw-r--r-- | indra/newview/llviewerassetupload.cpp | 3 |
4 files changed, 291 insertions, 74 deletions
diff --git a/indra/newview/app_settings/settings.xml b/indra/newview/app_settings/settings.xml index 2180a7f1a1..b4a4e41884 100755 --- a/indra/newview/app_settings/settings.xml +++ b/indra/newview/app_settings/settings.xml @@ -14340,6 +14340,24 @@ <key>Value</key> <integer>1</integer> </map> + <key>PoolSizeAIS</key> + <map> + <key>Comment</key> + <string>Coroutine Pool size for AIS</string> + <key>Type</key> + <string>U32</string> + <key>Value</key> + <integer>25</integer> + </map> + <key>PoolSizeUpload</key> + <map> + <key>Comment</key> + <string>Coroutine Pool size for Upload</string> + <key>Type</key> + <string>U32</string> + <key>Value</key> + <real>1</real> + </map> <!-- Settings below are for back compatibility only. They are not used in current viewer anymore. But they can't be removed to avoid diff --git a/indra/newview/llcoproceduremanager.cpp b/indra/newview/llcoproceduremanager.cpp index d3168985f8..e22a8b8013 100644 --- a/indra/newview/llcoproceduremanager.cpp +++ b/indra/newview/llcoproceduremanager.cpp @@ -1,5 +1,5 @@ /** -* @file llcoproceduremanager.cpp +* @file LLCoprocedurePool.cpp * @author Rider Linden * @brief Singleton class for managing asset uploads to the sim. * @@ -33,43 +33,270 @@ #include "llcoproceduremanager.h" //========================================================================= -#define COROCOUNT 1 +// Map of pool sizes for known pools +static std::map<std::string, U32> DefaultPoolSizes; + +// *TODO$: When C++11 this can be initialized here as follows: +// = {{"AIS", 25}, {"Upload", 1}} + +#define DEFAULT_POOL_SIZE 5 + +//========================================================================= +class LLCoprocedurePool: private boost::noncopyable +{ +public: + typedef LLCoprocedureManager::CoProcedure_t CoProcedure_t; + + LLCoprocedurePool(const std::string &name, size_t size); + virtual ~LLCoprocedurePool(); + + /// Places the coprocedure on the queue for processing. + /// + /// @param name Is used for debugging and should identify this coroutine. + /// @param proc Is a bound function to be executed + /// + /// @return This method returns a UUID that can be used later to cancel execution. + LLUUID enqueueCoprocedure(const std::string &name, CoProcedure_t proc); + + /// Cancel a coprocedure. If the coprocedure is already being actively executed + /// this method calls cancelYieldingOperation() on the associated HttpAdapter + /// If it has not yet been dequeued it is simply removed from the queue. + bool cancelCoprocedure(const LLUUID &id); + + /// Requests a shutdown of the upload manager. Passing 'true' will perform + /// an immediate kill on the upload coroutine. + void shutdown(bool hardShutdown = false); + + /// Returns the number of coprocedures in the queue awaiting processing. + /// + inline size_t countPending() const + { + return mPendingCoprocs.size(); + } + + /// Returns the number of coprocedures actively being processed. + /// + inline size_t countActive() const + { + return mActiveCoprocs.size(); + } + + /// Returns the total number of coprocedures either queued or in active processing. + /// + inline size_t count() const + { + return countPending() + countActive(); + } + +private: + struct QueuedCoproc + { + typedef boost::shared_ptr<QueuedCoproc> ptr_t; + + QueuedCoproc(const std::string &name, const LLUUID &id, CoProcedure_t proc) : + mName(name), + mId(id), + mProc(proc) + {} + + std::string mName; + LLUUID mId; + CoProcedure_t mProc; + }; + + // we use a deque here rather than std::queue since we want to be able to + // iterate through the queue and potentially erase an entry from the middle. + typedef std::deque<QueuedCoproc::ptr_t> CoprocQueue_t; + typedef std::map<LLUUID, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> ActiveCoproc_t; + + std::string mPoolName; + size_t mPoolSize; + CoprocQueue_t mPendingCoprocs; + ActiveCoproc_t mActiveCoprocs; + bool mShutdown; + LLEventStream mWakeupTrigger; + + typedef std::map<std::string, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> CoroAdapterMap_t; + LLCore::HttpRequest::policy_t mHTTPPolicy; + + CoroAdapterMap_t mCoroMapping; + + void coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter); + +}; //========================================================================= -LLCoprocedureManager::LLCoprocedureManager(): - LLSingleton<LLCoprocedureManager>(), +LLCoprocedureManager::LLCoprocedureManager() +{ + DefaultPoolSizes.insert(std::map<std::string, U32>::value_type("Upload", 1)); + DefaultPoolSizes.insert(std::map<std::string, U32>::value_type("AIS", 25)); +} + +LLCoprocedureManager::~LLCoprocedureManager() +{ + +} + +LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std::string &poolName) +{ + // *TODO: Retrieve the actual number of concurrent coroutines fro gSavedSettings and + // clamp to a "reasonable" number. + std::string keyName = "PoolSize" + poolName; + int size = 5; + + size = gSavedSettings.getU32(keyName); + if (size == 0) + { + std::map<std::string, U32>::iterator it = DefaultPoolSizes.find(poolName); + if (it == DefaultPoolSizes.end()) + size = DEFAULT_POOL_SIZE; + else + size = (*it).second; + gSavedSettings.declareU32(keyName, size, "Coroutine Pool size for " + poolName, LLControlVariable::PERSIST_ALWAYS); + LL_WARNS() << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL; + } + + poolPtr_t pool = poolPtr_t(new LLCoprocedurePool(poolName, size)); + mPoolMap.insert(poolMap_t::value_type(poolName, pool)); + + return pool; +} + +//------------------------------------------------------------------------- +LLUUID LLCoprocedureManager::enqueueCoprocedure(const std::string &pool, const std::string &name, CoProcedure_t proc) +{ + poolPtr_t targetPool; + poolMap_t::iterator it = mPoolMap.find(pool); + + if (it == mPoolMap.end()) + { + targetPool = initializePool(pool); + } + else + { + targetPool = (*it).second; + } + + if (!targetPool) + { + LL_WARNS() << "LLCoprocedureManager unable to create coprocedure pool named \"" << pool << "\"" << LL_ENDL; + return LLUUID::null; + } + + return targetPool->enqueueCoprocedure(name, proc); +} + +void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id) +{ + for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) + { + if ((*it).second->cancelCoprocedure(id)) + return; + } + LL_INFOS() << "Coprocedure not found." << LL_ENDL; +} + +void LLCoprocedureManager::shutdown(bool hardShutdown) +{ + for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) + { + (*it).second->shutdown(hardShutdown); + } + mPoolMap.clear(); +} + +//------------------------------------------------------------------------- +size_t LLCoprocedureManager::countPending() const +{ + size_t count = 0; + for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) + { + count += (*it).second->countPending(); + } + return count; +} + +size_t LLCoprocedureManager::countPending(const std::string &pool) const +{ + poolMap_t::const_iterator it = mPoolMap.find(pool); + + if (it == mPoolMap.end()) + return 0; + return (*it).second->countPending(); +} + +size_t LLCoprocedureManager::countActive() const +{ + size_t count = 0; + for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) + { + count += (*it).second->countActive(); + } + return count; +} + +size_t LLCoprocedureManager::countActive(const std::string &pool) const +{ + poolMap_t::const_iterator it = mPoolMap.find(pool); + + if (it == mPoolMap.end()) + return 0; + return (*it).second->countActive(); +} + +size_t LLCoprocedureManager::count() const +{ + size_t count = 0; + for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) + { + count += (*it).second->count(); + } + return count; +} + +size_t LLCoprocedureManager::count(const std::string &pool) const +{ + poolMap_t::const_iterator it = mPoolMap.find(pool); + + if (it == mPoolMap.end()) + return 0; + return (*it).second->count(); +} + +//========================================================================= +LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): + mPoolName(poolName), + mPoolSize(size), mPendingCoprocs(), mShutdown(false), - mWakeupTrigger("CoprocedureManager", true), + mWakeupTrigger("CoprocedurePool" + poolName, true), mCoroMapping(), mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID) { - - // *TODO: Retrieve the actual number of concurrent coroutines fro gSavedSettings and - // clamp to a "reasonable" number. - for (int count = 0; count < COROCOUNT; ++count) + for (size_t count = 0; count < mPoolSize; ++count) { LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter = LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t( - new LLCoreHttpUtil::HttpCoroutineAdapter("uploadPostAdapter", mHTTPPolicy)); + new LLCoreHttpUtil::HttpCoroutineAdapter( mPoolName + "Adapter", mHTTPPolicy)); - std::string uploadCoro = LLCoros::instance().launch("LLCoprocedureManager::coprocedureInvokerCoro", - boost::bind(&LLCoprocedureManager::coprocedureInvokerCoro, this, httpAdapter)); + std::string uploadCoro = LLCoros::instance().launch("LLCoprocedurePool("+mPoolName+")::coprocedureInvokerCoro", + boost::bind(&LLCoprocedurePool::coprocedureInvokerCoro, this, httpAdapter)); mCoroMapping.insert(CoroAdapterMap_t::value_type(uploadCoro, httpAdapter)); } + LL_INFOS() << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL; + mWakeupTrigger.post(LLSD()); } -LLCoprocedureManager::~LLCoprocedureManager() +LLCoprocedurePool::~LLCoprocedurePool() { shutdown(); } -//========================================================================= - -void LLCoprocedureManager::shutdown(bool hardShutdown) +//------------------------------------------------------------------------- +void LLCoprocedurePool::shutdown(bool hardShutdown) { CoroAdapterMap_t::iterator it; @@ -93,46 +320,47 @@ void LLCoprocedureManager::shutdown(bool hardShutdown) mPendingCoprocs.clear(); } -//========================================================================= -LLUUID LLCoprocedureManager::enqueueCoprocedure(const std::string &name, LLCoprocedureManager::CoProcedure_t proc) +//------------------------------------------------------------------------- +LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoprocedurePool::CoProcedure_t proc) { LLUUID id(LLUUID::generateNewID()); mPendingCoprocs.push_back(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc))); - LL_INFOS() << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << LL_ENDL; + LL_INFOS() << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; mWakeupTrigger.post(LLSD()); return id; } -void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id) +bool LLCoprocedurePool::cancelCoprocedure(const LLUUID &id) { // first check the active coroutines. If there, remove it and return. ActiveCoproc_t::iterator itActive = mActiveCoprocs.find(id); if (itActive != mActiveCoprocs.end()) { - LL_INFOS() << "Found and canceling active coprocedure with id=" << id.asString() << LL_ENDL; + LL_INFOS() << "Found and canceling active coprocedure with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; (*itActive).second->cancelYieldingOperation(); mActiveCoprocs.erase(itActive); - return; + return true; } for (CoprocQueue_t::iterator it = mPendingCoprocs.begin(); it != mPendingCoprocs.end(); ++it) { if ((*it)->mId == id) { - LL_INFOS() << "Found and removing queued coroutine(" << (*it)->mName << ") with Id=" << id.asString() << LL_ENDL; + LL_INFOS() << "Found and removing queued coroutine(" << (*it)->mName << ") with Id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; mPendingCoprocs.erase(it); - return; + return true; } } - LL_INFOS() << "Coprocedure with Id=" << id.asString() << " was not found." << LL_ENDL; + LL_INFOS() << "Coprocedure with Id=" << id.asString() << " was not found." << " in pool \"" << mPoolName << "\"" << LL_ENDL; + return false; } -//========================================================================= -void LLCoprocedureManager::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter) +//------------------------------------------------------------------------- +void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter) { LLCore::HttpRequest::ptr_t httpRequest(new LLCore::HttpRequest); @@ -148,7 +376,7 @@ void LLCoprocedureManager::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineA mPendingCoprocs.pop_front(); mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)); - LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << LL_ENDL; + LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; try { @@ -161,10 +389,10 @@ void LLCoprocedureManager::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineA } catch (...) { - LL_WARNS() << "A non std::exception was thrown from " << coproc->mName << " with id=" << coproc->mId << "." << LL_ENDL; + LL_WARNS() << "A non std::exception was thrown from " << coproc->mName << " with id=" << coproc->mId << "." << " in pool \"" << mPoolName << "\"" << LL_ENDL; } - LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << LL_ENDL; + LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL; ActiveCoproc_t::iterator itActive = mActiveCoprocs.find(coproc->mId); if (itActive != mActiveCoprocs.end()) diff --git a/indra/newview/llcoproceduremanager.h b/indra/newview/llcoproceduremanager.h index 6ba3891e87..d7f74af76b 100644 --- a/indra/newview/llcoproceduremanager.h +++ b/indra/newview/llcoproceduremanager.h @@ -33,6 +33,8 @@ #include "llcorehttputil.h" #include "lluuid.h" +class LLCoprocedurePool; + class LLCoprocedureManager : public LLSingleton < LLCoprocedureManager > { public: @@ -47,7 +49,7 @@ public: /// @param proc Is a bound function to be executed /// /// @return This method returns a UUID that can be used later to cancel execution. - LLUUID enqueueCoprocedure(const std::string &name, CoProcedure_t proc); + LLUUID enqueueCoprocedure(const std::string &pool, const std::string &name, CoProcedure_t proc); /// Cancel a coprocedure. If the coprocedure is already being actively executed /// this method calls cancelYieldingOperation() on the associated HttpAdapter @@ -60,58 +62,26 @@ public: /// Returns the number of coprocedures in the queue awaiting processing. /// - inline size_t countPending() const - { - return mPendingCoprocs.size(); - } + size_t countPending() const; + size_t countPending(const std::string &pool) const; /// Returns the number of coprocedures actively being processed. /// - inline size_t countActive() const - { - return mActiveCoprocs.size(); - } + size_t countActive() const; + size_t countActive(const std::string &pool) const; /// Returns the total number of coprocedures either queued or in active processing. /// - inline size_t count() const - { - return countPending() + countActive(); - } + size_t count() const; + size_t count(const std::string &pool) const; private: - struct QueuedCoproc - { - typedef boost::shared_ptr<QueuedCoproc> ptr_t; - - QueuedCoproc(const std::string &name, const LLUUID &id, CoProcedure_t proc): - mName(name), - mId(id), - mProc(proc) - {} - - std::string mName; - LLUUID mId; - CoProcedure_t mProc; - }; - - // we use a deque here rather than std::queue since we want to be able to - // iterate through the queue and potentially erase an entry from the middle. - typedef std::deque<QueuedCoproc::ptr_t> CoprocQueue_t; - typedef std::map<LLUUID, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> ActiveCoproc_t; - - CoprocQueue_t mPendingCoprocs; - ActiveCoproc_t mActiveCoprocs; - bool mShutdown; - LLEventStream mWakeupTrigger; - - - typedef std::map<std::string, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> CoroAdapterMap_t; - LLCore::HttpRequest::policy_t mHTTPPolicy; + typedef boost::shared_ptr<LLCoprocedurePool> poolPtr_t; + typedef std::map<std::string, poolPtr_t> poolMap_t; - CoroAdapterMap_t mCoroMapping; + poolMap_t mPoolMap; - void coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter); + poolPtr_t initializePool(const std::string &poolName); }; #endif diff --git a/indra/newview/llviewerassetupload.cpp b/indra/newview/llviewerassetupload.cpp index 4ef398d314..6c6d3a4f33 100644 --- a/indra/newview/llviewerassetupload.cpp +++ b/indra/newview/llviewerassetupload.cpp @@ -666,7 +666,8 @@ LLUUID LLViewerAssetUpload::EnqueueInventoryUpload(const std::string &url, const { std::string procName("LLViewerAssetUpload::AssetInventoryUploadCoproc("); - LLUUID queueId = LLCoprocedureManager::getInstance()->enqueueCoprocedure(procName + LLAssetType::lookup(uploadInfo->getAssetType()) + ")", + LLUUID queueId = LLCoprocedureManager::getInstance()->enqueueCoprocedure("Upload", + procName + LLAssetType::lookup(uploadInfo->getAssetType()) + ")", boost::bind(&LLViewerAssetUpload::AssetInventoryUploadCoproc, _1, _2, url, uploadInfo)); return queueId; |