diff options
| author | Rider Linden <rider@lindenlab.com> | 2015-07-28 15:29:51 -0700 | 
|---|---|---|
| committer | Rider Linden <rider@lindenlab.com> | 2015-07-28 15:29:51 -0700 | 
| commit | b57b0d97bb2fb880084cbcca1b915f8e67b442a5 (patch) | |
| tree | 787fe3b51aad8fe2b083efcf20fd05bf346bae1a | |
| parent | 7882396811fdf8b297f6d0c92d8e1e37859fde9d (diff) | |
Named pools of coroutines.
| -rwxr-xr-x | indra/newview/app_settings/settings.xml | 18 | ||||
| -rw-r--r-- | indra/newview/llcoproceduremanager.cpp | 288 | ||||
| -rw-r--r-- | indra/newview/llcoproceduremanager.h | 56 | ||||
| -rw-r--r-- | indra/newview/llviewerassetupload.cpp | 3 | 
4 files changed, 291 insertions, 74 deletions
diff --git a/indra/newview/app_settings/settings.xml b/indra/newview/app_settings/settings.xml index 2180a7f1a1..b4a4e41884 100755 --- a/indra/newview/app_settings/settings.xml +++ b/indra/newview/app_settings/settings.xml @@ -14340,6 +14340,24 @@        <key>Value</key>        <integer>1</integer>      </map> +    <key>PoolSizeAIS</key> +        <map> +        <key>Comment</key> +            <string>Coroutine Pool size for AIS</string> +        <key>Type</key> +            <string>U32</string> +        <key>Value</key> +            <integer>25</integer> +        </map> +    <key>PoolSizeUpload</key> +        <map> +        <key>Comment</key> +            <string>Coroutine Pool size for Upload</string> +        <key>Type</key> +            <string>U32</string> +        <key>Value</key> +            <real>1</real> +        </map>      <!-- Settings below are for back compatibility only.      They are not used in current viewer anymore. But they can't be removed to avoid diff --git a/indra/newview/llcoproceduremanager.cpp b/indra/newview/llcoproceduremanager.cpp index d3168985f8..e22a8b8013 100644 --- a/indra/newview/llcoproceduremanager.cpp +++ b/indra/newview/llcoproceduremanager.cpp @@ -1,5 +1,5 @@  /** -* @file llcoproceduremanager.cpp +* @file LLCoprocedurePool.cpp  * @author Rider Linden  * @brief Singleton class for managing asset uploads to the sim.  * @@ -33,43 +33,270 @@  #include "llcoproceduremanager.h"  //========================================================================= -#define COROCOUNT 1 +// Map of pool sizes for known pools +static std::map<std::string, U32> DefaultPoolSizes; + +// *TODO$: When C++11 this can be initialized here as follows: +// = {{"AIS", 25}, {"Upload", 1}} + +#define DEFAULT_POOL_SIZE 5 + +//========================================================================= +class LLCoprocedurePool: private boost::noncopyable +{ +public: +    typedef LLCoprocedureManager::CoProcedure_t CoProcedure_t; + +    LLCoprocedurePool(const std::string &name, size_t size); +    virtual ~LLCoprocedurePool(); + +    /// Places the coprocedure on the queue for processing.  +    ///  +    /// @param name Is used for debugging and should identify this coroutine. +    /// @param proc Is a bound function to be executed  +    ///  +    /// @return This method returns a UUID that can be used later to cancel execution. +    LLUUID enqueueCoprocedure(const std::string &name, CoProcedure_t proc); + +    /// Cancel a coprocedure. If the coprocedure is already being actively executed  +    /// this method calls cancelYieldingOperation() on the associated HttpAdapter +    /// If it has not yet been dequeued it is simply removed from the queue. +    bool cancelCoprocedure(const LLUUID &id); + +    /// Requests a shutdown of the upload manager. Passing 'true' will perform  +    /// an immediate kill on the upload coroutine. +    void shutdown(bool hardShutdown = false); + +    /// Returns the number of coprocedures in the queue awaiting processing. +    /// +    inline size_t countPending() const +    { +        return mPendingCoprocs.size(); +    } + +    /// Returns the number of coprocedures actively being processed. +    /// +    inline size_t countActive() const +    { +        return mActiveCoprocs.size(); +    } + +    /// Returns the total number of coprocedures either queued or in active processing. +    /// +    inline size_t count() const +    { +        return countPending() + countActive(); +    } + +private: +    struct QueuedCoproc +    { +        typedef boost::shared_ptr<QueuedCoproc> ptr_t; + +        QueuedCoproc(const std::string &name, const LLUUID &id, CoProcedure_t proc) : +            mName(name), +            mId(id), +            mProc(proc) +        {} + +        std::string mName; +        LLUUID mId; +        CoProcedure_t mProc; +    }; + +    // we use a deque here rather than std::queue since we want to be able to  +    // iterate through the queue and potentially erase an entry from the middle. +    typedef std::deque<QueuedCoproc::ptr_t>  CoprocQueue_t; +    typedef std::map<LLUUID, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> ActiveCoproc_t; + +    std::string     mPoolName; +    size_t          mPoolSize; +    CoprocQueue_t   mPendingCoprocs; +    ActiveCoproc_t  mActiveCoprocs; +    bool            mShutdown; +    LLEventStream   mWakeupTrigger; + +    typedef std::map<std::string, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> CoroAdapterMap_t; +    LLCore::HttpRequest::policy_t mHTTPPolicy; + +    CoroAdapterMap_t mCoroMapping; + +    void coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter); + +};  //========================================================================= -LLCoprocedureManager::LLCoprocedureManager(): -    LLSingleton<LLCoprocedureManager>(), +LLCoprocedureManager::LLCoprocedureManager() +{ +    DefaultPoolSizes.insert(std::map<std::string, U32>::value_type("Upload", 1)); +    DefaultPoolSizes.insert(std::map<std::string, U32>::value_type("AIS", 25)); +} + +LLCoprocedureManager::~LLCoprocedureManager() +{ + +} + +LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std::string &poolName) +{ +    // *TODO: Retrieve the actual number of concurrent coroutines fro gSavedSettings and +    // clamp to a "reasonable" number. +    std::string keyName = "PoolSize" + poolName; +    int size = 5; + +    size = gSavedSettings.getU32(keyName); +    if (size == 0) +    { +        std::map<std::string, U32>::iterator it = DefaultPoolSizes.find(poolName); +        if (it == DefaultPoolSizes.end()) +            size = DEFAULT_POOL_SIZE; +        else +            size = (*it).second; +        gSavedSettings.declareU32(keyName, size, "Coroutine Pool size for " + poolName, LLControlVariable::PERSIST_ALWAYS); +        LL_WARNS() << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL; +    } + +    poolPtr_t pool = poolPtr_t(new LLCoprocedurePool(poolName, size)); +    mPoolMap.insert(poolMap_t::value_type(poolName, pool)); + +    return pool; +} + +//------------------------------------------------------------------------- +LLUUID LLCoprocedureManager::enqueueCoprocedure(const std::string &pool, const std::string &name, CoProcedure_t proc) +{ +    poolPtr_t targetPool; +    poolMap_t::iterator it = mPoolMap.find(pool); + +    if (it == mPoolMap.end()) +    { +        targetPool = initializePool(pool); +    } +    else +    { +        targetPool = (*it).second; +    } + +    if (!targetPool) +    { +        LL_WARNS() << "LLCoprocedureManager unable to create coprocedure pool named \"" << pool << "\"" << LL_ENDL; +        return LLUUID::null; +    } + +    return targetPool->enqueueCoprocedure(name, proc); +} + +void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id) +{ +    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) +    { +        if ((*it).second->cancelCoprocedure(id)) +            return; +    } +    LL_INFOS() << "Coprocedure not found." << LL_ENDL; +} + +void LLCoprocedureManager::shutdown(bool hardShutdown) +{ +    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) +    { +        (*it).second->shutdown(hardShutdown); +    } +    mPoolMap.clear(); +} + +//------------------------------------------------------------------------- +size_t LLCoprocedureManager::countPending() const +{ +    size_t count = 0; +    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) +    { +        count += (*it).second->countPending(); +    } +    return count; +} + +size_t LLCoprocedureManager::countPending(const std::string &pool) const +{ +    poolMap_t::const_iterator it = mPoolMap.find(pool); + +    if (it == mPoolMap.end()) +        return 0; +    return (*it).second->countPending(); +} + +size_t LLCoprocedureManager::countActive() const +{ +    size_t count = 0; +    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) +    { +        count += (*it).second->countActive(); +    } +    return count; +} + +size_t LLCoprocedureManager::countActive(const std::string &pool) const +{ +    poolMap_t::const_iterator it = mPoolMap.find(pool); + +    if (it == mPoolMap.end()) +        return 0; +    return (*it).second->countActive(); +} + +size_t LLCoprocedureManager::count() const +{ +    size_t count = 0; +    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) +    { +        count += (*it).second->count(); +    } +    return count; +} + +size_t LLCoprocedureManager::count(const std::string &pool) const +{ +    poolMap_t::const_iterator it = mPoolMap.find(pool); + +    if (it == mPoolMap.end()) +        return 0; +    return (*it).second->count(); +} + +//========================================================================= +LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): +    mPoolName(poolName), +    mPoolSize(size),      mPendingCoprocs(),      mShutdown(false), -    mWakeupTrigger("CoprocedureManager", true), +    mWakeupTrigger("CoprocedurePool" + poolName, true),      mCoroMapping(),      mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID)  { - -    // *TODO: Retrieve the actual number of concurrent coroutines fro gSavedSettings and -    // clamp to a "reasonable" number. -    for (int count = 0; count < COROCOUNT; ++count) +    for (size_t count = 0; count < mPoolSize; ++count)      {          LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter =              LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t( -            new LLCoreHttpUtil::HttpCoroutineAdapter("uploadPostAdapter", mHTTPPolicy)); +            new LLCoreHttpUtil::HttpCoroutineAdapter( mPoolName + "Adapter", mHTTPPolicy)); -        std::string uploadCoro = LLCoros::instance().launch("LLCoprocedureManager::coprocedureInvokerCoro", -            boost::bind(&LLCoprocedureManager::coprocedureInvokerCoro, this, httpAdapter)); +        std::string uploadCoro = LLCoros::instance().launch("LLCoprocedurePool("+mPoolName+")::coprocedureInvokerCoro", +            boost::bind(&LLCoprocedurePool::coprocedureInvokerCoro, this, httpAdapter));          mCoroMapping.insert(CoroAdapterMap_t::value_type(uploadCoro, httpAdapter));      } +    LL_INFOS() << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL; +      mWakeupTrigger.post(LLSD());  } -LLCoprocedureManager::~LLCoprocedureManager()  +LLCoprocedurePool::~LLCoprocedurePool()   {      shutdown();  } -//========================================================================= - -void LLCoprocedureManager::shutdown(bool hardShutdown) +//------------------------------------------------------------------------- +void LLCoprocedurePool::shutdown(bool hardShutdown)  {      CoroAdapterMap_t::iterator it; @@ -93,46 +320,47 @@ void LLCoprocedureManager::shutdown(bool hardShutdown)      mPendingCoprocs.clear();  } -//========================================================================= -LLUUID LLCoprocedureManager::enqueueCoprocedure(const std::string &name, LLCoprocedureManager::CoProcedure_t proc) +//------------------------------------------------------------------------- +LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoprocedurePool::CoProcedure_t proc)  {      LLUUID id(LLUUID::generateNewID());      mPendingCoprocs.push_back(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc))); -    LL_INFOS() << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << LL_ENDL; +    LL_INFOS() << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;      mWakeupTrigger.post(LLSD());      return id;  } -void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id) +bool LLCoprocedurePool::cancelCoprocedure(const LLUUID &id)  {      // first check the active coroutines.  If there, remove it and return.      ActiveCoproc_t::iterator itActive = mActiveCoprocs.find(id);      if (itActive != mActiveCoprocs.end())      { -        LL_INFOS() << "Found and canceling active coprocedure with id=" << id.asString() << LL_ENDL; +        LL_INFOS() << "Found and canceling active coprocedure with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;          (*itActive).second->cancelYieldingOperation();          mActiveCoprocs.erase(itActive); -        return; +        return true;      }      for (CoprocQueue_t::iterator it = mPendingCoprocs.begin(); it != mPendingCoprocs.end(); ++it)      {          if ((*it)->mId == id)          { -            LL_INFOS() << "Found and removing queued coroutine(" << (*it)->mName << ") with Id=" << id.asString() << LL_ENDL; +            LL_INFOS() << "Found and removing queued coroutine(" << (*it)->mName << ") with Id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;              mPendingCoprocs.erase(it); -            return; +            return true;          }      } -    LL_INFOS() << "Coprocedure with Id=" << id.asString() << " was not found." << LL_ENDL; +    LL_INFOS() << "Coprocedure with Id=" << id.asString() << " was not found." << " in pool \"" << mPoolName << "\"" << LL_ENDL; +    return false;  } -//========================================================================= -void LLCoprocedureManager::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter) +//------------------------------------------------------------------------- +void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter)  {      LLCore::HttpRequest::ptr_t httpRequest(new LLCore::HttpRequest); @@ -148,7 +376,7 @@ void LLCoprocedureManager::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineA              mPendingCoprocs.pop_front();              mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)); -            LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << LL_ENDL; +            LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;              try              { @@ -161,10 +389,10 @@ void LLCoprocedureManager::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineA              }              catch (...)              { -                LL_WARNS() << "A non std::exception was thrown from " << coproc->mName << " with id=" << coproc->mId << "." << LL_ENDL; +                LL_WARNS() << "A non std::exception was thrown from " << coproc->mName << " with id=" << coproc->mId << "." << " in pool \"" << mPoolName << "\"" << LL_ENDL;              } -            LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << LL_ENDL; +            LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL;              ActiveCoproc_t::iterator itActive = mActiveCoprocs.find(coproc->mId);              if (itActive != mActiveCoprocs.end()) diff --git a/indra/newview/llcoproceduremanager.h b/indra/newview/llcoproceduremanager.h index 6ba3891e87..d7f74af76b 100644 --- a/indra/newview/llcoproceduremanager.h +++ b/indra/newview/llcoproceduremanager.h @@ -33,6 +33,8 @@  #include "llcorehttputil.h"  #include "lluuid.h" +class LLCoprocedurePool; +  class LLCoprocedureManager : public LLSingleton < LLCoprocedureManager >  {  public: @@ -47,7 +49,7 @@ public:      /// @param proc Is a bound function to be executed       ///       /// @return This method returns a UUID that can be used later to cancel execution. -    LLUUID enqueueCoprocedure(const std::string &name, CoProcedure_t proc); +    LLUUID enqueueCoprocedure(const std::string &pool, const std::string &name, CoProcedure_t proc);      /// Cancel a coprocedure. If the coprocedure is already being actively executed       /// this method calls cancelYieldingOperation() on the associated HttpAdapter @@ -60,58 +62,26 @@ public:      /// Returns the number of coprocedures in the queue awaiting processing.      /// -    inline size_t countPending() const -    { -        return mPendingCoprocs.size(); -    } +    size_t countPending() const; +    size_t countPending(const std::string &pool) const;      /// Returns the number of coprocedures actively being processed.      /// -    inline size_t countActive() const -    { -        return mActiveCoprocs.size(); -    } +    size_t countActive() const; +    size_t countActive(const std::string &pool) const;      /// Returns the total number of coprocedures either queued or in active processing.      /// -    inline size_t count() const -    { -        return countPending() + countActive(); -    } +    size_t count() const; +    size_t count(const std::string &pool) const;  private: -    struct QueuedCoproc -    { -        typedef boost::shared_ptr<QueuedCoproc> ptr_t; - -        QueuedCoproc(const std::string &name, const LLUUID &id, CoProcedure_t proc): -            mName(name), -            mId(id), -            mProc(proc) -        {} - -        std::string mName; -        LLUUID mId; -        CoProcedure_t mProc; -    }; -     -    // we use a deque here rather than std::queue since we want to be able to  -    // iterate through the queue and potentially erase an entry from the middle. -    typedef std::deque<QueuedCoproc::ptr_t>  CoprocQueue_t;   -    typedef std::map<LLUUID, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> ActiveCoproc_t; - -    CoprocQueue_t   mPendingCoprocs; -    ActiveCoproc_t  mActiveCoprocs; -    bool            mShutdown; -    LLEventStream   mWakeupTrigger; - - -    typedef std::map<std::string, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> CoroAdapterMap_t; -    LLCore::HttpRequest::policy_t mHTTPPolicy; +    typedef boost::shared_ptr<LLCoprocedurePool> poolPtr_t; +    typedef std::map<std::string, poolPtr_t> poolMap_t; -    CoroAdapterMap_t mCoroMapping; +    poolMap_t mPoolMap; -    void coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter); +    poolPtr_t initializePool(const std::string &poolName);  };  #endif diff --git a/indra/newview/llviewerassetupload.cpp b/indra/newview/llviewerassetupload.cpp index 4ef398d314..6c6d3a4f33 100644 --- a/indra/newview/llviewerassetupload.cpp +++ b/indra/newview/llviewerassetupload.cpp @@ -666,7 +666,8 @@ LLUUID LLViewerAssetUpload::EnqueueInventoryUpload(const std::string &url, const  {      std::string procName("LLViewerAssetUpload::AssetInventoryUploadCoproc("); -    LLUUID queueId = LLCoprocedureManager::getInstance()->enqueueCoprocedure(procName + LLAssetType::lookup(uploadInfo->getAssetType()) + ")", +    LLUUID queueId = LLCoprocedureManager::getInstance()->enqueueCoprocedure("Upload",  +        procName + LLAssetType::lookup(uploadInfo->getAssetType()) + ")",          boost::bind(&LLViewerAssetUpload::AssetInventoryUploadCoproc, _1, _2, url, uploadInfo));      return queueId;  | 
