diff options
author | Nat Goodspeed <nat@lindenlab.com> | 2019-11-14 15:40:06 -0500 |
---|---|---|
committer | Nat Goodspeed <nat@lindenlab.com> | 2020-03-25 19:06:13 -0400 |
commit | b461b5dcefb753c908af5c62fb21049dc9f594b8 (patch) | |
tree | d3130087ae36a052276b3eb246eb7090f8c734c4 /indra/llmessage | |
parent | 7826683fa264add84ef7d87fae5f962d27471a19 (diff) |
DRTVWR-476: Manually count items in LLCoprocedurePool's pending queue.
Reinstate LLCoprocedureManager::countPending() and count() methods. These were
removed because boost::fibers::buffered_channel has no size() method, but
since all users run within a single thread, it works to increment and
decrement a simple counter.
Add count information and max queue size to log messages.
Diffstat (limited to 'indra/llmessage')
-rw-r--r-- | indra/llmessage/llcoproceduremanager.cpp | 74 |
1 files changed, 68 insertions, 6 deletions
diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 0b161ad2b4..1c925b7eea 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -33,7 +33,6 @@ #include <chrono> -#include <boost/assign.hpp> #include <boost/fiber/buffered_channel.hpp> #include "llexception.h" @@ -67,6 +66,13 @@ public: /// @return This method returns a UUID that can be used later to cancel execution. LLUUID enqueueCoprocedure(const std::string &name, CoProcedure_t proc); + /// Returns the number of coprocedures in the queue awaiting processing. + /// + inline size_t countPending() const + { + return mPending; + } + /// Returns the number of coprocedures actively being processed. /// inline size_t countActive() const @@ -74,6 +80,13 @@ public: return mActiveCoprocs.size(); } + /// Returns the total number of coprocedures either queued or in active processing. + /// + inline size_t count() const + { + return countPending() + countActive(); + } + void close(); private: @@ -103,7 +116,7 @@ private: typedef std::map<LLUUID, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> ActiveCoproc_t; std::string mPoolName; - size_t mPoolSize; + size_t mPoolSize, mPending{0}; CoprocQueuePtr mPendingCoprocs; ActiveCoproc_t mActiveCoprocs; LLTempBoundListener mStatusListener; @@ -185,6 +198,26 @@ void LLCoprocedureManager::setPropertyMethods(SettingQuery_t queryfn, SettingUpd mPropertyDefineFn = updatefn; } +//------------------------------------------------------------------------- +size_t LLCoprocedureManager::countPending() const +{ + size_t count = 0; + for (const auto& pair : mPoolMap) + { + count += pair.second->countPending(); + } + return count; +} + +size_t LLCoprocedureManager::countPending(const std::string &pool) const +{ + poolMap_t::const_iterator it = mPoolMap.find(pool); + + if (it == mPoolMap.end()) + return 0; + return it->second->countPending(); +} + size_t LLCoprocedureManager::countActive() const { size_t count = 0; @@ -206,6 +239,25 @@ size_t LLCoprocedureManager::countActive(const std::string &pool) const return it->second->countActive(); } +size_t LLCoprocedureManager::count() const +{ + size_t count = 0; + for (const auto& pair : mPoolMap) + { + count += pair.second->count(); + } + return count; +} + +size_t LLCoprocedureManager::count(const std::string &pool) const +{ + poolMap_t::const_iterator it = mPoolMap.find(pool); + + if (it == mPoolMap.end()) + return 0; + return it->second->count(); +} + void LLCoprocedureManager::close(const std::string &pool) { poolMap_t::iterator it = mPoolMap.find(pool); @@ -254,7 +306,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): mCoroMapping.insert(CoroAdapterMap_t::value_type(pooledCoro, httpAdapter)); } - LL_INFOS("CoProcMgr") << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL; + LL_INFOS("CoProcMgr") << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items, queue max " << DEFAULT_QUEUE_SIZE << LL_ENDL; } LLCoprocedurePool::~LLCoprocedurePool() @@ -266,8 +318,16 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced { LLUUID id(LLUUID::generateNewID()); - mPendingCoprocs->push(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc))); - LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; + LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueuing with id=" << id.asString() << " in pool \"" << mPoolName << "\" at " << mPending << LL_ENDL; + auto pushed = mPendingCoprocs->try_push(boost::make_shared<QueuedCoproc>(name, id, proc)); + // We don't really have a lot of good options if try_push() failed, + // perhaps because the consuming coroutine is gummed up or something. This + // method is probably called from code called by mainloop. If we toss an + // llcoro::suspend() call here, we'll circle back for another mainloop + // iteration, possibly resulting in being re-entered here. Let's avoid that. + LL_ERRS_IF(pushed != boost::fibers::channel_op_status::success, "CoProcMgr") + << "Enqueue failed because queue is " << int(pushed) << LL_ENDL; + ++mPending; return id; } @@ -295,10 +355,12 @@ void LLCoprocedurePool::coprocedureInvokerCoro( LL_INFOS_ONCE() << "pool '" << mPoolName << "' stalled." << LL_ENDL; continue; } + // we actually popped an item + --mPending; ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first; - LL_DEBUGS("CoProcMgr") << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; + LL_DEBUGS("CoProcMgr") << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\" (" << mPending << " left)" << LL_ENDL; try { |