diff options
Diffstat (limited to 'indra/llcommon/llqueuedthread.cpp')
-rw-r--r-- | indra/llcommon/llqueuedthread.cpp | 303 |
1 files changed, 162 insertions, 141 deletions
diff --git a/indra/llcommon/llqueuedthread.cpp b/indra/llcommon/llqueuedthread.cpp index b06fee0ec2..394212ee0d 100644 --- a/indra/llcommon/llqueuedthread.cpp +++ b/indra/llcommon/llqueuedthread.cpp @@ -26,20 +26,26 @@ #include "linden_common.h" #include "llqueuedthread.h" +#include <chrono> + #include "llstl.h" #include "lltimer.h" // ms_sleep() -#include "lltracethreadrecorder.h" +#include "llmutex.h" //============================================================================ // MAIN THREAD LLQueuedThread::LLQueuedThread(const std::string& name, bool threaded, bool should_pause) : - LLThread(name), - mThreaded(threaded), - mIdleThread(TRUE), - mNextHandle(0), - mStarted(FALSE) + LLThread(name), + mIdleThread(TRUE), + mNextHandle(0), + mStarted(FALSE), + mThreaded(threaded), + mRequestQueue(name, 1024 * 1024) { + llassert(threaded); // not threaded implementation is deprecated + mMainQueue = LL::WorkQueue::getInstance("mainloop"); + if (mThreaded) { if(should_pause) @@ -69,6 +75,11 @@ void LLQueuedThread::shutdown() unpause(); // MAIN THREAD if (mThreaded) { + if (mRequestQueue.size() == 0) + { + mRequestQueue.close(); + } + S32 timeout = 100; for ( ; timeout>0; timeout--) { @@ -104,6 +115,8 @@ void LLQueuedThread::shutdown() { LL_WARNS() << "~LLQueuedThread() called with active requests: " << active_count << LL_ENDL; } + + mRequestQueue.close(); } //---------------------------------------------------------------------------- @@ -112,6 +125,7 @@ void LLQueuedThread::shutdown() // virtual size_t LLQueuedThread::update(F32 max_time_ms) { + LL_PROFILE_ZONE_SCOPED; if (!mStarted) { if (!mThreaded) @@ -125,29 +139,34 @@ size_t LLQueuedThread::update(F32 max_time_ms) size_t LLQueuedThread::updateQueue(F32 max_time_ms) { - F64 max_time = (F64)max_time_ms * .001; - LLTimer timer; - size_t pending = 1; - + LL_PROFILE_ZONE_SCOPED; // Frame Update if (mThreaded) { - pending = getPending(); - if(pending > 0) + // schedule a call to threadedUpdate for every call to updateQueue + if (!isQuitting()) + { + mRequestQueue.post([=]() + { + LL_PROFILE_ZONE_NAMED_CATEGORY_THREAD("qt - update"); + mIdleThread = FALSE; + threadedUpdate(); + mIdleThread = TRUE; + } + ); + } + + if(getPending() > 0) { - unpause(); - } + unpause(); + } } else { - while (pending > 0) - { - pending = processNextRequest(); - if (max_time && timer.getElapsedTimeF64() > max_time) - break; - } + mRequestQueue.runFor(std::chrono::microseconds((int) (max_time_ms*1000.f))); + threadedUpdate(); } - return pending; + return getPending(); } void LLQueuedThread::incQueue() @@ -166,11 +185,7 @@ void LLQueuedThread::incQueue() // May be called from any thread size_t LLQueuedThread::getPending() { - size_t res; - lockData(); - res = mRequestQueue.size(); - unlockData(); - return res; + return mRequestQueue.size(); } // MAIN thread @@ -195,35 +210,28 @@ void LLQueuedThread::waitOnPending() // MAIN thread void LLQueuedThread::printQueueStats() { - lockData(); - if (!mRequestQueue.empty()) + U32 size = mRequestQueue.size(); + if (size > 0) { - QueuedRequest *req = *mRequestQueue.begin(); - LL_INFOS() << llformat("Pending Requests:%d Current status:%d", mRequestQueue.size(), req->getStatus()) << LL_ENDL; + LL_INFOS() << llformat("Pending Requests:%d ", mRequestQueue.size()) << LL_ENDL; } else { LL_INFOS() << "Queued Thread Idle" << LL_ENDL; } - unlockData(); } // MAIN thread LLQueuedThread::handle_t LLQueuedThread::generateHandle() { - lockData(); - while ((mNextHandle == nullHandle()) || (mRequestHash.find(mNextHandle))) - { - mNextHandle++; - } - const LLQueuedThread::handle_t res = mNextHandle++; - unlockData(); + U32 res = ++mNextHandle; return res; } // MAIN thread bool LLQueuedThread::addRequest(QueuedRequest* req) { + LL_PROFILE_ZONE_SCOPED; if (mStatus == QUITTING) { return false; @@ -231,14 +239,14 @@ bool LLQueuedThread::addRequest(QueuedRequest* req) lockData(); req->setStatus(STATUS_QUEUED); - mRequestQueue.insert(req); - mRequestHash.insert(req); + mRequestHash.insert(req); #if _DEBUG // LL_INFOS() << llformat("LLQueuedThread::Added req [%08d]",handle) << LL_ENDL; #endif unlockData(); - incQueue(); + llassert(!mDataLock->isSelfLocked()); + mRequestQueue.post([this, req]() { processRequest(req); }); return true; } @@ -246,6 +254,7 @@ bool LLQueuedThread::addRequest(QueuedRequest* req) // MAIN thread bool LLQueuedThread::waitForResult(LLQueuedThread::handle_t handle, bool auto_complete) { + LL_PROFILE_ZONE_SCOPED; llassert (handle != nullHandle()); bool res = false; bool waspaused = isPaused(); @@ -312,6 +321,7 @@ LLQueuedThread::status_t LLQueuedThread::getRequestStatus(handle_t handle) void LLQueuedThread::abortRequest(handle_t handle, bool autocomplete) { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; lockData(); QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); if (req) @@ -333,30 +343,9 @@ void LLQueuedThread::setFlags(handle_t handle, U32 flags) unlockData(); } -void LLQueuedThread::setPriority(handle_t handle, U32 priority) -{ - lockData(); - QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); - if (req) - { - if(req->getStatus() == STATUS_INPROGRESS) - { - // not in list - req->setPriority(priority); - } - else if(req->getStatus() == STATUS_QUEUED) - { - // remove from list then re-insert - llverify(mRequestQueue.erase(req) == 1); - req->setPriority(priority); - mRequestQueue.insert(req); - } - } - unlockData(); -} - bool LLQueuedThread::completeRequest(handle_t handle) { + LL_PROFILE_ZONE_SCOPED; bool res = false; lockData(); QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); @@ -399,88 +388,120 @@ bool LLQueuedThread::check() //============================================================================ // Runs on its OWN thread -size_t LLQueuedThread::processNextRequest() +void LLQueuedThread::processRequest(LLQueuedThread::QueuedRequest* req) { - QueuedRequest *req; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + + mIdleThread = FALSE; + //threadedUpdate(); + // Get next request from pool lockData(); - while(1) + if ((req->getFlags() & FLAG_ABORT) || (mStatus == QUITTING)) { - req = NULL; - if (mRequestQueue.empty()) + LL_PROFILE_ZONE_NAMED_CATEGORY_THREAD("qtpr - abort"); + req->setStatus(STATUS_ABORTED); + req->finishRequest(false); + if (req->getFlags() & FLAG_AUTO_COMPLETE) { - break; - } - req = *mRequestQueue.begin(); - mRequestQueue.erase(mRequestQueue.begin()); - if ((req->getFlags() & FLAG_ABORT) || (mStatus == QUITTING)) - { - req->setStatus(STATUS_ABORTED); - req->finishRequest(false); - if (req->getFlags() & FLAG_AUTO_COMPLETE) - { - mRequestHash.erase(req); - req->deleteRequest(); + mRequestHash.erase(req); + req->deleteRequest(); // check(); - } - continue; } - llassert_always(req->getStatus() == STATUS_QUEUED); - break; - } - U32 start_priority = 0 ; - if (req) - { - req->setStatus(STATUS_INPROGRESS); - start_priority = req->getPriority(); - } - unlockData(); - - // This is the only place we will call req->setStatus() after - // it has initially been seet to STATUS_QUEUED, so it is - // safe to access req. - if (req) - { - // process request - bool complete = req->processRequest(); - - if (complete) - { - lockData(); - req->setStatus(STATUS_COMPLETE); - req->finishRequest(true); - if (req->getFlags() & FLAG_AUTO_COMPLETE) - { - mRequestHash.erase(req); - req->deleteRequest(); -// check(); - } - unlockData(); - } - else - { - lockData(); - req->setStatus(STATUS_QUEUED); - mRequestQueue.insert(req); - unlockData(); - if (mThreaded && start_priority < PRIORITY_NORMAL) - { - ms_sleep(1); // sleep the thread a little - } - } - - LLTrace::get_thread_recorder()->pushToParent(); + unlockData(); } + else + { + llassert_always(req->getStatus() == STATUS_QUEUED); + + if (req) + { + req->setStatus(STATUS_INPROGRESS); + } + unlockData(); + + // This is the only place we will call req->setStatus() after + // it has initially been seet to STATUS_QUEUED, so it is + // safe to access req. + if (req) + { + // process request + bool complete = req->processRequest(); + + if (complete) + { + LL_PROFILE_ZONE_NAMED_CATEGORY_THREAD("qtpr - complete"); + lockData(); + req->setStatus(STATUS_COMPLETE); + req->finishRequest(true); + if (req->getFlags() & FLAG_AUTO_COMPLETE) + { + mRequestHash.erase(req); + req->deleteRequest(); + // check(); + } + unlockData(); + } + else + { + LL_PROFILE_ZONE_NAMED_CATEGORY_THREAD("qtpr - retry"); + //put back on queue and try again in 0.1ms + lockData(); + req->setStatus(STATUS_QUEUED); + + unlockData(); + + llassert(!mDataLock->isSelfLocked()); + +#if 0 + // try again on next frame + // NOTE: tried using "post" with a time in the future, but this + // would invariably cause this thread to wait for a long time (10+ ms) + // while work is pending + bool ret = LL::WorkQueue::postMaybe( + mMainQueue, + [=]() + { + LL_PROFILE_ZONE_NAMED("processRequest - retry"); + mRequestQueue.post([=]() + { + LL_PROFILE_ZONE_NAMED("processRequest - retry"); // <-- not redundant, track retry on both queues + processRequest(req); + }); + }); + llassert(ret); +#else + using namespace std::chrono_literals; + auto retry_time = LL::WorkQueue::TimePoint::clock::now() + 16ms; + mRequestQueue.post([=] + { + LL_PROFILE_ZONE_NAMED("processRequest - retry"); + if (LL::WorkQueue::TimePoint::clock::now() < retry_time) + { + auto sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(retry_time - LL::WorkQueue::TimePoint::clock::now()); + + if (sleep_time.count() > 0) + { + ms_sleep(sleep_time.count()); + } + } + processRequest(req); + }); +#endif + + } + } + } - return getPending(); + mIdleThread = TRUE; } // virtual bool LLQueuedThread::runCondition() { // mRunCondition must be locked here - if (mRequestQueue.empty() && mIdleThread) + if (mRequestQueue.size() == 0 && mIdleThread) return false; else return true; @@ -494,18 +515,13 @@ void LLQueuedThread::run() startThread(); mStarted = TRUE; - while (1) + + /*while (1) { + LL_PROFILE_ZONE_SCOPED; // this will block on the condition until runCondition() returns true, the thread is unpaused, or the thread leaves the RUNNING state. checkPause(); - if (isQuitting()) - { - LLTrace::get_thread_recorder()->pushToParent(); - endThread(); - break; - } - mIdleThread = FALSE; threadedUpdate(); @@ -514,12 +530,18 @@ void LLQueuedThread::run() if (pending_work == 0) { + //LL_PROFILE_ZONE_NAMED("LLQueuedThread - sleep"); mIdleThread = TRUE; - ms_sleep(1); + //ms_sleep(1); } //LLThread::yield(); // thread should yield after each request - } + }*/ + mRequestQueue.runUntilClose(); + + endThread(); LL_INFOS() << "LLQueuedThread " << mName << " EXITING." << LL_ENDL; + + } // virtual @@ -539,10 +561,9 @@ void LLQueuedThread::threadedUpdate() //============================================================================ -LLQueuedThread::QueuedRequest::QueuedRequest(LLQueuedThread::handle_t handle, U32 priority, U32 flags) : +LLQueuedThread::QueuedRequest::QueuedRequest(LLQueuedThread::handle_t handle, U32 flags) : LLSimpleHashEntry<LLQueuedThread::handle_t>(handle), mStatus(STATUS_UNKNOWN), - mPriority(priority), mFlags(flags) { } |