summaryrefslogtreecommitdiff
path: root/indra/llcommon/llqueuedthread.cpp
diff options
context:
space:
mode:
authorNat Goodspeed <nat@lindenlab.com>2024-04-25 09:13:23 -0400
committerNat Goodspeed <nat@lindenlab.com>2024-04-25 09:13:23 -0400
commitf162693a23fe5cfda8dab3857718624033812d30 (patch)
tree0768f9ea570b248b48e4caa33103e3d55c625466 /indra/llcommon/llqueuedthread.cpp
parentd8931c9269a90cd01f6f6ff4de83b8fb41df11d3 (diff)
parentd98fc504a1d4bc292ba86acdda053c8b4598a193 (diff)
Merge Maint YZ branch 'main' into DRTVWR-588-cleanup-timers
Diffstat (limited to 'indra/llcommon/llqueuedthread.cpp')
-rw-r--r--indra/llcommon/llqueuedthread.cpp303
1 files changed, 162 insertions, 141 deletions
diff --git a/indra/llcommon/llqueuedthread.cpp b/indra/llcommon/llqueuedthread.cpp
index b06fee0ec2..394212ee0d 100644
--- a/indra/llcommon/llqueuedthread.cpp
+++ b/indra/llcommon/llqueuedthread.cpp
@@ -26,20 +26,26 @@
#include "linden_common.h"
#include "llqueuedthread.h"
+#include <chrono>
+
#include "llstl.h"
#include "lltimer.h" // ms_sleep()
-#include "lltracethreadrecorder.h"
+#include "llmutex.h"
//============================================================================
// MAIN THREAD
LLQueuedThread::LLQueuedThread(const std::string& name, bool threaded, bool should_pause) :
- LLThread(name),
- mThreaded(threaded),
- mIdleThread(TRUE),
- mNextHandle(0),
- mStarted(FALSE)
+ LLThread(name),
+ mIdleThread(TRUE),
+ mNextHandle(0),
+ mStarted(FALSE),
+ mThreaded(threaded),
+ mRequestQueue(name, 1024 * 1024)
{
+ llassert(threaded); // not threaded implementation is deprecated
+ mMainQueue = LL::WorkQueue::getInstance("mainloop");
+
if (mThreaded)
{
if(should_pause)
@@ -69,6 +75,11 @@ void LLQueuedThread::shutdown()
unpause(); // MAIN THREAD
if (mThreaded)
{
+ if (mRequestQueue.size() == 0)
+ {
+ mRequestQueue.close();
+ }
+
S32 timeout = 100;
for ( ; timeout>0; timeout--)
{
@@ -104,6 +115,8 @@ void LLQueuedThread::shutdown()
{
LL_WARNS() << "~LLQueuedThread() called with active requests: " << active_count << LL_ENDL;
}
+
+ mRequestQueue.close();
}
//----------------------------------------------------------------------------
@@ -112,6 +125,7 @@ void LLQueuedThread::shutdown()
// virtual
size_t LLQueuedThread::update(F32 max_time_ms)
{
+ LL_PROFILE_ZONE_SCOPED;
if (!mStarted)
{
if (!mThreaded)
@@ -125,29 +139,34 @@ size_t LLQueuedThread::update(F32 max_time_ms)
size_t LLQueuedThread::updateQueue(F32 max_time_ms)
{
- F64 max_time = (F64)max_time_ms * .001;
- LLTimer timer;
- size_t pending = 1;
-
+ LL_PROFILE_ZONE_SCOPED;
// Frame Update
if (mThreaded)
{
- pending = getPending();
- if(pending > 0)
+ // schedule a call to threadedUpdate for every call to updateQueue
+ if (!isQuitting())
+ {
+ mRequestQueue.post([=]()
+ {
+ LL_PROFILE_ZONE_NAMED_CATEGORY_THREAD("qt - update");
+ mIdleThread = FALSE;
+ threadedUpdate();
+ mIdleThread = TRUE;
+ }
+ );
+ }
+
+ if(getPending() > 0)
{
- unpause();
- }
+ unpause();
+ }
}
else
{
- while (pending > 0)
- {
- pending = processNextRequest();
- if (max_time && timer.getElapsedTimeF64() > max_time)
- break;
- }
+ mRequestQueue.runFor(std::chrono::microseconds((int) (max_time_ms*1000.f)));
+ threadedUpdate();
}
- return pending;
+ return getPending();
}
void LLQueuedThread::incQueue()
@@ -166,11 +185,7 @@ void LLQueuedThread::incQueue()
// May be called from any thread
size_t LLQueuedThread::getPending()
{
- size_t res;
- lockData();
- res = mRequestQueue.size();
- unlockData();
- return res;
+ return mRequestQueue.size();
}
// MAIN thread
@@ -195,35 +210,28 @@ void LLQueuedThread::waitOnPending()
// MAIN thread
void LLQueuedThread::printQueueStats()
{
- lockData();
- if (!mRequestQueue.empty())
+ U32 size = mRequestQueue.size();
+ if (size > 0)
{
- QueuedRequest *req = *mRequestQueue.begin();
- LL_INFOS() << llformat("Pending Requests:%d Current status:%d", mRequestQueue.size(), req->getStatus()) << LL_ENDL;
+ LL_INFOS() << llformat("Pending Requests:%d ", mRequestQueue.size()) << LL_ENDL;
}
else
{
LL_INFOS() << "Queued Thread Idle" << LL_ENDL;
}
- unlockData();
}
// MAIN thread
LLQueuedThread::handle_t LLQueuedThread::generateHandle()
{
- lockData();
- while ((mNextHandle == nullHandle()) || (mRequestHash.find(mNextHandle)))
- {
- mNextHandle++;
- }
- const LLQueuedThread::handle_t res = mNextHandle++;
- unlockData();
+ U32 res = ++mNextHandle;
return res;
}
// MAIN thread
bool LLQueuedThread::addRequest(QueuedRequest* req)
{
+ LL_PROFILE_ZONE_SCOPED;
if (mStatus == QUITTING)
{
return false;
@@ -231,14 +239,14 @@ bool LLQueuedThread::addRequest(QueuedRequest* req)
lockData();
req->setStatus(STATUS_QUEUED);
- mRequestQueue.insert(req);
- mRequestHash.insert(req);
+ mRequestHash.insert(req);
#if _DEBUG
// LL_INFOS() << llformat("LLQueuedThread::Added req [%08d]",handle) << LL_ENDL;
#endif
unlockData();
- incQueue();
+ llassert(!mDataLock->isSelfLocked());
+ mRequestQueue.post([this, req]() { processRequest(req); });
return true;
}
@@ -246,6 +254,7 @@ bool LLQueuedThread::addRequest(QueuedRequest* req)
// MAIN thread
bool LLQueuedThread::waitForResult(LLQueuedThread::handle_t handle, bool auto_complete)
{
+ LL_PROFILE_ZONE_SCOPED;
llassert (handle != nullHandle());
bool res = false;
bool waspaused = isPaused();
@@ -312,6 +321,7 @@ LLQueuedThread::status_t LLQueuedThread::getRequestStatus(handle_t handle)
void LLQueuedThread::abortRequest(handle_t handle, bool autocomplete)
{
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
lockData();
QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
if (req)
@@ -333,30 +343,9 @@ void LLQueuedThread::setFlags(handle_t handle, U32 flags)
unlockData();
}
-void LLQueuedThread::setPriority(handle_t handle, U32 priority)
-{
- lockData();
- QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
- if (req)
- {
- if(req->getStatus() == STATUS_INPROGRESS)
- {
- // not in list
- req->setPriority(priority);
- }
- else if(req->getStatus() == STATUS_QUEUED)
- {
- // remove from list then re-insert
- llverify(mRequestQueue.erase(req) == 1);
- req->setPriority(priority);
- mRequestQueue.insert(req);
- }
- }
- unlockData();
-}
-
bool LLQueuedThread::completeRequest(handle_t handle)
{
+ LL_PROFILE_ZONE_SCOPED;
bool res = false;
lockData();
QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
@@ -399,88 +388,120 @@ bool LLQueuedThread::check()
//============================================================================
// Runs on its OWN thread
-size_t LLQueuedThread::processNextRequest()
+void LLQueuedThread::processRequest(LLQueuedThread::QueuedRequest* req)
{
- QueuedRequest *req;
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+
+ mIdleThread = FALSE;
+ //threadedUpdate();
+
// Get next request from pool
lockData();
- while(1)
+ if ((req->getFlags() & FLAG_ABORT) || (mStatus == QUITTING))
{
- req = NULL;
- if (mRequestQueue.empty())
+ LL_PROFILE_ZONE_NAMED_CATEGORY_THREAD("qtpr - abort");
+ req->setStatus(STATUS_ABORTED);
+ req->finishRequest(false);
+ if (req->getFlags() & FLAG_AUTO_COMPLETE)
{
- break;
- }
- req = *mRequestQueue.begin();
- mRequestQueue.erase(mRequestQueue.begin());
- if ((req->getFlags() & FLAG_ABORT) || (mStatus == QUITTING))
- {
- req->setStatus(STATUS_ABORTED);
- req->finishRequest(false);
- if (req->getFlags() & FLAG_AUTO_COMPLETE)
- {
- mRequestHash.erase(req);
- req->deleteRequest();
+ mRequestHash.erase(req);
+ req->deleteRequest();
// check();
- }
- continue;
}
- llassert_always(req->getStatus() == STATUS_QUEUED);
- break;
- }
- U32 start_priority = 0 ;
- if (req)
- {
- req->setStatus(STATUS_INPROGRESS);
- start_priority = req->getPriority();
- }
- unlockData();
-
- // This is the only place we will call req->setStatus() after
- // it has initially been seet to STATUS_QUEUED, so it is
- // safe to access req.
- if (req)
- {
- // process request
- bool complete = req->processRequest();
-
- if (complete)
- {
- lockData();
- req->setStatus(STATUS_COMPLETE);
- req->finishRequest(true);
- if (req->getFlags() & FLAG_AUTO_COMPLETE)
- {
- mRequestHash.erase(req);
- req->deleteRequest();
-// check();
- }
- unlockData();
- }
- else
- {
- lockData();
- req->setStatus(STATUS_QUEUED);
- mRequestQueue.insert(req);
- unlockData();
- if (mThreaded && start_priority < PRIORITY_NORMAL)
- {
- ms_sleep(1); // sleep the thread a little
- }
- }
-
- LLTrace::get_thread_recorder()->pushToParent();
+ unlockData();
}
+ else
+ {
+ llassert_always(req->getStatus() == STATUS_QUEUED);
+
+ if (req)
+ {
+ req->setStatus(STATUS_INPROGRESS);
+ }
+ unlockData();
+
+ // This is the only place we will call req->setStatus() after
+ // it has initially been seet to STATUS_QUEUED, so it is
+ // safe to access req.
+ if (req)
+ {
+ // process request
+ bool complete = req->processRequest();
+
+ if (complete)
+ {
+ LL_PROFILE_ZONE_NAMED_CATEGORY_THREAD("qtpr - complete");
+ lockData();
+ req->setStatus(STATUS_COMPLETE);
+ req->finishRequest(true);
+ if (req->getFlags() & FLAG_AUTO_COMPLETE)
+ {
+ mRequestHash.erase(req);
+ req->deleteRequest();
+ // check();
+ }
+ unlockData();
+ }
+ else
+ {
+ LL_PROFILE_ZONE_NAMED_CATEGORY_THREAD("qtpr - retry");
+ //put back on queue and try again in 0.1ms
+ lockData();
+ req->setStatus(STATUS_QUEUED);
+
+ unlockData();
+
+ llassert(!mDataLock->isSelfLocked());
+
+#if 0
+ // try again on next frame
+ // NOTE: tried using "post" with a time in the future, but this
+ // would invariably cause this thread to wait for a long time (10+ ms)
+ // while work is pending
+ bool ret = LL::WorkQueue::postMaybe(
+ mMainQueue,
+ [=]()
+ {
+ LL_PROFILE_ZONE_NAMED("processRequest - retry");
+ mRequestQueue.post([=]()
+ {
+ LL_PROFILE_ZONE_NAMED("processRequest - retry"); // <-- not redundant, track retry on both queues
+ processRequest(req);
+ });
+ });
+ llassert(ret);
+#else
+ using namespace std::chrono_literals;
+ auto retry_time = LL::WorkQueue::TimePoint::clock::now() + 16ms;
+ mRequestQueue.post([=]
+ {
+ LL_PROFILE_ZONE_NAMED("processRequest - retry");
+ if (LL::WorkQueue::TimePoint::clock::now() < retry_time)
+ {
+ auto sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(retry_time - LL::WorkQueue::TimePoint::clock::now());
+
+ if (sleep_time.count() > 0)
+ {
+ ms_sleep(sleep_time.count());
+ }
+ }
+ processRequest(req);
+ });
+#endif
+
+ }
+ }
+ }
- return getPending();
+ mIdleThread = TRUE;
}
// virtual
bool LLQueuedThread::runCondition()
{
// mRunCondition must be locked here
- if (mRequestQueue.empty() && mIdleThread)
+ if (mRequestQueue.size() == 0 && mIdleThread)
return false;
else
return true;
@@ -494,18 +515,13 @@ void LLQueuedThread::run()
startThread();
mStarted = TRUE;
- while (1)
+
+ /*while (1)
{
+ LL_PROFILE_ZONE_SCOPED;
// this will block on the condition until runCondition() returns true, the thread is unpaused, or the thread leaves the RUNNING state.
checkPause();
- if (isQuitting())
- {
- LLTrace::get_thread_recorder()->pushToParent();
- endThread();
- break;
- }
-
mIdleThread = FALSE;
threadedUpdate();
@@ -514,12 +530,18 @@ void LLQueuedThread::run()
if (pending_work == 0)
{
+ //LL_PROFILE_ZONE_NAMED("LLQueuedThread - sleep");
mIdleThread = TRUE;
- ms_sleep(1);
+ //ms_sleep(1);
}
//LLThread::yield(); // thread should yield after each request
- }
+ }*/
+ mRequestQueue.runUntilClose();
+
+ endThread();
LL_INFOS() << "LLQueuedThread " << mName << " EXITING." << LL_ENDL;
+
+
}
// virtual
@@ -539,10 +561,9 @@ void LLQueuedThread::threadedUpdate()
//============================================================================
-LLQueuedThread::QueuedRequest::QueuedRequest(LLQueuedThread::handle_t handle, U32 priority, U32 flags) :
+LLQueuedThread::QueuedRequest::QueuedRequest(LLQueuedThread::handle_t handle, U32 flags) :
LLSimpleHashEntry<LLQueuedThread::handle_t>(handle),
mStatus(STATUS_UNKNOWN),
- mPriority(priority),
mFlags(flags)
{
}