diff options
Diffstat (limited to 'indra/llcommon/llworkerthread.cpp')
-rw-r--r-- | indra/llcommon/llworkerthread.cpp | 325 |
1 files changed, 196 insertions, 129 deletions
diff --git a/indra/llcommon/llworkerthread.cpp b/indra/llcommon/llworkerthread.cpp index a9370c8f6d..31f5c1cfcd 100644 --- a/indra/llcommon/llworkerthread.cpp +++ b/indra/llcommon/llworkerthread.cpp @@ -14,98 +14,86 @@ #endif //============================================================================ - -/*static*/ LLWorkerThread* LLWorkerThread::sLocal = NULL; -/*static*/ std::set<LLWorkerThread*> LLWorkerThread::sThreadList; - -//============================================================================ // Run on MAIN thread -//static -void LLWorkerThread::initClass(bool local_is_threaded, bool local_run_always) +LLWorkerThread::LLWorkerThread(const std::string& name, bool threaded) : + LLQueuedThread(name, threaded), + mWorkerAPRPoolp(NULL) { - if (!sLocal) - { - sLocal = new LLWorkerThread(local_is_threaded, local_run_always); - } + apr_pool_create(&mWorkerAPRPoolp, NULL); + mDeleteMutex = new LLMutex(getAPRPool()); } -//static -void LLWorkerThread::cleanupClass() +LLWorkerThread::~LLWorkerThread() { - if (sLocal) + // Delete any workers in the delete queue (should be safe - had better be!) + if (!mDeleteList.empty()) { - while (sLocal->getPending()) - { - sLocal->update(0); - } - delete sLocal; - sLocal = NULL; - llassert(sThreadList.size() == 0); + llwarns << "Worker Thread: " << mName << " destroyed with " << mDeleteList.size() + << " entries in delete list." << llendl; } -} -//static -S32 LLWorkerThread::updateClass(U32 ms_elapsed) -{ - for (std::set<LLWorkerThread*>::iterator iter = sThreadList.begin(); iter != sThreadList.end(); iter++) - { - (*iter)->update(ms_elapsed); - } - return getAllPending(); + delete mDeleteMutex; + + // ~LLQueuedThread() will be called here } -//static -S32 LLWorkerThread::getAllPending() +// virtual +S32 LLWorkerThread::update(U32 max_time_ms) { - S32 res = 0; - for (std::set<LLWorkerThread*>::iterator iter = sThreadList.begin(); iter != sThreadList.end(); iter++) + S32 res = LLQueuedThread::update(max_time_ms); + // Delete scheduled workers + std::vector<LLWorkerClass*> delete_list; + std::vector<LLWorkerClass*> abort_list; + mDeleteMutex->lock(); + for (delete_list_t::iterator iter = mDeleteList.begin(); + iter != mDeleteList.end(); ) { - res += (*iter)->getPending(); + delete_list_t::iterator curiter = iter++; + LLWorkerClass* worker = *curiter; + if (worker->deleteOK()) + { + if (worker->getFlags(LLWorkerClass::WCF_WORK_FINISHED)) + { + delete_list.push_back(worker); + mDeleteList.erase(curiter); + } + else if (!worker->getFlags(LLWorkerClass::WCF_ABORT_REQUESTED)) + { + abort_list.push_back(worker); + } + } } - return res; -} - -//static -void LLWorkerThread::pauseAll() -{ - for (std::set<LLWorkerThread*>::iterator iter = sThreadList.begin(); iter != sThreadList.end(); iter++) + mDeleteMutex->unlock(); + // abort and delete after releasing mutex + for (std::vector<LLWorkerClass*>::iterator iter = abort_list.begin(); + iter != abort_list.end(); ++iter) { - (*iter)->pause(); + (*iter)->abortWork(false); } -} - -//static -void LLWorkerThread::waitOnAllPending() -{ - for (std::set<LLWorkerThread*>::iterator iter = sThreadList.begin(); iter != sThreadList.end(); iter++) + for (std::vector<LLWorkerClass*>::iterator iter = delete_list.begin(); + iter != delete_list.end(); ++iter) { - (*iter)->waitOnPending(); + LLWorkerClass* worker = *iter; + if (worker->mRequestHandle) + { + // Finished but not completed + completeRequest(worker->mRequestHandle); + worker->mRequestHandle = LLWorkerThread::nullHandle(); + worker->clearFlags(LLWorkerClass::WCF_HAVE_WORK); + } + delete *iter; } + return res; } //---------------------------------------------------------------------------- -LLWorkerThread::LLWorkerThread(bool threaded, bool runalways) : - LLQueuedThread("Worker", threaded, runalways) -{ - sThreadList.insert(this); -} - -LLWorkerThread::~LLWorkerThread() -{ - llverify(sThreadList.erase(this) == 1); - // ~LLQueuedThread() will be called here -} - -//---------------------------------------------------------------------------- - - -LLWorkerThread::handle_t LLWorkerThread::add(LLWorkerClass* workerclass, S32 param, U32 priority) +LLWorkerThread::handle_t LLWorkerThread::addWorkRequest(LLWorkerClass* workerclass, S32 param, U32 priority) { handle_t handle = generateHandle(); - Request* req = new Request(handle, priority, workerclass, param); + WorkRequest* req = new WorkRequest(handle, priority, workerclass, param); bool res = addRequest(req); if (!res) @@ -118,63 +106,80 @@ LLWorkerThread::handle_t LLWorkerThread::add(LLWorkerClass* workerclass, S32 par return handle; } -//============================================================================ -// Runs on its OWN thread - -bool LLWorkerThread::processRequest(QueuedRequest* qreq) +void LLWorkerThread::deleteWorker(LLWorkerClass* workerclass) { - Request *req = (Request*)qreq; - - req->getWorkerClass()->setWorking(true); - - bool complete = req->getWorkerClass()->doWork(req->getParam()); - - req->getWorkerClass()->setWorking(false); - - LLThread::yield(); // worker thread should yield after each request - - return complete; + mDeleteMutex->lock(); + mDeleteList.push_back(workerclass); + mDeleteMutex->unlock(); } //============================================================================ +// Runs on its OWN thread -LLWorkerThread::Request::Request(handle_t handle, U32 priority, LLWorkerClass* workerclass, S32 param) : +LLWorkerThread::WorkRequest::WorkRequest(handle_t handle, U32 priority, LLWorkerClass* workerclass, S32 param) : LLQueuedThread::QueuedRequest(handle, priority), mWorkerClass(workerclass), mParam(param) { } -void LLWorkerThread::Request::deleteRequest() +LLWorkerThread::WorkRequest::~WorkRequest() +{ +} + +// virtual (required for access by LLWorkerThread) +void LLWorkerThread::WorkRequest::deleteRequest() { LLQueuedThread::QueuedRequest::deleteRequest(); } +// virtual +bool LLWorkerThread::WorkRequest::processRequest() +{ + LLWorkerClass* workerclass = getWorkerClass(); + workerclass->setWorking(true); + bool complete = workerclass->doWork(getParam()); + workerclass->setWorking(false); + return complete; +} + +// virtual +void LLWorkerThread::WorkRequest::finishRequest(bool completed) +{ + LLWorkerClass* workerclass = getWorkerClass(); + workerclass->finishWork(getParam(), completed); + U32 flags = LLWorkerClass::WCF_WORK_FINISHED | (completed ? 0 : LLWorkerClass::WCF_WORK_ABORTED); + workerclass->setFlags(flags); +} + //============================================================================ // LLWorkerClass:: operates in main thread LLWorkerClass::LLWorkerClass(LLWorkerThread* workerthread, const std::string& name) : mWorkerThread(workerthread), mWorkerClassName(name), - mWorkHandle(LLWorkerThread::nullHandle()), + mRequestHandle(LLWorkerThread::nullHandle()), + mMutex(workerthread->getWorkerAPRPool()), mWorkFlags(0) { if (!mWorkerThread) { - mWorkerThread = LLWorkerThread::sLocal; + llerrs << "LLWorkerClass() called with NULL workerthread: " << name << llendl; } } + LLWorkerClass::~LLWorkerClass() { - if (mWorkHandle != LLWorkerThread::nullHandle()) + llassert_always(!(mWorkFlags & WCF_WORKING)); + llassert_always(mWorkFlags & WCF_DELETE_REQUESTED); + if (mRequestHandle != LLWorkerThread::nullHandle()) { - LLWorkerThread::Request* workreq = (LLWorkerThread::Request*)mWorkerThread->getRequest(mWorkHandle); + LLWorkerThread::WorkRequest* workreq = (LLWorkerThread::WorkRequest*)mWorkerThread->getRequest(mRequestHandle); if (!workreq) { llerrs << "LLWorkerClass destroyed with stale work handle" << llendl; } - if (workreq->getStatus() != LLWorkerThread::STATUS_ABORT && - workreq->getStatus() != LLWorkerThread::STATUS_ABORTED && + if (workreq->getStatus() != LLWorkerThread::STATUS_ABORTED && workreq->getStatus() != LLWorkerThread::STATUS_COMPLETE) { llerrs << "LLWorkerClass destroyed with active worker! Worker Status: " << workreq->getStatus() << llendl; @@ -184,21 +189,58 @@ LLWorkerClass::~LLWorkerClass() void LLWorkerClass::setWorkerThread(LLWorkerThread* workerthread) { - if (mWorkHandle != LLWorkerThread::nullHandle()) + mMutex.lock(); + if (mRequestHandle != LLWorkerThread::nullHandle()) { llerrs << "LLWorkerClass attempt to change WorkerThread with active worker!" << llendl; } mWorkerThread = workerthread; + mMutex.unlock(); +} + +//---------------------------------------------------------------------------- + +//virtual +void LLWorkerClass::finishWork(S32 param, bool success) +{ +} + +//virtual +bool LLWorkerClass::deleteOK() +{ + return true; // default always OK +} + +//---------------------------------------------------------------------------- + +// Called from worker thread +void LLWorkerClass::setWorking(bool working) +{ + mMutex.lock(); + if (working) + { + llassert_always(!(mWorkFlags & WCF_WORKING)); + setFlags(WCF_WORKING); + } + else + { + llassert_always((mWorkFlags & WCF_WORKING)); + clearFlags(WCF_WORKING); + } + mMutex.unlock(); } //---------------------------------------------------------------------------- bool LLWorkerClass::yield() { - llassert(mWorkFlags & WCF_WORKING); LLThread::yield(); mWorkerThread->checkPause(); - return (getFlags() & WCF_ABORT_REQUESTED) ? true : false; + bool res; + mMutex.lock(); + res = (getFlags() & WCF_ABORT_REQUESTED) ? true : false; + mMutex.unlock(); + return res; } //---------------------------------------------------------------------------- @@ -206,7 +248,9 @@ bool LLWorkerClass::yield() // calls startWork, adds doWork() to queue void LLWorkerClass::addWork(S32 param, U32 priority) { - if (mWorkHandle != LLWorkerThread::nullHandle()) + mMutex.lock(); + llassert_always(!(mWorkFlags & (WCF_WORKING|WCF_HAVE_WORK))); + if (mRequestHandle != LLWorkerThread::nullHandle()) { llerrs << "LLWorkerClass attempt to add work with active worker!" << llendl; } @@ -214,70 +258,93 @@ void LLWorkerClass::addWork(S32 param, U32 priority) // llinfos << "addWork: " << mWorkerClassName << " Param: " << param << llendl; #endif startWork(param); - mWorkHandle = mWorkerThread->add(this, param, priority); + clearFlags(WCF_WORK_FINISHED|WCF_WORK_ABORTED); + setFlags(WCF_HAVE_WORK); + mRequestHandle = mWorkerThread->addWorkRequest(this, param, priority); + mMutex.unlock(); } -void LLWorkerClass::abortWork() +void LLWorkerClass::abortWork(bool autocomplete) { + mMutex.lock(); #if _DEBUG -// LLWorkerThread::Request* workreq = mWorkerThread->getRequest(mWorkHandle); +// LLWorkerThread::WorkRequest* workreq = mWorkerThread->getRequest(mRequestHandle); // if (workreq) // llinfos << "abortWork: " << mWorkerClassName << " Param: " << workreq->getParam() << llendl; #endif - mWorkerThread->abortRequest(mWorkHandle); - setFlags(WCF_ABORT_REQUESTED); + if (mRequestHandle != LLWorkerThread::nullHandle()) + { + mWorkerThread->abortRequest(mRequestHandle, autocomplete); + mWorkerThread->setPriority(mRequestHandle, LLQueuedThread::PRIORITY_IMMEDIATE); + setFlags(WCF_ABORT_REQUESTED); + } + mMutex.unlock(); } // if doWork is complete or aborted, call endWork() and return true -bool LLWorkerClass::checkWork() +bool LLWorkerClass::checkWork(bool aborting) { + LLMutexLock lock(&mMutex); bool complete = false, abort = false; - LLWorkerThread::Request* workreq = (LLWorkerThread::Request*)mWorkerThread->getRequest(mWorkHandle); - llassert(workreq); - if (getFlags(WCF_ABORT_REQUESTED) || workreq->getStatus() == LLWorkerThread::STATUS_ABORTED) + if (mRequestHandle != LLWorkerThread::nullHandle()) { - complete = true; - abort = true; + LLWorkerThread::WorkRequest* workreq = (LLWorkerThread::WorkRequest*)mWorkerThread->getRequest(mRequestHandle); + llassert_always(workreq); + LLQueuedThread::status_t status = workreq->getStatus(); + if (status == LLWorkerThread::STATUS_ABORTED) + { + complete = true; + abort = true; + } + else if (status == LLWorkerThread::STATUS_COMPLETE) + { + complete = true; + } + else + { + llassert_always(!aborting || (workreq->getFlags() & LLQueuedThread::FLAG_ABORT)); + } + if (complete) + { + llassert_always(!(getFlags(WCF_WORKING))); + endWork(workreq->getParam(), abort); + mWorkerThread->completeRequest(mRequestHandle); + mRequestHandle = LLWorkerThread::nullHandle(); + clearFlags(WCF_HAVE_WORK); + } } - else if (workreq->getStatus() == LLWorkerThread::STATUS_COMPLETE) + else { complete = true; } - if (complete) - { -#if _DEBUG -// llinfos << "endWork: " << mWorkerClassName << " Param: " << workreq->getParam() << llendl; -#endif - endWork(workreq->getParam(), abort); - mWorkerThread->completeRequest(mWorkHandle); - mWorkHandle = LLWorkerThread::nullHandle(); - } return complete; } -void LLWorkerClass::killWork() +void LLWorkerClass::scheduleDelete() { - if (haveWork()) + bool do_delete = false; + mMutex.lock(); + if (!(getFlags(WCF_DELETE_REQUESTED))) { - abortWork(); - bool paused = mWorkerThread->isPaused(); - while (!checkWork()) - { - mWorkerThread->updateQueue(0); - } - if (paused) - { - mWorkerThread->pause(); - } + setFlags(WCF_DELETE_REQUESTED); + do_delete = true; + } + mMutex.unlock(); + if (do_delete) + { + mWorkerThread->deleteWorker(this); } } void LLWorkerClass::setPriority(U32 priority) { - if (haveWork()) + mMutex.lock(); + if (mRequestHandle != LLWorkerThread::nullHandle()) { - mWorkerThread->setPriority(mWorkHandle, priority); + mRequestPriority = priority; + mWorkerThread->setPriority(mRequestHandle, priority); } + mMutex.unlock(); } //============================================================================ |