summaryrefslogtreecommitdiff
path: root/indra/llmessage
diff options
context:
space:
mode:
authorBrad Kittenbrink <brad@lindenlab.com>2019-03-11 17:42:39 -0700
committerNat Goodspeed <nat@lindenlab.com>2020-03-25 18:39:21 -0400
commit997bdfc88682de36f02931a22d3baa23f00b6ddb (patch)
tree24f60a174630963f8f65c34843076ab2faef1edc /indra/llmessage
parenta6f31e9167c75982bb5eaba96ec6ac1f70ee31fb (diff)
First draft of boost::fibers::unbuffered_channel based implementation of LLCoprocedureManager
Diffstat (limited to 'indra/llmessage')
-rw-r--r--indra/llmessage/llcoproceduremanager.cpp301
-rw-r--r--indra/llmessage/llcoproceduremanager.h4
-rw-r--r--indra/llmessage/tests/llcoproceduremanager_test.cpp81
3 files changed, 234 insertions, 152 deletions
diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp
index 4c85dd999a..9501181509 100644
--- a/indra/llmessage/llcoproceduremanager.cpp
+++ b/indra/llmessage/llcoproceduremanager.cpp
@@ -25,23 +25,23 @@
* $/LicenseInfo$
*/
-#include "linden_common.h"
+#include "linden_common.h"
#include "llcoproceduremanager.h"
#include "llexception.h"
#include "stringize.h"
#include <boost/assign.hpp>
+#include <boost/fiber/unbuffered_channel.hpp>
//=========================================================================
// Map of pool sizes for known pools
-// *TODO$: When C++11 this can be initialized here as follows:
-// = {{"AIS", 25}, {"Upload", 1}}
-static std::map<std::string, U32> DefaultPoolSizes =
- boost::assign::map_list_of
- (std::string("Upload"), 1)
- (std::string("AIS"), 1);
- // *TODO: Rider for the moment keep AIS calls serialized otherwise the COF will tend to get out of sync.
+static const std::map<std::string, U32> DefaultPoolSizes =
+{
+ {"Upload", 1},
+ {"AIS", 1},
+ // *TODO: Rider for the moment keep AIS calls serialized otherwise the COF will tend to get out of sync.
+};
-#define DEFAULT_POOL_SIZE 5
+static const U32 DEFAULT_POOL_SIZE = 5;
//=========================================================================
class LLCoprocedurePool: private boost::noncopyable
@@ -50,7 +50,7 @@ public:
typedef LLCoprocedureManager::CoProcedure_t CoProcedure_t;
LLCoprocedurePool(const std::string &name, size_t size);
- virtual ~LLCoprocedurePool();
+ ~LLCoprocedurePool();
/// Places the coprocedure on the queue for processing.
///
@@ -63,18 +63,18 @@ public:
/// Cancel a coprocedure. If the coprocedure is already being actively executed
/// this method calls cancelSuspendedOperation() on the associated HttpAdapter
/// If it has not yet been dequeued it is simply removed from the queue.
- bool cancelCoprocedure(const LLUUID &id);
+ //bool cancelCoprocedure(const LLUUID &id);
/// Requests a shutdown of the upload manager. Passing 'true' will perform
/// an immediate kill on the upload coroutine.
- void shutdown(bool hardShutdown = false);
+ //void shutdown(bool hardShutdown = false);
- /// Returns the number of coprocedures in the queue awaiting processing.
- ///
- inline size_t countPending() const
- {
- return mPendingCoprocs.size();
- }
+// /// Returns the number of coprocedures in the queue awaiting processing.
+// ///
+// inline size_t countPending() const
+// {
+// return mPendingCoprocs.size();
+// }
/// Returns the number of coprocedures actively being processed.
///
@@ -83,13 +83,15 @@ public:
return mActiveCoprocs.size();
}
- /// Returns the total number of coprocedures either queued or in active processing.
- ///
- inline size_t count() const
- {
- return countPending() + countActive();
- }
+// /// Returns the total number of coprocedures either queued or in active processing.
+// ///
+// inline size_t count() const
+// {
+// return countPending() + countActive();
+// }
+ void close();
+
private:
struct QueuedCoproc
{
@@ -108,15 +110,15 @@ private:
// we use a deque here rather than std::queue since we want to be able to
// iterate through the queue and potentially erase an entry from the middle.
- typedef std::deque<QueuedCoproc::ptr_t> CoprocQueue_t;
+ // TODO - make this queue be backed by an unbuffered_channel
+ typedef boost::fibers::unbuffered_channel<QueuedCoproc::ptr_t> CoprocQueue_t;
typedef std::map<LLUUID, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> ActiveCoproc_t;
std::string mPoolName;
size_t mPoolSize;
CoprocQueue_t mPendingCoprocs;
ActiveCoproc_t mActiveCoprocs;
- bool mShutdown;
- LLEventStream mWakeupTrigger;
+ //bool mShutdown;
typedef std::map<std::string, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> CoroAdapterMap_t;
LLCore::HttpRequest::policy_t mHTTPPolicy;
@@ -143,8 +145,7 @@ LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std::
std::string keyName = "PoolSize" + poolName;
int size = 0;
- if (poolName.empty())
- LL_ERRS("CoprocedureManager") << "Poolname must not be empty" << LL_ENDL;
+ LL_ERRS_IF(poolName.empty(), "CoprocedureManager") << "Poolname must not be empty" << LL_ENDL;
if (mPropertyQueryFn && !mPropertyQueryFn.empty())
{
@@ -152,24 +153,26 @@ LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std::
}
if (size == 0)
- { // if not found grab the know default... if there is no known
+ {
+ // if not found grab the know default... if there is no known
// default use a reasonable number like 5.
- std::map<std::string, U32>::iterator it = DefaultPoolSizes.find(poolName);
- if (it == DefaultPoolSizes.end())
- size = DEFAULT_POOL_SIZE;
- else
- size = (*it).second;
+ auto it = DefaultPoolSizes.find(poolName);
+ size = (it != DefaultPoolSizes.end()) ? it->second : DEFAULT_POOL_SIZE;
if (mPropertyDefineFn && !mPropertyDefineFn.empty())
+ {
mPropertyDefineFn(keyName, size, "Coroutine Pool size for " + poolName);
+ }
+
LL_WARNS() << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL;
}
poolPtr_t pool(new LLCoprocedurePool(poolName, size));
- mPoolMap.insert(poolMap_t::value_type(poolName, pool));
+ LL_ERRS_IF(!pool, "CoprocedureManager") << "Unable to create pool named \"" << poolName << "\" FATAL!" << LL_ENDL;
+
+ bool inserted = mPoolMap.emplace(poolName, pool).second;
+ LL_ERRS_IF(!inserted, "CoprocedureManager") << "Unable to add pool named \"" << poolName << "\" to map. FATAL!" << LL_ENDL;
- if (!pool)
- LL_ERRS("CoprocedureManager") << "Unable to create pool named \"" << poolName << "\" FATAL!" << LL_ENDL;
return pool;
}
@@ -178,30 +181,24 @@ LLUUID LLCoprocedureManager::enqueueCoprocedure(const std::string &pool, const s
{
// Attempt to find the pool and enqueue the procedure. If the pool does
// not exist, create it.
- poolPtr_t targetPool;
poolMap_t::iterator it = mPoolMap.find(pool);
- if (it == mPoolMap.end())
- {
- targetPool = initializePool(pool);
- }
- else
- {
- targetPool = (*it).second;
- }
+ poolPtr_t targetPool = (it != mPoolMap.end()) ? it->second : initializePool(pool);
return targetPool->enqueueCoprocedure(name, proc);
}
-void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id)
-{
- for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
- {
- if ((*it).second->cancelCoprocedure(id))
- return;
- }
- LL_INFOS() << "Coprocedure not found." << LL_ENDL;
-}
+//void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id)
+//{
+// for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
+// {
+// if (it->second->cancelCoprocedure(id))
+// {
+// return;
+// }
+// }
+// LL_INFOS() << "Coprocedure not found." << LL_ENDL;
+//}
/*==========================================================================*|
void LLCoprocedureManager::shutdown(bool hardShutdown)
@@ -220,32 +217,32 @@ void LLCoprocedureManager::setPropertyMethods(SettingQuery_t queryfn, SettingUpd
mPropertyDefineFn = updatefn;
}
-//-------------------------------------------------------------------------
-size_t LLCoprocedureManager::countPending() const
-{
- size_t count = 0;
- for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
- {
- count += (*it).second->countPending();
- }
- return count;
-}
-
-size_t LLCoprocedureManager::countPending(const std::string &pool) const
-{
- poolMap_t::const_iterator it = mPoolMap.find(pool);
-
- if (it == mPoolMap.end())
- return 0;
- return (*it).second->countPending();
-}
+////-------------------------------------------------------------------------
+//size_t LLCoprocedureManager::countPending() const
+//{
+// size_t count = 0;
+// for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
+// {
+// count += (*it).second->countPending();
+// }
+// return count;
+//}
+//
+//size_t LLCoprocedureManager::countPending(const std::string &pool) const
+//{
+// poolMap_t::const_iterator it = mPoolMap.find(pool);
+//
+// if (it == mPoolMap.end())
+// return 0;
+// return (*it).second->countPending();
+//}
size_t LLCoprocedureManager::countActive() const
{
size_t count = 0;
for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
{
- count += (*it).second->countActive();
+ count += it->second->countActive();
}
return count;
}
@@ -255,27 +252,38 @@ size_t LLCoprocedureManager::countActive(const std::string &pool) const
poolMap_t::const_iterator it = mPoolMap.find(pool);
if (it == mPoolMap.end())
+ {
return 0;
- return (*it).second->countActive();
+ }
+ return it->second->countActive();
}
-size_t LLCoprocedureManager::count() const
+//size_t LLCoprocedureManager::count() const
+//{
+// size_t count = 0;
+// for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
+// {
+// count += (*it).second->count();
+// }
+// return count;
+//}
+//
+//size_t LLCoprocedureManager::count(const std::string &pool) const
+//{
+// poolMap_t::const_iterator it = mPoolMap.find(pool);
+//
+// if (it == mPoolMap.end())
+// return 0;
+// return (*it).second->count();
+//}
+
+void LLCoprocedureManager::close(const std::string &pool)
{
- size_t count = 0;
- for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
+ poolMap_t::iterator it = mPoolMap.find(pool);
+ if (it != mPoolMap.end())
{
- count += (*it).second->count();
+ it->second->close();
}
- return count;
-}
-
-size_t LLCoprocedureManager::count(const std::string &pool) const
-{
- poolMap_t::const_iterator it = mPoolMap.find(pool);
-
- if (it == mPoolMap.end())
- return 0;
- return (*it).second->count();
}
//=========================================================================
@@ -283,8 +291,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size):
mPoolName(poolName),
mPoolSize(size),
mPendingCoprocs(),
- mShutdown(false),
- mWakeupTrigger("CoprocedurePool" + poolName, true),
+ //mShutdown(false),
mCoroMapping(),
mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID)
{
@@ -299,8 +306,6 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size):
}
LL_INFOS() << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL;
-
- mWakeupTrigger.post(LLSD());
}
LLCoprocedurePool::~LLCoprocedurePool()
@@ -339,76 +344,70 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced
{
LLUUID id(LLUUID::generateNewID());
- mPendingCoprocs.push_back(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc)));
+ mPendingCoprocs.push(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc)));
LL_INFOS() << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
- mWakeupTrigger.post(LLSD());
-
return id;
}
-bool LLCoprocedurePool::cancelCoprocedure(const LLUUID &id)
-{
- // first check the active coroutines. If there, remove it and return.
- ActiveCoproc_t::iterator itActive = mActiveCoprocs.find(id);
- if (itActive != mActiveCoprocs.end())
- {
- LL_INFOS() << "Found and canceling active coprocedure with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
- (*itActive).second->cancelSuspendedOperation();
- mActiveCoprocs.erase(itActive);
- return true;
- }
-
- for (CoprocQueue_t::iterator it = mPendingCoprocs.begin(); it != mPendingCoprocs.end(); ++it)
- {
- if ((*it)->mId == id)
- {
- LL_INFOS() << "Found and removing queued coroutine(" << (*it)->mName << ") with Id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
- mPendingCoprocs.erase(it);
- return true;
- }
- }
-
- LL_INFOS() << "Coprocedure with Id=" << id.asString() << " was not found in pool \"" << mPoolName << "\"" << LL_ENDL;
- return false;
-}
+//bool LLCoprocedurePool::cancelCoprocedure(const LLUUID &id)
+//{
+// // first check the active coroutines. If there, remove it and return.
+// ActiveCoproc_t::iterator itActive = mActiveCoprocs.find(id);
+// if (itActive != mActiveCoprocs.end())
+// {
+// LL_INFOS() << "Found and canceling active coprocedure with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
+// (*itActive).second->cancelSuspendedOperation();
+// mActiveCoprocs.erase(itActive);
+// return true;
+// }
+//
+//// for (auto it: mPendingCoprocs)
+//// {
+//// if ((*it)->mId == id)
+//// {
+//// LL_INFOS() << "Found and removing queued coroutine(" << (*it)->mName << ") with Id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
+//// mPendingCoprocs.erase(it);
+//// return true;
+//// }
+//// }
+//
+// LL_INFOS() << "Coprocedure with Id=" << id.asString() << " was not found in pool \"" << mPoolName << "\"" << LL_ENDL;
+// return false;
+//}
//-------------------------------------------------------------------------
void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter)
{
LLCore::HttpRequest::ptr_t httpRequest(new LLCore::HttpRequest);
- while (!mShutdown)
+ QueuedCoproc::ptr_t coproc;
+ while (mPendingCoprocs.pop(coproc) != boost::fibers::channel_op_status::closed)
{
- llcoro::suspendUntilEventOn(mWakeupTrigger);
- if (mShutdown)
- break;
-
- while (!mPendingCoprocs.empty())
- {
- QueuedCoproc::ptr_t coproc = mPendingCoprocs.front();
- mPendingCoprocs.pop_front();
- ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first;
-
- LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
-
- try
- {
- coproc->mProc(httpAdapter, coproc->mId);
- }
- catch (...)
- {
- LOG_UNHANDLED_EXCEPTION(STRINGIZE("Coprocedure('" << coproc->mName
- << "', id=" << coproc->mId.asString()
- << ") in pool '" << mPoolName << "'"));
- // must NOT omit this or we deplete the pool
- mActiveCoprocs.erase(itActive);
- throw;
- }
-
- LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL;
+ ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first;
+ LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
+
+ try
+ {
+ coproc->mProc(httpAdapter, coproc->mId);
+ }
+ catch (...)
+ {
+ LOG_UNHANDLED_EXCEPTION(STRINGIZE("Coprocedure('" << coproc->mName
+ << "', id=" << coproc->mId.asString()
+ << ") in pool '" << mPoolName << "'"));
+ // must NOT omit this or we deplete the pool
mActiveCoprocs.erase(itActive);
+ throw;
}
+
+ LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL;
+
+ mActiveCoprocs.erase(itActive);
}
}
+
+void LLCoprocedurePool::close() {
+ mPendingCoprocs.close();
+}
diff --git a/indra/llmessage/llcoproceduremanager.h b/indra/llmessage/llcoproceduremanager.h
index ba6f97355c..93d699a714 100644
--- a/indra/llmessage/llcoproceduremanager.h
+++ b/indra/llmessage/llcoproceduremanager.h
@@ -57,7 +57,7 @@ public:
/// Cancel a coprocedure. If the coprocedure is already being actively executed
/// this method calls cancelYieldingOperation() on the associated HttpAdapter
/// If it has not yet been dequeued it is simply removed from the queue.
- void cancelCoprocedure(const LLUUID &id);
+ //void cancelCoprocedure(const LLUUID &id);
/*==========================================================================*|
/// Requests a shutdown of the upload manager. Passing 'true' will perform
@@ -82,6 +82,8 @@ public:
size_t count() const;
size_t count(const std::string &pool) const;
+ void close(const std::string &pool);
+
private:
typedef boost::shared_ptr<LLCoprocedurePool> poolPtr_t;
diff --git a/indra/llmessage/tests/llcoproceduremanager_test.cpp b/indra/llmessage/tests/llcoproceduremanager_test.cpp
index e503798d88..17535abd1e 100644
--- a/indra/llmessage/tests/llcoproceduremanager_test.cpp
+++ b/indra/llmessage/tests/llcoproceduremanager_test.cpp
@@ -31,6 +31,12 @@
#include "../llcoproceduremanager.h"
+#include <functional>
+
+#include <boost/fiber/fiber.hpp>
+#include <boost/fiber/buffered_channel.hpp>
+#include <boost/fiber/unbuffered_channel.hpp>
+
#include "../test/lltut.h"
@@ -81,5 +87,80 @@ namespace tut
foo = 1;
});
ensure_equals("coprocedure failed to update foo", foo, 1);
+
+ LLCoprocedureManager::instance().close("PoolName");
+ }
+
+ template<> template<>
+ void coproceduremanager_object_t::test<2>()
+ {
+ const size_t capacity = 2;
+ boost::fibers::buffered_channel<std::function<void(void)>> chan(capacity);
+
+ boost::fibers::fiber worker([&chan]() {
+ chan.value_pop()();
+ });
+
+ chan.push([]() {
+ LL_INFOS("Test") << "test 1" << LL_ENDL;
+ });
+
+ worker.join();
+ }
+
+ template<> template<>
+ void coproceduremanager_object_t::test<3>()
+ {
+ boost::fibers::unbuffered_channel<std::function<void(void)>> chan;
+
+ boost::fibers::fiber worker([&chan]() {
+ chan.value_pop()();
+ });
+
+ chan.push([]() {
+ LL_INFOS("Test") << "test 1" << LL_ENDL;
+ });
+
+ worker.join();
+ }
+
+ template<> template<>
+ void coproceduremanager_object_t::test<4>()
+ {
+ boost::fibers::buffered_channel<std::function<void(void)>> chan(4);
+
+ boost::fibers::fiber worker([&chan]() {
+ std::function<void(void)> f;
+
+ // using namespace std::chrono_literals;
+ // const auto timeout = 5s;
+ // boost::fibers::channel_op_status status;
+ while (chan.pop(f) != boost::fibers::channel_op_status::closed)
+ {
+ LL_INFOS("CoWorker") << "got coproc" << LL_ENDL;
+ f();
+ }
+ LL_INFOS("CoWorker") << "got closed" << LL_ENDL;
+ });
+
+ int counter = 0;
+
+ for (int i = 0; i < 5; ++i)
+ {
+ LL_INFOS("CoMain") << "pushing coproc " << i << LL_ENDL;
+ chan.push([&counter]() {
+ LL_INFOS("CoProc") << "in coproc" << LL_ENDL;
+ ++counter;
+ });
+ }
+
+ LL_INFOS("CoMain") << "closing channel" << LL_ENDL;
+ chan.close();
+
+ LL_INFOS("CoMain") << "joining worker" << LL_ENDL;
+ worker.join();
+
+ LL_INFOS("CoMain") << "checking count" << LL_ENDL;
+ ensure_equals("coprocedure failed to update counter", counter, 5);
}
} // namespace tut