summaryrefslogtreecommitdiff
path: root/indra/llmessage
diff options
context:
space:
mode:
authorNat Goodspeed <nat@lindenlab.com>2020-05-19 11:32:24 -0400
committerNat Goodspeed <nat@lindenlab.com>2020-05-19 11:32:24 -0400
commit9d428662f88324b1d48ce89cca17c19e0f72f535 (patch)
treefc4b74cb62dcc7833327b7cd931199c09be92bba /indra/llmessage
parentdbfbe5419c51679741f8de7a03c91d61bb478e0c (diff)
DRTVWR-476: Revert "Use LLThreadSafeQueue, not boost::fibers::buffered_channel."
This reverts commit bf8aea5059f127dcce2fdf613d62c253bb3fa8fd. Try boost::fibers::buffered_channel again with Boost 1.72.
Diffstat (limited to 'indra/llmessage')
-rw-r--r--indra/llmessage/llcoproceduremanager.cpp29
1 files changed, 19 insertions, 10 deletions
diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp
index d252c0e4b0..456448137d 100644
--- a/indra/llmessage/llcoproceduremanager.cpp
+++ b/indra/llmessage/llcoproceduremanager.cpp
@@ -33,7 +33,7 @@
#include <chrono>
-#include "llthreadsafequeue.h"
+#include <boost/fiber/buffered_channel.hpp>
#include "llexception.h"
#include "stringize.h"
@@ -105,7 +105,10 @@ private:
CoProcedure_t mProc;
};
- typedef LLThreadSafeQueue<QueuedCoproc::ptr_t> CoprocQueue_t;
+ // we use a buffered_channel here rather than unbuffered_channel since we want to be able to
+ // push values without blocking,even if there's currently no one calling a pop operation (due to
+ // fiber running right now)
+ typedef boost::fibers::buffered_channel<QueuedCoproc::ptr_t> CoprocQueue_t;
// Use shared_ptr to control the lifespan of our CoprocQueue_t instance
// because the consuming coroutine might outlive this LLCoprocedurePool
// instance.
@@ -321,13 +324,14 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced
LLUUID id(LLUUID::generateNewID());
LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueuing with id=" << id.asString() << " in pool \"" << mPoolName << "\" at " << mPending << LL_ENDL;
- auto pushed = mPendingCoprocs->tryPushFront(boost::make_shared<QueuedCoproc>(name, id, proc));
- // We don't really have a lot of good options if tryPushFront() failed,
- // perhaps because the consuming coroutines are gummed up or something. This
+ auto pushed = mPendingCoprocs->try_push(boost::make_shared<QueuedCoproc>(name, id, proc));
+ // We don't really have a lot of good options if try_push() failed,
+ // perhaps because the consuming coroutine is gummed up or something. This
// method is probably called from code called by mainloop. If we toss an
// llcoro::suspend() call here, we'll circle back for another mainloop
// iteration, possibly resulting in being re-entered here. Let's avoid that.
- LL_ERRS_IF(! pushed, "CoProcMgr") << "Enqueue failed" << LL_ENDL;
+ LL_ERRS_IF(pushed != boost::fibers::channel_op_status::success, "CoProcMgr")
+ << "Enqueue failed because queue is " << int(pushed) << LL_ENDL;
++mPending;
return id;
@@ -339,19 +343,24 @@ void LLCoprocedurePool::coprocedureInvokerCoro(
LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter)
{
QueuedCoproc::ptr_t coproc;
+ boost::fibers::channel_op_status status;
for (;;)
{
try
{
- LLCoros::TempStatus st("waiting for work");
- coproc = pendingCoprocs->popBack();
+ LLCoros::TempStatus st("waiting for work for 10s");
+ status = pendingCoprocs->pop_wait_for(coproc, std::chrono::seconds(10));
}
- catch (const LLThreadSafeQueueError&)
+ if (status == boost::fibers::channel_op_status::closed)
{
- // queue is closed
break;
}
+ if(status == boost::fibers::channel_op_status::timeout)
+ {
+ LL_INFOS_ONCE() << "pool '" << mPoolName << "' stalled." << LL_ENDL;
+ continue;
+ }
// we actually popped an item
--mPending;