diff options
| -rw-r--r-- | indra/llcommon/llapp.cpp | 36 | ||||
| -rw-r--r-- | indra/llcommon/lleventcoro.cpp | 80 | ||||
| -rw-r--r-- | indra/llmessage/llcoproceduremanager.cpp | 47 | 
3 files changed, 138 insertions, 25 deletions
| diff --git a/indra/llcommon/llapp.cpp b/indra/llcommon/llapp.cpp index 421af3006e..3dab632aef 100644 --- a/indra/llcommon/llapp.cpp +++ b/indra/llcommon/llapp.cpp @@ -49,6 +49,8 @@  #include "google_breakpad/exception_handler.h"  #include "stringize.h"  #include "llcleanup.h" +#include "llevents.h" +#include "llsdutil.h"  //  // Signal handling @@ -561,10 +563,42 @@ void LLApp::runErrorHandler()  	LLApp::setStopped();  } +namespace +{ + +static std::map<LLApp::EAppStatus, const char*> statusDesc +{ +    { LLApp::APP_STATUS_RUNNING,  "running" }, +    { LLApp::APP_STATUS_QUITTING, "quitting" }, +    { LLApp::APP_STATUS_STOPPED,  "stopped" }, +    { LLApp::APP_STATUS_ERROR,    "error" } +}; + +} // anonymous namespace +  // static  void LLApp::setStatus(EAppStatus status)  { -	sStatus = status; +    sStatus = status; + +    // This can also happen very late in the application lifecycle -- don't +    // resurrect a deleted LLSingleton +    if (! LLEventPumps::wasDeleted()) +    { +        // notify interested parties of status change +        LLSD statsd; +        auto found = statusDesc.find(status); +        if (found != statusDesc.end()) +        { +            statsd = found->second; +        } +        else +        { +            // unknown status? at least report value +            statsd = LLSD::Integer(status); +        } +        LLEventPumps::instance().obtain("LLApp").post(llsd::map("status", statsd)); +    }  } diff --git a/indra/llcommon/lleventcoro.cpp b/indra/llcommon/lleventcoro.cpp index b374c9fa04..23e0012a1a 100644 --- a/indra/llcommon/lleventcoro.cpp +++ b/indra/llcommon/lleventcoro.cpp @@ -32,6 +32,7 @@  #include "lleventcoro.h"  // STL headers  #include <chrono> +#include <exception>  // std headers  // external library headers  #include <boost/fiber/operations.hpp> @@ -40,6 +41,7 @@  #include "llsdutil.h"  #include "llerror.h"  #include "llcoros.h" +#include "stringize.h"  namespace  { @@ -106,29 +108,39 @@ void storeToLLSDPath(LLSD& dest, const LLSD& path, const LLSD& value)  void llcoro::suspend()  { +    LLCoros::checkStop(); +    LLCoros::TempStatus st("waiting one tick");      boost::this_fiber::yield();  }  void llcoro::suspendUntilTimeout(float seconds)  { +    LLCoros::checkStop();      // The fact that we accept non-integer seconds means we should probably      // use granularity finer than one second. However, given the overhead of      // the rest of our processing, it seems silly to use granularity finer      // than a millisecond. +    LLCoros::TempStatus st(STRINGIZE("waiting for " << seconds << "s"));      boost::this_fiber::sleep_for(std::chrono::milliseconds(long(seconds * 1000)));  }  namespace  { -LLBoundListener postAndSuspendSetup(const std::string& callerName, -                                    const std::string& listenerName, -                                    LLCoros::Promise<LLSD>& promise, -                                    const LLSD& event, -                                    const LLEventPumpOrPumpName& requestPumpP, -                                    const LLEventPumpOrPumpName& replyPumpP, -                                    const LLSD& replyPumpNamePath) +// returns a listener on replyPumpP, also on "mainloop" -- both should be +// stored in LLTempBoundListeners on the caller's stack frame +std::pair<LLBoundListener, LLBoundListener> +postAndSuspendSetup(const std::string& callerName, +                    const std::string& listenerName, +                    LLCoros::Promise<LLSD>& promise, +                    const LLSD& event, +                    const LLEventPumpOrPumpName& requestPumpP, +                    const LLEventPumpOrPumpName& replyPumpP, +                    const LLSD& replyPumpNamePath)  { +    // Before we get any farther -- should we be stopping instead of +    // suspending? +    LLCoros::checkStop();      // Get the consuming attribute for THIS coroutine, the one that's about to      // suspend. Don't call get_consuming() in the lambda body: that would      // return the consuming attribute for some other coroutine, most likely @@ -138,6 +150,38 @@ LLBoundListener postAndSuspendSetup(const std::string& callerName,      // value to the promise, thus fulfilling its future      llassert_always_msg(replyPumpP, ("replyPump required for " + callerName));      LLEventPump& replyPump(replyPumpP.getPump()); +    // The relative order of the two listen() calls below would only matter if +    // "LLApp" were an LLEventMailDrop. But if we ever go there, we'd want to +    // notice the pending LLApp status first. +    LLBoundListener stopper( +        LLEventPumps::instance().obtain("LLApp").listen( +            listenerName, +            [&promise, listenerName](const LLSD& status) +            { +                // anything except "running" should wake up the waiting +                // coroutine +                auto& statsd = status["status"]; +                if (statsd.asString() != "running") +                { +                    LL_DEBUGS("lleventcoro") << listenerName +                                             << " spotted status " << statsd +                                             << ", throwing Stopping" << LL_ENDL; +                    try +                    { +                        promise.set_exception( +                            std::make_exception_ptr( +                                LLCoros::Stopping("status " + statsd.asString()))); +                    } +                    catch (const boost::fibers::promise_already_satisfied& exc) +                    { +                        LL_WARNS("lleventcoro") << listenerName +                                                << " couldn't throw Stopping " +                                                   "because promise already set" << LL_ENDL; +                    } +                } +                // do not consume -- every listener must see status +                return false; +            }));      LLBoundListener connection(          replyPump.listen(              listenerName, @@ -160,6 +204,7 @@ LLBoundListener postAndSuspendSetup(const std::string& callerName,                      return false;                  }              })); +      // skip the "post" part if requestPump is default-constructed      if (requestPumpP)      { @@ -179,7 +224,7 @@ LLBoundListener postAndSuspendSetup(const std::string& callerName,      LL_DEBUGS("lleventcoro") << callerName << ": coroutine " << listenerName                               << " about to wait on LLEventPump " << replyPump.getName()                               << LL_ENDL; -    return connection; +    return { connection, stopper };  }  } // anonymous @@ -190,15 +235,17 @@ LLSD llcoro::postAndSuspend(const LLSD& event, const LLEventPumpOrPumpName& requ      LLCoros::Promise<LLSD> promise;      std::string listenerName(listenerNameForCoro()); -    // Store connection into an LLTempBoundListener so we implicitly +    // Store both connections into LLTempBoundListeners so we implicitly      // disconnect on return from this function. -    LLTempBoundListener connection = +    auto connections =          postAndSuspendSetup("postAndSuspend()", listenerName, promise,                              event, requestPump, replyPump, replyPumpNamePath); +    LLTempBoundListener connection(connections.first), stopper(connections.second);      // declare the future      LLCoros::Future<LLSD> future = LLCoros::getFuture(promise);      // calling get() on the future makes us wait for it +    LLCoros::TempStatus st(STRINGIZE("waiting for " << replyPump.getPump().getName()));      LLSD value(future.get());      LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << listenerName                               << " resuming with " << value << LL_ENDL; @@ -215,17 +262,22 @@ LLSD llcoro::postAndSuspendWithTimeout(const LLSD& event,      LLCoros::Promise<LLSD> promise;      std::string listenerName(listenerNameForCoro()); -    // Store connection into an LLTempBoundListener so we implicitly +    // Store both connections into LLTempBoundListeners so we implicitly      // disconnect on return from this function. -    LLTempBoundListener connection = +    auto connections =          postAndSuspendSetup("postAndSuspendWithTimeout()", listenerName, promise,                              event, requestPump, replyPump, replyPumpNamePath); +    LLTempBoundListener connection(connections.first), stopper(connections.second);      // declare the future      LLCoros::Future<LLSD> future = LLCoros::getFuture(promise);      // wait for specified timeout -    boost::fibers::future_status status = -        future.wait_for(std::chrono::milliseconds(long(timeout * 1000))); +    boost::fibers::future_status status; +    { +        LLCoros::TempStatus st(STRINGIZE("waiting for " << replyPump.getPump().getName() +                                         << " for " << timeout << "s")); +        status = future.wait_for(std::chrono::milliseconds(long(timeout * 1000))); +    }      // if the future is NOT yet ready, return timeoutResult instead      if (status == boost::fibers::future_status::timeout)      { diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 89eb00a2b7..a8f6b8aa67 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -102,6 +102,7 @@ private:      size_t          mPoolSize;      CoprocQueue_t   mPendingCoprocs;      ActiveCoproc_t  mActiveCoprocs; +    LLTempBoundListener mStatusListener;      typedef std::map<std::string, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> CoroAdapterMap_t;      LLCore::HttpRequest::policy_t mHTTPPolicy; @@ -149,7 +150,7 @@ LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std::              mPropertyDefineFn(keyName, size, "Coroutine Pool size for " + poolName);          } -        LL_WARNS() << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL; +        LL_WARNS("CoProcMgr") << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL;      }      poolPtr_t pool(new LLCoprocedurePool(poolName, size)); @@ -214,9 +215,28 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size):      mPoolName(poolName),      mPoolSize(size),      mPendingCoprocs(DEFAULT_QUEUE_SIZE), -    mCoroMapping(), -    mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID) +    mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID), +    mCoroMapping()  { +    // store in our LLTempBoundListener so that when the LLCoprocedurePool is +    // destroyed, we implicitly disconnect from this LLEventPump +    mStatusListener = LLEventPumps::instance().obtain("LLApp").listen( +        poolName, +        [this, poolName](const LLSD& status) +        { +            auto& statsd = status["status"]; +            if (statsd.asString() != "running") +            { +                LL_INFOS("CoProcMgr") << "Pool " << poolName +                                      << " closing queue because status " << statsd +                                      << LL_ENDL; +                // This should ensure that all waiting coprocedures in this +                // pool will wake up and terminate. +                mPendingCoprocs.close(); +            } +            return false; +        }); +      for (size_t count = 0; count < mPoolSize; ++count)      {          LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter(new LLCoreHttpUtil::HttpCoroutineAdapter( mPoolName + "Adapter", mHTTPPolicy)); @@ -227,7 +247,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size):          mCoroMapping.insert(CoroAdapterMap_t::value_type(pooledCoro, httpAdapter));      } -    LL_INFOS() << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL; +    LL_INFOS("CoProcMgr") << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL;  }  LLCoprocedurePool::~LLCoprocedurePool()  @@ -240,7 +260,7 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced      LLUUID id(LLUUID::generateNewID());      mPendingCoprocs.push(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc))); -    LL_INFOS() << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; +    LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;      return id;  } @@ -250,8 +270,17 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap  {      QueuedCoproc::ptr_t coproc;      boost::fibers::channel_op_status status; -    while ((status = mPendingCoprocs.pop_wait_for(coproc, std::chrono::seconds(10))) != boost::fibers::channel_op_status::closed) +    for (;;)      { +        { +            LLCoros::TempStatus st("waiting for work for 10s"); +            status = mPendingCoprocs.pop_wait_for(coproc, std::chrono::seconds(10)); +        } +        if (status == boost::fibers::channel_op_status::closed) +        { +            break; +        } +          if(status == boost::fibers::channel_op_status::timeout)          {              LL_INFOS_ONCE() << "pool '" << mPoolName << "' stalled." << LL_ENDL; @@ -260,8 +289,7 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap          ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first; -        // Nicky: This is super spammy. Consider using LL_DEBUGS here? -        LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; +        LL_DEBUGS("CoProcMgr") << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;          try          { @@ -277,8 +305,7 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap              throw;          } -        // Nicky: This is super spammy. Consider using LL_DEBUGS here? -        LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL; +        LL_DEBUGS("CoProcMgr") << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL;          mActiveCoprocs.erase(itActive);      } | 
