From 66981fab0b3c8dcc3a031d50710a2b24ec6b0603 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 10 May 2018 21:46:07 -0400 Subject: SL-793: Use Boost.Fiber instead of the "dcoroutine" library. Longtime fans will remember that the "dcoroutine" library is a Google Summer of Code project by Giovanni P. Deretta. He originally called it "Boost.Coroutine," and we originally added it to our 3p-boost autobuild package as such. But when the official Boost.Coroutine library came along (with a very different API), and we still needed the API of the GSoC project, we renamed the unofficial one "dcoroutine" to allow coexistence. The "dcoroutine" library had an internal low-level API more or less analogous to Boost.Context. We later introduced an implementation of that internal API based on Boost.Context, a step towards eliminating the GSoC code in favor of official, supported Boost code. However, recent versions of Boost.Context no longer support the API on which we built the shim for "dcoroutine." We started down the path of reimplementing that shim using the current Boost.Context API -- then realized that it's time to bite the bullet and replace the "dcoroutine" API with the Boost.Fiber API, which we've been itching to do for literally years now. Naturally, most of the heavy lifting is in llcoros.{h,cpp} and lleventcoro.{h,cpp} -- which is good: the LLCoros layer abstracts away most of the differences between "dcoroutine" and Boost.Fiber. The one feature Boost.Fiber does not provide is the ability to forcibly terminate some other fiber. Accordingly, disable LLCoros::kill() and LLCoprocedureManager::shutdown(). The only known shutdown() call was in LLCoprocedurePool's destructor. We also took the opportunity to remove postAndSuspend2() and its associated machinery: FutureListener2, LLErrorEvent, errorException(), errorLog(), LLCoroEventPumps. All that dual-LLEventPump stuff was introduced at a time when the Responder pattern was king, and we assumed we'd want to listen on one LLEventPump with the success handler and on another with the error handler. We have never actually used that in practice. Remove associated tests, of course. There is one other semantic difference that necessitates patching a number of tests: with "dcoroutine," fulfilling a future IMMEDIATELY resumes the waiting coroutine. With Boost.Fiber, fulfilling a future merely marks the fiber as ready to resume next time the scheduler gets around to it. To observe the test side effects, we've inserted a number of llcoro::suspend() calls -- also in the main loop. For a long time we retained a single unit test exercising the raw "dcoroutine" API. Remove that. Eliminate llcoro_get_id.{h,cpp}, which provided llcoro::get_id(), which was a hack to emulate fiber-local variables. Since Boost.Fiber has an actual API for that, remove the hack. In fact, use (new alias) LLCoros::local_ptr for LLSingleton's dependency tracking in place of llcoro::get_id(). In CMake land, replace BOOST_COROUTINE_LIBRARY with BOOST_FIBER_LIBRARY. We don't actually use the Boost.Coroutine for anything (though there exist plausible use cases). --- indra/llmessage/llcoproceduremanager.cpp | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 74cdff2b00..4c85dd999a 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -203,6 +203,7 @@ void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id) LL_INFOS() << "Coprocedure not found." << LL_ENDL; } +/*==========================================================================*| void LLCoprocedureManager::shutdown(bool hardShutdown) { for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) @@ -211,6 +212,7 @@ void LLCoprocedureManager::shutdown(bool hardShutdown) } mPoolMap.clear(); } +|*==========================================================================*/ void LLCoprocedureManager::setPropertyMethods(SettingQuery_t queryfn, SettingUpdate_t updatefn) { @@ -303,10 +305,13 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): LLCoprocedurePool::~LLCoprocedurePool() { +/*==========================================================================*| shutdown(); +|*==========================================================================*/ } //------------------------------------------------------------------------- +/*==========================================================================*| void LLCoprocedurePool::shutdown(bool hardShutdown) { CoroAdapterMap_t::iterator it; @@ -327,6 +332,7 @@ void LLCoprocedurePool::shutdown(bool hardShutdown) mCoroMapping.clear(); mPendingCoprocs.clear(); } +|*==========================================================================*/ //------------------------------------------------------------------------- LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoprocedurePool::CoProcedure_t proc) -- cgit v1.2.3 From 997bdfc88682de36f02931a22d3baa23f00b6ddb Mon Sep 17 00:00:00 2001 From: Brad Kittenbrink Date: Mon, 11 Mar 2019 17:42:39 -0700 Subject: First draft of boost::fibers::unbuffered_channel based implementation of LLCoprocedureManager --- indra/llmessage/llcoproceduremanager.cpp | 301 +++++++++++++++---------------- 1 file changed, 150 insertions(+), 151 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 4c85dd999a..9501181509 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -25,23 +25,23 @@ * $/LicenseInfo$ */ -#include "linden_common.h" +#include "linden_common.h" #include "llcoproceduremanager.h" #include "llexception.h" #include "stringize.h" #include +#include //========================================================================= // Map of pool sizes for known pools -// *TODO$: When C++11 this can be initialized here as follows: -// = {{"AIS", 25}, {"Upload", 1}} -static std::map DefaultPoolSizes = - boost::assign::map_list_of - (std::string("Upload"), 1) - (std::string("AIS"), 1); - // *TODO: Rider for the moment keep AIS calls serialized otherwise the COF will tend to get out of sync. +static const std::map DefaultPoolSizes = +{ + {"Upload", 1}, + {"AIS", 1}, + // *TODO: Rider for the moment keep AIS calls serialized otherwise the COF will tend to get out of sync. +}; -#define DEFAULT_POOL_SIZE 5 +static const U32 DEFAULT_POOL_SIZE = 5; //========================================================================= class LLCoprocedurePool: private boost::noncopyable @@ -50,7 +50,7 @@ public: typedef LLCoprocedureManager::CoProcedure_t CoProcedure_t; LLCoprocedurePool(const std::string &name, size_t size); - virtual ~LLCoprocedurePool(); + ~LLCoprocedurePool(); /// Places the coprocedure on the queue for processing. /// @@ -63,18 +63,18 @@ public: /// Cancel a coprocedure. If the coprocedure is already being actively executed /// this method calls cancelSuspendedOperation() on the associated HttpAdapter /// If it has not yet been dequeued it is simply removed from the queue. - bool cancelCoprocedure(const LLUUID &id); + //bool cancelCoprocedure(const LLUUID &id); /// Requests a shutdown of the upload manager. Passing 'true' will perform /// an immediate kill on the upload coroutine. - void shutdown(bool hardShutdown = false); + //void shutdown(bool hardShutdown = false); - /// Returns the number of coprocedures in the queue awaiting processing. - /// - inline size_t countPending() const - { - return mPendingCoprocs.size(); - } +// /// Returns the number of coprocedures in the queue awaiting processing. +// /// +// inline size_t countPending() const +// { +// return mPendingCoprocs.size(); +// } /// Returns the number of coprocedures actively being processed. /// @@ -83,13 +83,15 @@ public: return mActiveCoprocs.size(); } - /// Returns the total number of coprocedures either queued or in active processing. - /// - inline size_t count() const - { - return countPending() + countActive(); - } +// /// Returns the total number of coprocedures either queued or in active processing. +// /// +// inline size_t count() const +// { +// return countPending() + countActive(); +// } + void close(); + private: struct QueuedCoproc { @@ -108,15 +110,15 @@ private: // we use a deque here rather than std::queue since we want to be able to // iterate through the queue and potentially erase an entry from the middle. - typedef std::deque CoprocQueue_t; + // TODO - make this queue be backed by an unbuffered_channel + typedef boost::fibers::unbuffered_channel CoprocQueue_t; typedef std::map ActiveCoproc_t; std::string mPoolName; size_t mPoolSize; CoprocQueue_t mPendingCoprocs; ActiveCoproc_t mActiveCoprocs; - bool mShutdown; - LLEventStream mWakeupTrigger; + //bool mShutdown; typedef std::map CoroAdapterMap_t; LLCore::HttpRequest::policy_t mHTTPPolicy; @@ -143,8 +145,7 @@ LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std:: std::string keyName = "PoolSize" + poolName; int size = 0; - if (poolName.empty()) - LL_ERRS("CoprocedureManager") << "Poolname must not be empty" << LL_ENDL; + LL_ERRS_IF(poolName.empty(), "CoprocedureManager") << "Poolname must not be empty" << LL_ENDL; if (mPropertyQueryFn && !mPropertyQueryFn.empty()) { @@ -152,24 +153,26 @@ LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std:: } if (size == 0) - { // if not found grab the know default... if there is no known + { + // if not found grab the know default... if there is no known // default use a reasonable number like 5. - std::map::iterator it = DefaultPoolSizes.find(poolName); - if (it == DefaultPoolSizes.end()) - size = DEFAULT_POOL_SIZE; - else - size = (*it).second; + auto it = DefaultPoolSizes.find(poolName); + size = (it != DefaultPoolSizes.end()) ? it->second : DEFAULT_POOL_SIZE; if (mPropertyDefineFn && !mPropertyDefineFn.empty()) + { mPropertyDefineFn(keyName, size, "Coroutine Pool size for " + poolName); + } + LL_WARNS() << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL; } poolPtr_t pool(new LLCoprocedurePool(poolName, size)); - mPoolMap.insert(poolMap_t::value_type(poolName, pool)); + LL_ERRS_IF(!pool, "CoprocedureManager") << "Unable to create pool named \"" << poolName << "\" FATAL!" << LL_ENDL; + + bool inserted = mPoolMap.emplace(poolName, pool).second; + LL_ERRS_IF(!inserted, "CoprocedureManager") << "Unable to add pool named \"" << poolName << "\" to map. FATAL!" << LL_ENDL; - if (!pool) - LL_ERRS("CoprocedureManager") << "Unable to create pool named \"" << poolName << "\" FATAL!" << LL_ENDL; return pool; } @@ -178,30 +181,24 @@ LLUUID LLCoprocedureManager::enqueueCoprocedure(const std::string &pool, const s { // Attempt to find the pool and enqueue the procedure. If the pool does // not exist, create it. - poolPtr_t targetPool; poolMap_t::iterator it = mPoolMap.find(pool); - if (it == mPoolMap.end()) - { - targetPool = initializePool(pool); - } - else - { - targetPool = (*it).second; - } + poolPtr_t targetPool = (it != mPoolMap.end()) ? it->second : initializePool(pool); return targetPool->enqueueCoprocedure(name, proc); } -void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id) -{ - for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) - { - if ((*it).second->cancelCoprocedure(id)) - return; - } - LL_INFOS() << "Coprocedure not found." << LL_ENDL; -} +//void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id) +//{ +// for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) +// { +// if (it->second->cancelCoprocedure(id)) +// { +// return; +// } +// } +// LL_INFOS() << "Coprocedure not found." << LL_ENDL; +//} /*==========================================================================*| void LLCoprocedureManager::shutdown(bool hardShutdown) @@ -220,32 +217,32 @@ void LLCoprocedureManager::setPropertyMethods(SettingQuery_t queryfn, SettingUpd mPropertyDefineFn = updatefn; } -//------------------------------------------------------------------------- -size_t LLCoprocedureManager::countPending() const -{ - size_t count = 0; - for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) - { - count += (*it).second->countPending(); - } - return count; -} - -size_t LLCoprocedureManager::countPending(const std::string &pool) const -{ - poolMap_t::const_iterator it = mPoolMap.find(pool); - - if (it == mPoolMap.end()) - return 0; - return (*it).second->countPending(); -} +////------------------------------------------------------------------------- +//size_t LLCoprocedureManager::countPending() const +//{ +// size_t count = 0; +// for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) +// { +// count += (*it).second->countPending(); +// } +// return count; +//} +// +//size_t LLCoprocedureManager::countPending(const std::string &pool) const +//{ +// poolMap_t::const_iterator it = mPoolMap.find(pool); +// +// if (it == mPoolMap.end()) +// return 0; +// return (*it).second->countPending(); +//} size_t LLCoprocedureManager::countActive() const { size_t count = 0; for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) { - count += (*it).second->countActive(); + count += it->second->countActive(); } return count; } @@ -255,27 +252,38 @@ size_t LLCoprocedureManager::countActive(const std::string &pool) const poolMap_t::const_iterator it = mPoolMap.find(pool); if (it == mPoolMap.end()) + { return 0; - return (*it).second->countActive(); + } + return it->second->countActive(); } -size_t LLCoprocedureManager::count() const +//size_t LLCoprocedureManager::count() const +//{ +// size_t count = 0; +// for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) +// { +// count += (*it).second->count(); +// } +// return count; +//} +// +//size_t LLCoprocedureManager::count(const std::string &pool) const +//{ +// poolMap_t::const_iterator it = mPoolMap.find(pool); +// +// if (it == mPoolMap.end()) +// return 0; +// return (*it).second->count(); +//} + +void LLCoprocedureManager::close(const std::string &pool) { - size_t count = 0; - for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) + poolMap_t::iterator it = mPoolMap.find(pool); + if (it != mPoolMap.end()) { - count += (*it).second->count(); + it->second->close(); } - return count; -} - -size_t LLCoprocedureManager::count(const std::string &pool) const -{ - poolMap_t::const_iterator it = mPoolMap.find(pool); - - if (it == mPoolMap.end()) - return 0; - return (*it).second->count(); } //========================================================================= @@ -283,8 +291,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): mPoolName(poolName), mPoolSize(size), mPendingCoprocs(), - mShutdown(false), - mWakeupTrigger("CoprocedurePool" + poolName, true), + //mShutdown(false), mCoroMapping(), mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID) { @@ -299,8 +306,6 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): } LL_INFOS() << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL; - - mWakeupTrigger.post(LLSD()); } LLCoprocedurePool::~LLCoprocedurePool() @@ -339,76 +344,70 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced { LLUUID id(LLUUID::generateNewID()); - mPendingCoprocs.push_back(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc))); + mPendingCoprocs.push(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc))); LL_INFOS() << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; - mWakeupTrigger.post(LLSD()); - return id; } -bool LLCoprocedurePool::cancelCoprocedure(const LLUUID &id) -{ - // first check the active coroutines. If there, remove it and return. - ActiveCoproc_t::iterator itActive = mActiveCoprocs.find(id); - if (itActive != mActiveCoprocs.end()) - { - LL_INFOS() << "Found and canceling active coprocedure with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; - (*itActive).second->cancelSuspendedOperation(); - mActiveCoprocs.erase(itActive); - return true; - } - - for (CoprocQueue_t::iterator it = mPendingCoprocs.begin(); it != mPendingCoprocs.end(); ++it) - { - if ((*it)->mId == id) - { - LL_INFOS() << "Found and removing queued coroutine(" << (*it)->mName << ") with Id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; - mPendingCoprocs.erase(it); - return true; - } - } - - LL_INFOS() << "Coprocedure with Id=" << id.asString() << " was not found in pool \"" << mPoolName << "\"" << LL_ENDL; - return false; -} +//bool LLCoprocedurePool::cancelCoprocedure(const LLUUID &id) +//{ +// // first check the active coroutines. If there, remove it and return. +// ActiveCoproc_t::iterator itActive = mActiveCoprocs.find(id); +// if (itActive != mActiveCoprocs.end()) +// { +// LL_INFOS() << "Found and canceling active coprocedure with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; +// (*itActive).second->cancelSuspendedOperation(); +// mActiveCoprocs.erase(itActive); +// return true; +// } +// +//// for (auto it: mPendingCoprocs) +//// { +//// if ((*it)->mId == id) +//// { +//// LL_INFOS() << "Found and removing queued coroutine(" << (*it)->mName << ") with Id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; +//// mPendingCoprocs.erase(it); +//// return true; +//// } +//// } +// +// LL_INFOS() << "Coprocedure with Id=" << id.asString() << " was not found in pool \"" << mPoolName << "\"" << LL_ENDL; +// return false; +//} //------------------------------------------------------------------------- void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter) { LLCore::HttpRequest::ptr_t httpRequest(new LLCore::HttpRequest); - while (!mShutdown) + QueuedCoproc::ptr_t coproc; + while (mPendingCoprocs.pop(coproc) != boost::fibers::channel_op_status::closed) { - llcoro::suspendUntilEventOn(mWakeupTrigger); - if (mShutdown) - break; - - while (!mPendingCoprocs.empty()) - { - QueuedCoproc::ptr_t coproc = mPendingCoprocs.front(); - mPendingCoprocs.pop_front(); - ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first; - - LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; - - try - { - coproc->mProc(httpAdapter, coproc->mId); - } - catch (...) - { - LOG_UNHANDLED_EXCEPTION(STRINGIZE("Coprocedure('" << coproc->mName - << "', id=" << coproc->mId.asString() - << ") in pool '" << mPoolName << "'")); - // must NOT omit this or we deplete the pool - mActiveCoprocs.erase(itActive); - throw; - } - - LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL; + ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first; + LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; + + try + { + coproc->mProc(httpAdapter, coproc->mId); + } + catch (...) + { + LOG_UNHANDLED_EXCEPTION(STRINGIZE("Coprocedure('" << coproc->mName + << "', id=" << coproc->mId.asString() + << ") in pool '" << mPoolName << "'")); + // must NOT omit this or we deplete the pool mActiveCoprocs.erase(itActive); + throw; } + + LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL; + + mActiveCoprocs.erase(itActive); } } + +void LLCoprocedurePool::close() { + mPendingCoprocs.close(); +} -- cgit v1.2.3 From b09aa6a2bf2f908ff890b920149976e04fd420db Mon Sep 17 00:00:00 2001 From: Brad Kittenbrink Date: Thu, 14 Mar 2019 11:55:42 -0700 Subject: Improved shutdown behavior of LLCoprocedureManager --- indra/llmessage/llcoproceduremanager.cpp | 68 ++++++++++---------------------- 1 file changed, 20 insertions(+), 48 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 9501181509..1b82cb8c99 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -26,12 +26,17 @@ */ #include "linden_common.h" + #include "llcoproceduremanager.h" -#include "llexception.h" -#include "stringize.h" + +#include + #include #include +#include "llexception.h" +#include "stringize.h" + //========================================================================= // Map of pool sizes for known pools static const std::map DefaultPoolSizes = @@ -65,10 +70,6 @@ public: /// If it has not yet been dequeued it is simply removed from the queue. //bool cancelCoprocedure(const LLUUID &id); - /// Requests a shutdown of the upload manager. Passing 'true' will perform - /// an immediate kill on the upload coroutine. - //void shutdown(bool hardShutdown = false); - // /// Returns the number of coprocedures in the queue awaiting processing. // /// // inline size_t countPending() const @@ -136,7 +137,10 @@ LLCoprocedureManager::LLCoprocedureManager() LLCoprocedureManager::~LLCoprocedureManager() { - + for(auto & poolEntry : mPoolMap) + { + poolEntry.second->close(); + } } LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std::string &poolName) @@ -200,17 +204,6 @@ LLUUID LLCoprocedureManager::enqueueCoprocedure(const std::string &pool, const s // LL_INFOS() << "Coprocedure not found." << LL_ENDL; //} -/*==========================================================================*| -void LLCoprocedureManager::shutdown(bool hardShutdown) -{ - for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) - { - (*it).second->shutdown(hardShutdown); - } - mPoolMap.clear(); -} -|*==========================================================================*/ - void LLCoprocedureManager::setPropertyMethods(SettingQuery_t queryfn, SettingUpdate_t updatefn) { mPropertyQueryFn = queryfn; @@ -310,35 +303,8 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): LLCoprocedurePool::~LLCoprocedurePool() { -/*==========================================================================*| - shutdown(); -|*==========================================================================*/ } -//------------------------------------------------------------------------- -/*==========================================================================*| -void LLCoprocedurePool::shutdown(bool hardShutdown) -{ - CoroAdapterMap_t::iterator it; - - for (it = mCoroMapping.begin(); it != mCoroMapping.end(); ++it) - { - if (hardShutdown) - { - LLCoros::instance().kill((*it).first); - } - if ((*it).second) - { - (*it).second->cancelSuspendedOperation(); - } - } - - mShutdown = true; - mCoroMapping.clear(); - mPendingCoprocs.clear(); -} -|*==========================================================================*/ - //------------------------------------------------------------------------- LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoprocedurePool::CoProcedure_t proc) { @@ -379,11 +345,17 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced //------------------------------------------------------------------------- void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter) { - LLCore::HttpRequest::ptr_t httpRequest(new LLCore::HttpRequest); - QueuedCoproc::ptr_t coproc; - while (mPendingCoprocs.pop(coproc) != boost::fibers::channel_op_status::closed) + boost::fibers::channel_op_status status; + using namespace std::chrono_literals; + while ((status = mPendingCoprocs.pop_wait_for(coproc, 10s)) != boost::fibers::channel_op_status::closed) { + if(status == boost::fibers::channel_op_status::timeout) + { + LL_INFOS() << "pool '" << mPoolName << "' stalled." << LL_ENDL; + continue; + } + ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first; LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; -- cgit v1.2.3 From c26c2bc3f0a4c254aac8ded86162d72eee7dea0a Mon Sep 17 00:00:00 2001 From: Brad Kittenbrink Date: Thu, 18 Apr 2019 15:27:40 -0700 Subject: Improved aggregate init syntax for DefaultPoolSizes map. --- indra/llmessage/llcoproceduremanager.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 1b82cb8c99..46c29d82b7 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -37,12 +37,13 @@ #include "llexception.h" #include "stringize.h" +using namespace std::literals; + //========================================================================= // Map of pool sizes for known pools -static const std::map DefaultPoolSizes = -{ - {"Upload", 1}, - {"AIS", 1}, +static const std::map DefaultPoolSizes{ + {"Upload"s, 1}, + {"AIS"s, 1}, // *TODO: Rider for the moment keep AIS calls serialized otherwise the COF will tend to get out of sync. }; @@ -347,7 +348,6 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap { QueuedCoproc::ptr_t coproc; boost::fibers::channel_op_status status; - using namespace std::chrono_literals; while ((status = mPendingCoprocs.pop_wait_for(coproc, 10s)) != boost::fibers::channel_op_status::closed) { if(status == boost::fibers::channel_op_status::timeout) -- cgit v1.2.3 From 828223bf1b8c7a74af6fea870a6a8620c6d4beb1 Mon Sep 17 00:00:00 2001 From: Brad Kittenbrink Date: Wed, 1 May 2019 15:35:56 -0700 Subject: Implemented some code review suggested cleanups. --- indra/llmessage/llcoproceduremanager.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 46c29d82b7..579ab097e0 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -112,7 +112,6 @@ private: // we use a deque here rather than std::queue since we want to be able to // iterate through the queue and potentially erase an entry from the middle. - // TODO - make this queue be backed by an unbuffered_channel typedef boost::fibers::unbuffered_channel CoprocQueue_t; typedef std::map ActiveCoproc_t; @@ -152,7 +151,7 @@ LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std:: LL_ERRS_IF(poolName.empty(), "CoprocedureManager") << "Poolname must not be empty" << LL_ENDL; - if (mPropertyQueryFn && !mPropertyQueryFn.empty()) + if (mPropertyQueryFn) { size = mPropertyQueryFn(keyName); } @@ -164,7 +163,7 @@ LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std:: auto it = DefaultPoolSizes.find(poolName); size = (it != DefaultPoolSizes.end()) ? it->second : DEFAULT_POOL_SIZE; - if (mPropertyDefineFn && !mPropertyDefineFn.empty()) + if (mPropertyDefineFn) { mPropertyDefineFn(keyName, size, "Coroutine Pool size for " + poolName); } @@ -352,7 +351,7 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap { if(status == boost::fibers::channel_op_status::timeout) { - LL_INFOS() << "pool '" << mPoolName << "' stalled." << LL_ENDL; + LL_INFOS_ONCE() << "pool '" << mPoolName << "' stalled." << LL_ENDL; continue; } -- cgit v1.2.3 From 16453005bb8373d7228262bf79c5882f311380e9 Mon Sep 17 00:00:00 2001 From: Anchor Date: Thu, 6 Jun 2019 01:51:38 -0700 Subject: [DRTVWR-476] - update cef, fix merge --- indra/llmessage/llcoproceduremanager.cpp | 2 ++ 1 file changed, 2 insertions(+) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 579ab097e0..fa8e9c3ebf 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -25,6 +25,8 @@ * $/LicenseInfo$ */ +#include "llwin32headers.h" + #include "linden_common.h" #include "llcoproceduremanager.h" -- cgit v1.2.3 From dc8d2779ab3fa49fd4ff4495d8a2c642507bde69 Mon Sep 17 00:00:00 2001 From: Nicky Date: Thu, 6 Jun 2019 20:59:54 +0200 Subject: Do not use string/chrono literals, sadly that won't work with GCC (4.9) --- indra/llmessage/llcoproceduremanager.cpp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index fa8e9c3ebf..13ee12b5bb 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -39,13 +39,11 @@ #include "llexception.h" #include "stringize.h" -using namespace std::literals; - //========================================================================= // Map of pool sizes for known pools static const std::map DefaultPoolSizes{ - {"Upload"s, 1}, - {"AIS"s, 1}, + {std::string("Upload"), 1}, + {std::string("AIS"), 1}, // *TODO: Rider for the moment keep AIS calls serialized otherwise the COF will tend to get out of sync. }; @@ -349,7 +347,7 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap { QueuedCoproc::ptr_t coproc; boost::fibers::channel_op_status status; - while ((status = mPendingCoprocs.pop_wait_for(coproc, 10s)) != boost::fibers::channel_op_status::closed) + while ((status = mPendingCoprocs.pop_wait_for(coproc, std::chrono::seconds(10))) != boost::fibers::channel_op_status::closed) { if(status == boost::fibers::channel_op_status::timeout) { -- cgit v1.2.3 From a27281591da9d4023d78f06823adaf2a7d51f724 Mon Sep 17 00:00:00 2001 From: Nicky Date: Fri, 7 Jun 2019 11:11:56 +0200 Subject: Replace boost::fibers::unbuffered_channel with boost::fibers::buffered_channel. Using boost::fibers::unbuffered_channel can block the mainthread when calling mPendingCoprocs.push (LLCoprocedurePool::enqueueCoprocedure) From the documentation: - If a fiber attempts to send a value through an unbuffered channel and no fiber is waiting to receive the value, the channel will block the sending fiber. This can happen if LLCoprocedurePool::coprocedureInvokerCoro is running a coroutine and this coroutine calls yield, resuming the viewers main loop. If inside the main loop someone calls LLCoprocedurePool::enqueueCoprocedure now push will block, as there's no one waiting for a result right now. The wait would be in LLCoprocedurePool::coprocedureInvokerCoro at the start of the while loop, but we have not reached that yet again as LLCoprocedurePool::coprocedureInvokerCoro did yield before reaching pop_wait_for. The result is a deadlock. boost::fibers::buffered_channel will not block as long as there's space in the channel. A size of 4096 (DEFAULT_QUEUE_SIZE) should be plenty enough for this. --- indra/llmessage/llcoproceduremanager.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 13ee12b5bb..bc7c982756 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -34,7 +34,7 @@ #include #include -#include +#include #include "llexception.h" #include "stringize.h" @@ -48,6 +48,7 @@ static const std::map DefaultPoolSizes{ }; static const U32 DEFAULT_POOL_SIZE = 5; +static const U32 DEFAULT_QUEUE_SIZE = 4096; //========================================================================= class LLCoprocedurePool: private boost::noncopyable @@ -112,7 +113,7 @@ private: // we use a deque here rather than std::queue since we want to be able to // iterate through the queue and potentially erase an entry from the middle. - typedef boost::fibers::unbuffered_channel CoprocQueue_t; + typedef boost::fibers::buffered_channel CoprocQueue_t; typedef std::map ActiveCoproc_t; std::string mPoolName; @@ -283,7 +284,7 @@ void LLCoprocedureManager::close(const std::string &pool) LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): mPoolName(poolName), mPoolSize(size), - mPendingCoprocs(), + mPendingCoprocs(DEFAULT_QUEUE_SIZE), //mShutdown(false), mCoroMapping(), mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID) -- cgit v1.2.3 From 96e7e92e2e60094a68f778767e3f4338b5d0ef60 Mon Sep 17 00:00:00 2001 From: Nicky Date: Fri, 7 Jun 2019 11:26:55 +0200 Subject: General cleanup. Delete commented out code. --- indra/llmessage/llcoproceduremanager.cpp | 109 ++----------------------------- 1 file changed, 7 insertions(+), 102 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index bc7c982756..89eb00a2b7 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -67,18 +67,6 @@ public: /// @return This method returns a UUID that can be used later to cancel execution. LLUUID enqueueCoprocedure(const std::string &name, CoProcedure_t proc); - /// Cancel a coprocedure. If the coprocedure is already being actively executed - /// this method calls cancelSuspendedOperation() on the associated HttpAdapter - /// If it has not yet been dequeued it is simply removed from the queue. - //bool cancelCoprocedure(const LLUUID &id); - -// /// Returns the number of coprocedures in the queue awaiting processing. -// /// -// inline size_t countPending() const -// { -// return mPendingCoprocs.size(); -// } - /// Returns the number of coprocedures actively being processed. /// inline size_t countActive() const @@ -86,13 +74,6 @@ public: return mActiveCoprocs.size(); } -// /// Returns the total number of coprocedures either queued or in active processing. -// /// -// inline size_t count() const -// { -// return countPending() + countActive(); -// } - void close(); private: @@ -111,8 +92,9 @@ private: CoProcedure_t mProc; }; - // we use a deque here rather than std::queue since we want to be able to - // iterate through the queue and potentially erase an entry from the middle. + // we use a buffered_channel here rather than unbuffered_channel since we want to be able to + // push values without blocking,even if there's currently no one calling a pop operation (due to + // fibber running right now) typedef boost::fibers::buffered_channel CoprocQueue_t; typedef std::map ActiveCoproc_t; @@ -120,7 +102,6 @@ private: size_t mPoolSize; CoprocQueue_t mPendingCoprocs; ActiveCoproc_t mActiveCoprocs; - //bool mShutdown; typedef std::map CoroAdapterMap_t; LLCore::HttpRequest::policy_t mHTTPPolicy; @@ -128,7 +109,6 @@ private: CoroAdapterMap_t mCoroMapping; void coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter); - }; //========================================================================= @@ -193,44 +173,12 @@ LLUUID LLCoprocedureManager::enqueueCoprocedure(const std::string &pool, const s return targetPool->enqueueCoprocedure(name, proc); } -//void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id) -//{ -// for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) -// { -// if (it->second->cancelCoprocedure(id)) -// { -// return; -// } -// } -// LL_INFOS() << "Coprocedure not found." << LL_ENDL; -//} - void LLCoprocedureManager::setPropertyMethods(SettingQuery_t queryfn, SettingUpdate_t updatefn) { mPropertyQueryFn = queryfn; mPropertyDefineFn = updatefn; } -////------------------------------------------------------------------------- -//size_t LLCoprocedureManager::countPending() const -//{ -// size_t count = 0; -// for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) -// { -// count += (*it).second->countPending(); -// } -// return count; -//} -// -//size_t LLCoprocedureManager::countPending(const std::string &pool) const -//{ -// poolMap_t::const_iterator it = mPoolMap.find(pool); -// -// if (it == mPoolMap.end()) -// return 0; -// return (*it).second->countPending(); -//} - size_t LLCoprocedureManager::countActive() const { size_t count = 0; @@ -252,25 +200,6 @@ size_t LLCoprocedureManager::countActive(const std::string &pool) const return it->second->countActive(); } -//size_t LLCoprocedureManager::count() const -//{ -// size_t count = 0; -// for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) -// { -// count += (*it).second->count(); -// } -// return count; -//} -// -//size_t LLCoprocedureManager::count(const std::string &pool) const -//{ -// poolMap_t::const_iterator it = mPoolMap.find(pool); -// -// if (it == mPoolMap.end()) -// return 0; -// return (*it).second->count(); -//} - void LLCoprocedureManager::close(const std::string &pool) { poolMap_t::iterator it = mPoolMap.find(pool); @@ -285,7 +214,6 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): mPoolName(poolName), mPoolSize(size), mPendingCoprocs(DEFAULT_QUEUE_SIZE), - //mShutdown(false), mCoroMapping(), mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID) { @@ -317,32 +245,6 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced return id; } -//bool LLCoprocedurePool::cancelCoprocedure(const LLUUID &id) -//{ -// // first check the active coroutines. If there, remove it and return. -// ActiveCoproc_t::iterator itActive = mActiveCoprocs.find(id); -// if (itActive != mActiveCoprocs.end()) -// { -// LL_INFOS() << "Found and canceling active coprocedure with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; -// (*itActive).second->cancelSuspendedOperation(); -// mActiveCoprocs.erase(itActive); -// return true; -// } -// -//// for (auto it: mPendingCoprocs) -//// { -//// if ((*it)->mId == id) -//// { -//// LL_INFOS() << "Found and removing queued coroutine(" << (*it)->mName << ") with Id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; -//// mPendingCoprocs.erase(it); -//// return true; -//// } -//// } -// -// LL_INFOS() << "Coprocedure with Id=" << id.asString() << " was not found in pool \"" << mPoolName << "\"" << LL_ENDL; -// return false; -//} - //------------------------------------------------------------------------- void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter) { @@ -358,6 +260,7 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first; + // Nicky: This is super spammy. Consider using LL_DEBUGS here? LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; try @@ -374,12 +277,14 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap throw; } + // Nicky: This is super spammy. Consider using LL_DEBUGS here? LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL; mActiveCoprocs.erase(itActive); } } -void LLCoprocedurePool::close() { +void LLCoprocedurePool::close() +{ mPendingCoprocs.close(); } -- cgit v1.2.3 From 1345a02b21a83bc4ee7ff72efc1858e956f18c53 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Tue, 22 Oct 2019 17:14:26 -0400 Subject: DRTVWR-476: Terminate long-lived coroutines to avoid shutdown crash. Add LLCoros::TempStatus instances around known suspension points so printActiveCoroutines() can report what each suspended coroutine is waiting for. Similarly, sprinkle checkStop() calls at known suspension points. Make LLApp::setStatus() post an event to a new LLEventPump "LLApp" with a string corresponding to the status value being set, but only until ~LLEventPumps() -- since setStatus() also gets called very late in the application's lifetime. Make postAndSuspendSetup() (used by postAndSuspend(), suspendUntilEventOn(), postAndSuspendWithTimeout(), suspendUntilEventOnWithTimeout()) add a listener on the new "LLApp" LLEventPump that pushes the new LLCoros::Stopping exception to the coroutine waiting on the LLCoros::Promise. Make it return the new LLBoundListener along with the previous one. Accordingly, make postAndSuspend() and postAndSuspendWithTimeout() store the new LLBoundListener returned by postAndSuspendSetup() in a LLTempBoundListener (as with the previous one) so it will automatically disconnect once the wait is over. Make each LLCoprocedurePool instance listen on "LLApp" with a listener that closes the queue on which new work items are dispatched. Closing the queue causes the waiting dispatch coroutine to terminate. Store the connection in an LLTempBoundListener on the LLCoprocedurePool so it will disconnect automatically on destruction. Refactor the loop in coprocedureInvokerCoro() to instantiate TempStatus around the suspending call. Change a couple spammy LL_INFOS() calls to LL_DEBUGS(). Give all logging calls in that module a "CoProcMgr" tag to make it straightforward to re-enable the LL_DEBUGS() calls as desired. --- indra/llmessage/llcoproceduremanager.cpp | 47 +++++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 10 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 89eb00a2b7..a8f6b8aa67 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -102,6 +102,7 @@ private: size_t mPoolSize; CoprocQueue_t mPendingCoprocs; ActiveCoproc_t mActiveCoprocs; + LLTempBoundListener mStatusListener; typedef std::map CoroAdapterMap_t; LLCore::HttpRequest::policy_t mHTTPPolicy; @@ -149,7 +150,7 @@ LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std:: mPropertyDefineFn(keyName, size, "Coroutine Pool size for " + poolName); } - LL_WARNS() << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL; + LL_WARNS("CoProcMgr") << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL; } poolPtr_t pool(new LLCoprocedurePool(poolName, size)); @@ -214,9 +215,28 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): mPoolName(poolName), mPoolSize(size), mPendingCoprocs(DEFAULT_QUEUE_SIZE), - mCoroMapping(), - mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID) + mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID), + mCoroMapping() { + // store in our LLTempBoundListener so that when the LLCoprocedurePool is + // destroyed, we implicitly disconnect from this LLEventPump + mStatusListener = LLEventPumps::instance().obtain("LLApp").listen( + poolName, + [this, poolName](const LLSD& status) + { + auto& statsd = status["status"]; + if (statsd.asString() != "running") + { + LL_INFOS("CoProcMgr") << "Pool " << poolName + << " closing queue because status " << statsd + << LL_ENDL; + // This should ensure that all waiting coprocedures in this + // pool will wake up and terminate. + mPendingCoprocs.close(); + } + return false; + }); + for (size_t count = 0; count < mPoolSize; ++count) { LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter(new LLCoreHttpUtil::HttpCoroutineAdapter( mPoolName + "Adapter", mHTTPPolicy)); @@ -227,7 +247,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): mCoroMapping.insert(CoroAdapterMap_t::value_type(pooledCoro, httpAdapter)); } - LL_INFOS() << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL; + LL_INFOS("CoProcMgr") << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL; } LLCoprocedurePool::~LLCoprocedurePool() @@ -240,7 +260,7 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced LLUUID id(LLUUID::generateNewID()); mPendingCoprocs.push(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc))); - LL_INFOS() << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; + LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; return id; } @@ -250,8 +270,17 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap { QueuedCoproc::ptr_t coproc; boost::fibers::channel_op_status status; - while ((status = mPendingCoprocs.pop_wait_for(coproc, std::chrono::seconds(10))) != boost::fibers::channel_op_status::closed) + for (;;) { + { + LLCoros::TempStatus st("waiting for work for 10s"); + status = mPendingCoprocs.pop_wait_for(coproc, std::chrono::seconds(10)); + } + if (status == boost::fibers::channel_op_status::closed) + { + break; + } + if(status == boost::fibers::channel_op_status::timeout) { LL_INFOS_ONCE() << "pool '" << mPoolName << "' stalled." << LL_ENDL; @@ -260,8 +289,7 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first; - // Nicky: This is super spammy. Consider using LL_DEBUGS here? - LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; + LL_DEBUGS("CoProcMgr") << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; try { @@ -277,8 +305,7 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap throw; } - // Nicky: This is super spammy. Consider using LL_DEBUGS here? - LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL; + LL_DEBUGS("CoProcMgr") << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL; mActiveCoprocs.erase(itActive); } -- cgit v1.2.3 From cbf146f2b3fc255bc83f2b01101dc29658bea6ea Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 24 Oct 2019 12:54:38 -0400 Subject: DRTVWR-476: Pump coroutines a few more times when we start quitting. By the time "LLApp" listeners are notified that the app is quitting, the mainloop is no longer running. Even though those listeners do things like close work queues and inject exceptions into pending promises, any coroutines waiting on those resources must regain control before they can notice and shut down properly. Add a final "LLApp" listener that resumes ready coroutines a few more times. Make sure every other "LLApp" listener is positioned before that new one. --- indra/llmessage/llcoproceduremanager.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index a8f6b8aa67..a71a31bfd2 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -218,8 +218,9 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID), mCoroMapping() { - // store in our LLTempBoundListener so that when the LLCoprocedurePool is - // destroyed, we implicitly disconnect from this LLEventPump + // Store in our LLTempBoundListener so that when the LLCoprocedurePool is + // destroyed, we implicitly disconnect from this LLEventPump. + // Run this listener before the "final" listener. mStatusListener = LLEventPumps::instance().obtain("LLApp").listen( poolName, [this, poolName](const LLSD& status) @@ -235,7 +236,9 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): mPendingCoprocs.close(); } return false; - }); + }, + LLEventPump::NameList{}, // after + LLEventPump::NameList{ "final "}); // before for (size_t count = 0; count < mPoolSize; ++count) { -- cgit v1.2.3 From 26c8ccfc06bc9334c9a4d0d027e83ad0b1b92a86 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 24 Oct 2019 16:05:37 -0400 Subject: DRTVWR-476: Back out changeset 40c0c6a8407d ("final" LLApp listener) --- indra/llmessage/llcoproceduremanager.cpp | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index a71a31bfd2..a8f6b8aa67 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -218,9 +218,8 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID), mCoroMapping() { - // Store in our LLTempBoundListener so that when the LLCoprocedurePool is - // destroyed, we implicitly disconnect from this LLEventPump. - // Run this listener before the "final" listener. + // store in our LLTempBoundListener so that when the LLCoprocedurePool is + // destroyed, we implicitly disconnect from this LLEventPump mStatusListener = LLEventPumps::instance().obtain("LLApp").listen( poolName, [this, poolName](const LLSD& status) @@ -236,9 +235,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): mPendingCoprocs.close(); } return false; - }, - LLEventPump::NameList{}, // after - LLEventPump::NameList{ "final "}); // before + }); for (size_t count = 0; count < mPoolSize; ++count) { -- cgit v1.2.3 From cc6f1d6195c457dc744ff23ac06ccd3a2d948aca Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Fri, 25 Oct 2019 16:10:56 -0400 Subject: DRTVWR-476: Use shared_ptr to manage lifespan of coprocedure queue. Since the consuming coroutine LLCoprocedurePool::coprocedureInvokerCoro() has been observed to outlive the LLCoprocedurePool instance that owns the CoprocQueue_t, closing that queue isn't enough to keep the coroutine from crashing at shutdown: accessing a deleted CoprocQueue_t is fatal whether or not it's been closed. Make LLCoprocedurePool store a shared_ptr to a heap CoprocQueue_t instance, and pass that shared_ptr by value to consuming coroutines. That way the CoprocQueue_t instance is guaranteed to live as long as the last interested party. --- indra/llmessage/llcoproceduremanager.cpp | 33 ++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index a8f6b8aa67..0b161ad2b4 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -94,13 +94,17 @@ private: // we use a buffered_channel here rather than unbuffered_channel since we want to be able to // push values without blocking,even if there's currently no one calling a pop operation (due to - // fibber running right now) + // fiber running right now) typedef boost::fibers::buffered_channel CoprocQueue_t; + // Use shared_ptr to control the lifespan of our CoprocQueue_t instance + // because the consuming coroutine might outlive this LLCoprocedurePool + // instance. + typedef boost::shared_ptr CoprocQueuePtr; typedef std::map ActiveCoproc_t; std::string mPoolName; size_t mPoolSize; - CoprocQueue_t mPendingCoprocs; + CoprocQueuePtr mPendingCoprocs; ActiveCoproc_t mActiveCoprocs; LLTempBoundListener mStatusListener; @@ -109,7 +113,8 @@ private: CoroAdapterMap_t mCoroMapping; - void coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter); + void coprocedureInvokerCoro(CoprocQueuePtr pendingCoprocs, + LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter); }; //========================================================================= @@ -214,7 +219,7 @@ void LLCoprocedureManager::close(const std::string &pool) LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): mPoolName(poolName), mPoolSize(size), - mPendingCoprocs(DEFAULT_QUEUE_SIZE), + mPendingCoprocs(boost::make_shared(DEFAULT_QUEUE_SIZE)), mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID), mCoroMapping() { @@ -222,7 +227,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): // destroyed, we implicitly disconnect from this LLEventPump mStatusListener = LLEventPumps::instance().obtain("LLApp").listen( poolName, - [this, poolName](const LLSD& status) + [pendingCoprocs=mPendingCoprocs, poolName](const LLSD& status) { auto& statsd = status["status"]; if (statsd.asString() != "running") @@ -232,7 +237,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): << LL_ENDL; // This should ensure that all waiting coprocedures in this // pool will wake up and terminate. - mPendingCoprocs.close(); + pendingCoprocs->close(); } return false; }); @@ -241,8 +246,10 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): { LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter(new LLCoreHttpUtil::HttpCoroutineAdapter( mPoolName + "Adapter", mHTTPPolicy)); - std::string pooledCoro = LLCoros::instance().launch("LLCoprocedurePool("+mPoolName+")::coprocedureInvokerCoro", - boost::bind(&LLCoprocedurePool::coprocedureInvokerCoro, this, httpAdapter)); + std::string pooledCoro = LLCoros::instance().launch( + "LLCoprocedurePool("+mPoolName+")::coprocedureInvokerCoro", + boost::bind(&LLCoprocedurePool::coprocedureInvokerCoro, this, + mPendingCoprocs, httpAdapter)); mCoroMapping.insert(CoroAdapterMap_t::value_type(pooledCoro, httpAdapter)); } @@ -259,14 +266,16 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced { LLUUID id(LLUUID::generateNewID()); - mPendingCoprocs.push(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc))); + mPendingCoprocs->push(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc))); LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; return id; } //------------------------------------------------------------------------- -void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter) +void LLCoprocedurePool::coprocedureInvokerCoro( + CoprocQueuePtr pendingCoprocs, + LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter) { QueuedCoproc::ptr_t coproc; boost::fibers::channel_op_status status; @@ -274,7 +283,7 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap { { LLCoros::TempStatus st("waiting for work for 10s"); - status = mPendingCoprocs.pop_wait_for(coproc, std::chrono::seconds(10)); + status = pendingCoprocs->pop_wait_for(coproc, std::chrono::seconds(10)); } if (status == boost::fibers::channel_op_status::closed) { @@ -313,5 +322,5 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap void LLCoprocedurePool::close() { - mPendingCoprocs.close(); + mPendingCoprocs->close(); } -- cgit v1.2.3 From b461b5dcefb753c908af5c62fb21049dc9f594b8 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 14 Nov 2019 15:40:06 -0500 Subject: DRTVWR-476: Manually count items in LLCoprocedurePool's pending queue. Reinstate LLCoprocedureManager::countPending() and count() methods. These were removed because boost::fibers::buffered_channel has no size() method, but since all users run within a single thread, it works to increment and decrement a simple counter. Add count information and max queue size to log messages. --- indra/llmessage/llcoproceduremanager.cpp | 74 +++++++++++++++++++++++++++++--- 1 file changed, 68 insertions(+), 6 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 0b161ad2b4..1c925b7eea 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -33,7 +33,6 @@ #include -#include #include #include "llexception.h" @@ -67,6 +66,13 @@ public: /// @return This method returns a UUID that can be used later to cancel execution. LLUUID enqueueCoprocedure(const std::string &name, CoProcedure_t proc); + /// Returns the number of coprocedures in the queue awaiting processing. + /// + inline size_t countPending() const + { + return mPending; + } + /// Returns the number of coprocedures actively being processed. /// inline size_t countActive() const @@ -74,6 +80,13 @@ public: return mActiveCoprocs.size(); } + /// Returns the total number of coprocedures either queued or in active processing. + /// + inline size_t count() const + { + return countPending() + countActive(); + } + void close(); private: @@ -103,7 +116,7 @@ private: typedef std::map ActiveCoproc_t; std::string mPoolName; - size_t mPoolSize; + size_t mPoolSize, mPending{0}; CoprocQueuePtr mPendingCoprocs; ActiveCoproc_t mActiveCoprocs; LLTempBoundListener mStatusListener; @@ -185,6 +198,26 @@ void LLCoprocedureManager::setPropertyMethods(SettingQuery_t queryfn, SettingUpd mPropertyDefineFn = updatefn; } +//------------------------------------------------------------------------- +size_t LLCoprocedureManager::countPending() const +{ + size_t count = 0; + for (const auto& pair : mPoolMap) + { + count += pair.second->countPending(); + } + return count; +} + +size_t LLCoprocedureManager::countPending(const std::string &pool) const +{ + poolMap_t::const_iterator it = mPoolMap.find(pool); + + if (it == mPoolMap.end()) + return 0; + return it->second->countPending(); +} + size_t LLCoprocedureManager::countActive() const { size_t count = 0; @@ -206,6 +239,25 @@ size_t LLCoprocedureManager::countActive(const std::string &pool) const return it->second->countActive(); } +size_t LLCoprocedureManager::count() const +{ + size_t count = 0; + for (const auto& pair : mPoolMap) + { + count += pair.second->count(); + } + return count; +} + +size_t LLCoprocedureManager::count(const std::string &pool) const +{ + poolMap_t::const_iterator it = mPoolMap.find(pool); + + if (it == mPoolMap.end()) + return 0; + return it->second->count(); +} + void LLCoprocedureManager::close(const std::string &pool) { poolMap_t::iterator it = mPoolMap.find(pool); @@ -254,7 +306,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): mCoroMapping.insert(CoroAdapterMap_t::value_type(pooledCoro, httpAdapter)); } - LL_INFOS("CoProcMgr") << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL; + LL_INFOS("CoProcMgr") << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items, queue max " << DEFAULT_QUEUE_SIZE << LL_ENDL; } LLCoprocedurePool::~LLCoprocedurePool() @@ -266,8 +318,16 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced { LLUUID id(LLUUID::generateNewID()); - mPendingCoprocs->push(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc))); - LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; + LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueuing with id=" << id.asString() << " in pool \"" << mPoolName << "\" at " << mPending << LL_ENDL; + auto pushed = mPendingCoprocs->try_push(boost::make_shared(name, id, proc)); + // We don't really have a lot of good options if try_push() failed, + // perhaps because the consuming coroutine is gummed up or something. This + // method is probably called from code called by mainloop. If we toss an + // llcoro::suspend() call here, we'll circle back for another mainloop + // iteration, possibly resulting in being re-entered here. Let's avoid that. + LL_ERRS_IF(pushed != boost::fibers::channel_op_status::success, "CoProcMgr") + << "Enqueue failed because queue is " << int(pushed) << LL_ENDL; + ++mPending; return id; } @@ -295,10 +355,12 @@ void LLCoprocedurePool::coprocedureInvokerCoro( LL_INFOS_ONCE() << "pool '" << mPoolName << "' stalled." << LL_ENDL; continue; } + // we actually popped an item + --mPending; ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first; - LL_DEBUGS("CoProcMgr") << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; + LL_DEBUGS("CoProcMgr") << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\" (" << mPending << " left)" << LL_ENDL; try { -- cgit v1.2.3 From bf8aea5059f127dcce2fdf613d62c253bb3fa8fd Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 14 Nov 2019 16:45:39 -0500 Subject: DRTVWR-476: Use LLThreadSafeQueue, not boost::fibers::buffered_channel. We've observed buffered_channel::try_push() hanging, which seems very odd. Try our own LLThreadSafeQueue instead. --- indra/llmessage/llcoproceduremanager.cpp | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 1c925b7eea..712cab5b19 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -33,7 +33,7 @@ #include -#include +#include "llthreadsafequeue.h" #include "llexception.h" #include "stringize.h" @@ -105,10 +105,7 @@ private: CoProcedure_t mProc; }; - // we use a buffered_channel here rather than unbuffered_channel since we want to be able to - // push values without blocking,even if there's currently no one calling a pop operation (due to - // fiber running right now) - typedef boost::fibers::buffered_channel CoprocQueue_t; + typedef LLThreadSafeQueue CoprocQueue_t; // Use shared_ptr to control the lifespan of our CoprocQueue_t instance // because the consuming coroutine might outlive this LLCoprocedurePool // instance. @@ -289,7 +286,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): << LL_ENDL; // This should ensure that all waiting coprocedures in this // pool will wake up and terminate. - pendingCoprocs->close(); + pendingCoprocs->pushFront({}); } return false; }); @@ -319,14 +316,13 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced LLUUID id(LLUUID::generateNewID()); LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueuing with id=" << id.asString() << " in pool \"" << mPoolName << "\" at " << mPending << LL_ENDL; - auto pushed = mPendingCoprocs->try_push(boost::make_shared(name, id, proc)); - // We don't really have a lot of good options if try_push() failed, + auto pushed = mPendingCoprocs->tryPushFront(boost::make_shared(name, id, proc)); + // We don't really have a lot of good options if tryPushFront() failed, // perhaps because the consuming coroutine is gummed up or something. This // method is probably called from code called by mainloop. If we toss an // llcoro::suspend() call here, we'll circle back for another mainloop // iteration, possibly resulting in being re-entered here. Let's avoid that. - LL_ERRS_IF(pushed != boost::fibers::channel_op_status::success, "CoProcMgr") - << "Enqueue failed because queue is " << int(pushed) << LL_ENDL; + LL_ERRS_IF(! pushed, "CoProcMgr") << "Enqueue failed" << LL_ENDL; ++mPending; return id; @@ -338,23 +334,18 @@ void LLCoprocedurePool::coprocedureInvokerCoro( LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter) { QueuedCoproc::ptr_t coproc; - boost::fibers::channel_op_status status; for (;;) { { - LLCoros::TempStatus st("waiting for work for 10s"); - status = pendingCoprocs->pop_wait_for(coproc, std::chrono::seconds(10)); + LLCoros::TempStatus st("waiting for work"); + coproc = pendingCoprocs->popBack(); } - if (status == boost::fibers::channel_op_status::closed) + if (! coproc) { + // close() pushes an empty pointer to signal done break; } - if(status == boost::fibers::channel_op_status::timeout) - { - LL_INFOS_ONCE() << "pool '" << mPoolName << "' stalled." << LL_ENDL; - continue; - } // we actually popped an item --mPending; @@ -384,5 +375,5 @@ void LLCoprocedurePool::coprocedureInvokerCoro( void LLCoprocedurePool::close() { - mPendingCoprocs->close(); + mPendingCoprocs->pushFront({}); } -- cgit v1.2.3 From fc2437fb5d349a094c1c64631ba6a5fd5675ddcc Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Fri, 15 Nov 2019 08:17:04 -0500 Subject: DRTVWR-476: Introduce LLCoprocedureManager::close(). Use in tests. The new close(void) method simply acquires the logic from ~LLCoprocedureManager() (which now calls close()). It's useful, even if only in test programs, to be able to shut down all existing LLCoprocedurePools without having to name them individually -- and without having to destroy the LLCoprocedureManager singleton instance. Deleting an LLSingleton should be done only once per process, whereas test programs want to reset the LLCoprocedureManager after each test. --- indra/llmessage/llcoproceduremanager.cpp | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 712cab5b19..c1e53ea278 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -134,10 +134,7 @@ LLCoprocedureManager::LLCoprocedureManager() LLCoprocedureManager::~LLCoprocedureManager() { - for(auto & poolEntry : mPoolMap) - { - poolEntry.second->close(); - } + close(); } LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std::string &poolName) @@ -255,6 +252,14 @@ size_t LLCoprocedureManager::count(const std::string &pool) const return it->second->count(); } +void LLCoprocedureManager::close() +{ + for(auto & poolEntry : mPoolMap) + { + poolEntry.second->close(); + } +} + void LLCoprocedureManager::close(const std::string &pool) { poolMap_t::iterator it = mPoolMap.find(pool); -- cgit v1.2.3 From ce36ef8242ce4af423832ced90f724615b5b3140 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 19 Dec 2019 11:50:52 -0500 Subject: DRTVWR-476: Use LLThreadSafeQueue::close() to shut down coprocs. The tactic of pushing an empty QueuedCoproc::ptr_t to signal coprocedure close only works for LLCoprocedurePools with a single coprocedure (e.g. "Upload" and "AIS"). Only one coprocedureInvokerCoro() coroutine will pop that empty pointer and shut down properly -- the rest will continue waiting indefinitely. Rather than pushing some number of empty pointers, hopefully enough to notify all consumer coroutines, close() the queue. That will notify as many consumers as there may be. That means catching LLThreadSafeQueueInterrupt from popBack(), instead of detecting empty pointer. Also, if a queued coprocedure throws an exception, coprocedureInvokerCoro() logs it as before -- but instead of rethrowing it, the coroutine now loops back to wait for more work. Otherwise, the number of coroutines servicing the queue dwindles. --- indra/llmessage/llcoproceduremanager.cpp | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index c1e53ea278..d252c0e4b0 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -291,7 +291,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): << LL_ENDL; // This should ensure that all waiting coprocedures in this // pool will wake up and terminate. - pendingCoprocs->pushFront({}); + pendingCoprocs->close(); } return false; }); @@ -323,7 +323,7 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueuing with id=" << id.asString() << " in pool \"" << mPoolName << "\" at " << mPending << LL_ENDL; auto pushed = mPendingCoprocs->tryPushFront(boost::make_shared(name, id, proc)); // We don't really have a lot of good options if tryPushFront() failed, - // perhaps because the consuming coroutine is gummed up or something. This + // perhaps because the consuming coroutines are gummed up or something. This // method is probably called from code called by mainloop. If we toss an // llcoro::suspend() call here, we'll circle back for another mainloop // iteration, possibly resulting in being re-entered here. Let's avoid that. @@ -341,13 +341,14 @@ void LLCoprocedurePool::coprocedureInvokerCoro( QueuedCoproc::ptr_t coproc; for (;;) { + try { LLCoros::TempStatus st("waiting for work"); coproc = pendingCoprocs->popBack(); } - if (! coproc) + catch (const LLThreadSafeQueueError&) { - // close() pushes an empty pointer to signal done + // queue is closed break; } @@ -369,7 +370,7 @@ void LLCoprocedurePool::coprocedureInvokerCoro( << ") in pool '" << mPoolName << "'")); // must NOT omit this or we deplete the pool mActiveCoprocs.erase(itActive); - throw; + continue; } LL_DEBUGS("CoProcMgr") << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL; @@ -380,5 +381,5 @@ void LLCoprocedurePool::coprocedureInvokerCoro( void LLCoprocedurePool::close() { - mPendingCoprocs->pushFront({}); + mPendingCoprocs->close(); } -- cgit v1.2.3 From 9d428662f88324b1d48ce89cca17c19e0f72f535 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Tue, 19 May 2020 11:32:24 -0400 Subject: DRTVWR-476: Revert "Use LLThreadSafeQueue, not boost::fibers::buffered_channel." This reverts commit bf8aea5059f127dcce2fdf613d62c253bb3fa8fd. Try boost::fibers::buffered_channel again with Boost 1.72. --- indra/llmessage/llcoproceduremanager.cpp | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index d252c0e4b0..456448137d 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -33,7 +33,7 @@ #include -#include "llthreadsafequeue.h" +#include #include "llexception.h" #include "stringize.h" @@ -105,7 +105,10 @@ private: CoProcedure_t mProc; }; - typedef LLThreadSafeQueue CoprocQueue_t; + // we use a buffered_channel here rather than unbuffered_channel since we want to be able to + // push values without blocking,even if there's currently no one calling a pop operation (due to + // fiber running right now) + typedef boost::fibers::buffered_channel CoprocQueue_t; // Use shared_ptr to control the lifespan of our CoprocQueue_t instance // because the consuming coroutine might outlive this LLCoprocedurePool // instance. @@ -321,13 +324,14 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced LLUUID id(LLUUID::generateNewID()); LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueuing with id=" << id.asString() << " in pool \"" << mPoolName << "\" at " << mPending << LL_ENDL; - auto pushed = mPendingCoprocs->tryPushFront(boost::make_shared(name, id, proc)); - // We don't really have a lot of good options if tryPushFront() failed, - // perhaps because the consuming coroutines are gummed up or something. This + auto pushed = mPendingCoprocs->try_push(boost::make_shared(name, id, proc)); + // We don't really have a lot of good options if try_push() failed, + // perhaps because the consuming coroutine is gummed up or something. This // method is probably called from code called by mainloop. If we toss an // llcoro::suspend() call here, we'll circle back for another mainloop // iteration, possibly resulting in being re-entered here. Let's avoid that. - LL_ERRS_IF(! pushed, "CoProcMgr") << "Enqueue failed" << LL_ENDL; + LL_ERRS_IF(pushed != boost::fibers::channel_op_status::success, "CoProcMgr") + << "Enqueue failed because queue is " << int(pushed) << LL_ENDL; ++mPending; return id; @@ -339,19 +343,24 @@ void LLCoprocedurePool::coprocedureInvokerCoro( LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter) { QueuedCoproc::ptr_t coproc; + boost::fibers::channel_op_status status; for (;;) { try { - LLCoros::TempStatus st("waiting for work"); - coproc = pendingCoprocs->popBack(); + LLCoros::TempStatus st("waiting for work for 10s"); + status = pendingCoprocs->pop_wait_for(coproc, std::chrono::seconds(10)); } - catch (const LLThreadSafeQueueError&) + if (status == boost::fibers::channel_op_status::closed) { - // queue is closed break; } + if(status == boost::fibers::channel_op_status::timeout) + { + LL_INFOS_ONCE() << "pool '" << mPoolName << "' stalled." << LL_ENDL; + continue; + } // we actually popped an item --mPending; -- cgit v1.2.3 From 003ba682a1b7555a41f4c095b927d19c96a77256 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Tue, 19 May 2020 14:38:14 -0400 Subject: DRTVWR-476: Clean up reverting to boost::fibers::buffered_channel. --- indra/llmessage/llcoproceduremanager.cpp | 1 - 1 file changed, 1 deletion(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 456448137d..4168e0c67b 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -346,7 +346,6 @@ void LLCoprocedurePool::coprocedureInvokerCoro( boost::fibers::channel_op_status status; for (;;) { - try { LLCoros::TempStatus st("waiting for work for 10s"); status = pendingCoprocs->pop_wait_for(coproc, std::chrono::seconds(10)); -- cgit v1.2.3 From 13b4bd58324e265db5b6d7392f0202c07af1e303 Mon Sep 17 00:00:00 2001 From: Nicky Dasmijn Date: Tue, 19 May 2020 21:27:16 +0200 Subject: Make sure coproc gets destroyed after each iteration. Making coproc scoped to the for loop will make sure the destructor gets called every loop iteration. Keeping it's scope outside the for loop means the pointer keeps valid till the next assigment that happens inside pop_wait_for when it gets assigned a new value. Triggering the dtor inside pop_wait_for can lead to deadlock when inside the dtor a coroutine tries to call enqueueCoprocedure (this happens). enqueueCoprocedure then will try to grab the lock for try_push but this lock is still held by pop_wait_for. --- indra/llmessage/llcoproceduremanager.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 4168e0c67b..210b83ae2d 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -342,10 +342,10 @@ void LLCoprocedurePool::coprocedureInvokerCoro( CoprocQueuePtr pendingCoprocs, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter) { - QueuedCoproc::ptr_t coproc; - boost::fibers::channel_op_status status; for (;;) { + QueuedCoproc::ptr_t coproc; + boost::fibers::channel_op_status status; { LLCoros::TempStatus st("waiting for work for 10s"); status = pendingCoprocs->pop_wait_for(coproc, std::chrono::seconds(10)); -- cgit v1.2.3 From b7d60f650d2ca9fdfc3c541d76670c938f2cf48e Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Wed, 20 May 2020 10:44:34 -0400 Subject: DRTVWR-476: Fix LLCoprocedurePool::enqueueCoprocedure() shutdown crash. --- indra/llmessage/llcoproceduremanager.cpp | 45 ++++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 11 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 210b83ae2d..a7bd836c4d 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -325,16 +325,22 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueuing with id=" << id.asString() << " in pool \"" << mPoolName << "\" at " << mPending << LL_ENDL; auto pushed = mPendingCoprocs->try_push(boost::make_shared(name, id, proc)); - // We don't really have a lot of good options if try_push() failed, - // perhaps because the consuming coroutine is gummed up or something. This - // method is probably called from code called by mainloop. If we toss an - // llcoro::suspend() call here, we'll circle back for another mainloop - // iteration, possibly resulting in being re-entered here. Let's avoid that. - LL_ERRS_IF(pushed != boost::fibers::channel_op_status::success, "CoProcMgr") - << "Enqueue failed because queue is " << int(pushed) << LL_ENDL; - ++mPending; - - return id; + if (pushed == boost::fibers::channel_op_status::success) + { + ++mPending; + return id; + } + + // Here we didn't succeed in pushing. Shutdown could be the reason. + if (pushed == boost::fibers::channel_op_status::closed) + { + LL_WARNS("CoProcMgr") << "Discarding coprocedure '" << name << "' because shutdown" << LL_ENDL; + return {}; + } + + // The queue should never fill up. + LL_ERRS("CoProcMgr") << "Enqueue failed (" << unsigned(pushed) << ")" << LL_ENDL; + return {}; // never executed, pacify the compiler } //------------------------------------------------------------------------- @@ -344,6 +350,23 @@ void LLCoprocedurePool::coprocedureInvokerCoro( { for (;;) { + // It is VERY IMPORTANT that we instantiate a new ptr_t just before + // the pop_wait_for() call below. When this ptr_t was declared at + // function scope (outside the for loop), NickyD correctly diagnosed a + // mysterious hang condition due to: + // - the second time through the loop, the ptr_t held the last pointer + // to the previous QueuedCoproc, which indirectly held the last + // LLPointer to an LLInventoryCallback instance + // - while holding the lock on pendingCoprocs, pop_wait_for() assigned + // the popped value to the ptr_t variable + // - assignment destroyed the previous value of that variable, which + // indirectly destroyed the LLInventoryCallback + // - whose destructor called ~LLRequestServerAppearanceUpdateOnDestroy() + // - which called LLAppearanceMgr::requestServerAppearanceUpdate() + // - which called enqueueCoprocedure() + // - which tried to acquire the lock on pendingCoprocs... alas. + // Using a fresh, clean ptr_t ensures that no previous value is + // destroyed during pop_wait_for(). QueuedCoproc::ptr_t coproc; boost::fibers::channel_op_status status; { @@ -357,7 +380,7 @@ void LLCoprocedurePool::coprocedureInvokerCoro( if(status == boost::fibers::channel_op_status::timeout) { - LL_INFOS_ONCE() << "pool '" << mPoolName << "' stalled." << LL_ENDL; + LL_DEBUGS_ONCE("CoProcMgr") << "pool '" << mPoolName << "' waiting." << LL_ENDL; continue; } // we actually popped an item -- cgit v1.2.3 From cca777fdf51c0737a6c597a48c71c674f73ed7c7 Mon Sep 17 00:00:00 2001 From: Andrey Kleshchev Date: Fri, 24 Jul 2020 23:40:00 +0300 Subject: SL-13679 Event pump DupListenerName crash at login --- indra/llmessage/llcoproceduremanager.cpp | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) (limited to 'indra/llmessage/llcoproceduremanager.cpp') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index a7bd836c4d..42c19e3b1c 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -280,11 +280,14 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID), mCoroMapping() { - // store in our LLTempBoundListener so that when the LLCoprocedurePool is - // destroyed, we implicitly disconnect from this LLEventPump - mStatusListener = LLEventPumps::instance().obtain("LLApp").listen( - poolName, - [pendingCoprocs=mPendingCoprocs, poolName](const LLSD& status) + try + { + // store in our LLTempBoundListener so that when the LLCoprocedurePool is + // destroyed, we implicitly disconnect from this LLEventPump + // Monitores application status + mStatusListener = LLEventPumps::instance().obtain("LLApp").listen( + poolName + "_pool", // Make sure it won't repeat names from lleventcoro + [pendingCoprocs = mPendingCoprocs, poolName](const LLSD& status) { auto& statsd = status["status"]; if (statsd.asString() != "running") @@ -298,6 +301,19 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): } return false; }); + } + catch (const LLEventPump::DupListenerName &) + { + // This shounldn't be possible since LLCoprocedurePool is supposed to have unique names, + // yet it somehow did happen, as result pools got '_pool' suffix and this catch. + // + // If this somehow happens again it is better to crash later on shutdown due to pump + // not stopping coroutine and see warning in logs than on startup or during login. + LL_WARNS("CoProcMgr") << "Attempted to register dupplicate listener name: " << poolName + << "_pool. Failed to start listener." << LL_ENDL; + + llassert(0); // Fix Me! Ignoring missing listener! + } for (size_t count = 0; count < mPoolSize; ++count) { -- cgit v1.2.3