summaryrefslogtreecommitdiff
path: root/indra/llmessage
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llmessage')
-rw-r--r--indra/llmessage/CMakeLists.txt9
-rw-r--r--indra/llmessage/llavatarnamecache.cpp4
-rw-r--r--indra/llmessage/llbuffer.cpp1
-rw-r--r--indra/llmessage/llbufferstream.cpp1
-rw-r--r--indra/llmessage/llcoproceduremanager.cpp317
-rw-r--r--indra/llmessage/llcoproceduremanager.h10
-rw-r--r--indra/llmessage/llexperiencecache.cpp6
-rw-r--r--indra/llmessage/llproxy.cpp4
-rw-r--r--indra/llmessage/llproxy.h1
-rw-r--r--indra/llmessage/tests/llcoproceduremanager_test.cpp178
10 files changed, 345 insertions, 186 deletions
diff --git a/indra/llmessage/CMakeLists.txt b/indra/llmessage/CMakeLists.txt
index e0922c0667..2f99ca069e 100644
--- a/indra/llmessage/CMakeLists.txt
+++ b/indra/llmessage/CMakeLists.txt
@@ -217,7 +217,7 @@ target_link_libraries(
${NGHTTP2_LIBRARIES}
${XMLRPCEPI_LIBRARIES}
${LLCOREHTTP_LIBRARIES}
- ${BOOST_COROUTINE_LIBRARY}
+ ${BOOST_FIBER_LIBRARY}
${BOOST_CONTEXT_LIBRARY}
${BOOST_SYSTEM_LIBRARY}
rt
@@ -235,7 +235,7 @@ target_link_libraries(
${NGHTTP2_LIBRARIES}
${XMLRPCEPI_LIBRARIES}
${LLCOREHTTP_LIBRARIES}
- ${BOOST_COROUTINE_LIBRARY}
+ ${BOOST_FIBER_LIBRARY}
${BOOST_CONTEXT_LIBRARY}
${BOOST_SYSTEM_LIBRARY}
)
@@ -244,6 +244,7 @@ endif(LINUX)
# tests
if (LL_TESTS)
SET(llmessage_TEST_SOURCE_FILES
+ llcoproceduremanager.cpp
llnamevalue.cpp
lltrustedmessageservice.cpp
lltemplatemessagedispatcher.cpp
@@ -264,7 +265,7 @@ if (LINUX)
${LLMESSAGE_LIBRARIES}
${LLCOREHTTP_LIBRARIES}
${JSONCPP_LIBRARIES}
- ${BOOST_COROUTINE_LIBRARY}
+ ${BOOST_FIBER_LIBRARY}
${BOOST_CONTEXT_LIBRARY}
rt
${GOOGLEMOCK_LIBRARIES}
@@ -280,7 +281,7 @@ else (LINUX)
${LLMESSAGE_LIBRARIES}
${LLCOREHTTP_LIBRARIES}
${JSONCPP_LIBRARIES}
- ${BOOST_COROUTINE_LIBRARY}
+ ${BOOST_FIBER_LIBRARY}
${BOOST_CONTEXT_LIBRARY}
${GOOGLEMOCK_LIBRARIES}
)
diff --git a/indra/llmessage/llavatarnamecache.cpp b/indra/llmessage/llavatarnamecache.cpp
index 6a287f0cc5..fbd65cc67b 100644
--- a/indra/llmessage/llavatarnamecache.cpp
+++ b/indra/llmessage/llavatarnamecache.cpp
@@ -134,7 +134,7 @@ LLAvatarNameCache::~LLAvatarNameCache()
void LLAvatarNameCache::requestAvatarNameCache_(std::string url, std::vector<LLUUID> agentIds)
{
- LL_DEBUGS("AvNameCache") << "Entering coroutine " << LLCoros::instance().getName()
+ LL_DEBUGS("AvNameCache") << "Entering coroutine " << LLCoros::getName()
<< " with url '" << url << "', requesting " << agentIds.size() << " Agent Ids" << LL_ENDL;
// Check pointer that can be cleaned up by cleanupClass()
@@ -188,7 +188,7 @@ void LLAvatarNameCache::requestAvatarNameCache_(std::string url, std::vector<LLU
}
catch (...)
{
- LOG_UNHANDLED_EXCEPTION(STRINGIZE("coroutine " << LLCoros::instance().getName()
+ LOG_UNHANDLED_EXCEPTION(STRINGIZE("coroutine " << LLCoros::getName()
<< "('" << url << "', " << agentIds.size()
<< " http result: " << httpResults.asString()
<< " Agent Ids)"));
diff --git a/indra/llmessage/llbuffer.cpp b/indra/llmessage/llbuffer.cpp
index 1a0eceba0f..cfe38605ad 100644
--- a/indra/llmessage/llbuffer.cpp
+++ b/indra/llmessage/llbuffer.cpp
@@ -32,6 +32,7 @@
#include "llmath.h"
#include "llstl.h"
#include "llthread.h"
+#include "llmutex.h"
#include <iterator>
#define ASSERT_LLBUFFERARRAY_MUTEX_LOCKED() llassert(!mMutexp || mMutexp->isSelfLocked())
diff --git a/indra/llmessage/llbufferstream.cpp b/indra/llmessage/llbufferstream.cpp
index ff1c9993cc..39508c1c52 100644
--- a/indra/llmessage/llbufferstream.cpp
+++ b/indra/llmessage/llbufferstream.cpp
@@ -31,6 +31,7 @@
#include "llbuffer.h"
#include "llthread.h"
+#include "llmutex.h"
static const S32 DEFAULT_OUTPUT_SEGMENT_SIZE = 1024 * 4;
diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp
index 74cdff2b00..d252c0e4b0 100644
--- a/indra/llmessage/llcoproceduremanager.cpp
+++ b/indra/llmessage/llcoproceduremanager.cpp
@@ -25,23 +25,29 @@
* $/LicenseInfo$
*/
-#include "linden_common.h"
+#include "llwin32headers.h"
+
+#include "linden_common.h"
+
#include "llcoproceduremanager.h"
+
+#include <chrono>
+
+#include "llthreadsafequeue.h"
+
#include "llexception.h"
#include "stringize.h"
-#include <boost/assign.hpp>
//=========================================================================
// Map of pool sizes for known pools
-// *TODO$: When C++11 this can be initialized here as follows:
-// = {{"AIS", 25}, {"Upload", 1}}
-static std::map<std::string, U32> DefaultPoolSizes =
- boost::assign::map_list_of
- (std::string("Upload"), 1)
- (std::string("AIS"), 1);
- // *TODO: Rider for the moment keep AIS calls serialized otherwise the COF will tend to get out of sync.
+static const std::map<std::string, U32> DefaultPoolSizes{
+ {std::string("Upload"), 1},
+ {std::string("AIS"), 1},
+ // *TODO: Rider for the moment keep AIS calls serialized otherwise the COF will tend to get out of sync.
+};
-#define DEFAULT_POOL_SIZE 5
+static const U32 DEFAULT_POOL_SIZE = 5;
+static const U32 DEFAULT_QUEUE_SIZE = 4096;
//=========================================================================
class LLCoprocedurePool: private boost::noncopyable
@@ -50,7 +56,7 @@ public:
typedef LLCoprocedureManager::CoProcedure_t CoProcedure_t;
LLCoprocedurePool(const std::string &name, size_t size);
- virtual ~LLCoprocedurePool();
+ ~LLCoprocedurePool();
/// Places the coprocedure on the queue for processing.
///
@@ -60,20 +66,11 @@ public:
/// @return This method returns a UUID that can be used later to cancel execution.
LLUUID enqueueCoprocedure(const std::string &name, CoProcedure_t proc);
- /// Cancel a coprocedure. If the coprocedure is already being actively executed
- /// this method calls cancelSuspendedOperation() on the associated HttpAdapter
- /// If it has not yet been dequeued it is simply removed from the queue.
- bool cancelCoprocedure(const LLUUID &id);
-
- /// Requests a shutdown of the upload manager. Passing 'true' will perform
- /// an immediate kill on the upload coroutine.
- void shutdown(bool hardShutdown = false);
-
/// Returns the number of coprocedures in the queue awaiting processing.
///
inline size_t countPending() const
{
- return mPendingCoprocs.size();
+ return mPending;
}
/// Returns the number of coprocedures actively being processed.
@@ -90,6 +87,8 @@ public:
return countPending() + countActive();
}
+ void close();
+
private:
struct QueuedCoproc
{
@@ -106,25 +105,26 @@ private:
CoProcedure_t mProc;
};
- // we use a deque here rather than std::queue since we want to be able to
- // iterate through the queue and potentially erase an entry from the middle.
- typedef std::deque<QueuedCoproc::ptr_t> CoprocQueue_t;
+ typedef LLThreadSafeQueue<QueuedCoproc::ptr_t> CoprocQueue_t;
+ // Use shared_ptr to control the lifespan of our CoprocQueue_t instance
+ // because the consuming coroutine might outlive this LLCoprocedurePool
+ // instance.
+ typedef boost::shared_ptr<CoprocQueue_t> CoprocQueuePtr;
typedef std::map<LLUUID, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> ActiveCoproc_t;
std::string mPoolName;
- size_t mPoolSize;
- CoprocQueue_t mPendingCoprocs;
+ size_t mPoolSize, mPending{0};
+ CoprocQueuePtr mPendingCoprocs;
ActiveCoproc_t mActiveCoprocs;
- bool mShutdown;
- LLEventStream mWakeupTrigger;
+ LLTempBoundListener mStatusListener;
typedef std::map<std::string, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> CoroAdapterMap_t;
LLCore::HttpRequest::policy_t mHTTPPolicy;
CoroAdapterMap_t mCoroMapping;
- void coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter);
-
+ void coprocedureInvokerCoro(CoprocQueuePtr pendingCoprocs,
+ LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter);
};
//=========================================================================
@@ -134,7 +134,7 @@ LLCoprocedureManager::LLCoprocedureManager()
LLCoprocedureManager::~LLCoprocedureManager()
{
-
+ close();
}
LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std::string &poolName)
@@ -143,33 +143,34 @@ LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std::
std::string keyName = "PoolSize" + poolName;
int size = 0;
- if (poolName.empty())
- LL_ERRS("CoprocedureManager") << "Poolname must not be empty" << LL_ENDL;
+ LL_ERRS_IF(poolName.empty(), "CoprocedureManager") << "Poolname must not be empty" << LL_ENDL;
- if (mPropertyQueryFn && !mPropertyQueryFn.empty())
+ if (mPropertyQueryFn)
{
size = mPropertyQueryFn(keyName);
}
if (size == 0)
- { // if not found grab the know default... if there is no known
+ {
+ // if not found grab the know default... if there is no known
// default use a reasonable number like 5.
- std::map<std::string, U32>::iterator it = DefaultPoolSizes.find(poolName);
- if (it == DefaultPoolSizes.end())
- size = DEFAULT_POOL_SIZE;
- else
- size = (*it).second;
+ auto it = DefaultPoolSizes.find(poolName);
+ size = (it != DefaultPoolSizes.end()) ? it->second : DEFAULT_POOL_SIZE;
- if (mPropertyDefineFn && !mPropertyDefineFn.empty())
+ if (mPropertyDefineFn)
+ {
mPropertyDefineFn(keyName, size, "Coroutine Pool size for " + poolName);
- LL_WARNS() << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL;
+ }
+
+ LL_WARNS("CoProcMgr") << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL;
}
poolPtr_t pool(new LLCoprocedurePool(poolName, size));
- mPoolMap.insert(poolMap_t::value_type(poolName, pool));
+ LL_ERRS_IF(!pool, "CoprocedureManager") << "Unable to create pool named \"" << poolName << "\" FATAL!" << LL_ENDL;
+
+ bool inserted = mPoolMap.emplace(poolName, pool).second;
+ LL_ERRS_IF(!inserted, "CoprocedureManager") << "Unable to add pool named \"" << poolName << "\" to map. FATAL!" << LL_ENDL;
- if (!pool)
- LL_ERRS("CoprocedureManager") << "Unable to create pool named \"" << poolName << "\" FATAL!" << LL_ENDL;
return pool;
}
@@ -178,40 +179,13 @@ LLUUID LLCoprocedureManager::enqueueCoprocedure(const std::string &pool, const s
{
// Attempt to find the pool and enqueue the procedure. If the pool does
// not exist, create it.
- poolPtr_t targetPool;
poolMap_t::iterator it = mPoolMap.find(pool);
- if (it == mPoolMap.end())
- {
- targetPool = initializePool(pool);
- }
- else
- {
- targetPool = (*it).second;
- }
+ poolPtr_t targetPool = (it != mPoolMap.end()) ? it->second : initializePool(pool);
return targetPool->enqueueCoprocedure(name, proc);
}
-void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id)
-{
- for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
- {
- if ((*it).second->cancelCoprocedure(id))
- return;
- }
- LL_INFOS() << "Coprocedure not found." << LL_ENDL;
-}
-
-void LLCoprocedureManager::shutdown(bool hardShutdown)
-{
- for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
- {
- (*it).second->shutdown(hardShutdown);
- }
- mPoolMap.clear();
-}
-
void LLCoprocedureManager::setPropertyMethods(SettingQuery_t queryfn, SettingUpdate_t updatefn)
{
mPropertyQueryFn = queryfn;
@@ -222,9 +196,9 @@ void LLCoprocedureManager::setPropertyMethods(SettingQuery_t queryfn, SettingUpd
size_t LLCoprocedureManager::countPending() const
{
size_t count = 0;
- for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
+ for (const auto& pair : mPoolMap)
{
- count += (*it).second->countPending();
+ count += pair.second->countPending();
}
return count;
}
@@ -235,7 +209,7 @@ size_t LLCoprocedureManager::countPending(const std::string &pool) const
if (it == mPoolMap.end())
return 0;
- return (*it).second->countPending();
+ return it->second->countPending();
}
size_t LLCoprocedureManager::countActive() const
@@ -243,7 +217,7 @@ size_t LLCoprocedureManager::countActive() const
size_t count = 0;
for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
{
- count += (*it).second->countActive();
+ count += it->second->countActive();
}
return count;
}
@@ -253,16 +227,18 @@ size_t LLCoprocedureManager::countActive(const std::string &pool) const
poolMap_t::const_iterator it = mPoolMap.find(pool);
if (it == mPoolMap.end())
+ {
return 0;
- return (*it).second->countActive();
+ }
+ return it->second->countActive();
}
size_t LLCoprocedureManager::count() const
{
size_t count = 0;
- for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
+ for (const auto& pair : mPoolMap)
{
- count += (*it).second->count();
+ count += pair.second->count();
}
return count;
}
@@ -273,59 +249,70 @@ size_t LLCoprocedureManager::count(const std::string &pool) const
if (it == mPoolMap.end())
return 0;
- return (*it).second->count();
+ return it->second->count();
+}
+
+void LLCoprocedureManager::close()
+{
+ for(auto & poolEntry : mPoolMap)
+ {
+ poolEntry.second->close();
+ }
+}
+
+void LLCoprocedureManager::close(const std::string &pool)
+{
+ poolMap_t::iterator it = mPoolMap.find(pool);
+ if (it != mPoolMap.end())
+ {
+ it->second->close();
+ }
}
//=========================================================================
LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size):
mPoolName(poolName),
mPoolSize(size),
- mPendingCoprocs(),
- mShutdown(false),
- mWakeupTrigger("CoprocedurePool" + poolName, true),
- mCoroMapping(),
- mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID)
+ mPendingCoprocs(boost::make_shared<CoprocQueue_t>(DEFAULT_QUEUE_SIZE)),
+ mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID),
+ mCoroMapping()
{
+ // store in our LLTempBoundListener so that when the LLCoprocedurePool is
+ // destroyed, we implicitly disconnect from this LLEventPump
+ mStatusListener = LLEventPumps::instance().obtain("LLApp").listen(
+ poolName,
+ [pendingCoprocs=mPendingCoprocs, poolName](const LLSD& status)
+ {
+ auto& statsd = status["status"];
+ if (statsd.asString() != "running")
+ {
+ LL_INFOS("CoProcMgr") << "Pool " << poolName
+ << " closing queue because status " << statsd
+ << LL_ENDL;
+ // This should ensure that all waiting coprocedures in this
+ // pool will wake up and terminate.
+ pendingCoprocs->close();
+ }
+ return false;
+ });
+
for (size_t count = 0; count < mPoolSize; ++count)
{
LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter(new LLCoreHttpUtil::HttpCoroutineAdapter( mPoolName + "Adapter", mHTTPPolicy));
- std::string pooledCoro = LLCoros::instance().launch("LLCoprocedurePool("+mPoolName+")::coprocedureInvokerCoro",
- boost::bind(&LLCoprocedurePool::coprocedureInvokerCoro, this, httpAdapter));
+ std::string pooledCoro = LLCoros::instance().launch(
+ "LLCoprocedurePool("+mPoolName+")::coprocedureInvokerCoro",
+ boost::bind(&LLCoprocedurePool::coprocedureInvokerCoro, this,
+ mPendingCoprocs, httpAdapter));
mCoroMapping.insert(CoroAdapterMap_t::value_type(pooledCoro, httpAdapter));
}
- LL_INFOS() << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL;
-
- mWakeupTrigger.post(LLSD());
+ LL_INFOS("CoProcMgr") << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items, queue max " << DEFAULT_QUEUE_SIZE << LL_ENDL;
}
LLCoprocedurePool::~LLCoprocedurePool()
{
- shutdown();
-}
-
-//-------------------------------------------------------------------------
-void LLCoprocedurePool::shutdown(bool hardShutdown)
-{
- CoroAdapterMap_t::iterator it;
-
- for (it = mCoroMapping.begin(); it != mCoroMapping.end(); ++it)
- {
- if (hardShutdown)
- {
- LLCoros::instance().kill((*it).first);
- }
- if ((*it).second)
- {
- (*it).second->cancelSuspendedOperation();
- }
- }
-
- mShutdown = true;
- mCoroMapping.clear();
- mPendingCoprocs.clear();
}
//-------------------------------------------------------------------------
@@ -333,76 +320,66 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced
{
LLUUID id(LLUUID::generateNewID());
- mPendingCoprocs.push_back(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc)));
- LL_INFOS() << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
-
- mWakeupTrigger.post(LLSD());
+ LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueuing with id=" << id.asString() << " in pool \"" << mPoolName << "\" at " << mPending << LL_ENDL;
+ auto pushed = mPendingCoprocs->tryPushFront(boost::make_shared<QueuedCoproc>(name, id, proc));
+ // We don't really have a lot of good options if tryPushFront() failed,
+ // perhaps because the consuming coroutines are gummed up or something. This
+ // method is probably called from code called by mainloop. If we toss an
+ // llcoro::suspend() call here, we'll circle back for another mainloop
+ // iteration, possibly resulting in being re-entered here. Let's avoid that.
+ LL_ERRS_IF(! pushed, "CoProcMgr") << "Enqueue failed" << LL_ENDL;
+ ++mPending;
return id;
}
-bool LLCoprocedurePool::cancelCoprocedure(const LLUUID &id)
+//-------------------------------------------------------------------------
+void LLCoprocedurePool::coprocedureInvokerCoro(
+ CoprocQueuePtr pendingCoprocs,
+ LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter)
{
- // first check the active coroutines. If there, remove it and return.
- ActiveCoproc_t::iterator itActive = mActiveCoprocs.find(id);
- if (itActive != mActiveCoprocs.end())
- {
- LL_INFOS() << "Found and canceling active coprocedure with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
- (*itActive).second->cancelSuspendedOperation();
- mActiveCoprocs.erase(itActive);
- return true;
- }
-
- for (CoprocQueue_t::iterator it = mPendingCoprocs.begin(); it != mPendingCoprocs.end(); ++it)
+ QueuedCoproc::ptr_t coproc;
+ for (;;)
{
- if ((*it)->mId == id)
+ try
{
- LL_INFOS() << "Found and removing queued coroutine(" << (*it)->mName << ") with Id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
- mPendingCoprocs.erase(it);
- return true;
+ LLCoros::TempStatus st("waiting for work");
+ coproc = pendingCoprocs->popBack();
}
- }
-
- LL_INFOS() << "Coprocedure with Id=" << id.asString() << " was not found in pool \"" << mPoolName << "\"" << LL_ENDL;
- return false;
-}
-
-//-------------------------------------------------------------------------
-void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter)
-{
- LLCore::HttpRequest::ptr_t httpRequest(new LLCore::HttpRequest);
-
- while (!mShutdown)
- {
- llcoro::suspendUntilEventOn(mWakeupTrigger);
- if (mShutdown)
- break;
-
- while (!mPendingCoprocs.empty())
+ catch (const LLThreadSafeQueueError&)
{
- QueuedCoproc::ptr_t coproc = mPendingCoprocs.front();
- mPendingCoprocs.pop_front();
- ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first;
+ // queue is closed
+ break;
+ }
- LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
+ // we actually popped an item
+ --mPending;
- try
- {
- coproc->mProc(httpAdapter, coproc->mId);
- }
- catch (...)
- {
- LOG_UNHANDLED_EXCEPTION(STRINGIZE("Coprocedure('" << coproc->mName
- << "', id=" << coproc->mId.asString()
- << ") in pool '" << mPoolName << "'"));
- // must NOT omit this or we deplete the pool
- mActiveCoprocs.erase(itActive);
- throw;
- }
+ ActiveCoproc_t::iterator itActive = mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter)).first;
- LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL;
+ LL_DEBUGS("CoProcMgr") << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\" (" << mPending << " left)" << LL_ENDL;
+ try
+ {
+ coproc->mProc(httpAdapter, coproc->mId);
+ }
+ catch (...)
+ {
+ LOG_UNHANDLED_EXCEPTION(STRINGIZE("Coprocedure('" << coproc->mName
+ << "', id=" << coproc->mId.asString()
+ << ") in pool '" << mPoolName << "'"));
+ // must NOT omit this or we deplete the pool
mActiveCoprocs.erase(itActive);
+ continue;
}
+
+ LL_DEBUGS("CoProcMgr") << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL;
+
+ mActiveCoprocs.erase(itActive);
}
}
+
+void LLCoprocedurePool::close()
+{
+ mPendingCoprocs->close();
+}
diff --git a/indra/llmessage/llcoproceduremanager.h b/indra/llmessage/llcoproceduremanager.h
index 7d0e83180c..70204ba02b 100644
--- a/indra/llmessage/llcoproceduremanager.h
+++ b/indra/llmessage/llcoproceduremanager.h
@@ -32,6 +32,7 @@
#include "llcoros.h"
#include "llcorehttputil.h"
#include "lluuid.h"
+#include <boost/smart_ptr/shared_ptr.hpp>
class LLCoprocedurePool;
@@ -57,11 +58,7 @@ public:
/// Cancel a coprocedure. If the coprocedure is already being actively executed
/// this method calls cancelYieldingOperation() on the associated HttpAdapter
/// If it has not yet been dequeued it is simply removed from the queue.
- void cancelCoprocedure(const LLUUID &id);
-
- /// Requests a shutdown of the upload manager. Passing 'true' will perform
- /// an immediate kill on the upload coroutine.
- void shutdown(bool hardShutdown = false);
+ //void cancelCoprocedure(const LLUUID &id);
void setPropertyMethods(SettingQuery_t queryfn, SettingUpdate_t updatefn);
@@ -80,6 +77,9 @@ public:
size_t count() const;
size_t count(const std::string &pool) const;
+ void close();
+ void close(const std::string &pool);
+
private:
typedef boost::shared_ptr<LLCoprocedurePool> poolPtr_t;
diff --git a/indra/llmessage/llexperiencecache.cpp b/indra/llmessage/llexperiencecache.cpp
index aa7b3c1260..7d96ac4b02 100644
--- a/indra/llmessage/llexperiencecache.cpp
+++ b/indra/llmessage/llexperiencecache.cpp
@@ -338,10 +338,10 @@ void LLExperienceCache::requestExperiences()
F64 now = LLFrameTimer::getTotalSeconds();
const U32 EXP_URL_SEND_THRESHOLD = 3000;
- const U32 PAGE_SIZE = EXP_URL_SEND_THRESHOLD / UUID_STR_LENGTH;
+ const U32 PAGE_SIZE1 = EXP_URL_SEND_THRESHOLD / UUID_STR_LENGTH;
std::ostringstream ostr;
- ostr << urlBase << "?page_size=" << PAGE_SIZE;
+ ostr << urlBase << "?page_size=" << PAGE_SIZE1;
RequestQueue_t requests;
while (!mRequestQueue.empty())
@@ -360,7 +360,7 @@ void LLExperienceCache::requestExperiences()
boost::bind(&LLExperienceCache::requestExperiencesCoro, this, _1, ostr.str(), requests) );
ostr.str(std::string());
- ostr << urlBase << "?page_size=" << PAGE_SIZE;
+ ostr << urlBase << "?page_size=" << PAGE_SIZE1;
requests.clear();
}
}
diff --git a/indra/llmessage/llproxy.cpp b/indra/llmessage/llproxy.cpp
index 950599217f..86bcfe6881 100644
--- a/indra/llmessage/llproxy.cpp
+++ b/indra/llmessage/llproxy.cpp
@@ -115,9 +115,9 @@ S32 LLProxy::proxyHandshake(LLHost proxy)
U32 request_size = socks_username.size() + socks_password.size() + 3;
char * password_auth = new char[request_size];
password_auth[0] = 0x01;
- password_auth[1] = socks_username.size();
+ password_auth[1] = (char)(socks_username.size());
memcpy(&password_auth[2], socks_username.c_str(), socks_username.size());
- password_auth[socks_username.size() + 2] = socks_password.size();
+ password_auth[socks_username.size() + 2] = (char)(socks_password.size());
memcpy(&password_auth[socks_username.size() + 3], socks_password.c_str(), socks_password.size());
authmethod_password_reply_t password_reply;
diff --git a/indra/llmessage/llproxy.h b/indra/llmessage/llproxy.h
index 87891901ad..a1ffa9e5d5 100644
--- a/indra/llmessage/llproxy.h
+++ b/indra/llmessage/llproxy.h
@@ -32,6 +32,7 @@
#include "llmemory.h"
#include "llsingleton.h"
#include "llthread.h"
+#include "llmutex.h"
#include <curl/curl.h>
#include <string>
diff --git a/indra/llmessage/tests/llcoproceduremanager_test.cpp b/indra/llmessage/tests/llcoproceduremanager_test.cpp
new file mode 100644
index 0000000000..9db13a37b5
--- /dev/null
+++ b/indra/llmessage/tests/llcoproceduremanager_test.cpp
@@ -0,0 +1,178 @@
+/**
+ * @file llcoproceduremanager_test.cpp
+ * @author Brad
+ * @date 2019-02
+ * @brief LLCoprocedureManager unit test
+ *
+ * $LicenseInfo:firstyear=2019&license=viewerlgpl$
+ * Second Life Viewer Source Code
+ * Copyright (C) 2010, Linden Research, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation;
+ * version 2.1 of the License only.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Linden Research, Inc., 945 Battery Street, San Francisco, CA 94111 USA
+ * $/LicenseInfo$
+ */
+
+#include "llwin32headers.h"
+
+#include "linden_common.h"
+#include "llsdserialize.h"
+
+#include "../llcoproceduremanager.h"
+
+#include <functional>
+
+#include <boost/fiber/fiber.hpp>
+#include <boost/fiber/buffered_channel.hpp>
+#include <boost/fiber/unbuffered_channel.hpp>
+
+#include "../test/lltut.h"
+#include "../test/sync.h"
+
+
+#if LL_WINDOWS
+// disable unreachable code warnings
+#pragma warning(disable: 4702)
+#endif
+
+LLCoreHttpUtil::HttpCoroutineAdapter::HttpCoroutineAdapter(std::string const&, unsigned int, unsigned int)
+{
+}
+
+void LLCoreHttpUtil::HttpCoroutineAdapter::cancelSuspendedOperation()
+{
+}
+
+LLCoreHttpUtil::HttpCoroutineAdapter::~HttpCoroutineAdapter()
+{
+}
+
+LLCore::HttpRequest::HttpRequest()
+{
+}
+
+LLCore::HttpRequest::~HttpRequest()
+{
+}
+
+namespace tut
+{
+ struct coproceduremanager_test
+ {
+ coproceduremanager_test()
+ {
+ }
+
+ ~coproceduremanager_test()
+ {
+ LLCoprocedureManager::instance().close();
+ }
+ };
+ typedef test_group<coproceduremanager_test> coproceduremanager_t;
+ typedef coproceduremanager_t::object coproceduremanager_object_t;
+ tut::coproceduremanager_t tut_coproceduremanager("LLCoprocedureManager");
+
+
+ template<> template<>
+ void coproceduremanager_object_t::test<1>()
+ {
+ Sync sync;
+ int foo = 0;
+ LLUUID queueId = LLCoprocedureManager::instance().enqueueCoprocedure("PoolName", "ProcName",
+ [&foo, &sync] (LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t & ptr, const LLUUID & id) {
+ sync.bump();
+ foo = 1;
+ });
+
+ sync.yield();
+ ensure_equals("coprocedure failed to update foo", foo, 1);
+
+ LLCoprocedureManager::instance().close("PoolName");
+ }
+
+ template<> template<>
+ void coproceduremanager_object_t::test<2>()
+ {
+ const size_t capacity = 2;
+ boost::fibers::buffered_channel<std::function<void(void)>> chan(capacity);
+
+ boost::fibers::fiber worker([&chan]() {
+ chan.value_pop()();
+ });
+
+ chan.push([]() {
+ LL_INFOS("Test") << "test 1" << LL_ENDL;
+ });
+
+ worker.join();
+ }
+
+ template<> template<>
+ void coproceduremanager_object_t::test<3>()
+ {
+ boost::fibers::unbuffered_channel<std::function<void(void)>> chan;
+
+ boost::fibers::fiber worker([&chan]() {
+ chan.value_pop()();
+ });
+
+ chan.push([]() {
+ LL_INFOS("Test") << "test 1" << LL_ENDL;
+ });
+
+ worker.join();
+ }
+
+ template<> template<>
+ void coproceduremanager_object_t::test<4>()
+ {
+ boost::fibers::buffered_channel<std::function<void(void)>> chan(4);
+
+ boost::fibers::fiber worker([&chan]() {
+ std::function<void(void)> f;
+
+ // using namespace std::chrono_literals;
+ // const auto timeout = 5s;
+ // boost::fibers::channel_op_status status;
+ while (chan.pop(f) != boost::fibers::channel_op_status::closed)
+ {
+ LL_INFOS("CoWorker") << "got coproc" << LL_ENDL;
+ f();
+ }
+ LL_INFOS("CoWorker") << "got closed" << LL_ENDL;
+ });
+
+ int counter = 0;
+
+ for (int i = 0; i < 5; ++i)
+ {
+ LL_INFOS("CoMain") << "pushing coproc " << i << LL_ENDL;
+ chan.push([&counter]() {
+ LL_INFOS("CoProc") << "in coproc" << LL_ENDL;
+ ++counter;
+ });
+ }
+
+ LL_INFOS("CoMain") << "closing channel" << LL_ENDL;
+ chan.close();
+
+ LL_INFOS("CoMain") << "joining worker" << LL_ENDL;
+ worker.join();
+
+ LL_INFOS("CoMain") << "checking count" << LL_ENDL;
+ ensure_equals("coprocedure failed to update counter", counter, 5);
+ }
+} // namespace tut