diff options
Diffstat (limited to 'indra/llmessage/llcoproceduremanager.cpp')
-rw-r--r-- | indra/llmessage/llcoproceduremanager.cpp | 33 |
1 files changed, 21 insertions, 12 deletions
diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index a8f6b8aa67..0b161ad2b4 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -94,13 +94,17 @@ private: // we use a buffered_channel here rather than unbuffered_channel since we want to be able to // push values without blocking,even if there's currently no one calling a pop operation (due to - // fibber running right now) + // fiber running right now) typedef boost::fibers::buffered_channel<QueuedCoproc::ptr_t> CoprocQueue_t; + // Use shared_ptr to control the lifespan of our CoprocQueue_t instance + // because the consuming coroutine might outlive this LLCoprocedurePool + // instance. + typedef boost::shared_ptr<CoprocQueue_t> CoprocQueuePtr; typedef std::map<LLUUID, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> ActiveCoproc_t; std::string mPoolName; size_t mPoolSize; - CoprocQueue_t mPendingCoprocs; + CoprocQueuePtr mPendingCoprocs; ActiveCoproc_t mActiveCoprocs; LLTempBoundListener mStatusListener; @@ -109,7 +113,8 @@ private: CoroAdapterMap_t mCoroMapping; - void coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter); + void coprocedureInvokerCoro(CoprocQueuePtr pendingCoprocs, + LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter); }; //========================================================================= @@ -214,7 +219,7 @@ void LLCoprocedureManager::close(const std::string &pool) LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): mPoolName(poolName), mPoolSize(size), - mPendingCoprocs(DEFAULT_QUEUE_SIZE), + mPendingCoprocs(boost::make_shared<CoprocQueue_t>(DEFAULT_QUEUE_SIZE)), mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID), mCoroMapping() { @@ -222,7 +227,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): // destroyed, we implicitly disconnect from this LLEventPump mStatusListener = LLEventPumps::instance().obtain("LLApp").listen( poolName, - [this, poolName](const LLSD& status) + [pendingCoprocs=mPendingCoprocs, poolName](const LLSD& status) { auto& statsd = status["status"]; if (statsd.asString() != "running") @@ -232,7 +237,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): << LL_ENDL; // This should ensure that all waiting coprocedures in this // pool will wake up and terminate. - mPendingCoprocs.close(); + pendingCoprocs->close(); } return false; }); @@ -241,8 +246,10 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): { LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter(new LLCoreHttpUtil::HttpCoroutineAdapter( mPoolName + "Adapter", mHTTPPolicy)); - std::string pooledCoro = LLCoros::instance().launch("LLCoprocedurePool("+mPoolName+")::coprocedureInvokerCoro", - boost::bind(&LLCoprocedurePool::coprocedureInvokerCoro, this, httpAdapter)); + std::string pooledCoro = LLCoros::instance().launch( + "LLCoprocedurePool("+mPoolName+")::coprocedureInvokerCoro", + boost::bind(&LLCoprocedurePool::coprocedureInvokerCoro, this, + mPendingCoprocs, httpAdapter)); mCoroMapping.insert(CoroAdapterMap_t::value_type(pooledCoro, httpAdapter)); } @@ -259,14 +266,16 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced { LLUUID id(LLUUID::generateNewID()); - mPendingCoprocs.push(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc))); + mPendingCoprocs->push(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc))); LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; return id; } //------------------------------------------------------------------------- -void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter) +void LLCoprocedurePool::coprocedureInvokerCoro( + CoprocQueuePtr pendingCoprocs, + LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter) { QueuedCoproc::ptr_t coproc; boost::fibers::channel_op_status status; @@ -274,7 +283,7 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap { { LLCoros::TempStatus st("waiting for work for 10s"); - status = mPendingCoprocs.pop_wait_for(coproc, std::chrono::seconds(10)); + status = pendingCoprocs->pop_wait_for(coproc, std::chrono::seconds(10)); } if (status == boost::fibers::channel_op_status::closed) { @@ -313,5 +322,5 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap void LLCoprocedurePool::close() { - mPendingCoprocs.close(); + mPendingCoprocs->close(); } |