diff options
author | Nat Goodspeed <nat@lindenlab.com> | 2019-10-22 17:14:26 -0400 |
---|---|---|
committer | Nat Goodspeed <nat@lindenlab.com> | 2020-03-25 19:02:24 -0400 |
commit | 1345a02b21a83bc4ee7ff72efc1858e956f18c53 (patch) | |
tree | a85a70d9691eb5fb35063b971e729cca30ab7e23 | |
parent | 28a54c2f7b25b9fda763c51746701f0cd21c8018 (diff) |
DRTVWR-476: Terminate long-lived coroutines to avoid shutdown crash.
Add LLCoros::TempStatus instances around known suspension points so
printActiveCoroutines() can report what each suspended coroutine is waiting
for.
Similarly, sprinkle checkStop() calls at known suspension points.
Make LLApp::setStatus() post an event to a new LLEventPump "LLApp" with a
string corresponding to the status value being set, but only until
~LLEventPumps() -- since setStatus() also gets called very late in the
application's lifetime.
Make postAndSuspendSetup() (used by postAndSuspend(), suspendUntilEventOn(),
postAndSuspendWithTimeout(), suspendUntilEventOnWithTimeout()) add a listener
on the new "LLApp" LLEventPump that pushes the new LLCoros::Stopping exception
to the coroutine waiting on the LLCoros::Promise. Make it return the new
LLBoundListener along with the previous one.
Accordingly, make postAndSuspend() and postAndSuspendWithTimeout() store the
new LLBoundListener returned by postAndSuspendSetup() in a LLTempBoundListener
(as with the previous one) so it will automatically disconnect once the wait
is over.
Make each LLCoprocedurePool instance listen on "LLApp" with a listener that
closes the queue on which new work items are dispatched. Closing the queue
causes the waiting dispatch coroutine to terminate. Store the connection in an
LLTempBoundListener on the LLCoprocedurePool so it will disconnect
automatically on destruction.
Refactor the loop in coprocedureInvokerCoro() to instantiate TempStatus around
the suspending call.
Change a couple spammy LL_INFOS() calls to LL_DEBUGS(). Give all logging calls
in that module a "CoProcMgr" tag to make it straightforward to re-enable the
LL_DEBUGS() calls as desired.
-rw-r--r-- | indra/llcommon/llapp.cpp | 36 | ||||
-rw-r--r-- | indra/llcommon/lleventcoro.cpp | 80 | ||||
-rw-r--r-- | indra/llmessage/llcoproceduremanager.cpp | 47 |
3 files changed, 138 insertions, 25 deletions
diff --git a/indra/llcommon/llapp.cpp b/indra/llcommon/llapp.cpp index 421af3006e..3dab632aef 100644 --- a/indra/llcommon/llapp.cpp +++ b/indra/llcommon/llapp.cpp @@ -49,6 +49,8 @@ #include "google_breakpad/exception_handler.h" #include "stringize.h" #include "llcleanup.h" +#include "llevents.h" +#include "llsdutil.h" // // Signal handling @@ -561,10 +563,42 @@ void LLApp::runErrorHandler() LLApp::setStopped(); } +namespace +{ + +static std::map<LLApp::EAppStatus, const char*> statusDesc +{ + { LLApp::APP_STATUS_RUNNING, "running" }, + { LLApp::APP_STATUS_QUITTING, "quitting" }, + { LLApp::APP_STATUS_STOPPED, "stopped" }, + { LLApp::APP_STATUS_ERROR, "error" } +}; + +} // anonymous namespace + // static void LLApp::setStatus(EAppStatus status) { - sStatus = status; + sStatus = status; + + // This can also happen very late in the application lifecycle -- don't + // resurrect a deleted LLSingleton + if (! LLEventPumps::wasDeleted()) + { + // notify interested parties of status change + LLSD statsd; + auto found = statusDesc.find(status); + if (found != statusDesc.end()) + { + statsd = found->second; + } + else + { + // unknown status? at least report value + statsd = LLSD::Integer(status); + } + LLEventPumps::instance().obtain("LLApp").post(llsd::map("status", statsd)); + } } diff --git a/indra/llcommon/lleventcoro.cpp b/indra/llcommon/lleventcoro.cpp index b374c9fa04..23e0012a1a 100644 --- a/indra/llcommon/lleventcoro.cpp +++ b/indra/llcommon/lleventcoro.cpp @@ -32,6 +32,7 @@ #include "lleventcoro.h" // STL headers #include <chrono> +#include <exception> // std headers // external library headers #include <boost/fiber/operations.hpp> @@ -40,6 +41,7 @@ #include "llsdutil.h" #include "llerror.h" #include "llcoros.h" +#include "stringize.h" namespace { @@ -106,29 +108,39 @@ void storeToLLSDPath(LLSD& dest, const LLSD& path, const LLSD& value) void llcoro::suspend() { + LLCoros::checkStop(); + LLCoros::TempStatus st("waiting one tick"); boost::this_fiber::yield(); } void llcoro::suspendUntilTimeout(float seconds) { + LLCoros::checkStop(); // The fact that we accept non-integer seconds means we should probably // use granularity finer than one second. However, given the overhead of // the rest of our processing, it seems silly to use granularity finer // than a millisecond. + LLCoros::TempStatus st(STRINGIZE("waiting for " << seconds << "s")); boost::this_fiber::sleep_for(std::chrono::milliseconds(long(seconds * 1000))); } namespace { -LLBoundListener postAndSuspendSetup(const std::string& callerName, - const std::string& listenerName, - LLCoros::Promise<LLSD>& promise, - const LLSD& event, - const LLEventPumpOrPumpName& requestPumpP, - const LLEventPumpOrPumpName& replyPumpP, - const LLSD& replyPumpNamePath) +// returns a listener on replyPumpP, also on "mainloop" -- both should be +// stored in LLTempBoundListeners on the caller's stack frame +std::pair<LLBoundListener, LLBoundListener> +postAndSuspendSetup(const std::string& callerName, + const std::string& listenerName, + LLCoros::Promise<LLSD>& promise, + const LLSD& event, + const LLEventPumpOrPumpName& requestPumpP, + const LLEventPumpOrPumpName& replyPumpP, + const LLSD& replyPumpNamePath) { + // Before we get any farther -- should we be stopping instead of + // suspending? + LLCoros::checkStop(); // Get the consuming attribute for THIS coroutine, the one that's about to // suspend. Don't call get_consuming() in the lambda body: that would // return the consuming attribute for some other coroutine, most likely @@ -138,6 +150,38 @@ LLBoundListener postAndSuspendSetup(const std::string& callerName, // value to the promise, thus fulfilling its future llassert_always_msg(replyPumpP, ("replyPump required for " + callerName)); LLEventPump& replyPump(replyPumpP.getPump()); + // The relative order of the two listen() calls below would only matter if + // "LLApp" were an LLEventMailDrop. But if we ever go there, we'd want to + // notice the pending LLApp status first. + LLBoundListener stopper( + LLEventPumps::instance().obtain("LLApp").listen( + listenerName, + [&promise, listenerName](const LLSD& status) + { + // anything except "running" should wake up the waiting + // coroutine + auto& statsd = status["status"]; + if (statsd.asString() != "running") + { + LL_DEBUGS("lleventcoro") << listenerName + << " spotted status " << statsd + << ", throwing Stopping" << LL_ENDL; + try + { + promise.set_exception( + std::make_exception_ptr( + LLCoros::Stopping("status " + statsd.asString()))); + } + catch (const boost::fibers::promise_already_satisfied& exc) + { + LL_WARNS("lleventcoro") << listenerName + << " couldn't throw Stopping " + "because promise already set" << LL_ENDL; + } + } + // do not consume -- every listener must see status + return false; + })); LLBoundListener connection( replyPump.listen( listenerName, @@ -160,6 +204,7 @@ LLBoundListener postAndSuspendSetup(const std::string& callerName, return false; } })); + // skip the "post" part if requestPump is default-constructed if (requestPumpP) { @@ -179,7 +224,7 @@ LLBoundListener postAndSuspendSetup(const std::string& callerName, LL_DEBUGS("lleventcoro") << callerName << ": coroutine " << listenerName << " about to wait on LLEventPump " << replyPump.getName() << LL_ENDL; - return connection; + return { connection, stopper }; } } // anonymous @@ -190,15 +235,17 @@ LLSD llcoro::postAndSuspend(const LLSD& event, const LLEventPumpOrPumpName& requ LLCoros::Promise<LLSD> promise; std::string listenerName(listenerNameForCoro()); - // Store connection into an LLTempBoundListener so we implicitly + // Store both connections into LLTempBoundListeners so we implicitly // disconnect on return from this function. - LLTempBoundListener connection = + auto connections = postAndSuspendSetup("postAndSuspend()", listenerName, promise, event, requestPump, replyPump, replyPumpNamePath); + LLTempBoundListener connection(connections.first), stopper(connections.second); // declare the future LLCoros::Future<LLSD> future = LLCoros::getFuture(promise); // calling get() on the future makes us wait for it + LLCoros::TempStatus st(STRINGIZE("waiting for " << replyPump.getPump().getName())); LLSD value(future.get()); LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << listenerName << " resuming with " << value << LL_ENDL; @@ -215,17 +262,22 @@ LLSD llcoro::postAndSuspendWithTimeout(const LLSD& event, LLCoros::Promise<LLSD> promise; std::string listenerName(listenerNameForCoro()); - // Store connection into an LLTempBoundListener so we implicitly + // Store both connections into LLTempBoundListeners so we implicitly // disconnect on return from this function. - LLTempBoundListener connection = + auto connections = postAndSuspendSetup("postAndSuspendWithTimeout()", listenerName, promise, event, requestPump, replyPump, replyPumpNamePath); + LLTempBoundListener connection(connections.first), stopper(connections.second); // declare the future LLCoros::Future<LLSD> future = LLCoros::getFuture(promise); // wait for specified timeout - boost::fibers::future_status status = - future.wait_for(std::chrono::milliseconds(long(timeout * 1000))); + boost::fibers::future_status status; + { + LLCoros::TempStatus st(STRINGIZE("waiting for " << replyPump.getPump().getName() + << " for " << timeout << "s")); + status = future.wait_for(std::chrono::milliseconds(long(timeout * 1000))); + } // if the future is NOT yet ready, return timeoutResult instead if (status == boost::fibers::future_status::timeout) { diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 89eb00a2b7..a8f6b8aa67 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -102,6 +102,7 @@ private: size_t mPoolSize; CoprocQueue_t mPendingCoprocs; ActiveCoproc_t mActiveCoprocs; + LLTempBoundListener mStatusListener; typedef std::map<std::string, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> CoroAdapterMap_t; LLCore::HttpRequest::policy_t mHTTPPolicy; @@ -149,7 +150,7 @@ LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std:: mPropertyDefineFn(keyName, size, "Coroutine Pool size for " + poolName); } - LL_WARNS() << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL; + LL_WARNS("CoProcMgr") << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL; } poolPtr_t pool(new LLCoprocedurePool(poolName, size)); @@ -214,9 +215,28 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): mPoolName(poolName), mPoolSize(size), mPendingCoprocs(DEFAULT_QUEUE_SIZE), - mCoroMapping(), - mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID) + mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID), + mCoroMapping() { + // store in our LLTempBoundListener so that when the LLCoprocedurePool is + // destroyed, we implicitly disconnect from this LLEventPump + mStatusListener = LLEventPumps::instance().obtain("LLApp").listen( + poolName, + [this, poolName](const LLSD& status) + { + auto& statsd = status["status"]; + if (statsd.asString() != "running") + { + LL_INFOS("CoProcMgr") << "Pool " << poolName + << " closing queue because status " << statsd + << LL_ENDL; + // This should ensure that all waiting coprocedures in this + // pool will wake up and terminate. + mPendingCoprocs.close(); + } + return false; + }); + for (size_t count = 0; count < mPoolSize; ++count) { LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter(new LLCoreHttpUtil::HttpCoroutineAdapter( mPoolName + "Adapter", mHTTPPolicy)); @@ -227,7 +247,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): mCoroMapping.insert(CoroAdapterMap_t::value_type(pooledCoro, httpAdapter)); } - LL_INFOS() << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL; + LL_INFOS("CoProcMgr") << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL; } LLCoprocedurePool::~LLCoprocedurePool() @@ -240,7 +260,7 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced LLUUID id(LLUUID::generateNewID()); mPendingCoprocs.push(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc))); - LL_INFOS() << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; + LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; return id; } @@ -250,8 +270,17 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap { QueuedCoproc::ptr_t coproc; boost::fibers::channel_op_status status; - while ((status = mPendingCoprocs.pop_wait_for(coproc, std::chrono::seconds(10))) != boost::fibers::channel_op_status::closed) + for (;;) { + { + LLCoros::TempStatus st("waiting for work for 10s"); + status = mPendingCoprocs.pop_wait_for(coproc, std::chrono::seconds(10)); + } + if (status == boost::fibers::channel_op_status::closed) + { + break; + } + if(status == boost::fibers::channel_op_status::timeout) { LL_INFOS_ONCE() << "pool '" << mPoolName << "' stalled." << LL_ENDL; @@ -260,8 +289,7 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first; - // Nicky: This is super spammy. Consider using LL_DEBUGS here? - LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; + LL_DEBUGS("CoProcMgr") << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; try { @@ -277,8 +305,7 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap throw; } - // Nicky: This is super spammy. Consider using LL_DEBUGS here? - LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL; + LL_DEBUGS("CoProcMgr") << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL; mActiveCoprocs.erase(itActive); } |