diff options
| author | Brad Kittenbrink <brad@lindenlab.com> | 2019-03-11 17:42:39 -0700 | 
|---|---|---|
| committer | Nat Goodspeed <nat@lindenlab.com> | 2020-03-25 18:39:21 -0400 | 
| commit | 997bdfc88682de36f02931a22d3baa23f00b6ddb (patch) | |
| tree | 24f60a174630963f8f65c34843076ab2faef1edc | |
| parent | a6f31e9167c75982bb5eaba96ec6ac1f70ee31fb (diff) | |
First draft of boost::fibers::unbuffered_channel based implementation of LLCoprocedureManager
| -rw-r--r-- | indra/llmessage/llcoproceduremanager.cpp | 301 | ||||
| -rw-r--r-- | indra/llmessage/llcoproceduremanager.h | 4 | ||||
| -rw-r--r-- | indra/llmessage/tests/llcoproceduremanager_test.cpp | 81 | 
3 files changed, 234 insertions, 152 deletions
| diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 4c85dd999a..9501181509 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -25,23 +25,23 @@  * $/LicenseInfo$  */ -#include "linden_common.h"  +#include "linden_common.h"  #include "llcoproceduremanager.h"  #include "llexception.h"  #include "stringize.h"  #include <boost/assign.hpp> +#include <boost/fiber/unbuffered_channel.hpp>  //=========================================================================  // Map of pool sizes for known pools -// *TODO$: When C++11 this can be initialized here as follows: -// = {{"AIS", 25}, {"Upload", 1}} -static std::map<std::string, U32> DefaultPoolSizes =  -    boost::assign::map_list_of -        (std::string("Upload"),  1) -        (std::string("AIS"),     1);     -        // *TODO: Rider for the moment keep AIS calls serialized otherwise the COF will tend to get out of sync. +static const std::map<std::string, U32> DefaultPoolSizes = +{ +    {"Upload",  1}, +    {"AIS",     1}, +    // *TODO: Rider for the moment keep AIS calls serialized otherwise the COF will tend to get out of sync. +}; -#define DEFAULT_POOL_SIZE 5 +static const U32 DEFAULT_POOL_SIZE = 5;  //=========================================================================  class LLCoprocedurePool: private boost::noncopyable @@ -50,7 +50,7 @@ public:      typedef LLCoprocedureManager::CoProcedure_t CoProcedure_t;      LLCoprocedurePool(const std::string &name, size_t size); -    virtual ~LLCoprocedurePool(); +    ~LLCoprocedurePool();      /// Places the coprocedure on the queue for processing.       ///  @@ -63,18 +63,18 @@ public:      /// Cancel a coprocedure. If the coprocedure is already being actively executed       /// this method calls cancelSuspendedOperation() on the associated HttpAdapter      /// If it has not yet been dequeued it is simply removed from the queue. -    bool cancelCoprocedure(const LLUUID &id); +    //bool cancelCoprocedure(const LLUUID &id);      /// Requests a shutdown of the upload manager. Passing 'true' will perform       /// an immediate kill on the upload coroutine. -    void shutdown(bool hardShutdown = false); +    //void shutdown(bool hardShutdown = false); -    /// Returns the number of coprocedures in the queue awaiting processing. -    /// -    inline size_t countPending() const -    { -        return mPendingCoprocs.size(); -    } +//    /// Returns the number of coprocedures in the queue awaiting processing. +//    /// +//    inline size_t countPending() const +//    { +//        return mPendingCoprocs.size(); +//    }      /// Returns the number of coprocedures actively being processed.      /// @@ -83,13 +83,15 @@ public:          return mActiveCoprocs.size();      } -    /// Returns the total number of coprocedures either queued or in active processing. -    /// -    inline size_t count() const -    { -        return countPending() + countActive(); -    } +//    /// Returns the total number of coprocedures either queued or in active processing. +//    /// +//    inline size_t count() const +//    { +//        return countPending() + countActive(); +//    } +    void close(); +      private:      struct QueuedCoproc      { @@ -108,15 +110,15 @@ private:      // we use a deque here rather than std::queue since we want to be able to       // iterate through the queue and potentially erase an entry from the middle. -    typedef std::deque<QueuedCoproc::ptr_t>  CoprocQueue_t; +    // TODO - make this queue be backed by an unbuffered_channel +    typedef boost::fibers::unbuffered_channel<QueuedCoproc::ptr_t>  CoprocQueue_t;      typedef std::map<LLUUID, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> ActiveCoproc_t;      std::string     mPoolName;      size_t          mPoolSize;      CoprocQueue_t   mPendingCoprocs;      ActiveCoproc_t  mActiveCoprocs; -    bool            mShutdown; -    LLEventStream   mWakeupTrigger; +    //bool            mShutdown;      typedef std::map<std::string, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> CoroAdapterMap_t;      LLCore::HttpRequest::policy_t mHTTPPolicy; @@ -143,8 +145,7 @@ LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std::      std::string keyName = "PoolSize" + poolName;      int size = 0; -    if (poolName.empty()) -        LL_ERRS("CoprocedureManager") << "Poolname must not be empty" << LL_ENDL; +    LL_ERRS_IF(poolName.empty(), "CoprocedureManager") << "Poolname must not be empty" << LL_ENDL;      if (mPropertyQueryFn && !mPropertyQueryFn.empty())      { @@ -152,24 +153,26 @@ LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std::      }      if (size == 0) -    {   // if not found grab the know default... if there is no known  +    { +        // if not found grab the know default... if there is no known           // default use a reasonable number like 5. -        std::map<std::string, U32>::iterator it = DefaultPoolSizes.find(poolName); -        if (it == DefaultPoolSizes.end()) -            size = DEFAULT_POOL_SIZE; -        else -            size = (*it).second; +        auto it = DefaultPoolSizes.find(poolName); +        size = (it != DefaultPoolSizes.end()) ? it->second : DEFAULT_POOL_SIZE;          if (mPropertyDefineFn && !mPropertyDefineFn.empty()) +        {              mPropertyDefineFn(keyName, size, "Coroutine Pool size for " + poolName); +        } +          LL_WARNS() << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL;      }      poolPtr_t pool(new LLCoprocedurePool(poolName, size)); -    mPoolMap.insert(poolMap_t::value_type(poolName, pool)); +    LL_ERRS_IF(!pool, "CoprocedureManager") << "Unable to create pool named \"" << poolName << "\" FATAL!" << LL_ENDL; + +    bool inserted = mPoolMap.emplace(poolName, pool).second; +    LL_ERRS_IF(!inserted, "CoprocedureManager") << "Unable to add pool named \"" << poolName << "\" to map. FATAL!" << LL_ENDL; -    if (!pool) -        LL_ERRS("CoprocedureManager") << "Unable to create pool named \"" << poolName << "\" FATAL!" << LL_ENDL;      return pool;  } @@ -178,30 +181,24 @@ LLUUID LLCoprocedureManager::enqueueCoprocedure(const std::string &pool, const s  {      // Attempt to find the pool and enqueue the procedure.  If the pool does       // not exist, create it. -    poolPtr_t targetPool;      poolMap_t::iterator it = mPoolMap.find(pool); -    if (it == mPoolMap.end()) -    { -        targetPool = initializePool(pool); -    } -    else -    { -        targetPool = (*it).second; -    } +    poolPtr_t targetPool = (it != mPoolMap.end()) ? it->second : initializePool(pool);      return targetPool->enqueueCoprocedure(name, proc);  } -void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id) -{ -    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) -    { -        if ((*it).second->cancelCoprocedure(id)) -            return; -    } -    LL_INFOS() << "Coprocedure not found." << LL_ENDL; -} +//void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id) +//{ +//    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) +//    { +//        if (it->second->cancelCoprocedure(id)) +//        { +//            return; +//        } +//    } +//    LL_INFOS() << "Coprocedure not found." << LL_ENDL; +//}  /*==========================================================================*|  void LLCoprocedureManager::shutdown(bool hardShutdown) @@ -220,32 +217,32 @@ void LLCoprocedureManager::setPropertyMethods(SettingQuery_t queryfn, SettingUpd      mPropertyDefineFn = updatefn;  } -//------------------------------------------------------------------------- -size_t LLCoprocedureManager::countPending() const -{ -    size_t count = 0; -    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) -    { -        count += (*it).second->countPending(); -    } -    return count; -} - -size_t LLCoprocedureManager::countPending(const std::string &pool) const -{ -    poolMap_t::const_iterator it = mPoolMap.find(pool); - -    if (it == mPoolMap.end()) -        return 0; -    return (*it).second->countPending(); -} +////------------------------------------------------------------------------- +//size_t LLCoprocedureManager::countPending() const +//{ +//    size_t count = 0; +//    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) +//    { +//        count += (*it).second->countPending(); +//    } +//    return count; +//} +// +//size_t LLCoprocedureManager::countPending(const std::string &pool) const +//{ +//    poolMap_t::const_iterator it = mPoolMap.find(pool); +// +//    if (it == mPoolMap.end()) +//        return 0; +//    return (*it).second->countPending(); +//}  size_t LLCoprocedureManager::countActive() const  {      size_t count = 0;      for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)      { -        count += (*it).second->countActive(); +        count += it->second->countActive();      }      return count;  } @@ -255,27 +252,38 @@ size_t LLCoprocedureManager::countActive(const std::string &pool) const      poolMap_t::const_iterator it = mPoolMap.find(pool);      if (it == mPoolMap.end()) +    {          return 0; -    return (*it).second->countActive(); +    } +    return it->second->countActive();  } -size_t LLCoprocedureManager::count() const +//size_t LLCoprocedureManager::count() const +//{ +//    size_t count = 0; +//    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) +//    { +//        count += (*it).second->count(); +//    } +//    return count; +//} +// +//size_t LLCoprocedureManager::count(const std::string &pool) const +//{ +//    poolMap_t::const_iterator it = mPoolMap.find(pool); +// +//    if (it == mPoolMap.end()) +//        return 0; +//    return (*it).second->count(); +//} + +void LLCoprocedureManager::close(const std::string &pool)  { -    size_t count = 0; -    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it) +    poolMap_t::iterator it = mPoolMap.find(pool); +    if (it != mPoolMap.end())      { -        count += (*it).second->count(); +        it->second->close();      } -    return count; -} - -size_t LLCoprocedureManager::count(const std::string &pool) const -{ -    poolMap_t::const_iterator it = mPoolMap.find(pool); - -    if (it == mPoolMap.end()) -        return 0; -    return (*it).second->count();  }  //========================================================================= @@ -283,8 +291,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size):      mPoolName(poolName),      mPoolSize(size),      mPendingCoprocs(), -    mShutdown(false), -    mWakeupTrigger("CoprocedurePool" + poolName, true), +    //mShutdown(false),      mCoroMapping(),      mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID)  { @@ -299,8 +306,6 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size):      }      LL_INFOS() << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL; - -    mWakeupTrigger.post(LLSD());  }  LLCoprocedurePool::~LLCoprocedurePool()  @@ -339,76 +344,70 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced  {      LLUUID id(LLUUID::generateNewID()); -    mPendingCoprocs.push_back(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc))); +    mPendingCoprocs.push(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc)));      LL_INFOS() << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; -    mWakeupTrigger.post(LLSD()); -      return id;  } -bool LLCoprocedurePool::cancelCoprocedure(const LLUUID &id) -{ -    // first check the active coroutines.  If there, remove it and return. -    ActiveCoproc_t::iterator itActive = mActiveCoprocs.find(id); -    if (itActive != mActiveCoprocs.end()) -    { -        LL_INFOS() << "Found and canceling active coprocedure with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; -        (*itActive).second->cancelSuspendedOperation(); -        mActiveCoprocs.erase(itActive); -        return true; -    } - -    for (CoprocQueue_t::iterator it = mPendingCoprocs.begin(); it != mPendingCoprocs.end(); ++it) -    { -        if ((*it)->mId == id) -        { -            LL_INFOS() << "Found and removing queued coroutine(" << (*it)->mName << ") with Id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; -            mPendingCoprocs.erase(it); -            return true; -        } -    } - -    LL_INFOS() << "Coprocedure with Id=" << id.asString() << " was not found in pool \"" << mPoolName << "\"" << LL_ENDL; -    return false; -} +//bool LLCoprocedurePool::cancelCoprocedure(const LLUUID &id) +//{ +//    // first check the active coroutines.  If there, remove it and return. +//    ActiveCoproc_t::iterator itActive = mActiveCoprocs.find(id); +//    if (itActive != mActiveCoprocs.end()) +//    { +//        LL_INFOS() << "Found and canceling active coprocedure with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; +//        (*itActive).second->cancelSuspendedOperation(); +//        mActiveCoprocs.erase(itActive); +//        return true; +//    } +// +////    for (auto it: mPendingCoprocs) +////    { +////        if ((*it)->mId == id) +////        { +////            LL_INFOS() << "Found and removing queued coroutine(" << (*it)->mName << ") with Id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; +////            mPendingCoprocs.erase(it); +////            return true; +////        } +////    } +// +//    LL_INFOS() << "Coprocedure with Id=" << id.asString() << " was not found in pool \"" << mPoolName << "\"" << LL_ENDL; +//    return false; +//}  //-------------------------------------------------------------------------  void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter)  {      LLCore::HttpRequest::ptr_t httpRequest(new LLCore::HttpRequest); -    while (!mShutdown) +    QueuedCoproc::ptr_t coproc; +    while (mPendingCoprocs.pop(coproc) != boost::fibers::channel_op_status::closed)      { -        llcoro::suspendUntilEventOn(mWakeupTrigger); -        if (mShutdown) -            break; -         -        while (!mPendingCoprocs.empty()) -        { -            QueuedCoproc::ptr_t coproc = mPendingCoprocs.front(); -            mPendingCoprocs.pop_front(); -            ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first; - -            LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; - -            try -            { -                coproc->mProc(httpAdapter, coproc->mId); -            } -            catch (...) -            { -                LOG_UNHANDLED_EXCEPTION(STRINGIZE("Coprocedure('" << coproc->mName -                                                  << "', id=" << coproc->mId.asString() -                                                  << ") in pool '" << mPoolName << "'")); -                // must NOT omit this or we deplete the pool -                mActiveCoprocs.erase(itActive); -                throw; -            } - -            LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL; +        ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first; +        LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; + +        try +        { +            coproc->mProc(httpAdapter, coproc->mId); +        } +        catch (...) +        { +            LOG_UNHANDLED_EXCEPTION(STRINGIZE("Coprocedure('" << coproc->mName +                                              << "', id=" << coproc->mId.asString() +                                              << ") in pool '" << mPoolName << "'")); +            // must NOT omit this or we deplete the pool              mActiveCoprocs.erase(itActive); +            throw;          } + +        LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL; + +        mActiveCoprocs.erase(itActive);      }  } + +void LLCoprocedurePool::close() { +    mPendingCoprocs.close(); +} diff --git a/indra/llmessage/llcoproceduremanager.h b/indra/llmessage/llcoproceduremanager.h index ba6f97355c..93d699a714 100644 --- a/indra/llmessage/llcoproceduremanager.h +++ b/indra/llmessage/llcoproceduremanager.h @@ -57,7 +57,7 @@ public:      /// Cancel a coprocedure. If the coprocedure is already being actively executed       /// this method calls cancelYieldingOperation() on the associated HttpAdapter      /// If it has not yet been dequeued it is simply removed from the queue. -    void cancelCoprocedure(const LLUUID &id); +    //void cancelCoprocedure(const LLUUID &id);  /*==========================================================================*|      /// Requests a shutdown of the upload manager. Passing 'true' will perform  @@ -82,6 +82,8 @@ public:      size_t count() const;      size_t count(const std::string &pool) const; +    void close(const std::string &pool); +      private:      typedef boost::shared_ptr<LLCoprocedurePool> poolPtr_t; diff --git a/indra/llmessage/tests/llcoproceduremanager_test.cpp b/indra/llmessage/tests/llcoproceduremanager_test.cpp index e503798d88..17535abd1e 100644 --- a/indra/llmessage/tests/llcoproceduremanager_test.cpp +++ b/indra/llmessage/tests/llcoproceduremanager_test.cpp @@ -31,6 +31,12 @@  #include "../llcoproceduremanager.h" +#include <functional> + +#include <boost/fiber/fiber.hpp> +#include <boost/fiber/buffered_channel.hpp> +#include <boost/fiber/unbuffered_channel.hpp> +  #include "../test/lltut.h" @@ -81,5 +87,80 @@ namespace tut                  foo = 1;              });          ensure_equals("coprocedure failed to update foo", foo, 1); +         +        LLCoprocedureManager::instance().close("PoolName"); +    } + +    template<> template<> +    void coproceduremanager_object_t::test<2>() +    { +        const size_t capacity = 2; +        boost::fibers::buffered_channel<std::function<void(void)>> chan(capacity); + +        boost::fibers::fiber worker([&chan]() { +            chan.value_pop()(); +        }); + +        chan.push([]() { +            LL_INFOS("Test") << "test 1" << LL_ENDL; +        }); + +        worker.join(); +    } + +    template<> template<> +    void coproceduremanager_object_t::test<3>() +    { +        boost::fibers::unbuffered_channel<std::function<void(void)>> chan; + +        boost::fibers::fiber worker([&chan]() { +            chan.value_pop()(); +        }); + +        chan.push([]() { +            LL_INFOS("Test") << "test 1" << LL_ENDL; +        }); + +        worker.join(); +    } + +    template<> template<> +    void coproceduremanager_object_t::test<4>() +    { +        boost::fibers::buffered_channel<std::function<void(void)>> chan(4); + +        boost::fibers::fiber worker([&chan]() { +            std::function<void(void)> f; + +            // using namespace std::chrono_literals; +            // const auto timeout = 5s; +            // boost::fibers::channel_op_status status; +            while (chan.pop(f) != boost::fibers::channel_op_status::closed) +            { +                LL_INFOS("CoWorker") << "got coproc" << LL_ENDL; +                f(); +            } +            LL_INFOS("CoWorker") << "got closed" << LL_ENDL; +        }); + +        int counter = 0; + +        for (int i = 0; i < 5; ++i) +        { +            LL_INFOS("CoMain") << "pushing coproc " << i << LL_ENDL; +            chan.push([&counter]() { +                LL_INFOS("CoProc") << "in coproc" << LL_ENDL; +                ++counter; +            }); +        } + +        LL_INFOS("CoMain") << "closing channel" << LL_ENDL; +        chan.close(); + +        LL_INFOS("CoMain") << "joining worker" << LL_ENDL; +        worker.join(); + +        LL_INFOS("CoMain") << "checking count" << LL_ENDL; +        ensure_equals("coprocedure failed to update counter", counter, 5);      }  }  // namespace tut | 
