diff options
Diffstat (limited to 'indra/llmessage')
| -rw-r--r-- | indra/llmessage/CMakeLists.txt | 9 | ||||
| -rw-r--r-- | indra/llmessage/llassetstorage.cpp | 30 | ||||
| -rw-r--r-- | indra/llmessage/llavatarnamecache.cpp | 4 | ||||
| -rw-r--r-- | indra/llmessage/llbuffer.cpp | 1 | ||||
| -rw-r--r-- | indra/llmessage/llbufferstream.cpp | 1 | ||||
| -rw-r--r-- | indra/llmessage/llcoproceduremanager.cpp | 358 | ||||
| -rw-r--r-- | indra/llmessage/llcoproceduremanager.h | 10 | ||||
| -rw-r--r-- | indra/llmessage/llexperiencecache.cpp | 6 | ||||
| -rw-r--r-- | indra/llmessage/llextendedstatus.h | 64 | ||||
| -rw-r--r-- | indra/llmessage/lliosocket.cpp | 15 | ||||
| -rw-r--r-- | indra/llmessage/llproxy.cpp | 4 | ||||
| -rw-r--r-- | indra/llmessage/llproxy.h | 1 | ||||
| -rw-r--r-- | indra/llmessage/lltransfertargetvfile.cpp | 2 | ||||
| -rw-r--r-- | indra/llmessage/llxfer.cpp | 2 | ||||
| -rw-r--r-- | indra/llmessage/llxfer_mem.cpp | 2 | ||||
| -rw-r--r-- | indra/llmessage/message.cpp | 41 | ||||
| -rw-r--r-- | indra/llmessage/message.h | 132 | ||||
| -rw-r--r-- | indra/llmessage/tests/llcoproceduremanager_test.cpp | 178 | 
18 files changed, 598 insertions, 262 deletions
diff --git a/indra/llmessage/CMakeLists.txt b/indra/llmessage/CMakeLists.txt index e0922c0667..2f99ca069e 100644 --- a/indra/llmessage/CMakeLists.txt +++ b/indra/llmessage/CMakeLists.txt @@ -217,7 +217,7 @@ target_link_libraries(    ${NGHTTP2_LIBRARIES}    ${XMLRPCEPI_LIBRARIES}    ${LLCOREHTTP_LIBRARIES} -  ${BOOST_COROUTINE_LIBRARY} +  ${BOOST_FIBER_LIBRARY}    ${BOOST_CONTEXT_LIBRARY}    ${BOOST_SYSTEM_LIBRARY}    rt @@ -235,7 +235,7 @@ target_link_libraries(    ${NGHTTP2_LIBRARIES}    ${XMLRPCEPI_LIBRARIES}    ${LLCOREHTTP_LIBRARIES} -  ${BOOST_COROUTINE_LIBRARY} +  ${BOOST_FIBER_LIBRARY}    ${BOOST_CONTEXT_LIBRARY}    ${BOOST_SYSTEM_LIBRARY}    ) @@ -244,6 +244,7 @@ endif(LINUX)  # tests  if (LL_TESTS)    SET(llmessage_TEST_SOURCE_FILES +    llcoproceduremanager.cpp      llnamevalue.cpp      lltrustedmessageservice.cpp      lltemplatemessagedispatcher.cpp @@ -264,7 +265,7 @@ if (LINUX)      ${LLMESSAGE_LIBRARIES}      ${LLCOREHTTP_LIBRARIES}      ${JSONCPP_LIBRARIES} -    ${BOOST_COROUTINE_LIBRARY} +    ${BOOST_FIBER_LIBRARY}      ${BOOST_CONTEXT_LIBRARY}      rt      ${GOOGLEMOCK_LIBRARIES} @@ -280,7 +281,7 @@ else (LINUX)      ${LLMESSAGE_LIBRARIES}      ${LLCOREHTTP_LIBRARIES}      ${JSONCPP_LIBRARIES} -    ${BOOST_COROUTINE_LIBRARY} +    ${BOOST_FIBER_LIBRARY}      ${BOOST_CONTEXT_LIBRARY}      ${GOOGLEMOCK_LIBRARIES}      ) diff --git a/indra/llmessage/llassetstorage.cpp b/indra/llmessage/llassetstorage.cpp index 18b2b124e1..d7801b6ddc 100644 --- a/indra/llmessage/llassetstorage.cpp +++ b/indra/llmessage/llassetstorage.cpp @@ -426,11 +426,11 @@ void LLAssetStorage::_cleanupRequests(BOOL all, S32 error)          LLAssetRequest* tmp = *curiter;          if (tmp->mUpCallback)          { -            tmp->mUpCallback(tmp->getUUID(), tmp->mUserData, error, LL_EXSTAT_NONE); +            tmp->mUpCallback(tmp->getUUID(), tmp->mUserData, error, LLExtStat::NONE);          }          if (tmp->mDownCallback)          { -            tmp->mDownCallback(mVFS, tmp->getUUID(), tmp->getType(), tmp->mUserData, error, LL_EXSTAT_NONE); +            tmp->mDownCallback(mVFS, tmp->getUUID(), tmp->getType(), tmp->mUserData, error, LLExtStat::NONE);          }          if (tmp->mInfoCallback)          { @@ -465,7 +465,7 @@ bool LLAssetStorage::findInStaticVFSAndInvokeCallback(const LLUUID& uuid, LLAsse              // we've already got the file              if (callback)              { -                callback(mStaticVFS, uuid, type, user_data, LL_ERR_NOERR, LL_EXSTAT_VFS_CACHED); +                callback(mStaticVFS, uuid, type, user_data, LL_ERR_NOERR, LLExtStat::VFS_CACHED);              }              return true;          } @@ -506,7 +506,7 @@ void LLAssetStorage::getAssetData(const LLUUID uuid,          if (callback)          {              add(sFailedDownloadCount, 1); -            callback(mVFS, uuid, type, user_data, LL_ERR_ASSET_REQUEST_FAILED, LL_EXSTAT_NONE); +            callback(mVFS, uuid, type, user_data, LL_ERR_ASSET_REQUEST_FAILED, LLExtStat::NONE);          }          return;      } @@ -517,7 +517,7 @@ void LLAssetStorage::getAssetData(const LLUUID uuid,          if (callback)          {              add(sFailedDownloadCount, 1); -            callback(mVFS, uuid, type, user_data, LL_ERR_ASSET_REQUEST_NOT_IN_DATABASE, LL_EXSTAT_NULL_UUID); +            callback(mVFS, uuid, type, user_data, LL_ERR_ASSET_REQUEST_NOT_IN_DATABASE, LLExtStat::NULL_UUID);          }          return;      } @@ -540,7 +540,7 @@ void LLAssetStorage::getAssetData(const LLUUID uuid,          // unless there's a weird error          if (callback)          { -            callback(mVFS, uuid, type, user_data, LL_ERR_NOERR, LL_EXSTAT_VFS_CACHED); +            callback(mVFS, uuid, type, user_data, LL_ERR_NOERR, LLExtStat::VFS_CACHED);          }          LL_DEBUGS("AssetStorage") << "ASSET_TRACE asset " << uuid << " found in VFS" << LL_ENDL; @@ -694,7 +694,7 @@ void LLAssetStorage::downloadCompleteCallback(          }      } -    removeAndCallbackPendingDownloads(file_id, file_type, callback_id, callback_type, ext_status, result); +    removeAndCallbackPendingDownloads(file_id, file_type, callback_id, callback_type, result, ext_status);  }  void LLAssetStorage::getEstateAsset( @@ -719,7 +719,7 @@ void LLAssetStorage::getEstateAsset(          if (callback)          {              add(sFailedDownloadCount, 1); -            callback(mVFS, asset_id, atype, user_data, LL_ERR_ASSET_REQUEST_NOT_IN_DATABASE, LL_EXSTAT_NULL_UUID); +            callback(mVFS, asset_id, atype, user_data, LL_ERR_ASSET_REQUEST_NOT_IN_DATABASE, LLExtStat::NULL_UUID);          }          return;      } @@ -741,7 +741,7 @@ void LLAssetStorage::getEstateAsset(          // unless there's a weird error          if (callback)          { -            callback(mVFS, asset_id, atype, user_data, LL_ERR_NOERR, LL_EXSTAT_VFS_CACHED); +            callback(mVFS, asset_id, atype, user_data, LL_ERR_NOERR, LLExtStat::VFS_CACHED);          }      }      else @@ -792,7 +792,7 @@ void LLAssetStorage::getEstateAsset(              if (callback)              {                  add(sFailedDownloadCount, 1); -                callback(mVFS, asset_id, atype, user_data, LL_ERR_CIRCUIT_GONE, LL_EXSTAT_NO_UPSTREAM); +                callback(mVFS, asset_id, atype, user_data, LL_ERR_CIRCUIT_GONE, LLExtStat::NO_UPSTREAM);              }          }      } @@ -885,7 +885,7 @@ void LLAssetStorage::getInvItemAsset(          // unless there's a weird error          if (callback)          { -            callback(mVFS, asset_id, atype, user_data, LL_ERR_NOERR, LL_EXSTAT_VFS_CACHED); +            callback(mVFS, asset_id, atype, user_data, LL_ERR_NOERR, LLExtStat::VFS_CACHED);          }      }      else @@ -936,7 +936,7 @@ void LLAssetStorage::getInvItemAsset(              if (callback)              {                  add(sFailedDownloadCount, 1); -                callback(mVFS, asset_id, atype, user_data, LL_ERR_CIRCUIT_GONE, LL_EXSTAT_NO_UPSTREAM); +                callback(mVFS, asset_id, atype, user_data, LL_ERR_CIRCUIT_GONE, LLExtStat::NO_UPSTREAM);              }          }      } @@ -1034,7 +1034,7 @@ void LLAssetStorage::processUploadComplete(LLMessageSystem *msg, void **user_dat      msg->getBOOLFast(_PREHASH_AssetBlock, _PREHASH_Success, success);      asset_type = (LLAssetType::EType)asset_type_s8; -    this_ptr->_callUploadCallbacks(uuid, asset_type, success, LL_EXSTAT_NONE); +    this_ptr->_callUploadCallbacks(uuid, asset_type, success, LLExtStat::NONE);  }  void LLAssetStorage::_callUploadCallbacks(const LLUUID &uuid, LLAssetType::EType asset_type, BOOL success, LLExtStat ext_status ) @@ -1288,12 +1288,12 @@ bool LLAssetStorage::deletePendingRequestImpl(LLAssetStorage::request_list_t* re          // Run callbacks.          if (req->mUpCallback)          { -            req->mUpCallback(req->getUUID(), req->mUserData, error, LL_EXSTAT_REQUEST_DROPPED); +            req->mUpCallback(req->getUUID(), req->mUserData, error, LLExtStat::REQUEST_DROPPED);          }          if (req->mDownCallback)          {              add(sFailedDownloadCount, 1); -            req->mDownCallback(mVFS, req->getUUID(), req->getType(), req->mUserData, error, LL_EXSTAT_REQUEST_DROPPED); +            req->mDownCallback(mVFS, req->getUUID(), req->getType(), req->mUserData, error, LLExtStat::REQUEST_DROPPED);          }          if (req->mInfoCallback)          { diff --git a/indra/llmessage/llavatarnamecache.cpp b/indra/llmessage/llavatarnamecache.cpp index 6a287f0cc5..fbd65cc67b 100644 --- a/indra/llmessage/llavatarnamecache.cpp +++ b/indra/llmessage/llavatarnamecache.cpp @@ -134,7 +134,7 @@ LLAvatarNameCache::~LLAvatarNameCache()  void LLAvatarNameCache::requestAvatarNameCache_(std::string url, std::vector<LLUUID> agentIds)  { -    LL_DEBUGS("AvNameCache") << "Entering coroutine " << LLCoros::instance().getName() +    LL_DEBUGS("AvNameCache") << "Entering coroutine " << LLCoros::getName()          << " with url '" << url << "', requesting " << agentIds.size() << " Agent Ids" << LL_ENDL;      // Check pointer that can be cleaned up by cleanupClass() @@ -188,7 +188,7 @@ void LLAvatarNameCache::requestAvatarNameCache_(std::string url, std::vector<LLU      }      catch (...)      { -        LOG_UNHANDLED_EXCEPTION(STRINGIZE("coroutine " << LLCoros::instance().getName() +        LOG_UNHANDLED_EXCEPTION(STRINGIZE("coroutine " << LLCoros::getName()                                            << "('" << url << "', " << agentIds.size()                                            << " http result: " << httpResults.asString()                                            << " Agent Ids)")); diff --git a/indra/llmessage/llbuffer.cpp b/indra/llmessage/llbuffer.cpp index 1a0eceba0f..cfe38605ad 100644 --- a/indra/llmessage/llbuffer.cpp +++ b/indra/llmessage/llbuffer.cpp @@ -32,6 +32,7 @@  #include "llmath.h"  #include "llstl.h"  #include "llthread.h" +#include "llmutex.h"  #include <iterator>  #define ASSERT_LLBUFFERARRAY_MUTEX_LOCKED() llassert(!mMutexp || mMutexp->isSelfLocked()) diff --git a/indra/llmessage/llbufferstream.cpp b/indra/llmessage/llbufferstream.cpp index ff1c9993cc..39508c1c52 100644 --- a/indra/llmessage/llbufferstream.cpp +++ b/indra/llmessage/llbufferstream.cpp @@ -31,6 +31,7 @@  #include "llbuffer.h"  #include "llthread.h" +#include "llmutex.h"  static const S32 DEFAULT_OUTPUT_SEGMENT_SIZE = 1024 * 4; diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 74cdff2b00..42c19e3b1c 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -25,23 +25,29 @@  * $/LicenseInfo$  */ -#include "linden_common.h"  +#include "llwin32headers.h" + +#include "linden_common.h" +  #include "llcoproceduremanager.h" + +#include <chrono> + +#include <boost/fiber/buffered_channel.hpp> +  #include "llexception.h"  #include "stringize.h" -#include <boost/assign.hpp>  //=========================================================================  // Map of pool sizes for known pools -// *TODO$: When C++11 this can be initialized here as follows: -// = {{"AIS", 25}, {"Upload", 1}} -static std::map<std::string, U32> DefaultPoolSizes =  -    boost::assign::map_list_of -        (std::string("Upload"),  1) -        (std::string("AIS"),     1);     -        // *TODO: Rider for the moment keep AIS calls serialized otherwise the COF will tend to get out of sync. +static const std::map<std::string, U32> DefaultPoolSizes{ +	{std::string("Upload"),  1}, +    {std::string("AIS"),     1}, +    // *TODO: Rider for the moment keep AIS calls serialized otherwise the COF will tend to get out of sync. +}; -#define DEFAULT_POOL_SIZE 5 +static const U32 DEFAULT_POOL_SIZE = 5; +static const U32 DEFAULT_QUEUE_SIZE = 4096;  //=========================================================================  class LLCoprocedurePool: private boost::noncopyable @@ -50,7 +56,7 @@ public:      typedef LLCoprocedureManager::CoProcedure_t CoProcedure_t;      LLCoprocedurePool(const std::string &name, size_t size); -    virtual ~LLCoprocedurePool(); +    ~LLCoprocedurePool();      /// Places the coprocedure on the queue for processing.       ///  @@ -60,20 +66,11 @@ public:      /// @return This method returns a UUID that can be used later to cancel execution.      LLUUID enqueueCoprocedure(const std::string &name, CoProcedure_t proc); -    /// Cancel a coprocedure. If the coprocedure is already being actively executed  -    /// this method calls cancelSuspendedOperation() on the associated HttpAdapter -    /// If it has not yet been dequeued it is simply removed from the queue. -    bool cancelCoprocedure(const LLUUID &id); - -    /// Requests a shutdown of the upload manager. Passing 'true' will perform  -    /// an immediate kill on the upload coroutine. -    void shutdown(bool hardShutdown = false); -      /// Returns the number of coprocedures in the queue awaiting processing.      ///      inline size_t countPending() const      { -        return mPendingCoprocs.size(); +        return mPending;      }      /// Returns the number of coprocedures actively being processed. @@ -90,6 +87,8 @@ public:          return countPending() + countActive();      } +    void close(); +      private:      struct QueuedCoproc      { @@ -106,25 +105,29 @@ private:          CoProcedure_t mProc;      }; -    // we use a deque here rather than std::queue since we want to be able to  -    // iterate through the queue and potentially erase an entry from the middle. -    typedef std::deque<QueuedCoproc::ptr_t>  CoprocQueue_t; +    // we use a buffered_channel here rather than unbuffered_channel since we want to be able to  +    // push values without blocking,even if there's currently no one calling a pop operation (due to +    // fiber running right now) +    typedef boost::fibers::buffered_channel<QueuedCoproc::ptr_t>  CoprocQueue_t; +    // Use shared_ptr to control the lifespan of our CoprocQueue_t instance +    // because the consuming coroutine might outlive this LLCoprocedurePool +    // instance. +    typedef boost::shared_ptr<CoprocQueue_t> CoprocQueuePtr;      typedef std::map<LLUUID, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> ActiveCoproc_t;      std::string     mPoolName; -    size_t          mPoolSize; -    CoprocQueue_t   mPendingCoprocs; +    size_t          mPoolSize, mPending{0}; +    CoprocQueuePtr  mPendingCoprocs;      ActiveCoproc_t  mActiveCoprocs; -    bool            mShutdown; -    LLEventStream   mWakeupTrigger; +    LLTempBoundListener mStatusListener;      typedef std::map<std::string, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> CoroAdapterMap_t;      LLCore::HttpRequest::policy_t mHTTPPolicy;      CoroAdapterMap_t mCoroMapping; -    void coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter); - +    void coprocedureInvokerCoro(CoprocQueuePtr pendingCoprocs, +                                LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter);  };  //========================================================================= @@ -134,7 +137,7 @@ LLCoprocedureManager::LLCoprocedureManager()  LLCoprocedureManager::~LLCoprocedureManager()  { - +    close();  }  LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std::string &poolName) @@ -143,33 +146,34 @@ LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std::      std::string keyName = "PoolSize" + poolName;      int size = 0; -    if (poolName.empty()) -        LL_ERRS("CoprocedureManager") << "Poolname must not be empty" << LL_ENDL; +    LL_ERRS_IF(poolName.empty(), "CoprocedureManager") << "Poolname must not be empty" << LL_ENDL; -    if (mPropertyQueryFn && !mPropertyQueryFn.empty()) +    if (mPropertyQueryFn)      {          size = mPropertyQueryFn(keyName);      }      if (size == 0) -    {   // if not found grab the know default... if there is no known  +    { +        // if not found grab the know default... if there is no known           // default use a reasonable number like 5. -        std::map<std::string, U32>::iterator it = DefaultPoolSizes.find(poolName); -        if (it == DefaultPoolSizes.end()) -            size = DEFAULT_POOL_SIZE; -        else -            size = (*it).second; +        auto it = DefaultPoolSizes.find(poolName); +        size = (it != DefaultPoolSizes.end()) ? it->second : DEFAULT_POOL_SIZE; -        if (mPropertyDefineFn && !mPropertyDefineFn.empty()) +        if (mPropertyDefineFn) +        {              mPropertyDefineFn(keyName, size, "Coroutine Pool size for " + poolName); -        LL_WARNS() << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL; +        } + +        LL_WARNS("CoProcMgr") << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL;      }      poolPtr_t pool(new LLCoprocedurePool(poolName, size)); -    mPoolMap.insert(poolMap_t::value_type(poolName, pool)); +    LL_ERRS_IF(!pool, "CoprocedureManager") << "Unable to create pool named \"" << poolName << "\" FATAL!" << LL_ENDL; + +    bool inserted = mPoolMap.emplace(poolName, pool).second; +    LL_ERRS_IF(!inserted, "CoprocedureManager") << "Unable to add pool named \"" << poolName << "\" to map. FATAL!" << LL_ENDL; -    if (!pool) -        LL_ERRS("CoprocedureManager") << "Unable to create pool named \"" << poolName << "\" FATAL!" << LL_ENDL;      return pool;  } @@ -178,40 +182,13 @@ LLUUID LLCoprocedureManager::enqueueCoprocedure(const std::string &pool, const s  {      // Attempt to find the pool and enqueue the procedure.  If the pool does       // not exist, create it. -    poolPtr_t targetPool;      poolMap_t::iterator it = mPoolMap.find(pool); -    if (it == mPoolMap.end()) -    { -        targetPool = initializePool(pool); -    } -    else -    { -        targetPool = (*it).second; -    } +    poolPtr_t targetPool = (it != mPoolMap.end()) ? it->second : initializePool(pool);      return targetPool->enqueueCoprocedure(name, proc);  } -void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id) -{ -    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) -    { -        if ((*it).second->cancelCoprocedure(id)) -            return; -    } -    LL_INFOS() << "Coprocedure not found." << LL_ENDL; -} - -void LLCoprocedureManager::shutdown(bool hardShutdown) -{ -    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) -    { -        (*it).second->shutdown(hardShutdown); -    } -    mPoolMap.clear(); -} -  void LLCoprocedureManager::setPropertyMethods(SettingQuery_t queryfn, SettingUpdate_t updatefn)  {      mPropertyQueryFn = queryfn; @@ -222,9 +199,9 @@ void LLCoprocedureManager::setPropertyMethods(SettingQuery_t queryfn, SettingUpd  size_t LLCoprocedureManager::countPending() const  {      size_t count = 0; -    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) +    for (const auto& pair : mPoolMap)      { -        count += (*it).second->countPending(); +        count += pair.second->countPending();      }      return count;  } @@ -235,7 +212,7 @@ size_t LLCoprocedureManager::countPending(const std::string &pool) const      if (it == mPoolMap.end())          return 0; -    return (*it).second->countPending(); +    return it->second->countPending();  }  size_t LLCoprocedureManager::countActive() const @@ -243,7 +220,7 @@ size_t LLCoprocedureManager::countActive() const      size_t count = 0;      for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)      { -        count += (*it).second->countActive(); +        count += it->second->countActive();      }      return count;  } @@ -253,16 +230,18 @@ size_t LLCoprocedureManager::countActive(const std::string &pool) const      poolMap_t::const_iterator it = mPoolMap.find(pool);      if (it == mPoolMap.end()) +    {          return 0; -    return (*it).second->countActive(); +    } +    return it->second->countActive();  }  size_t LLCoprocedureManager::count() const  {      size_t count = 0; -    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) +    for (const auto& pair : mPoolMap)      { -        count += (*it).second->count(); +        count += pair.second->count();      }      return count;  } @@ -273,59 +252,86 @@ size_t LLCoprocedureManager::count(const std::string &pool) const      if (it == mPoolMap.end())          return 0; -    return (*it).second->count(); +    return it->second->count(); +} + +void LLCoprocedureManager::close() +{ +    for(auto & poolEntry : mPoolMap) +    { +        poolEntry.second->close(); +    } +} + +void LLCoprocedureManager::close(const std::string &pool) +{ +    poolMap_t::iterator it = mPoolMap.find(pool); +    if (it != mPoolMap.end()) +    { +        it->second->close(); +    }  }  //=========================================================================  LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size):      mPoolName(poolName),      mPoolSize(size), -    mPendingCoprocs(), -    mShutdown(false), -    mWakeupTrigger("CoprocedurePool" + poolName, true), -    mCoroMapping(), -    mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID) +    mPendingCoprocs(boost::make_shared<CoprocQueue_t>(DEFAULT_QUEUE_SIZE)), +    mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID), +    mCoroMapping()  { +    try +    { +        // store in our LLTempBoundListener so that when the LLCoprocedurePool is +        // destroyed, we implicitly disconnect from this LLEventPump +        // Monitores application status +        mStatusListener = LLEventPumps::instance().obtain("LLApp").listen( +            poolName + "_pool", // Make sure it won't repeat names from lleventcoro +            [pendingCoprocs = mPendingCoprocs, poolName](const LLSD& status) +        { +            auto& statsd = status["status"]; +            if (statsd.asString() != "running") +            { +                LL_INFOS("CoProcMgr") << "Pool " << poolName +                                      << " closing queue because status " << statsd +                                      << LL_ENDL; +                // This should ensure that all waiting coprocedures in this +                // pool will wake up and terminate. +                pendingCoprocs->close(); +            } +            return false; +        }); +    } +    catch (const LLEventPump::DupListenerName &) +    { +        // This shounldn't be possible since LLCoprocedurePool is supposed to have unique names, +        // yet it somehow did happen, as result pools got '_pool' suffix and this catch. +        // +        // If this somehow happens again it is better to crash later on shutdown due to pump +        // not stopping coroutine and see warning in logs than on startup or during login. +        LL_WARNS("CoProcMgr") << "Attempted to register dupplicate listener name: " << poolName +                              << "_pool. Failed to start listener." << LL_ENDL; + +        llassert(0); // Fix Me! Ignoring missing listener! +    } +      for (size_t count = 0; count < mPoolSize; ++count)      {          LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter(new LLCoreHttpUtil::HttpCoroutineAdapter( mPoolName + "Adapter", mHTTPPolicy)); -        std::string pooledCoro = LLCoros::instance().launch("LLCoprocedurePool("+mPoolName+")::coprocedureInvokerCoro", -            boost::bind(&LLCoprocedurePool::coprocedureInvokerCoro, this, httpAdapter)); +        std::string pooledCoro = LLCoros::instance().launch( +            "LLCoprocedurePool("+mPoolName+")::coprocedureInvokerCoro", +            boost::bind(&LLCoprocedurePool::coprocedureInvokerCoro, this, +                        mPendingCoprocs, httpAdapter));          mCoroMapping.insert(CoroAdapterMap_t::value_type(pooledCoro, httpAdapter));      } -    LL_INFOS() << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL; - -    mWakeupTrigger.post(LLSD()); +    LL_INFOS("CoProcMgr") << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items, queue max " << DEFAULT_QUEUE_SIZE << LL_ENDL;  }  LLCoprocedurePool::~LLCoprocedurePool()   { -    shutdown(); -} - -//------------------------------------------------------------------------- -void LLCoprocedurePool::shutdown(bool hardShutdown) -{ -    CoroAdapterMap_t::iterator it; - -    for (it = mCoroMapping.begin(); it != mCoroMapping.end(); ++it) -    { -        if (hardShutdown) -        { -            LLCoros::instance().kill((*it).first); -        } -        if ((*it).second) -        { -            (*it).second->cancelSuspendedOperation(); -        } -    } - -    mShutdown = true; -    mCoroMapping.clear(); -    mPendingCoprocs.clear();  }  //------------------------------------------------------------------------- @@ -333,76 +339,94 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced  {      LLUUID id(LLUUID::generateNewID()); -    mPendingCoprocs.push_back(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc))); -    LL_INFOS() << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; - -    mWakeupTrigger.post(LLSD()); - -    return id; -} - -bool LLCoprocedurePool::cancelCoprocedure(const LLUUID &id) -{ -    // first check the active coroutines.  If there, remove it and return. -    ActiveCoproc_t::iterator itActive = mActiveCoprocs.find(id); -    if (itActive != mActiveCoprocs.end()) +    LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueuing with id=" << id.asString() << " in pool \"" << mPoolName << "\" at " << mPending << LL_ENDL; +    auto pushed = mPendingCoprocs->try_push(boost::make_shared<QueuedCoproc>(name, id, proc)); +    if (pushed == boost::fibers::channel_op_status::success)      { -        LL_INFOS() << "Found and canceling active coprocedure with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; -        (*itActive).second->cancelSuspendedOperation(); -        mActiveCoprocs.erase(itActive); -        return true; +        ++mPending; +        return id;      } -    for (CoprocQueue_t::iterator it = mPendingCoprocs.begin(); it != mPendingCoprocs.end(); ++it) +    // Here we didn't succeed in pushing. Shutdown could be the reason. +    if (pushed == boost::fibers::channel_op_status::closed)      { -        if ((*it)->mId == id) -        { -            LL_INFOS() << "Found and removing queued coroutine(" << (*it)->mName << ") with Id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; -            mPendingCoprocs.erase(it); -            return true; -        } +        LL_WARNS("CoProcMgr") << "Discarding coprocedure '" << name << "' because shutdown" << LL_ENDL; +        return {};      } -    LL_INFOS() << "Coprocedure with Id=" << id.asString() << " was not found in pool \"" << mPoolName << "\"" << LL_ENDL; -    return false; +    // The queue should never fill up. +    LL_ERRS("CoProcMgr") << "Enqueue failed (" << unsigned(pushed) << ")" << LL_ENDL; +    return {};                      // never executed, pacify the compiler  }  //------------------------------------------------------------------------- -void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter) +void LLCoprocedurePool::coprocedureInvokerCoro( +    CoprocQueuePtr pendingCoprocs, +    LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter)  { -    LLCore::HttpRequest::ptr_t httpRequest(new LLCore::HttpRequest); - -    while (!mShutdown) +    for (;;)      { -        llcoro::suspendUntilEventOn(mWakeupTrigger); -        if (mShutdown) -            break; -         -        while (!mPendingCoprocs.empty()) +        // It is VERY IMPORTANT that we instantiate a new ptr_t just before +        // the pop_wait_for() call below. When this ptr_t was declared at +        // function scope (outside the for loop), NickyD correctly diagnosed a +        // mysterious hang condition due to: +        // - the second time through the loop, the ptr_t held the last pointer +        //   to the previous QueuedCoproc, which indirectly held the last +        //   LLPointer to an LLInventoryCallback instance +        // - while holding the lock on pendingCoprocs, pop_wait_for() assigned +        //   the popped value to the ptr_t variable +        // - assignment destroyed the previous value of that variable, which +        //   indirectly destroyed the LLInventoryCallback +        // - whose destructor called ~LLRequestServerAppearanceUpdateOnDestroy() +        // - which called LLAppearanceMgr::requestServerAppearanceUpdate() +        // - which called enqueueCoprocedure() +        // - which tried to acquire the lock on pendingCoprocs... alas. +        // Using a fresh, clean ptr_t ensures that no previous value is +        // destroyed during pop_wait_for(). +        QueuedCoproc::ptr_t coproc; +        boost::fibers::channel_op_status status; +        { +            LLCoros::TempStatus st("waiting for work for 10s"); +            status = pendingCoprocs->pop_wait_for(coproc, std::chrono::seconds(10)); +        } +        if (status == boost::fibers::channel_op_status::closed)          { -            QueuedCoproc::ptr_t coproc = mPendingCoprocs.front(); -            mPendingCoprocs.pop_front(); -            ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first; +            break; +        } -            LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; +        if(status == boost::fibers::channel_op_status::timeout) +        { +            LL_DEBUGS_ONCE("CoProcMgr") << "pool '" << mPoolName << "' waiting." << LL_ENDL; +            continue; +        } +        // we actually popped an item +        --mPending; -            try -            { -                coproc->mProc(httpAdapter, coproc->mId); -            } -            catch (...) -            { -                LOG_UNHANDLED_EXCEPTION(STRINGIZE("Coprocedure('" << coproc->mName -                                                  << "', id=" << coproc->mId.asString() -                                                  << ") in pool '" << mPoolName << "'")); -                // must NOT omit this or we deplete the pool -                mActiveCoprocs.erase(itActive); -                throw; -            } +        ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first; -            LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL; +        LL_DEBUGS("CoProcMgr") << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\" (" << mPending << " left)" << LL_ENDL; +        try +        { +            coproc->mProc(httpAdapter, coproc->mId); +        } +        catch (...) +        { +            LOG_UNHANDLED_EXCEPTION(STRINGIZE("Coprocedure('" << coproc->mName +                                              << "', id=" << coproc->mId.asString() +                                              << ") in pool '" << mPoolName << "'")); +            // must NOT omit this or we deplete the pool              mActiveCoprocs.erase(itActive); +            continue;          } + +        LL_DEBUGS("CoProcMgr") << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL; + +        mActiveCoprocs.erase(itActive);      }  } + +void LLCoprocedurePool::close() +{ +    mPendingCoprocs->close(); +} diff --git a/indra/llmessage/llcoproceduremanager.h b/indra/llmessage/llcoproceduremanager.h index 7d0e83180c..70204ba02b 100644 --- a/indra/llmessage/llcoproceduremanager.h +++ b/indra/llmessage/llcoproceduremanager.h @@ -32,6 +32,7 @@  #include "llcoros.h"  #include "llcorehttputil.h"  #include "lluuid.h" +#include <boost/smart_ptr/shared_ptr.hpp>  class LLCoprocedurePool; @@ -57,11 +58,7 @@ public:      /// Cancel a coprocedure. If the coprocedure is already being actively executed       /// this method calls cancelYieldingOperation() on the associated HttpAdapter      /// If it has not yet been dequeued it is simply removed from the queue. -    void cancelCoprocedure(const LLUUID &id); - -    /// Requests a shutdown of the upload manager. Passing 'true' will perform  -    /// an immediate kill on the upload coroutine. -    void shutdown(bool hardShutdown = false); +    //void cancelCoprocedure(const LLUUID &id);      void setPropertyMethods(SettingQuery_t queryfn, SettingUpdate_t updatefn); @@ -80,6 +77,9 @@ public:      size_t count() const;      size_t count(const std::string &pool) const; +    void close(); +    void close(const std::string &pool); +      private:      typedef boost::shared_ptr<LLCoprocedurePool> poolPtr_t; diff --git a/indra/llmessage/llexperiencecache.cpp b/indra/llmessage/llexperiencecache.cpp index aa7b3c1260..7d96ac4b02 100644 --- a/indra/llmessage/llexperiencecache.cpp +++ b/indra/llmessage/llexperiencecache.cpp @@ -338,10 +338,10 @@ void LLExperienceCache::requestExperiences()  	F64 now = LLFrameTimer::getTotalSeconds();      const U32 EXP_URL_SEND_THRESHOLD = 3000; -    const U32 PAGE_SIZE = EXP_URL_SEND_THRESHOLD / UUID_STR_LENGTH; +    const U32 PAGE_SIZE1 = EXP_URL_SEND_THRESHOLD / UUID_STR_LENGTH;      std::ostringstream ostr; -    ostr << urlBase << "?page_size=" << PAGE_SIZE; +    ostr << urlBase << "?page_size=" << PAGE_SIZE1;      RequestQueue_t  requests;      while (!mRequestQueue.empty()) @@ -360,7 +360,7 @@ void LLExperienceCache::requestExperiences()                  boost::bind(&LLExperienceCache::requestExperiencesCoro, this, _1, ostr.str(), requests) );              ostr.str(std::string()); -            ostr << urlBase << "?page_size=" << PAGE_SIZE; +            ostr << urlBase << "?page_size=" << PAGE_SIZE1;              requests.clear();          }      } diff --git a/indra/llmessage/llextendedstatus.h b/indra/llmessage/llextendedstatus.h index 8ce173d1ff..9923d73c1a 100644 --- a/indra/llmessage/llextendedstatus.h +++ b/indra/llmessage/llextendedstatus.h @@ -28,40 +28,36 @@  #ifndef LL_LLEXTENDEDSTATUS_H  #define LL_LLEXTENDEDSTATUS_H - -typedef S32 LLExtStat; - - -// Status provider groups - Top bits indicate which status type it is -// Zero is common status code (next section) -const LLExtStat LL_EXSTAT_CURL_RESULT	= 1L<<30; // serviced by curl - use 1L if we really implement the below -const LLExtStat LL_EXSTAT_RES_RESULT	= 2L<<30; // serviced by resident copy -const LLExtStat LL_EXSTAT_VFS_RESULT	= 3L<<30; // serviced by vfs - - -// Common Status Codes -// -const LLExtStat LL_EXSTAT_NONE				= 0x00000; // No extra info here - sorry! -const LLExtStat LL_EXSTAT_NULL_UUID			= 0x10001; // null asset ID -const LLExtStat LL_EXSTAT_NO_UPSTREAM		= 0x10002; // attempt to upload without a valid upstream method/provider -const LLExtStat LL_EXSTAT_REQUEST_DROPPED	= 0x10003; // request was dropped unserviced -const LLExtStat LL_EXSTAT_NONEXISTENT_FILE	= 0x10004; // trying to upload a file that doesn't exist -const LLExtStat LL_EXSTAT_BLOCKED_FILE		= 0x10005; // trying to upload a file that we can't open - - -// curl status codes: -// -// Mask off LL_EXSTAT_CURL_RESULT for original result and -// see: libraries/include/curl/curl.h - - -// Memory-Resident status codes: -// None at present - - -// VFS status codes: -const LLExtStat LL_EXSTAT_VFS_CACHED	= LL_EXSTAT_VFS_RESULT | 0x0001; -const LLExtStat LL_EXSTAT_VFS_CORRUPT	= LL_EXSTAT_VFS_RESULT | 0x0002; +enum class LLExtStat: uint32_t +{ +	// Status provider groups - Top bits indicate which status type it is +	// Zero is common status code (next section) +	CURL_RESULT	= 1UL<<30, // serviced by curl - use 1L if we really implement the below +	RES_RESULT	= 2UL<<30, // serviced by resident copy +	VFS_RESULT	= 3UL<<30, // serviced by vfs + + +	// Common Status Codes +	// +	NONE			= 0x00000, // No extra info here - sorry! +	NULL_UUID		= 0x10001, // null asset ID +	NO_UPSTREAM		= 0x10002, // attempt to upload without a valid upstream method/provider +	REQUEST_DROPPED	= 0x10003, // request was dropped unserviced +	NONEXISTENT_FILE= 0x10004, // trying to upload a file that doesn't exist +	BLOCKED_FILE	= 0x10005, // trying to upload a file that we can't open + +	// curl status codes: +	// +	// Mask off CURL_RESULT for original result and +	// see: libraries/include/curl/curl.h + +	// Memory-Resident status codes: +	// None at present + +	// VFS status codes: +	VFS_CACHED	= VFS_RESULT | 0x0001, +	VFS_CORRUPT	= VFS_RESULT | 0x0002, +};  #endif // LL_LLEXTENDEDSTATUS_H diff --git a/indra/llmessage/lliosocket.cpp b/indra/llmessage/lliosocket.cpp index 7caf0766b7..a9cc71c365 100644 --- a/indra/llmessage/lliosocket.cpp +++ b/indra/llmessage/lliosocket.cpp @@ -62,9 +62,9 @@ bool is_addr_in_use(apr_status_t status)  #endif  } -#if LL_LINUX +#if ! LL_WINDOWS  // Define this to see the actual file descriptors being tossed around. -//#define LL_DEBUG_SOCKET_FILE_DESCRIPTORS 1 +#define LL_DEBUG_SOCKET_FILE_DESCRIPTORS 1  #if LL_DEBUG_SOCKET_FILE_DESCRIPTORS  #include "apr_portable.h"  #endif @@ -77,7 +77,7 @@ void ll_debug_socket(const char* msg, apr_socket_t* apr_sock)  #if LL_DEBUG_SOCKET_FILE_DESCRIPTORS  	if(!apr_sock)  	{ -		LL_DEBUGS() << "Socket -- " << (msg?msg:"") << ": no socket." << LL_ENDL; +		LL_DEBUGS("Socket") << "Socket -- " << (msg?msg:"") << ": no socket." << LL_ENDL;  		return;  	}  	// *TODO: Why doesn't this work? @@ -85,12 +85,12 @@ void ll_debug_socket(const char* msg, apr_socket_t* apr_sock)  	int os_sock;  	if(APR_SUCCESS == apr_os_sock_get(&os_sock, apr_sock))  	{ -		LL_DEBUGS() << "Socket -- " << (msg?msg:"") << " on fd " << os_sock +		LL_DEBUGS("Socket") << "Socket -- " << (msg?msg:"") << " on fd " << os_sock  			<< " at " << apr_sock << LL_ENDL;  	}  	else  	{ -		LL_DEBUGS() << "Socket -- " << (msg?msg:"") << " no fd " +		LL_DEBUGS("Socket") << "Socket -- " << (msg?msg:"") << " no fd "  			<< " at " << apr_sock << LL_ENDL;  	}  #endif @@ -144,6 +144,9 @@ LLSocket::ptr_t LLSocket::create(apr_pool_t* pool, EType type, U16 port, const c  		if(new_pool) apr_pool_destroy(new_pool);  		return rv;  	} +	// At this point, the new LLSocket instance takes ownership of new_pool, +	// which is why no early return below this call explicitly destroys it: it +	// is instead cleaned up by ~LLSocket().  	rv = ptr_t(new LLSocket(socket, new_pool));  	if(port > 0)  	{ @@ -186,7 +189,7 @@ LLSocket::ptr_t LLSocket::create(apr_pool_t* pool, EType type, U16 port, const c  			}  		}  	} -	else +	else // port <= 0  	{  		// we need to indicate that we have an ephemeral port if the  		// previous calls were successful. It will diff --git a/indra/llmessage/llproxy.cpp b/indra/llmessage/llproxy.cpp index 950599217f..86bcfe6881 100644 --- a/indra/llmessage/llproxy.cpp +++ b/indra/llmessage/llproxy.cpp @@ -115,9 +115,9 @@ S32 LLProxy::proxyHandshake(LLHost proxy)  		U32 request_size = socks_username.size() + socks_password.size() + 3;  		char * password_auth = new char[request_size];  		password_auth[0] = 0x01; -		password_auth[1] = socks_username.size(); +		password_auth[1] = (char)(socks_username.size());  		memcpy(&password_auth[2], socks_username.c_str(), socks_username.size()); -		password_auth[socks_username.size() + 2] = socks_password.size(); +		password_auth[socks_username.size() + 2] = (char)(socks_password.size());  		memcpy(&password_auth[socks_username.size() + 3], socks_password.c_str(), socks_password.size());  		authmethod_password_reply_t password_reply; diff --git a/indra/llmessage/llproxy.h b/indra/llmessage/llproxy.h index 87891901ad..a1ffa9e5d5 100644 --- a/indra/llmessage/llproxy.h +++ b/indra/llmessage/llproxy.h @@ -32,6 +32,7 @@  #include "llmemory.h"  #include "llsingleton.h"  #include "llthread.h" +#include "llmutex.h"  #include <curl/curl.h>  #include <string> diff --git a/indra/llmessage/lltransfertargetvfile.cpp b/indra/llmessage/lltransfertargetvfile.cpp index a572c68a7f..b27f0881e0 100644 --- a/indra/llmessage/lltransfertargetvfile.cpp +++ b/indra/llmessage/lltransfertargetvfile.cpp @@ -227,7 +227,7 @@ void LLTransferTargetVFile::completionCallback(const LLTSCode status)                  mParams.getAssetID(),                  mParams.getAssetType(),                  mParams.mRequestDatap, -                LL_EXSTAT_NONE); +				LLExtStat::NONE);          }          delete mParams.mRequestDatap;          mParams.mRequestDatap = NULL; diff --git a/indra/llmessage/llxfer.cpp b/indra/llmessage/llxfer.cpp index 32e0e2cc3b..93d5cfc131 100644 --- a/indra/llmessage/llxfer.cpp +++ b/indra/llmessage/llxfer.cpp @@ -319,7 +319,7 @@ S32 LLXfer::processEOF()  	if (mCallback)  	{ -		mCallback(mCallbackDataHandle,mCallbackResult,LL_EXSTAT_NONE); +		mCallback(mCallbackDataHandle,mCallbackResult, LLExtStat::NONE);  	}  	return(retval); diff --git a/indra/llmessage/llxfer_mem.cpp b/indra/llmessage/llxfer_mem.cpp index 78a3e4f558..da8534ecdc 100644 --- a/indra/llmessage/llxfer_mem.cpp +++ b/indra/llmessage/llxfer_mem.cpp @@ -112,7 +112,7 @@ S32 LLXfer_Mem::processEOF()  	if (mCallback)  	{ -		mCallback((void *)mBuffer,mBufferLength,mCallbackDataHandle,mCallbackResult,LL_EXSTAT_NONE); +		mCallback((void *)mBuffer,mBufferLength,mCallbackDataHandle,mCallbackResult, LLExtStat::NONE);  	}  	return(retval); diff --git a/indra/llmessage/message.cpp b/indra/llmessage/message.cpp index 6ef4025ab1..da62bb12e8 100644 --- a/indra/llmessage/message.cpp +++ b/indra/llmessage/message.cpp @@ -117,8 +117,8 @@ void LLMessageHandlerBridge::post(LLHTTPNode::ResponsePtr response,  	gMessageSystem->mLastSender = LLHost(input["sender"].asString());  	gMessageSystem->mPacketsIn += 1;  	gMessageSystem->mLLSDMessageReader->setMessage(namePtr, input["body"]); -	gMessageSystem->mMessageReader = gMessageSystem->mLLSDMessageReader; -	 +	LockMessageReader rdr(gMessageSystem->mMessageReader, gMessageSystem->mLLSDMessageReader); +  	if(gMessageSystem->callHandler(namePtr, false, gMessageSystem))  	{  		response->result(LLSD()); @@ -189,7 +189,7 @@ void LLMessageSystem::init()  	mTimingCallbackData = NULL;  	mMessageBuilder = NULL; -	mMessageReader = NULL; +	LockMessageReader(mMessageReader, NULL);  }  // Read file and build message templates @@ -230,7 +230,6 @@ LLMessageSystem::LLMessageSystem(const std::string& filename, U32 port,  	mTemplateMessageReader = new LLTemplateMessageReader(mMessageNumbers);  	mLLSDMessageReader = new LLSDMessageReader(); -	mMessageReader = NULL;  	// initialize various bits of net info  	mSocket = 0; @@ -330,7 +329,6 @@ LLMessageSystem::~LLMessageSystem()  	delete mTemplateMessageReader;  	mTemplateMessageReader = NULL; -	mMessageReader = NULL;  	delete mTemplateMessageBuilder;  	mTemplateMessageBuilder = NULL; @@ -480,11 +478,12 @@ LLCircuitData* LLMessageSystem::findCircuit(const LLHost& host,  }  // Returns TRUE if a valid, on-circuit message has been received. -BOOL LLMessageSystem::checkMessages( S64 frame_count ) +// Requiring a non-const LockMessageChecker reference ensures that +// mMessageReader has been set to mTemplateMessageReader. +BOOL LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count )  {  	// Pump   	BOOL	valid_packet = FALSE; -	mMessageReader = mTemplateMessageReader;  	LLTransferTargetVFile::updateQueue(); @@ -748,7 +747,7 @@ S32	LLMessageSystem::getReceiveBytes() const  } -void LLMessageSystem::processAcks(F32 collect_time) +void LLMessageSystem::processAcks(LockMessageChecker&, F32 collect_time)  {  	F64Seconds mt_sec = getMessageTimeSeconds();  	{ @@ -2062,8 +2061,9 @@ void LLMessageSystem::dispatch(  		return;  	}  	// enable this for output of message names -	//LL_INFOS("Messaging") << "< \"" << msg_name << "\"" << LL_ENDL; -	//LL_DEBUGS() << "data: " << LLSDNotationStreamer(message) << LL_ENDL;	    +	LL_DEBUGS("Messaging") << "< \"" << msg_name << "\"" << LL_ENDL; +	LL_DEBUGS("Messaging") << "context: " << context << LL_ENDL; +	LL_DEBUGS("Messaging") << "message: " << message << LL_ENDL;	     	handler->post(responsep, context, message);  } @@ -3268,6 +3268,8 @@ void null_message_callback(LLMessageSystem *msg, void **data)  // up, and then sending auth messages.  void LLMessageSystem::establishBidirectionalTrust(const LLHost &host, S64 frame_count )  { +	LockMessageChecker lmc(this); +  	std::string shared_secret = get_shared_secret();  	if(shared_secret.empty())  	{ @@ -3287,7 +3289,7 @@ void LLMessageSystem::establishBidirectionalTrust(const LLHost &host, S64 frame_  		addU8Fast(_PREHASH_PingID, 0);  		addU32Fast(_PREHASH_OldestUnacked, 0);  		sendMessage(host); -		if (checkMessages( frame_count )) +		if (lmc.checkMessages( frame_count ))  		{  			if (isMessageFast(_PREHASH_CompletePingCheck) &&  			    (getSender() == host)) @@ -3295,7 +3297,7 @@ void LLMessageSystem::establishBidirectionalTrust(const LLHost &host, S64 frame_  				break;  			}  		} -		processAcks(); +		lmc.processAcks();  		ms_sleep(1);  	} @@ -3314,8 +3316,8 @@ void LLMessageSystem::establishBidirectionalTrust(const LLHost &host, S64 frame_  		cdp = mCircuitInfo.findCircuit(host);  		if(!cdp) break; // no circuit anymore, no point continuing.  		if(cdp->getTrusted()) break; // circuit is trusted. -		checkMessages(frame_count); -		processAcks(); +		lmc.checkMessages(frame_count); +		lmc.processAcks();  		ms_sleep(1);  	}  } @@ -3973,11 +3975,18 @@ void LLMessageSystem::setTimeDecodesSpamThreshold( F32 seconds )  	LLMessageReader::setTimeDecodesSpamThreshold(seconds);  } +LockMessageChecker::LockMessageChecker(LLMessageSystem* msgsystem): +    // for the lifespan of this LockMessageChecker instance, use +    // LLTemplateMessageReader as msgsystem's mMessageReader +    LockMessageReader(msgsystem->mMessageReader, msgsystem->mTemplateMessageReader), +    mMessageSystem(msgsystem) +{} +  // HACK! babbage: return true if message rxed via either UDP or HTTP  // TODO: babbage: move gServicePump in to LLMessageSystem? -bool LLMessageSystem::checkAllMessages(S64 frame_count, LLPumpIO* http_pump) +bool LLMessageSystem::checkAllMessages(LockMessageChecker& lmc, S64 frame_count, LLPumpIO* http_pump)  { -	if(checkMessages(frame_count)) +	if(lmc.checkMessages(frame_count))  	{  		return true;  	} diff --git a/indra/llmessage/message.h b/indra/llmessage/message.h index 0af5a1b96d..52dbf871db 100644 --- a/indra/llmessage/message.h +++ b/indra/llmessage/message.h @@ -61,6 +61,8 @@  #include "llstoredmessage.h"  #include "boost/function.hpp"  #include "llpounceable.h" +#include "llcoros.h" +#include LLCOROS_MUTEX_HEADER  const U32 MESSAGE_MAX_STRINGS_LENGTH = 64;  const U32 MESSAGE_NUMBER_OF_HASH_BUCKETS = 8192; @@ -199,6 +201,91 @@ public:  	virtual void complete(const LLHost& host, const LLUUID& agent) const = 0;  }; +/** + * SL-12204: We've observed crashes when consumer code sets + * LLMessageSystem::mMessageReader, assuming that all subsequent processing of + * the current message will use the same mMessageReader value -- only to have + * a different coroutine sneak in and replace mMessageReader before + * completion. This is a limitation of sharing a stateful global resource for + * message parsing; instead code receiving a new message should instantiate a + * (trivially constructed) local message parser and use that. + * + * Until then, when one coroutine sets a particular LLMessageReader subclass + * as the current message reader, ensure that no other coroutine can replace + * it until the first coroutine has finished with its message. + * + * This is achieved with two helper classes. LLMessageSystem::mMessageReader + * is now an LLMessageReaderPointer instance, which can efficiently compare or + * dereference its contained LLMessageReader* but which cannot be directly + * assigned. To change the value of LLMessageReaderPointer, you must + * instantiate LockMessageReader with the LLMessageReader* you wish to make + * current. mMessageReader will have that value for the lifetime of the + * LockMessageReader instance, then revert to nullptr. Moreover, as its name + * implies, LockMessageReader locks the mutex in LLMessageReaderPointer so + * that any other coroutine instantiating LockMessageReader will block until + * the first coroutine has destroyed its instance. + */ +class LLMessageReaderPointer +{ +public: +    LLMessageReaderPointer(): mPtr(nullptr) {} +    // It is essential that comparison and dereferencing must be fast, which +    // is why we don't check for nullptr when dereferencing. +    LLMessageReader* operator->() const { return mPtr; } +    bool operator==(const LLMessageReader* other) const { return mPtr == other; } +    bool operator!=(const LLMessageReader* other) const { return ! (*this == other); } +private: +    // Only LockMessageReader can set mPtr. +    friend class LockMessageReader; +    LLMessageReader* mPtr; +    LLCoros::Mutex mMutex; +}; + +/** + * To set mMessageReader to nullptr: + * + * @code + * // use an anonymous instance that is destroyed immediately + * LockMessageReader(gMessageSystem->mMessageReader, nullptr); + * @endcode + * + * Why do we still require going through LockMessageReader at all? Because it + * would be Bad if any coroutine set mMessageReader to nullptr while another + * coroutine was still parsing a message. + */ +class LockMessageReader +{ +public: +    LockMessageReader(LLMessageReaderPointer& var, LLMessageReader* instance): +        mVar(var.mPtr), +        mLock(var.mMutex) +    { +        mVar = instance; +    } +    // Some compilers reportedly fail to suppress generating implicit copy +    // operations even though we have a move-only LockType data member. +    LockMessageReader(const LockMessageReader&) = delete; +    LockMessageReader& operator=(const LockMessageReader&) = delete; +    ~LockMessageReader() +    { +        mVar = nullptr; +    } +private: +    // capture a reference to LLMessageReaderPointer::mPtr +    decltype(LLMessageReaderPointer::mPtr)& mVar; +    // while holding a lock on LLMessageReaderPointer::mMutex +    LLCoros::LockType mLock; +}; + +/** + * LockMessageReader is great as long as you only need mMessageReader locked + * during a single LLMessageSystem function call. However, empirically the + * sequence from checkAllMessages() through processAcks() need mMessageReader + * locked to LLTemplateMessageReader. Enforce that by making them require an + * instance of LockMessageChecker. + */ +class LockMessageChecker; +  class LLMessageSystem : public LLMessageSenderInterface  {   private: @@ -331,8 +418,8 @@ public:  	bool addCircuitCode(U32 code, const LLUUID& session_id);  	BOOL	poll(F32 seconds); // Number of seconds that we want to block waiting for data, returns if data was received -	BOOL	checkMessages( S64 frame_count = 0 ); -	void	processAcks(F32 collect_time = 0.f); +	BOOL	checkMessages(LockMessageChecker&, S64 frame_count = 0 ); +	void	processAcks(LockMessageChecker&, F32 collect_time = 0.f);  	BOOL	isMessageFast(const char *msg);  	BOOL	isMessage(const char *msg) @@ -730,7 +817,7 @@ public:  		const LLSD& data);  	// Check UDP messages and pump http_pump to receive HTTP messages. -	bool checkAllMessages(S64 frame_count, LLPumpIO* http_pump); +	bool checkAllMessages(LockMessageChecker&, S64 frame_count, LLPumpIO* http_pump);  	// Moved to allow access from LLTemplateMessageDispatcher  	void clearReceiveState(); @@ -817,12 +904,13 @@ private:  	LLMessageBuilder* mMessageBuilder;  	LLTemplateMessageBuilder* mTemplateMessageBuilder;  	LLSDMessageBuilder* mLLSDMessageBuilder; -	LLMessageReader* mMessageReader; +	LLMessageReaderPointer mMessageReader;  	LLTemplateMessageReader* mTemplateMessageReader;  	LLSDMessageReader* mLLSDMessageReader;  	friend class LLMessageHandlerBridge; -	 +	friend class LockMessageChecker; +  	bool callHandler(const char *name, bool trustedSource,  					 LLMessageSystem* msg); @@ -835,6 +923,40 @@ private:  // external hook into messaging system  extern LLPounceable<LLMessageSystem*, LLPounceableStatic> gMessageSystem; +// Implementation of LockMessageChecker depends on definition of +// LLMessageSystem, hence must follow it. +class LockMessageChecker: public LockMessageReader +{ +public: +    LockMessageChecker(LLMessageSystem* msgsystem); + +    // For convenience, provide forwarding wrappers so you can call (e.g.) +    // checkAllMessages() on your LockMessageChecker instance instead of +    // passing the instance to LLMessageSystem::checkAllMessages(). Use +    // perfect forwarding to avoid having to maintain these wrappers in sync +    // with the target methods. +    template <typename... ARGS> +    bool checkAllMessages(ARGS&&... args) +    { +        return mMessageSystem->checkAllMessages(*this, std::forward<ARGS>(args)...); +    } + +    template <typename... ARGS> +    bool checkMessages(ARGS&&... args) +    { +        return mMessageSystem->checkMessages(*this, std::forward<ARGS>(args)...); +    } + +    template <typename... ARGS> +    void processAcks(ARGS&&... args) +    { +        return mMessageSystem->processAcks(*this, std::forward<ARGS>(args)...); +    } + +private: +    LLMessageSystem* mMessageSystem; +}; +  // Must specific overall system version, which is used to determine  // if a patch is available in the message template checksum verification.  // Return true if able to initialize system. diff --git a/indra/llmessage/tests/llcoproceduremanager_test.cpp b/indra/llmessage/tests/llcoproceduremanager_test.cpp new file mode 100644 index 0000000000..9db13a37b5 --- /dev/null +++ b/indra/llmessage/tests/llcoproceduremanager_test.cpp @@ -0,0 +1,178 @@ +/**  + * @file llcoproceduremanager_test.cpp + * @author Brad + * @date 2019-02 + * @brief LLCoprocedureManager unit test + * + * $LicenseInfo:firstyear=2019&license=viewerlgpl$ + * Second Life Viewer Source Code + * Copyright (C) 2010, Linden Research, Inc. + *  + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; + * version 2.1 of the License only. + *  + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + *  + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA + *  + * Linden Research, Inc., 945 Battery Street, San Francisco, CA  94111  USA + * $/LicenseInfo$ + */ + +#include "llwin32headers.h" + +#include "linden_common.h" +#include "llsdserialize.h" + +#include "../llcoproceduremanager.h" + +#include <functional> + +#include <boost/fiber/fiber.hpp> +#include <boost/fiber/buffered_channel.hpp> +#include <boost/fiber/unbuffered_channel.hpp> + +#include "../test/lltut.h" +#include "../test/sync.h" + + +#if LL_WINDOWS +// disable unreachable code warnings +#pragma warning(disable: 4702) +#endif + +LLCoreHttpUtil::HttpCoroutineAdapter::HttpCoroutineAdapter(std::string const&, unsigned int, unsigned int) +{ +} + +void LLCoreHttpUtil::HttpCoroutineAdapter::cancelSuspendedOperation() +{ +} + +LLCoreHttpUtil::HttpCoroutineAdapter::~HttpCoroutineAdapter() +{ +} + +LLCore::HttpRequest::HttpRequest() +{ +} + +LLCore::HttpRequest::~HttpRequest() +{ +} + +namespace tut +{ +    struct coproceduremanager_test +    { +        coproceduremanager_test() +        { +        } + +        ~coproceduremanager_test() +        { +            LLCoprocedureManager::instance().close(); +        } +    }; +    typedef test_group<coproceduremanager_test> coproceduremanager_t; +    typedef coproceduremanager_t::object coproceduremanager_object_t; +    tut::coproceduremanager_t tut_coproceduremanager("LLCoprocedureManager"); + + +    template<> template<> +    void coproceduremanager_object_t::test<1>() +    { +        Sync sync; +        int foo = 0; +        LLUUID queueId = LLCoprocedureManager::instance().enqueueCoprocedure("PoolName", "ProcName", +            [&foo, &sync] (LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t & ptr, const LLUUID & id) { +                sync.bump(); +                foo = 1; +            }); + +        sync.yield(); +        ensure_equals("coprocedure failed to update foo", foo, 1); +         +        LLCoprocedureManager::instance().close("PoolName"); +    } + +    template<> template<> +    void coproceduremanager_object_t::test<2>() +    { +        const size_t capacity = 2; +        boost::fibers::buffered_channel<std::function<void(void)>> chan(capacity); + +        boost::fibers::fiber worker([&chan]() { +            chan.value_pop()(); +        }); + +        chan.push([]() { +            LL_INFOS("Test") << "test 1" << LL_ENDL; +        }); + +        worker.join(); +    } + +    template<> template<> +    void coproceduremanager_object_t::test<3>() +    { +        boost::fibers::unbuffered_channel<std::function<void(void)>> chan; + +        boost::fibers::fiber worker([&chan]() { +            chan.value_pop()(); +        }); + +        chan.push([]() { +            LL_INFOS("Test") << "test 1" << LL_ENDL; +        }); + +        worker.join(); +    } + +    template<> template<> +    void coproceduremanager_object_t::test<4>() +    { +        boost::fibers::buffered_channel<std::function<void(void)>> chan(4); + +        boost::fibers::fiber worker([&chan]() { +            std::function<void(void)> f; + +            // using namespace std::chrono_literals; +            // const auto timeout = 5s; +            // boost::fibers::channel_op_status status; +            while (chan.pop(f) != boost::fibers::channel_op_status::closed) +            { +                LL_INFOS("CoWorker") << "got coproc" << LL_ENDL; +                f(); +            } +            LL_INFOS("CoWorker") << "got closed" << LL_ENDL; +        }); + +        int counter = 0; + +        for (int i = 0; i < 5; ++i) +        { +            LL_INFOS("CoMain") << "pushing coproc " << i << LL_ENDL; +            chan.push([&counter]() { +                LL_INFOS("CoProc") << "in coproc" << LL_ENDL; +                ++counter; +            }); +        } + +        LL_INFOS("CoMain") << "closing channel" << LL_ENDL; +        chan.close(); + +        LL_INFOS("CoMain") << "joining worker" << LL_ENDL; +        worker.join(); + +        LL_INFOS("CoMain") << "checking count" << LL_ENDL; +        ensure_equals("coprocedure failed to update counter", counter, 5); +    } +}  // namespace tut  | 
