summaryrefslogtreecommitdiff
path: root/indra/llcommon/llqueuedthread.cpp
diff options
context:
space:
mode:
authorJames Cook <james@lindenlab.com>2007-01-02 08:33:20 +0000
committerJames Cook <james@lindenlab.com>2007-01-02 08:33:20 +0000
commit420b91db29485df39fd6e724e782c449158811cb (patch)
treeb471a94563af914d3ed3edd3e856d21cb1b69945 /indra/llcommon/llqueuedthread.cpp
Print done when done.
Diffstat (limited to 'indra/llcommon/llqueuedthread.cpp')
-rw-r--r--indra/llcommon/llqueuedthread.cpp491
1 files changed, 491 insertions, 0 deletions
diff --git a/indra/llcommon/llqueuedthread.cpp b/indra/llcommon/llqueuedthread.cpp
new file mode 100644
index 0000000000..bc41927d38
--- /dev/null
+++ b/indra/llcommon/llqueuedthread.cpp
@@ -0,0 +1,491 @@
+/**
+ * @file llqueuedthread.cpp
+ *
+ * Copyright (c) 2004-$CurrentYear$, Linden Research, Inc.
+ * $License$
+ */
+
+#include "linden_common.h"
+#include "llqueuedthread.h"
+#include "llstl.h"
+
+//============================================================================
+
+// MAIN THREAD
+LLQueuedThread::LLQueuedThread(const std::string& name, bool threaded, bool runalways) :
+ LLThread(name),
+ mThreaded(threaded),
+ mRunAlways(runalways),
+ mIdleThread(TRUE),
+ mNextHandle(0)
+{
+ if (mThreaded)
+ {
+ start();
+ }
+}
+
+// MAIN THREAD
+LLQueuedThread::~LLQueuedThread()
+{
+ setQuitting();
+
+ unpause(); // MAIN THREAD
+ if (mThreaded)
+ {
+ S32 timeout = 100;
+ for ( ; timeout>0; timeout--)
+ {
+ if (isStopped())
+ {
+ break;
+ }
+ ms_sleep(100);
+ LLThread::yield();
+ }
+ if (timeout == 0)
+ {
+ llwarns << "~LLQueuedThread (" << mName << ") timed out!" << llendl;
+ }
+ }
+ else
+ {
+ mStatus = STOPPED;
+ }
+
+ QueuedRequest* req;
+ while ( (req = (QueuedRequest*)mRequestHash.pop_element()) )
+ {
+ req->deleteRequest();
+ }
+
+ // ~LLThread() will be called here
+}
+
+//----------------------------------------------------------------------------
+
+// MAIN THREAD
+void LLQueuedThread::update(U32 ms_elapsed)
+{
+ updateQueue(0);
+}
+
+void LLQueuedThread::updateQueue(S32 inc)
+{
+ // If mRunAlways == TRUE, unpause the thread whenever we put something into the queue.
+ // If mRunAlways == FALSE, we only unpause the thread when updateQueue() is called from the main loop (i.e. between rendered frames)
+
+ if (inc == 0) // Frame Update
+ {
+ if (mThreaded)
+ {
+ unpause();
+ wake(); // Wake the thread up if necessary.
+ }
+ else
+ {
+ while (processNextRequest() > 0)
+ ;
+ }
+ }
+ else
+ {
+ // Something has been added to the queue
+ if (mRunAlways)
+ {
+ if (mThreaded)
+ {
+ wake(); // Wake the thread up if necessary.
+ }
+ else
+ {
+ while(processNextRequest() > 0)
+ ;
+ }
+ }
+ }
+}
+
+//virtual
+// May be called from any thread
+S32 LLQueuedThread::getPending(bool child_thread)
+{
+ S32 res;
+ lockData();
+ res = mRequestQueue.size();
+ unlockData();
+ return res;
+}
+
+// MAIN thread
+void LLQueuedThread::waitOnPending()
+{
+ while(1)
+ {
+ updateQueue(0);
+
+ if (mIdleThread)
+ {
+ break;
+ }
+ if (mThreaded)
+ {
+ yield();
+ }
+ }
+ return;
+}
+
+// MAIN thread
+void LLQueuedThread::printQueueStats()
+{
+ lockData();
+ if (!mRequestQueue.empty())
+ {
+ QueuedRequest *req = *mRequestQueue.begin();
+ llinfos << llformat("Pending Requests:%d Current status:%d", mRequestQueue.size(), req->getStatus()) << llendl;
+ }
+ else
+ {
+ llinfos << "Queued Thread Idle" << llendl;
+ }
+ unlockData();
+}
+
+// MAIN thread
+LLQueuedThread::handle_t LLQueuedThread::generateHandle()
+{
+ lockData();
+ while ((mNextHandle == nullHandle()) || (mRequestHash.find(mNextHandle)))
+ {
+ mNextHandle++;
+ }
+ unlockData();
+ return mNextHandle++;
+}
+
+// MAIN thread
+bool LLQueuedThread::addRequest(QueuedRequest* req)
+{
+ if (mStatus == QUITTING)
+ {
+ return false;
+ }
+
+ lockData();
+ req->setStatus(STATUS_QUEUED);
+ mRequestQueue.insert(req);
+ mRequestHash.insert(req);
+#if _DEBUG
+// llinfos << llformat("LLQueuedThread::Added req [%08d]",handle) << llendl;
+#endif
+ unlockData();
+
+ updateQueue(1);
+
+ return true;
+}
+
+// MAIN thread
+bool LLQueuedThread::waitForResult(LLQueuedThread::handle_t handle, bool auto_complete)
+{
+ llassert (handle != nullHandle())
+ bool res = false;
+ bool waspaused = isPaused();
+ bool done = false;
+ while(!done)
+ {
+ updateQueue(0); // unpauses
+ lockData();
+ QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
+ if (!req)
+ {
+ done = true; // request does not exist
+ }
+ else if (req->getStatus() == STATUS_COMPLETE)
+ {
+ res = true;
+ if (auto_complete)
+ {
+ mRequestHash.erase(handle);
+ req->deleteRequest();
+// check();
+ }
+ done = true;
+ }
+ unlockData();
+
+ if (!done && mThreaded)
+ {
+ yield();
+ }
+ }
+ if (waspaused)
+ {
+ pause();
+ }
+ return res;
+}
+
+// MAIN thread
+LLQueuedThread::QueuedRequest* LLQueuedThread::getRequest(handle_t handle)
+{
+ if (handle == nullHandle())
+ {
+ return 0;
+ }
+ lockData();
+ QueuedRequest* res = (QueuedRequest*)mRequestHash.find(handle);
+ unlockData();
+ return res;
+}
+
+LLQueuedThread::status_t LLQueuedThread::getRequestStatus(handle_t handle)
+{
+ status_t res = STATUS_EXPIRED;
+ lockData();
+ QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
+ if (req)
+ {
+ res = req->getStatus();
+ }
+ unlockData();
+ return res;
+}
+
+LLQueuedThread::status_t LLQueuedThread::abortRequest(handle_t handle, U32 flags)
+{
+ status_t res = STATUS_EXPIRED;
+ lockData();
+ QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
+ if (req)
+ {
+ res = req->abortRequest(flags);
+ if ((flags & AUTO_COMPLETE) && (res == STATUS_COMPLETE))
+ {
+ mRequestHash.erase(handle);
+ req->deleteRequest();
+// check();
+ }
+#if _DEBUG
+// llinfos << llformat("LLQueuedThread::Aborted req [%08d]",handle) << llendl;
+#endif
+ }
+ unlockData();
+ return res;
+}
+
+// MAIN thread
+LLQueuedThread::status_t LLQueuedThread::setFlags(handle_t handle, U32 flags)
+{
+ status_t res = STATUS_EXPIRED;
+ lockData();
+ QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
+ if (req)
+ {
+ res = req->setFlags(flags);
+ }
+ unlockData();
+ return res;
+}
+
+void LLQueuedThread::setPriority(handle_t handle, U32 priority)
+{
+ lockData();
+ QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
+ if (req && (req->getStatus() == STATUS_QUEUED))
+ {
+ llverify(mRequestQueue.erase(req) == 1);
+ req->setPriority(priority);
+ mRequestQueue.insert(req);
+ }
+ unlockData();
+}
+
+bool LLQueuedThread::completeRequest(handle_t handle)
+{
+ bool res = false;
+ lockData();
+ QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
+ if (req)
+ {
+ llassert(req->getStatus() != STATUS_QUEUED && req->getStatus() != STATUS_ABORT);
+#if _DEBUG
+// llinfos << llformat("LLQueuedThread::Completed req [%08d]",handle) << llendl;
+#endif
+ mRequestHash.erase(handle);
+ req->deleteRequest();
+// check();
+ res = true;
+ }
+ unlockData();
+ return res;
+}
+
+bool LLQueuedThread::check()
+{
+#if 0 // not a reliable check once mNextHandle wraps, just for quick and dirty debugging
+ for (int i=0; i<REQUEST_HASH_SIZE; i++)
+ {
+ LLSimpleHashEntry<handle_t>* entry = mRequestHash.get_element_at_index(i);
+ while (entry)
+ {
+ if (entry->getHashKey() > mNextHandle)
+ {
+ llerrs << "Hash Error" << llendl;
+ return false;
+ }
+ entry = entry->getNextEntry();
+ }
+ }
+#endif
+ return true;
+}
+
+//============================================================================
+// Runs on its OWN thread
+
+int LLQueuedThread::processNextRequest()
+{
+ QueuedRequest *req = 0;
+ // Get next request from pool
+ lockData();
+ while(1)
+ {
+ if (!mRequestQueue.empty())
+ {
+ req = *mRequestQueue.begin();
+ mRequestQueue.erase(mRequestQueue.begin());
+ }
+ if (req && req->getStatus() == STATUS_ABORT)
+ {
+ req->setStatus(STATUS_ABORTED);
+ req = 0;
+ }
+ else
+ {
+ llassert (!req || req->getStatus() == STATUS_QUEUED)
+ break;
+ }
+ }
+ if (req)
+ {
+ req->setStatus(STATUS_INPROGRESS);
+ }
+ unlockData();
+
+ // This is the only place we will cal req->setStatus() after
+ // it has initially been seet to STATUS_QUEUED, so it is
+ // safe to access req.
+ if (req)
+ {
+ // process request
+ bool complete = processRequest(req);
+
+ if (complete)
+ {
+ lockData();
+ req->setStatus(STATUS_COMPLETE);
+ req->finishRequest();
+ if (req->getFlags() & AUTO_COMPLETE)
+ {
+ llverify(mRequestHash.erase(req))
+ req->deleteRequest();
+// check();
+ }
+ unlockData();
+ }
+ else
+ {
+ lockData();
+ req->setStatus(STATUS_QUEUED);
+ mRequestQueue.insert(req);
+ unlockData();
+ }
+ }
+
+ int res;
+ if (getPending(true) == 0)
+ {
+ if (isQuitting())
+ {
+ res = -1; // exit thread
+ }
+ else
+ {
+ res = 0;
+ }
+ }
+ else
+ {
+ res = 1;
+ }
+ return res;
+}
+
+bool LLQueuedThread::runCondition()
+{
+ // mRunCondition must be locked here
+ return (mRequestQueue.empty() && mIdleThread) ? FALSE : TRUE;
+}
+
+void LLQueuedThread::run()
+{
+ llinfos << "QUEUED THREAD STARTING" << llendl;
+
+ while (1)
+ {
+ // this will block on the condition until runCondition() returns true, the thread is unpaused, or the thread leaves the RUNNING state.
+ checkPause();
+
+ if(isQuitting())
+ break;
+
+ //llinfos << "QUEUED THREAD RUNNING, queue size = " << mRequestQueue.size() << llendl;
+
+ mIdleThread = FALSE;
+
+ int res = processNextRequest();
+ if (res == 0)
+ {
+ mIdleThread = TRUE;
+ }
+
+ if (res < 0) // finished working and want to exit
+ {
+ break;
+ }
+ }
+
+ llinfos << "QUEUED THREAD " << mName << " EXITING." << llendl;
+}
+
+//============================================================================
+
+LLQueuedThread::QueuedRequest::QueuedRequest(LLQueuedThread::handle_t handle, U32 priority, U32 flags) :
+ LLSimpleHashEntry<LLQueuedThread::handle_t>(handle),
+ mStatus(STATUS_UNKNOWN),
+ mPriority(priority),
+ mFlags(flags)
+{
+}
+
+LLQueuedThread::QueuedRequest::~QueuedRequest()
+{
+ if (mStatus != STATUS_DELETE)
+ {
+ llerrs << "Attemt to directly delete a LLQueuedThread::QueuedRequest; use deleteRequest()" << llendl;
+ }
+}
+
+//virtual
+void LLQueuedThread::QueuedRequest::finishRequest()
+{
+}
+
+//virtual
+void LLQueuedThread::QueuedRequest::deleteRequest()
+{
+ setStatus(STATUS_DELETE);
+ delete this;
+}