diff options
Diffstat (limited to 'indra/llmessage/llcoproceduremanager.cpp')
-rw-r--r-- | indra/llmessage/llcoproceduremanager.cpp | 342 |
1 files changed, 175 insertions, 167 deletions
diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 74cdff2b00..a7bd836c4d 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -25,23 +25,29 @@ * $/LicenseInfo$ */ -#include "linden_common.h" +#include "llwin32headers.h" + +#include "linden_common.h" + #include "llcoproceduremanager.h" + +#include <chrono> + +#include <boost/fiber/buffered_channel.hpp> + #include "llexception.h" #include "stringize.h" -#include <boost/assign.hpp> //========================================================================= // Map of pool sizes for known pools -// *TODO$: When C++11 this can be initialized here as follows: -// = {{"AIS", 25}, {"Upload", 1}} -static std::map<std::string, U32> DefaultPoolSizes = - boost::assign::map_list_of - (std::string("Upload"), 1) - (std::string("AIS"), 1); - // *TODO: Rider for the moment keep AIS calls serialized otherwise the COF will tend to get out of sync. +static const std::map<std::string, U32> DefaultPoolSizes{ + {std::string("Upload"), 1}, + {std::string("AIS"), 1}, + // *TODO: Rider for the moment keep AIS calls serialized otherwise the COF will tend to get out of sync. +}; -#define DEFAULT_POOL_SIZE 5 +static const U32 DEFAULT_POOL_SIZE = 5; +static const U32 DEFAULT_QUEUE_SIZE = 4096; //========================================================================= class LLCoprocedurePool: private boost::noncopyable @@ -50,7 +56,7 @@ public: typedef LLCoprocedureManager::CoProcedure_t CoProcedure_t; LLCoprocedurePool(const std::string &name, size_t size); - virtual ~LLCoprocedurePool(); + ~LLCoprocedurePool(); /// Places the coprocedure on the queue for processing. /// @@ -60,20 +66,11 @@ public: /// @return This method returns a UUID that can be used later to cancel execution. LLUUID enqueueCoprocedure(const std::string &name, CoProcedure_t proc); - /// Cancel a coprocedure. If the coprocedure is already being actively executed - /// this method calls cancelSuspendedOperation() on the associated HttpAdapter - /// If it has not yet been dequeued it is simply removed from the queue. - bool cancelCoprocedure(const LLUUID &id); - - /// Requests a shutdown of the upload manager. Passing 'true' will perform - /// an immediate kill on the upload coroutine. - void shutdown(bool hardShutdown = false); - /// Returns the number of coprocedures in the queue awaiting processing. /// inline size_t countPending() const { - return mPendingCoprocs.size(); + return mPending; } /// Returns the number of coprocedures actively being processed. @@ -90,6 +87,8 @@ public: return countPending() + countActive(); } + void close(); + private: struct QueuedCoproc { @@ -106,25 +105,29 @@ private: CoProcedure_t mProc; }; - // we use a deque here rather than std::queue since we want to be able to - // iterate through the queue and potentially erase an entry from the middle. - typedef std::deque<QueuedCoproc::ptr_t> CoprocQueue_t; + // we use a buffered_channel here rather than unbuffered_channel since we want to be able to + // push values without blocking,even if there's currently no one calling a pop operation (due to + // fiber running right now) + typedef boost::fibers::buffered_channel<QueuedCoproc::ptr_t> CoprocQueue_t; + // Use shared_ptr to control the lifespan of our CoprocQueue_t instance + // because the consuming coroutine might outlive this LLCoprocedurePool + // instance. + typedef boost::shared_ptr<CoprocQueue_t> CoprocQueuePtr; typedef std::map<LLUUID, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> ActiveCoproc_t; std::string mPoolName; - size_t mPoolSize; - CoprocQueue_t mPendingCoprocs; + size_t mPoolSize, mPending{0}; + CoprocQueuePtr mPendingCoprocs; ActiveCoproc_t mActiveCoprocs; - bool mShutdown; - LLEventStream mWakeupTrigger; + LLTempBoundListener mStatusListener; typedef std::map<std::string, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> CoroAdapterMap_t; LLCore::HttpRequest::policy_t mHTTPPolicy; CoroAdapterMap_t mCoroMapping; - void coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter); - + void coprocedureInvokerCoro(CoprocQueuePtr pendingCoprocs, + LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter); }; //========================================================================= @@ -134,7 +137,7 @@ LLCoprocedureManager::LLCoprocedureManager() LLCoprocedureManager::~LLCoprocedureManager() { - + close(); } LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std::string &poolName) @@ -143,33 +146,34 @@ LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std:: std::string keyName = "PoolSize" + poolName; int size = 0; - if (poolName.empty()) - LL_ERRS("CoprocedureManager") << "Poolname must not be empty" << LL_ENDL; + LL_ERRS_IF(poolName.empty(), "CoprocedureManager") << "Poolname must not be empty" << LL_ENDL; - if (mPropertyQueryFn && !mPropertyQueryFn.empty()) + if (mPropertyQueryFn) { size = mPropertyQueryFn(keyName); } if (size == 0) - { // if not found grab the know default... if there is no known + { + // if not found grab the know default... if there is no known // default use a reasonable number like 5. - std::map<std::string, U32>::iterator it = DefaultPoolSizes.find(poolName); - if (it == DefaultPoolSizes.end()) - size = DEFAULT_POOL_SIZE; - else - size = (*it).second; + auto it = DefaultPoolSizes.find(poolName); + size = (it != DefaultPoolSizes.end()) ? it->second : DEFAULT_POOL_SIZE; - if (mPropertyDefineFn && !mPropertyDefineFn.empty()) + if (mPropertyDefineFn) + { mPropertyDefineFn(keyName, size, "Coroutine Pool size for " + poolName); - LL_WARNS() << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL; + } + + LL_WARNS("CoProcMgr") << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL; } poolPtr_t pool(new LLCoprocedurePool(poolName, size)); - mPoolMap.insert(poolMap_t::value_type(poolName, pool)); + LL_ERRS_IF(!pool, "CoprocedureManager") << "Unable to create pool named \"" << poolName << "\" FATAL!" << LL_ENDL; + + bool inserted = mPoolMap.emplace(poolName, pool).second; + LL_ERRS_IF(!inserted, "CoprocedureManager") << "Unable to add pool named \"" << poolName << "\" to map. FATAL!" << LL_ENDL; - if (!pool) - LL_ERRS("CoprocedureManager") << "Unable to create pool named \"" << poolName << "\" FATAL!" << LL_ENDL; return pool; } @@ -178,40 +182,13 @@ LLUUID LLCoprocedureManager::enqueueCoprocedure(const std::string &pool, const s { // Attempt to find the pool and enqueue the procedure. If the pool does // not exist, create it. - poolPtr_t targetPool; poolMap_t::iterator it = mPoolMap.find(pool); - if (it == mPoolMap.end()) - { - targetPool = initializePool(pool); - } - else - { - targetPool = (*it).second; - } + poolPtr_t targetPool = (it != mPoolMap.end()) ? it->second : initializePool(pool); return targetPool->enqueueCoprocedure(name, proc); } -void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id) -{ - for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) - { - if ((*it).second->cancelCoprocedure(id)) - return; - } - LL_INFOS() << "Coprocedure not found." << LL_ENDL; -} - -void LLCoprocedureManager::shutdown(bool hardShutdown) -{ - for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) - { - (*it).second->shutdown(hardShutdown); - } - mPoolMap.clear(); -} - void LLCoprocedureManager::setPropertyMethods(SettingQuery_t queryfn, SettingUpdate_t updatefn) { mPropertyQueryFn = queryfn; @@ -222,9 +199,9 @@ void LLCoprocedureManager::setPropertyMethods(SettingQuery_t queryfn, SettingUpd size_t LLCoprocedureManager::countPending() const { size_t count = 0; - for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) + for (const auto& pair : mPoolMap) { - count += (*it).second->countPending(); + count += pair.second->countPending(); } return count; } @@ -235,7 +212,7 @@ size_t LLCoprocedureManager::countPending(const std::string &pool) const if (it == mPoolMap.end()) return 0; - return (*it).second->countPending(); + return it->second->countPending(); } size_t LLCoprocedureManager::countActive() const @@ -243,7 +220,7 @@ size_t LLCoprocedureManager::countActive() const size_t count = 0; for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) { - count += (*it).second->countActive(); + count += it->second->countActive(); } return count; } @@ -253,16 +230,18 @@ size_t LLCoprocedureManager::countActive(const std::string &pool) const poolMap_t::const_iterator it = mPoolMap.find(pool); if (it == mPoolMap.end()) + { return 0; - return (*it).second->countActive(); + } + return it->second->countActive(); } size_t LLCoprocedureManager::count() const { size_t count = 0; - for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) + for (const auto& pair : mPoolMap) { - count += (*it).second->count(); + count += pair.second->count(); } return count; } @@ -273,59 +252,70 @@ size_t LLCoprocedureManager::count(const std::string &pool) const if (it == mPoolMap.end()) return 0; - return (*it).second->count(); + return it->second->count(); +} + +void LLCoprocedureManager::close() +{ + for(auto & poolEntry : mPoolMap) + { + poolEntry.second->close(); + } +} + +void LLCoprocedureManager::close(const std::string &pool) +{ + poolMap_t::iterator it = mPoolMap.find(pool); + if (it != mPoolMap.end()) + { + it->second->close(); + } } //========================================================================= LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): mPoolName(poolName), mPoolSize(size), - mPendingCoprocs(), - mShutdown(false), - mWakeupTrigger("CoprocedurePool" + poolName, true), - mCoroMapping(), - mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID) + mPendingCoprocs(boost::make_shared<CoprocQueue_t>(DEFAULT_QUEUE_SIZE)), + mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID), + mCoroMapping() { + // store in our LLTempBoundListener so that when the LLCoprocedurePool is + // destroyed, we implicitly disconnect from this LLEventPump + mStatusListener = LLEventPumps::instance().obtain("LLApp").listen( + poolName, + [pendingCoprocs=mPendingCoprocs, poolName](const LLSD& status) + { + auto& statsd = status["status"]; + if (statsd.asString() != "running") + { + LL_INFOS("CoProcMgr") << "Pool " << poolName + << " closing queue because status " << statsd + << LL_ENDL; + // This should ensure that all waiting coprocedures in this + // pool will wake up and terminate. + pendingCoprocs->close(); + } + return false; + }); + for (size_t count = 0; count < mPoolSize; ++count) { LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter(new LLCoreHttpUtil::HttpCoroutineAdapter( mPoolName + "Adapter", mHTTPPolicy)); - std::string pooledCoro = LLCoros::instance().launch("LLCoprocedurePool("+mPoolName+")::coprocedureInvokerCoro", - boost::bind(&LLCoprocedurePool::coprocedureInvokerCoro, this, httpAdapter)); + std::string pooledCoro = LLCoros::instance().launch( + "LLCoprocedurePool("+mPoolName+")::coprocedureInvokerCoro", + boost::bind(&LLCoprocedurePool::coprocedureInvokerCoro, this, + mPendingCoprocs, httpAdapter)); mCoroMapping.insert(CoroAdapterMap_t::value_type(pooledCoro, httpAdapter)); } - LL_INFOS() << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL; - - mWakeupTrigger.post(LLSD()); + LL_INFOS("CoProcMgr") << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items, queue max " << DEFAULT_QUEUE_SIZE << LL_ENDL; } LLCoprocedurePool::~LLCoprocedurePool() { - shutdown(); -} - -//------------------------------------------------------------------------- -void LLCoprocedurePool::shutdown(bool hardShutdown) -{ - CoroAdapterMap_t::iterator it; - - for (it = mCoroMapping.begin(); it != mCoroMapping.end(); ++it) - { - if (hardShutdown) - { - LLCoros::instance().kill((*it).first); - } - if ((*it).second) - { - (*it).second->cancelSuspendedOperation(); - } - } - - mShutdown = true; - mCoroMapping.clear(); - mPendingCoprocs.clear(); } //------------------------------------------------------------------------- @@ -333,76 +323,94 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced { LLUUID id(LLUUID::generateNewID()); - mPendingCoprocs.push_back(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc))); - LL_INFOS() << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; - - mWakeupTrigger.post(LLSD()); - - return id; -} - -bool LLCoprocedurePool::cancelCoprocedure(const LLUUID &id) -{ - // first check the active coroutines. If there, remove it and return. - ActiveCoproc_t::iterator itActive = mActiveCoprocs.find(id); - if (itActive != mActiveCoprocs.end()) + LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueuing with id=" << id.asString() << " in pool \"" << mPoolName << "\" at " << mPending << LL_ENDL; + auto pushed = mPendingCoprocs->try_push(boost::make_shared<QueuedCoproc>(name, id, proc)); + if (pushed == boost::fibers::channel_op_status::success) { - LL_INFOS() << "Found and canceling active coprocedure with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; - (*itActive).second->cancelSuspendedOperation(); - mActiveCoprocs.erase(itActive); - return true; + ++mPending; + return id; } - for (CoprocQueue_t::iterator it = mPendingCoprocs.begin(); it != mPendingCoprocs.end(); ++it) + // Here we didn't succeed in pushing. Shutdown could be the reason. + if (pushed == boost::fibers::channel_op_status::closed) { - if ((*it)->mId == id) - { - LL_INFOS() << "Found and removing queued coroutine(" << (*it)->mName << ") with Id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; - mPendingCoprocs.erase(it); - return true; - } + LL_WARNS("CoProcMgr") << "Discarding coprocedure '" << name << "' because shutdown" << LL_ENDL; + return {}; } - LL_INFOS() << "Coprocedure with Id=" << id.asString() << " was not found in pool \"" << mPoolName << "\"" << LL_ENDL; - return false; + // The queue should never fill up. + LL_ERRS("CoProcMgr") << "Enqueue failed (" << unsigned(pushed) << ")" << LL_ENDL; + return {}; // never executed, pacify the compiler } //------------------------------------------------------------------------- -void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter) +void LLCoprocedurePool::coprocedureInvokerCoro( + CoprocQueuePtr pendingCoprocs, + LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter) { - LLCore::HttpRequest::ptr_t httpRequest(new LLCore::HttpRequest); - - while (!mShutdown) + for (;;) { - llcoro::suspendUntilEventOn(mWakeupTrigger); - if (mShutdown) + // It is VERY IMPORTANT that we instantiate a new ptr_t just before + // the pop_wait_for() call below. When this ptr_t was declared at + // function scope (outside the for loop), NickyD correctly diagnosed a + // mysterious hang condition due to: + // - the second time through the loop, the ptr_t held the last pointer + // to the previous QueuedCoproc, which indirectly held the last + // LLPointer to an LLInventoryCallback instance + // - while holding the lock on pendingCoprocs, pop_wait_for() assigned + // the popped value to the ptr_t variable + // - assignment destroyed the previous value of that variable, which + // indirectly destroyed the LLInventoryCallback + // - whose destructor called ~LLRequestServerAppearanceUpdateOnDestroy() + // - which called LLAppearanceMgr::requestServerAppearanceUpdate() + // - which called enqueueCoprocedure() + // - which tried to acquire the lock on pendingCoprocs... alas. + // Using a fresh, clean ptr_t ensures that no previous value is + // destroyed during pop_wait_for(). + QueuedCoproc::ptr_t coproc; + boost::fibers::channel_op_status status; + { + LLCoros::TempStatus st("waiting for work for 10s"); + status = pendingCoprocs->pop_wait_for(coproc, std::chrono::seconds(10)); + } + if (status == boost::fibers::channel_op_status::closed) + { break; - - while (!mPendingCoprocs.empty()) + } + + if(status == boost::fibers::channel_op_status::timeout) { - QueuedCoproc::ptr_t coproc = mPendingCoprocs.front(); - mPendingCoprocs.pop_front(); - ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first; + LL_DEBUGS_ONCE("CoProcMgr") << "pool '" << mPoolName << "' waiting." << LL_ENDL; + continue; + } + // we actually popped an item + --mPending; - LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; + ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first; - try - { - coproc->mProc(httpAdapter, coproc->mId); - } - catch (...) - { - LOG_UNHANDLED_EXCEPTION(STRINGIZE("Coprocedure('" << coproc->mName - << "', id=" << coproc->mId.asString() - << ") in pool '" << mPoolName << "'")); - // must NOT omit this or we deplete the pool - mActiveCoprocs.erase(itActive); - throw; - } - - LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL; + LL_DEBUGS("CoProcMgr") << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\" (" << mPending << " left)" << LL_ENDL; + try + { + coproc->mProc(httpAdapter, coproc->mId); + } + catch (...) + { + LOG_UNHANDLED_EXCEPTION(STRINGIZE("Coprocedure('" << coproc->mName + << "', id=" << coproc->mId.asString() + << ") in pool '" << mPoolName << "'")); + // must NOT omit this or we deplete the pool mActiveCoprocs.erase(itActive); + continue; } + + LL_DEBUGS("CoProcMgr") << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL; + + mActiveCoprocs.erase(itActive); } } + +void LLCoprocedurePool::close() +{ + mPendingCoprocs->close(); +} |