diff options
author | James Cook <james@lindenlab.com> | 2007-01-02 08:33:20 +0000 |
---|---|---|
committer | James Cook <james@lindenlab.com> | 2007-01-02 08:33:20 +0000 |
commit | 420b91db29485df39fd6e724e782c449158811cb (patch) | |
tree | b471a94563af914d3ed3edd3e856d21cb1b69945 /indra/llcommon/llqueuedthread.cpp |
Print done when done.
Diffstat (limited to 'indra/llcommon/llqueuedthread.cpp')
-rw-r--r-- | indra/llcommon/llqueuedthread.cpp | 491 |
1 files changed, 491 insertions, 0 deletions
diff --git a/indra/llcommon/llqueuedthread.cpp b/indra/llcommon/llqueuedthread.cpp new file mode 100644 index 0000000000..bc41927d38 --- /dev/null +++ b/indra/llcommon/llqueuedthread.cpp @@ -0,0 +1,491 @@ +/** + * @file llqueuedthread.cpp + * + * Copyright (c) 2004-$CurrentYear$, Linden Research, Inc. + * $License$ + */ + +#include "linden_common.h" +#include "llqueuedthread.h" +#include "llstl.h" + +//============================================================================ + +// MAIN THREAD +LLQueuedThread::LLQueuedThread(const std::string& name, bool threaded, bool runalways) : + LLThread(name), + mThreaded(threaded), + mRunAlways(runalways), + mIdleThread(TRUE), + mNextHandle(0) +{ + if (mThreaded) + { + start(); + } +} + +// MAIN THREAD +LLQueuedThread::~LLQueuedThread() +{ + setQuitting(); + + unpause(); // MAIN THREAD + if (mThreaded) + { + S32 timeout = 100; + for ( ; timeout>0; timeout--) + { + if (isStopped()) + { + break; + } + ms_sleep(100); + LLThread::yield(); + } + if (timeout == 0) + { + llwarns << "~LLQueuedThread (" << mName << ") timed out!" << llendl; + } + } + else + { + mStatus = STOPPED; + } + + QueuedRequest* req; + while ( (req = (QueuedRequest*)mRequestHash.pop_element()) ) + { + req->deleteRequest(); + } + + // ~LLThread() will be called here +} + +//---------------------------------------------------------------------------- + +// MAIN THREAD +void LLQueuedThread::update(U32 ms_elapsed) +{ + updateQueue(0); +} + +void LLQueuedThread::updateQueue(S32 inc) +{ + // If mRunAlways == TRUE, unpause the thread whenever we put something into the queue. + // If mRunAlways == FALSE, we only unpause the thread when updateQueue() is called from the main loop (i.e. between rendered frames) + + if (inc == 0) // Frame Update + { + if (mThreaded) + { + unpause(); + wake(); // Wake the thread up if necessary. + } + else + { + while (processNextRequest() > 0) + ; + } + } + else + { + // Something has been added to the queue + if (mRunAlways) + { + if (mThreaded) + { + wake(); // Wake the thread up if necessary. + } + else + { + while(processNextRequest() > 0) + ; + } + } + } +} + +//virtual +// May be called from any thread +S32 LLQueuedThread::getPending(bool child_thread) +{ + S32 res; + lockData(); + res = mRequestQueue.size(); + unlockData(); + return res; +} + +// MAIN thread +void LLQueuedThread::waitOnPending() +{ + while(1) + { + updateQueue(0); + + if (mIdleThread) + { + break; + } + if (mThreaded) + { + yield(); + } + } + return; +} + +// MAIN thread +void LLQueuedThread::printQueueStats() +{ + lockData(); + if (!mRequestQueue.empty()) + { + QueuedRequest *req = *mRequestQueue.begin(); + llinfos << llformat("Pending Requests:%d Current status:%d", mRequestQueue.size(), req->getStatus()) << llendl; + } + else + { + llinfos << "Queued Thread Idle" << llendl; + } + unlockData(); +} + +// MAIN thread +LLQueuedThread::handle_t LLQueuedThread::generateHandle() +{ + lockData(); + while ((mNextHandle == nullHandle()) || (mRequestHash.find(mNextHandle))) + { + mNextHandle++; + } + unlockData(); + return mNextHandle++; +} + +// MAIN thread +bool LLQueuedThread::addRequest(QueuedRequest* req) +{ + if (mStatus == QUITTING) + { + return false; + } + + lockData(); + req->setStatus(STATUS_QUEUED); + mRequestQueue.insert(req); + mRequestHash.insert(req); +#if _DEBUG +// llinfos << llformat("LLQueuedThread::Added req [%08d]",handle) << llendl; +#endif + unlockData(); + + updateQueue(1); + + return true; +} + +// MAIN thread +bool LLQueuedThread::waitForResult(LLQueuedThread::handle_t handle, bool auto_complete) +{ + llassert (handle != nullHandle()) + bool res = false; + bool waspaused = isPaused(); + bool done = false; + while(!done) + { + updateQueue(0); // unpauses + lockData(); + QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); + if (!req) + { + done = true; // request does not exist + } + else if (req->getStatus() == STATUS_COMPLETE) + { + res = true; + if (auto_complete) + { + mRequestHash.erase(handle); + req->deleteRequest(); +// check(); + } + done = true; + } + unlockData(); + + if (!done && mThreaded) + { + yield(); + } + } + if (waspaused) + { + pause(); + } + return res; +} + +// MAIN thread +LLQueuedThread::QueuedRequest* LLQueuedThread::getRequest(handle_t handle) +{ + if (handle == nullHandle()) + { + return 0; + } + lockData(); + QueuedRequest* res = (QueuedRequest*)mRequestHash.find(handle); + unlockData(); + return res; +} + +LLQueuedThread::status_t LLQueuedThread::getRequestStatus(handle_t handle) +{ + status_t res = STATUS_EXPIRED; + lockData(); + QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); + if (req) + { + res = req->getStatus(); + } + unlockData(); + return res; +} + +LLQueuedThread::status_t LLQueuedThread::abortRequest(handle_t handle, U32 flags) +{ + status_t res = STATUS_EXPIRED; + lockData(); + QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); + if (req) + { + res = req->abortRequest(flags); + if ((flags & AUTO_COMPLETE) && (res == STATUS_COMPLETE)) + { + mRequestHash.erase(handle); + req->deleteRequest(); +// check(); + } +#if _DEBUG +// llinfos << llformat("LLQueuedThread::Aborted req [%08d]",handle) << llendl; +#endif + } + unlockData(); + return res; +} + +// MAIN thread +LLQueuedThread::status_t LLQueuedThread::setFlags(handle_t handle, U32 flags) +{ + status_t res = STATUS_EXPIRED; + lockData(); + QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); + if (req) + { + res = req->setFlags(flags); + } + unlockData(); + return res; +} + +void LLQueuedThread::setPriority(handle_t handle, U32 priority) +{ + lockData(); + QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); + if (req && (req->getStatus() == STATUS_QUEUED)) + { + llverify(mRequestQueue.erase(req) == 1); + req->setPriority(priority); + mRequestQueue.insert(req); + } + unlockData(); +} + +bool LLQueuedThread::completeRequest(handle_t handle) +{ + bool res = false; + lockData(); + QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle); + if (req) + { + llassert(req->getStatus() != STATUS_QUEUED && req->getStatus() != STATUS_ABORT); +#if _DEBUG +// llinfos << llformat("LLQueuedThread::Completed req [%08d]",handle) << llendl; +#endif + mRequestHash.erase(handle); + req->deleteRequest(); +// check(); + res = true; + } + unlockData(); + return res; +} + +bool LLQueuedThread::check() +{ +#if 0 // not a reliable check once mNextHandle wraps, just for quick and dirty debugging + for (int i=0; i<REQUEST_HASH_SIZE; i++) + { + LLSimpleHashEntry<handle_t>* entry = mRequestHash.get_element_at_index(i); + while (entry) + { + if (entry->getHashKey() > mNextHandle) + { + llerrs << "Hash Error" << llendl; + return false; + } + entry = entry->getNextEntry(); + } + } +#endif + return true; +} + +//============================================================================ +// Runs on its OWN thread + +int LLQueuedThread::processNextRequest() +{ + QueuedRequest *req = 0; + // Get next request from pool + lockData(); + while(1) + { + if (!mRequestQueue.empty()) + { + req = *mRequestQueue.begin(); + mRequestQueue.erase(mRequestQueue.begin()); + } + if (req && req->getStatus() == STATUS_ABORT) + { + req->setStatus(STATUS_ABORTED); + req = 0; + } + else + { + llassert (!req || req->getStatus() == STATUS_QUEUED) + break; + } + } + if (req) + { + req->setStatus(STATUS_INPROGRESS); + } + unlockData(); + + // This is the only place we will cal req->setStatus() after + // it has initially been seet to STATUS_QUEUED, so it is + // safe to access req. + if (req) + { + // process request + bool complete = processRequest(req); + + if (complete) + { + lockData(); + req->setStatus(STATUS_COMPLETE); + req->finishRequest(); + if (req->getFlags() & AUTO_COMPLETE) + { + llverify(mRequestHash.erase(req)) + req->deleteRequest(); +// check(); + } + unlockData(); + } + else + { + lockData(); + req->setStatus(STATUS_QUEUED); + mRequestQueue.insert(req); + unlockData(); + } + } + + int res; + if (getPending(true) == 0) + { + if (isQuitting()) + { + res = -1; // exit thread + } + else + { + res = 0; + } + } + else + { + res = 1; + } + return res; +} + +bool LLQueuedThread::runCondition() +{ + // mRunCondition must be locked here + return (mRequestQueue.empty() && mIdleThread) ? FALSE : TRUE; +} + +void LLQueuedThread::run() +{ + llinfos << "QUEUED THREAD STARTING" << llendl; + + while (1) + { + // this will block on the condition until runCondition() returns true, the thread is unpaused, or the thread leaves the RUNNING state. + checkPause(); + + if(isQuitting()) + break; + + //llinfos << "QUEUED THREAD RUNNING, queue size = " << mRequestQueue.size() << llendl; + + mIdleThread = FALSE; + + int res = processNextRequest(); + if (res == 0) + { + mIdleThread = TRUE; + } + + if (res < 0) // finished working and want to exit + { + break; + } + } + + llinfos << "QUEUED THREAD " << mName << " EXITING." << llendl; +} + +//============================================================================ + +LLQueuedThread::QueuedRequest::QueuedRequest(LLQueuedThread::handle_t handle, U32 priority, U32 flags) : + LLSimpleHashEntry<LLQueuedThread::handle_t>(handle), + mStatus(STATUS_UNKNOWN), + mPriority(priority), + mFlags(flags) +{ +} + +LLQueuedThread::QueuedRequest::~QueuedRequest() +{ + if (mStatus != STATUS_DELETE) + { + llerrs << "Attemt to directly delete a LLQueuedThread::QueuedRequest; use deleteRequest()" << llendl; + } +} + +//virtual +void LLQueuedThread::QueuedRequest::finishRequest() +{ +} + +//virtual +void LLQueuedThread::QueuedRequest::deleteRequest() +{ + setStatus(STATUS_DELETE); + delete this; +} |