summaryrefslogtreecommitdiff
path: root/indra/llmessage
diff options
context:
space:
mode:
authorNat Goodspeed <nat@lindenlab.com>2019-11-14 15:40:06 -0500
committerNat Goodspeed <nat@lindenlab.com>2020-03-25 19:06:13 -0400
commitb461b5dcefb753c908af5c62fb21049dc9f594b8 (patch)
treed3130087ae36a052276b3eb246eb7090f8c734c4 /indra/llmessage
parent7826683fa264add84ef7d87fae5f962d27471a19 (diff)
DRTVWR-476: Manually count items in LLCoprocedurePool's pending queue.
Reinstate LLCoprocedureManager::countPending() and count() methods. These were removed because boost::fibers::buffered_channel has no size() method, but since all users run within a single thread, it works to increment and decrement a simple counter. Add count information and max queue size to log messages.
Diffstat (limited to 'indra/llmessage')
-rw-r--r--indra/llmessage/llcoproceduremanager.cpp74
1 files changed, 68 insertions, 6 deletions
diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp
index 0b161ad2b4..1c925b7eea 100644
--- a/indra/llmessage/llcoproceduremanager.cpp
+++ b/indra/llmessage/llcoproceduremanager.cpp
@@ -33,7 +33,6 @@
#include <chrono>
-#include <boost/assign.hpp>
#include <boost/fiber/buffered_channel.hpp>
#include "llexception.h"
@@ -67,6 +66,13 @@ public:
/// @return This method returns a UUID that can be used later to cancel execution.
LLUUID enqueueCoprocedure(const std::string &name, CoProcedure_t proc);
+ /// Returns the number of coprocedures in the queue awaiting processing.
+ ///
+ inline size_t countPending() const
+ {
+ return mPending;
+ }
+
/// Returns the number of coprocedures actively being processed.
///
inline size_t countActive() const
@@ -74,6 +80,13 @@ public:
return mActiveCoprocs.size();
}
+ /// Returns the total number of coprocedures either queued or in active processing.
+ ///
+ inline size_t count() const
+ {
+ return countPending() + countActive();
+ }
+
void close();
private:
@@ -103,7 +116,7 @@ private:
typedef std::map<LLUUID, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> ActiveCoproc_t;
std::string mPoolName;
- size_t mPoolSize;
+ size_t mPoolSize, mPending{0};
CoprocQueuePtr mPendingCoprocs;
ActiveCoproc_t mActiveCoprocs;
LLTempBoundListener mStatusListener;
@@ -185,6 +198,26 @@ void LLCoprocedureManager::setPropertyMethods(SettingQuery_t queryfn, SettingUpd
mPropertyDefineFn = updatefn;
}
+//-------------------------------------------------------------------------
+size_t LLCoprocedureManager::countPending() const
+{
+ size_t count = 0;
+ for (const auto& pair : mPoolMap)
+ {
+ count += pair.second->countPending();
+ }
+ return count;
+}
+
+size_t LLCoprocedureManager::countPending(const std::string &pool) const
+{
+ poolMap_t::const_iterator it = mPoolMap.find(pool);
+
+ if (it == mPoolMap.end())
+ return 0;
+ return it->second->countPending();
+}
+
size_t LLCoprocedureManager::countActive() const
{
size_t count = 0;
@@ -206,6 +239,25 @@ size_t LLCoprocedureManager::countActive(const std::string &pool) const
return it->second->countActive();
}
+size_t LLCoprocedureManager::count() const
+{
+ size_t count = 0;
+ for (const auto& pair : mPoolMap)
+ {
+ count += pair.second->count();
+ }
+ return count;
+}
+
+size_t LLCoprocedureManager::count(const std::string &pool) const
+{
+ poolMap_t::const_iterator it = mPoolMap.find(pool);
+
+ if (it == mPoolMap.end())
+ return 0;
+ return it->second->count();
+}
+
void LLCoprocedureManager::close(const std::string &pool)
{
poolMap_t::iterator it = mPoolMap.find(pool);
@@ -254,7 +306,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size):
mCoroMapping.insert(CoroAdapterMap_t::value_type(pooledCoro, httpAdapter));
}
- LL_INFOS("CoProcMgr") << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL;
+ LL_INFOS("CoProcMgr") << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items, queue max " << DEFAULT_QUEUE_SIZE << LL_ENDL;
}
LLCoprocedurePool::~LLCoprocedurePool()
@@ -266,8 +318,16 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced
{
LLUUID id(LLUUID::generateNewID());
- mPendingCoprocs->push(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc)));
- LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
+ LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueuing with id=" << id.asString() << " in pool \"" << mPoolName << "\" at " << mPending << LL_ENDL;
+ auto pushed = mPendingCoprocs->try_push(boost::make_shared<QueuedCoproc>(name, id, proc));
+ // We don't really have a lot of good options if try_push() failed,
+ // perhaps because the consuming coroutine is gummed up or something. This
+ // method is probably called from code called by mainloop. If we toss an
+ // llcoro::suspend() call here, we'll circle back for another mainloop
+ // iteration, possibly resulting in being re-entered here. Let's avoid that.
+ LL_ERRS_IF(pushed != boost::fibers::channel_op_status::success, "CoProcMgr")
+ << "Enqueue failed because queue is " << int(pushed) << LL_ENDL;
+ ++mPending;
return id;
}
@@ -295,10 +355,12 @@ void LLCoprocedurePool::coprocedureInvokerCoro(
LL_INFOS_ONCE() << "pool '" << mPoolName << "' stalled." << LL_ENDL;
continue;
}
+ // we actually popped an item
+ --mPending;
ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first;
- LL_DEBUGS("CoProcMgr") << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
+ LL_DEBUGS("CoProcMgr") << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\" (" << mPending << " left)" << LL_ENDL;
try
{