summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--indra/llcommon/llapp.cpp36
-rw-r--r--indra/llcommon/lleventcoro.cpp80
-rw-r--r--indra/llmessage/llcoproceduremanager.cpp47
3 files changed, 138 insertions, 25 deletions
diff --git a/indra/llcommon/llapp.cpp b/indra/llcommon/llapp.cpp
index 421af3006e..3dab632aef 100644
--- a/indra/llcommon/llapp.cpp
+++ b/indra/llcommon/llapp.cpp
@@ -49,6 +49,8 @@
#include "google_breakpad/exception_handler.h"
#include "stringize.h"
#include "llcleanup.h"
+#include "llevents.h"
+#include "llsdutil.h"
//
// Signal handling
@@ -561,10 +563,42 @@ void LLApp::runErrorHandler()
LLApp::setStopped();
}
+namespace
+{
+
+static std::map<LLApp::EAppStatus, const char*> statusDesc
+{
+ { LLApp::APP_STATUS_RUNNING, "running" },
+ { LLApp::APP_STATUS_QUITTING, "quitting" },
+ { LLApp::APP_STATUS_STOPPED, "stopped" },
+ { LLApp::APP_STATUS_ERROR, "error" }
+};
+
+} // anonymous namespace
+
// static
void LLApp::setStatus(EAppStatus status)
{
- sStatus = status;
+ sStatus = status;
+
+ // This can also happen very late in the application lifecycle -- don't
+ // resurrect a deleted LLSingleton
+ if (! LLEventPumps::wasDeleted())
+ {
+ // notify interested parties of status change
+ LLSD statsd;
+ auto found = statusDesc.find(status);
+ if (found != statusDesc.end())
+ {
+ statsd = found->second;
+ }
+ else
+ {
+ // unknown status? at least report value
+ statsd = LLSD::Integer(status);
+ }
+ LLEventPumps::instance().obtain("LLApp").post(llsd::map("status", statsd));
+ }
}
diff --git a/indra/llcommon/lleventcoro.cpp b/indra/llcommon/lleventcoro.cpp
index b374c9fa04..23e0012a1a 100644
--- a/indra/llcommon/lleventcoro.cpp
+++ b/indra/llcommon/lleventcoro.cpp
@@ -32,6 +32,7 @@
#include "lleventcoro.h"
// STL headers
#include <chrono>
+#include <exception>
// std headers
// external library headers
#include <boost/fiber/operations.hpp>
@@ -40,6 +41,7 @@
#include "llsdutil.h"
#include "llerror.h"
#include "llcoros.h"
+#include "stringize.h"
namespace
{
@@ -106,29 +108,39 @@ void storeToLLSDPath(LLSD& dest, const LLSD& path, const LLSD& value)
void llcoro::suspend()
{
+ LLCoros::checkStop();
+ LLCoros::TempStatus st("waiting one tick");
boost::this_fiber::yield();
}
void llcoro::suspendUntilTimeout(float seconds)
{
+ LLCoros::checkStop();
// The fact that we accept non-integer seconds means we should probably
// use granularity finer than one second. However, given the overhead of
// the rest of our processing, it seems silly to use granularity finer
// than a millisecond.
+ LLCoros::TempStatus st(STRINGIZE("waiting for " << seconds << "s"));
boost::this_fiber::sleep_for(std::chrono::milliseconds(long(seconds * 1000)));
}
namespace
{
-LLBoundListener postAndSuspendSetup(const std::string& callerName,
- const std::string& listenerName,
- LLCoros::Promise<LLSD>& promise,
- const LLSD& event,
- const LLEventPumpOrPumpName& requestPumpP,
- const LLEventPumpOrPumpName& replyPumpP,
- const LLSD& replyPumpNamePath)
+// returns a listener on replyPumpP, also on "mainloop" -- both should be
+// stored in LLTempBoundListeners on the caller's stack frame
+std::pair<LLBoundListener, LLBoundListener>
+postAndSuspendSetup(const std::string& callerName,
+ const std::string& listenerName,
+ LLCoros::Promise<LLSD>& promise,
+ const LLSD& event,
+ const LLEventPumpOrPumpName& requestPumpP,
+ const LLEventPumpOrPumpName& replyPumpP,
+ const LLSD& replyPumpNamePath)
{
+ // Before we get any farther -- should we be stopping instead of
+ // suspending?
+ LLCoros::checkStop();
// Get the consuming attribute for THIS coroutine, the one that's about to
// suspend. Don't call get_consuming() in the lambda body: that would
// return the consuming attribute for some other coroutine, most likely
@@ -138,6 +150,38 @@ LLBoundListener postAndSuspendSetup(const std::string& callerName,
// value to the promise, thus fulfilling its future
llassert_always_msg(replyPumpP, ("replyPump required for " + callerName));
LLEventPump& replyPump(replyPumpP.getPump());
+ // The relative order of the two listen() calls below would only matter if
+ // "LLApp" were an LLEventMailDrop. But if we ever go there, we'd want to
+ // notice the pending LLApp status first.
+ LLBoundListener stopper(
+ LLEventPumps::instance().obtain("LLApp").listen(
+ listenerName,
+ [&promise, listenerName](const LLSD& status)
+ {
+ // anything except "running" should wake up the waiting
+ // coroutine
+ auto& statsd = status["status"];
+ if (statsd.asString() != "running")
+ {
+ LL_DEBUGS("lleventcoro") << listenerName
+ << " spotted status " << statsd
+ << ", throwing Stopping" << LL_ENDL;
+ try
+ {
+ promise.set_exception(
+ std::make_exception_ptr(
+ LLCoros::Stopping("status " + statsd.asString())));
+ }
+ catch (const boost::fibers::promise_already_satisfied& exc)
+ {
+ LL_WARNS("lleventcoro") << listenerName
+ << " couldn't throw Stopping "
+ "because promise already set" << LL_ENDL;
+ }
+ }
+ // do not consume -- every listener must see status
+ return false;
+ }));
LLBoundListener connection(
replyPump.listen(
listenerName,
@@ -160,6 +204,7 @@ LLBoundListener postAndSuspendSetup(const std::string& callerName,
return false;
}
}));
+
// skip the "post" part if requestPump is default-constructed
if (requestPumpP)
{
@@ -179,7 +224,7 @@ LLBoundListener postAndSuspendSetup(const std::string& callerName,
LL_DEBUGS("lleventcoro") << callerName << ": coroutine " << listenerName
<< " about to wait on LLEventPump " << replyPump.getName()
<< LL_ENDL;
- return connection;
+ return { connection, stopper };
}
} // anonymous
@@ -190,15 +235,17 @@ LLSD llcoro::postAndSuspend(const LLSD& event, const LLEventPumpOrPumpName& requ
LLCoros::Promise<LLSD> promise;
std::string listenerName(listenerNameForCoro());
- // Store connection into an LLTempBoundListener so we implicitly
+ // Store both connections into LLTempBoundListeners so we implicitly
// disconnect on return from this function.
- LLTempBoundListener connection =
+ auto connections =
postAndSuspendSetup("postAndSuspend()", listenerName, promise,
event, requestPump, replyPump, replyPumpNamePath);
+ LLTempBoundListener connection(connections.first), stopper(connections.second);
// declare the future
LLCoros::Future<LLSD> future = LLCoros::getFuture(promise);
// calling get() on the future makes us wait for it
+ LLCoros::TempStatus st(STRINGIZE("waiting for " << replyPump.getPump().getName()));
LLSD value(future.get());
LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << listenerName
<< " resuming with " << value << LL_ENDL;
@@ -215,17 +262,22 @@ LLSD llcoro::postAndSuspendWithTimeout(const LLSD& event,
LLCoros::Promise<LLSD> promise;
std::string listenerName(listenerNameForCoro());
- // Store connection into an LLTempBoundListener so we implicitly
+ // Store both connections into LLTempBoundListeners so we implicitly
// disconnect on return from this function.
- LLTempBoundListener connection =
+ auto connections =
postAndSuspendSetup("postAndSuspendWithTimeout()", listenerName, promise,
event, requestPump, replyPump, replyPumpNamePath);
+ LLTempBoundListener connection(connections.first), stopper(connections.second);
// declare the future
LLCoros::Future<LLSD> future = LLCoros::getFuture(promise);
// wait for specified timeout
- boost::fibers::future_status status =
- future.wait_for(std::chrono::milliseconds(long(timeout * 1000)));
+ boost::fibers::future_status status;
+ {
+ LLCoros::TempStatus st(STRINGIZE("waiting for " << replyPump.getPump().getName()
+ << " for " << timeout << "s"));
+ status = future.wait_for(std::chrono::milliseconds(long(timeout * 1000)));
+ }
// if the future is NOT yet ready, return timeoutResult instead
if (status == boost::fibers::future_status::timeout)
{
diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp
index 89eb00a2b7..a8f6b8aa67 100644
--- a/indra/llmessage/llcoproceduremanager.cpp
+++ b/indra/llmessage/llcoproceduremanager.cpp
@@ -102,6 +102,7 @@ private:
size_t mPoolSize;
CoprocQueue_t mPendingCoprocs;
ActiveCoproc_t mActiveCoprocs;
+ LLTempBoundListener mStatusListener;
typedef std::map<std::string, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> CoroAdapterMap_t;
LLCore::HttpRequest::policy_t mHTTPPolicy;
@@ -149,7 +150,7 @@ LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std::
mPropertyDefineFn(keyName, size, "Coroutine Pool size for " + poolName);
}
- LL_WARNS() << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL;
+ LL_WARNS("CoProcMgr") << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL;
}
poolPtr_t pool(new LLCoprocedurePool(poolName, size));
@@ -214,9 +215,28 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size):
mPoolName(poolName),
mPoolSize(size),
mPendingCoprocs(DEFAULT_QUEUE_SIZE),
- mCoroMapping(),
- mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID)
+ mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID),
+ mCoroMapping()
{
+ // store in our LLTempBoundListener so that when the LLCoprocedurePool is
+ // destroyed, we implicitly disconnect from this LLEventPump
+ mStatusListener = LLEventPumps::instance().obtain("LLApp").listen(
+ poolName,
+ [this, poolName](const LLSD& status)
+ {
+ auto& statsd = status["status"];
+ if (statsd.asString() != "running")
+ {
+ LL_INFOS("CoProcMgr") << "Pool " << poolName
+ << " closing queue because status " << statsd
+ << LL_ENDL;
+ // This should ensure that all waiting coprocedures in this
+ // pool will wake up and terminate.
+ mPendingCoprocs.close();
+ }
+ return false;
+ });
+
for (size_t count = 0; count < mPoolSize; ++count)
{
LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter(new LLCoreHttpUtil::HttpCoroutineAdapter( mPoolName + "Adapter", mHTTPPolicy));
@@ -227,7 +247,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size):
mCoroMapping.insert(CoroAdapterMap_t::value_type(pooledCoro, httpAdapter));
}
- LL_INFOS() << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL;
+ LL_INFOS("CoProcMgr") << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL;
}
LLCoprocedurePool::~LLCoprocedurePool()
@@ -240,7 +260,7 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced
LLUUID id(LLUUID::generateNewID());
mPendingCoprocs.push(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc)));
- LL_INFOS() << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
+ LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
return id;
}
@@ -250,8 +270,17 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap
{
QueuedCoproc::ptr_t coproc;
boost::fibers::channel_op_status status;
- while ((status = mPendingCoprocs.pop_wait_for(coproc, std::chrono::seconds(10))) != boost::fibers::channel_op_status::closed)
+ for (;;)
{
+ {
+ LLCoros::TempStatus st("waiting for work for 10s");
+ status = mPendingCoprocs.pop_wait_for(coproc, std::chrono::seconds(10));
+ }
+ if (status == boost::fibers::channel_op_status::closed)
+ {
+ break;
+ }
+
if(status == boost::fibers::channel_op_status::timeout)
{
LL_INFOS_ONCE() << "pool '" << mPoolName << "' stalled." << LL_ENDL;
@@ -260,8 +289,7 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap
ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first;
- // Nicky: This is super spammy. Consider using LL_DEBUGS here?
- LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
+ LL_DEBUGS("CoProcMgr") << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
try
{
@@ -277,8 +305,7 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap
throw;
}
- // Nicky: This is super spammy. Consider using LL_DEBUGS here?
- LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL;
+ LL_DEBUGS("CoProcMgr") << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL;
mActiveCoprocs.erase(itActive);
}