summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRider Linden <rider@lindenlab.com>2015-07-28 15:29:51 -0700
committerRider Linden <rider@lindenlab.com>2015-07-28 15:29:51 -0700
commitb57b0d97bb2fb880084cbcca1b915f8e67b442a5 (patch)
tree787fe3b51aad8fe2b083efcf20fd05bf346bae1a
parent7882396811fdf8b297f6d0c92d8e1e37859fde9d (diff)
Named pools of coroutines.
-rwxr-xr-xindra/newview/app_settings/settings.xml18
-rw-r--r--indra/newview/llcoproceduremanager.cpp288
-rw-r--r--indra/newview/llcoproceduremanager.h56
-rw-r--r--indra/newview/llviewerassetupload.cpp3
4 files changed, 291 insertions, 74 deletions
diff --git a/indra/newview/app_settings/settings.xml b/indra/newview/app_settings/settings.xml
index 2180a7f1a1..b4a4e41884 100755
--- a/indra/newview/app_settings/settings.xml
+++ b/indra/newview/app_settings/settings.xml
@@ -14340,6 +14340,24 @@
<key>Value</key>
<integer>1</integer>
</map>
+ <key>PoolSizeAIS</key>
+ <map>
+ <key>Comment</key>
+ <string>Coroutine Pool size for AIS</string>
+ <key>Type</key>
+ <string>U32</string>
+ <key>Value</key>
+ <integer>25</integer>
+ </map>
+ <key>PoolSizeUpload</key>
+ <map>
+ <key>Comment</key>
+ <string>Coroutine Pool size for Upload</string>
+ <key>Type</key>
+ <string>U32</string>
+ <key>Value</key>
+ <real>1</real>
+ </map>
<!-- Settings below are for back compatibility only.
They are not used in current viewer anymore. But they can't be removed to avoid
diff --git a/indra/newview/llcoproceduremanager.cpp b/indra/newview/llcoproceduremanager.cpp
index d3168985f8..e22a8b8013 100644
--- a/indra/newview/llcoproceduremanager.cpp
+++ b/indra/newview/llcoproceduremanager.cpp
@@ -1,5 +1,5 @@
/**
-* @file llcoproceduremanager.cpp
+* @file LLCoprocedurePool.cpp
* @author Rider Linden
* @brief Singleton class for managing asset uploads to the sim.
*
@@ -33,43 +33,270 @@
#include "llcoproceduremanager.h"
//=========================================================================
-#define COROCOUNT 1
+// Map of pool sizes for known pools
+static std::map<std::string, U32> DefaultPoolSizes;
+
+// *TODO$: When C++11 this can be initialized here as follows:
+// = {{"AIS", 25}, {"Upload", 1}}
+
+#define DEFAULT_POOL_SIZE 5
+
+//=========================================================================
+class LLCoprocedurePool: private boost::noncopyable
+{
+public:
+ typedef LLCoprocedureManager::CoProcedure_t CoProcedure_t;
+
+ LLCoprocedurePool(const std::string &name, size_t size);
+ virtual ~LLCoprocedurePool();
+
+ /// Places the coprocedure on the queue for processing.
+ ///
+ /// @param name Is used for debugging and should identify this coroutine.
+ /// @param proc Is a bound function to be executed
+ ///
+ /// @return This method returns a UUID that can be used later to cancel execution.
+ LLUUID enqueueCoprocedure(const std::string &name, CoProcedure_t proc);
+
+ /// Cancel a coprocedure. If the coprocedure is already being actively executed
+ /// this method calls cancelYieldingOperation() on the associated HttpAdapter
+ /// If it has not yet been dequeued it is simply removed from the queue.
+ bool cancelCoprocedure(const LLUUID &id);
+
+ /// Requests a shutdown of the upload manager. Passing 'true' will perform
+ /// an immediate kill on the upload coroutine.
+ void shutdown(bool hardShutdown = false);
+
+ /// Returns the number of coprocedures in the queue awaiting processing.
+ ///
+ inline size_t countPending() const
+ {
+ return mPendingCoprocs.size();
+ }
+
+ /// Returns the number of coprocedures actively being processed.
+ ///
+ inline size_t countActive() const
+ {
+ return mActiveCoprocs.size();
+ }
+
+ /// Returns the total number of coprocedures either queued or in active processing.
+ ///
+ inline size_t count() const
+ {
+ return countPending() + countActive();
+ }
+
+private:
+ struct QueuedCoproc
+ {
+ typedef boost::shared_ptr<QueuedCoproc> ptr_t;
+
+ QueuedCoproc(const std::string &name, const LLUUID &id, CoProcedure_t proc) :
+ mName(name),
+ mId(id),
+ mProc(proc)
+ {}
+
+ std::string mName;
+ LLUUID mId;
+ CoProcedure_t mProc;
+ };
+
+ // we use a deque here rather than std::queue since we want to be able to
+ // iterate through the queue and potentially erase an entry from the middle.
+ typedef std::deque<QueuedCoproc::ptr_t> CoprocQueue_t;
+ typedef std::map<LLUUID, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> ActiveCoproc_t;
+
+ std::string mPoolName;
+ size_t mPoolSize;
+ CoprocQueue_t mPendingCoprocs;
+ ActiveCoproc_t mActiveCoprocs;
+ bool mShutdown;
+ LLEventStream mWakeupTrigger;
+
+ typedef std::map<std::string, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> CoroAdapterMap_t;
+ LLCore::HttpRequest::policy_t mHTTPPolicy;
+
+ CoroAdapterMap_t mCoroMapping;
+
+ void coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter);
+
+};
//=========================================================================
-LLCoprocedureManager::LLCoprocedureManager():
- LLSingleton<LLCoprocedureManager>(),
+LLCoprocedureManager::LLCoprocedureManager()
+{
+ DefaultPoolSizes.insert(std::map<std::string, U32>::value_type("Upload", 1));
+ DefaultPoolSizes.insert(std::map<std::string, U32>::value_type("AIS", 25));
+}
+
+LLCoprocedureManager::~LLCoprocedureManager()
+{
+
+}
+
+LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std::string &poolName)
+{
+ // *TODO: Retrieve the actual number of concurrent coroutines fro gSavedSettings and
+ // clamp to a "reasonable" number.
+ std::string keyName = "PoolSize" + poolName;
+ int size = 5;
+
+ size = gSavedSettings.getU32(keyName);
+ if (size == 0)
+ {
+ std::map<std::string, U32>::iterator it = DefaultPoolSizes.find(poolName);
+ if (it == DefaultPoolSizes.end())
+ size = DEFAULT_POOL_SIZE;
+ else
+ size = (*it).second;
+ gSavedSettings.declareU32(keyName, size, "Coroutine Pool size for " + poolName, LLControlVariable::PERSIST_ALWAYS);
+ LL_WARNS() << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL;
+ }
+
+ poolPtr_t pool = poolPtr_t(new LLCoprocedurePool(poolName, size));
+ mPoolMap.insert(poolMap_t::value_type(poolName, pool));
+
+ return pool;
+}
+
+//-------------------------------------------------------------------------
+LLUUID LLCoprocedureManager::enqueueCoprocedure(const std::string &pool, const std::string &name, CoProcedure_t proc)
+{
+ poolPtr_t targetPool;
+ poolMap_t::iterator it = mPoolMap.find(pool);
+
+ if (it == mPoolMap.end())
+ {
+ targetPool = initializePool(pool);
+ }
+ else
+ {
+ targetPool = (*it).second;
+ }
+
+ if (!targetPool)
+ {
+ LL_WARNS() << "LLCoprocedureManager unable to create coprocedure pool named \"" << pool << "\"" << LL_ENDL;
+ return LLUUID::null;
+ }
+
+ return targetPool->enqueueCoprocedure(name, proc);
+}
+
+void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id)
+{
+ for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
+ {
+ if ((*it).second->cancelCoprocedure(id))
+ return;
+ }
+ LL_INFOS() << "Coprocedure not found." << LL_ENDL;
+}
+
+void LLCoprocedureManager::shutdown(bool hardShutdown)
+{
+ for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
+ {
+ (*it).second->shutdown(hardShutdown);
+ }
+ mPoolMap.clear();
+}
+
+//-------------------------------------------------------------------------
+size_t LLCoprocedureManager::countPending() const
+{
+ size_t count = 0;
+ for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
+ {
+ count += (*it).second->countPending();
+ }
+ return count;
+}
+
+size_t LLCoprocedureManager::countPending(const std::string &pool) const
+{
+ poolMap_t::const_iterator it = mPoolMap.find(pool);
+
+ if (it == mPoolMap.end())
+ return 0;
+ return (*it).second->countPending();
+}
+
+size_t LLCoprocedureManager::countActive() const
+{
+ size_t count = 0;
+ for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
+ {
+ count += (*it).second->countActive();
+ }
+ return count;
+}
+
+size_t LLCoprocedureManager::countActive(const std::string &pool) const
+{
+ poolMap_t::const_iterator it = mPoolMap.find(pool);
+
+ if (it == mPoolMap.end())
+ return 0;
+ return (*it).second->countActive();
+}
+
+size_t LLCoprocedureManager::count() const
+{
+ size_t count = 0;
+ for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
+ {
+ count += (*it).second->count();
+ }
+ return count;
+}
+
+size_t LLCoprocedureManager::count(const std::string &pool) const
+{
+ poolMap_t::const_iterator it = mPoolMap.find(pool);
+
+ if (it == mPoolMap.end())
+ return 0;
+ return (*it).second->count();
+}
+
+//=========================================================================
+LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size):
+ mPoolName(poolName),
+ mPoolSize(size),
mPendingCoprocs(),
mShutdown(false),
- mWakeupTrigger("CoprocedureManager", true),
+ mWakeupTrigger("CoprocedurePool" + poolName, true),
mCoroMapping(),
mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID)
{
-
- // *TODO: Retrieve the actual number of concurrent coroutines fro gSavedSettings and
- // clamp to a "reasonable" number.
- for (int count = 0; count < COROCOUNT; ++count)
+ for (size_t count = 0; count < mPoolSize; ++count)
{
LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter =
LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t(
- new LLCoreHttpUtil::HttpCoroutineAdapter("uploadPostAdapter", mHTTPPolicy));
+ new LLCoreHttpUtil::HttpCoroutineAdapter( mPoolName + "Adapter", mHTTPPolicy));
- std::string uploadCoro = LLCoros::instance().launch("LLCoprocedureManager::coprocedureInvokerCoro",
- boost::bind(&LLCoprocedureManager::coprocedureInvokerCoro, this, httpAdapter));
+ std::string uploadCoro = LLCoros::instance().launch("LLCoprocedurePool("+mPoolName+")::coprocedureInvokerCoro",
+ boost::bind(&LLCoprocedurePool::coprocedureInvokerCoro, this, httpAdapter));
mCoroMapping.insert(CoroAdapterMap_t::value_type(uploadCoro, httpAdapter));
}
+ LL_INFOS() << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL;
+
mWakeupTrigger.post(LLSD());
}
-LLCoprocedureManager::~LLCoprocedureManager()
+LLCoprocedurePool::~LLCoprocedurePool()
{
shutdown();
}
-//=========================================================================
-
-void LLCoprocedureManager::shutdown(bool hardShutdown)
+//-------------------------------------------------------------------------
+void LLCoprocedurePool::shutdown(bool hardShutdown)
{
CoroAdapterMap_t::iterator it;
@@ -93,46 +320,47 @@ void LLCoprocedureManager::shutdown(bool hardShutdown)
mPendingCoprocs.clear();
}
-//=========================================================================
-LLUUID LLCoprocedureManager::enqueueCoprocedure(const std::string &name, LLCoprocedureManager::CoProcedure_t proc)
+//-------------------------------------------------------------------------
+LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoprocedurePool::CoProcedure_t proc)
{
LLUUID id(LLUUID::generateNewID());
mPendingCoprocs.push_back(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc)));
- LL_INFOS() << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << LL_ENDL;
+ LL_INFOS() << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
mWakeupTrigger.post(LLSD());
return id;
}
-void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id)
+bool LLCoprocedurePool::cancelCoprocedure(const LLUUID &id)
{
// first check the active coroutines. If there, remove it and return.
ActiveCoproc_t::iterator itActive = mActiveCoprocs.find(id);
if (itActive != mActiveCoprocs.end())
{
- LL_INFOS() << "Found and canceling active coprocedure with id=" << id.asString() << LL_ENDL;
+ LL_INFOS() << "Found and canceling active coprocedure with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
(*itActive).second->cancelYieldingOperation();
mActiveCoprocs.erase(itActive);
- return;
+ return true;
}
for (CoprocQueue_t::iterator it = mPendingCoprocs.begin(); it != mPendingCoprocs.end(); ++it)
{
if ((*it)->mId == id)
{
- LL_INFOS() << "Found and removing queued coroutine(" << (*it)->mName << ") with Id=" << id.asString() << LL_ENDL;
+ LL_INFOS() << "Found and removing queued coroutine(" << (*it)->mName << ") with Id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
mPendingCoprocs.erase(it);
- return;
+ return true;
}
}
- LL_INFOS() << "Coprocedure with Id=" << id.asString() << " was not found." << LL_ENDL;
+ LL_INFOS() << "Coprocedure with Id=" << id.asString() << " was not found." << " in pool \"" << mPoolName << "\"" << LL_ENDL;
+ return false;
}
-//=========================================================================
-void LLCoprocedureManager::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter)
+//-------------------------------------------------------------------------
+void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter)
{
LLCore::HttpRequest::ptr_t httpRequest(new LLCore::HttpRequest);
@@ -148,7 +376,7 @@ void LLCoprocedureManager::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineA
mPendingCoprocs.pop_front();
mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter));
- LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << LL_ENDL;
+ LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
try
{
@@ -161,10 +389,10 @@ void LLCoprocedureManager::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineA
}
catch (...)
{
- LL_WARNS() << "A non std::exception was thrown from " << coproc->mName << " with id=" << coproc->mId << "." << LL_ENDL;
+ LL_WARNS() << "A non std::exception was thrown from " << coproc->mName << " with id=" << coproc->mId << "." << " in pool \"" << mPoolName << "\"" << LL_ENDL;
}
- LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << LL_ENDL;
+ LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL;
ActiveCoproc_t::iterator itActive = mActiveCoprocs.find(coproc->mId);
if (itActive != mActiveCoprocs.end())
diff --git a/indra/newview/llcoproceduremanager.h b/indra/newview/llcoproceduremanager.h
index 6ba3891e87..d7f74af76b 100644
--- a/indra/newview/llcoproceduremanager.h
+++ b/indra/newview/llcoproceduremanager.h
@@ -33,6 +33,8 @@
#include "llcorehttputil.h"
#include "lluuid.h"
+class LLCoprocedurePool;
+
class LLCoprocedureManager : public LLSingleton < LLCoprocedureManager >
{
public:
@@ -47,7 +49,7 @@ public:
/// @param proc Is a bound function to be executed
///
/// @return This method returns a UUID that can be used later to cancel execution.
- LLUUID enqueueCoprocedure(const std::string &name, CoProcedure_t proc);
+ LLUUID enqueueCoprocedure(const std::string &pool, const std::string &name, CoProcedure_t proc);
/// Cancel a coprocedure. If the coprocedure is already being actively executed
/// this method calls cancelYieldingOperation() on the associated HttpAdapter
@@ -60,58 +62,26 @@ public:
/// Returns the number of coprocedures in the queue awaiting processing.
///
- inline size_t countPending() const
- {
- return mPendingCoprocs.size();
- }
+ size_t countPending() const;
+ size_t countPending(const std::string &pool) const;
/// Returns the number of coprocedures actively being processed.
///
- inline size_t countActive() const
- {
- return mActiveCoprocs.size();
- }
+ size_t countActive() const;
+ size_t countActive(const std::string &pool) const;
/// Returns the total number of coprocedures either queued or in active processing.
///
- inline size_t count() const
- {
- return countPending() + countActive();
- }
+ size_t count() const;
+ size_t count(const std::string &pool) const;
private:
- struct QueuedCoproc
- {
- typedef boost::shared_ptr<QueuedCoproc> ptr_t;
-
- QueuedCoproc(const std::string &name, const LLUUID &id, CoProcedure_t proc):
- mName(name),
- mId(id),
- mProc(proc)
- {}
-
- std::string mName;
- LLUUID mId;
- CoProcedure_t mProc;
- };
-
- // we use a deque here rather than std::queue since we want to be able to
- // iterate through the queue and potentially erase an entry from the middle.
- typedef std::deque<QueuedCoproc::ptr_t> CoprocQueue_t;
- typedef std::map<LLUUID, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> ActiveCoproc_t;
-
- CoprocQueue_t mPendingCoprocs;
- ActiveCoproc_t mActiveCoprocs;
- bool mShutdown;
- LLEventStream mWakeupTrigger;
-
-
- typedef std::map<std::string, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> CoroAdapterMap_t;
- LLCore::HttpRequest::policy_t mHTTPPolicy;
+ typedef boost::shared_ptr<LLCoprocedurePool> poolPtr_t;
+ typedef std::map<std::string, poolPtr_t> poolMap_t;
- CoroAdapterMap_t mCoroMapping;
+ poolMap_t mPoolMap;
- void coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter);
+ poolPtr_t initializePool(const std::string &poolName);
};
#endif
diff --git a/indra/newview/llviewerassetupload.cpp b/indra/newview/llviewerassetupload.cpp
index 4ef398d314..6c6d3a4f33 100644
--- a/indra/newview/llviewerassetupload.cpp
+++ b/indra/newview/llviewerassetupload.cpp
@@ -666,7 +666,8 @@ LLUUID LLViewerAssetUpload::EnqueueInventoryUpload(const std::string &url, const
{
std::string procName("LLViewerAssetUpload::AssetInventoryUploadCoproc(");
- LLUUID queueId = LLCoprocedureManager::getInstance()->enqueueCoprocedure(procName + LLAssetType::lookup(uploadInfo->getAssetType()) + ")",
+ LLUUID queueId = LLCoprocedureManager::getInstance()->enqueueCoprocedure("Upload",
+ procName + LLAssetType::lookup(uploadInfo->getAssetType()) + ")",
boost::bind(&LLViewerAssetUpload::AssetInventoryUploadCoproc, _1, _2, url, uploadInfo));
return queueId;