summaryrefslogtreecommitdiff
path: root/indra/llcommon/llworkerthread.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llcommon/llworkerthread.cpp')
-rw-r--r--indra/llcommon/llworkerthread.cpp325
1 files changed, 196 insertions, 129 deletions
diff --git a/indra/llcommon/llworkerthread.cpp b/indra/llcommon/llworkerthread.cpp
index a9370c8f6d..31f5c1cfcd 100644
--- a/indra/llcommon/llworkerthread.cpp
+++ b/indra/llcommon/llworkerthread.cpp
@@ -14,98 +14,86 @@
#endif
//============================================================================
-
-/*static*/ LLWorkerThread* LLWorkerThread::sLocal = NULL;
-/*static*/ std::set<LLWorkerThread*> LLWorkerThread::sThreadList;
-
-//============================================================================
// Run on MAIN thread
-//static
-void LLWorkerThread::initClass(bool local_is_threaded, bool local_run_always)
+LLWorkerThread::LLWorkerThread(const std::string& name, bool threaded) :
+ LLQueuedThread(name, threaded),
+ mWorkerAPRPoolp(NULL)
{
- if (!sLocal)
- {
- sLocal = new LLWorkerThread(local_is_threaded, local_run_always);
- }
+ apr_pool_create(&mWorkerAPRPoolp, NULL);
+ mDeleteMutex = new LLMutex(getAPRPool());
}
-//static
-void LLWorkerThread::cleanupClass()
+LLWorkerThread::~LLWorkerThread()
{
- if (sLocal)
+ // Delete any workers in the delete queue (should be safe - had better be!)
+ if (!mDeleteList.empty())
{
- while (sLocal->getPending())
- {
- sLocal->update(0);
- }
- delete sLocal;
- sLocal = NULL;
- llassert(sThreadList.size() == 0);
+ llwarns << "Worker Thread: " << mName << " destroyed with " << mDeleteList.size()
+ << " entries in delete list." << llendl;
}
-}
-//static
-S32 LLWorkerThread::updateClass(U32 ms_elapsed)
-{
- for (std::set<LLWorkerThread*>::iterator iter = sThreadList.begin(); iter != sThreadList.end(); iter++)
- {
- (*iter)->update(ms_elapsed);
- }
- return getAllPending();
+ delete mDeleteMutex;
+
+ // ~LLQueuedThread() will be called here
}
-//static
-S32 LLWorkerThread::getAllPending()
+// virtual
+S32 LLWorkerThread::update(U32 max_time_ms)
{
- S32 res = 0;
- for (std::set<LLWorkerThread*>::iterator iter = sThreadList.begin(); iter != sThreadList.end(); iter++)
+ S32 res = LLQueuedThread::update(max_time_ms);
+ // Delete scheduled workers
+ std::vector<LLWorkerClass*> delete_list;
+ std::vector<LLWorkerClass*> abort_list;
+ mDeleteMutex->lock();
+ for (delete_list_t::iterator iter = mDeleteList.begin();
+ iter != mDeleteList.end(); )
{
- res += (*iter)->getPending();
+ delete_list_t::iterator curiter = iter++;
+ LLWorkerClass* worker = *curiter;
+ if (worker->deleteOK())
+ {
+ if (worker->getFlags(LLWorkerClass::WCF_WORK_FINISHED))
+ {
+ delete_list.push_back(worker);
+ mDeleteList.erase(curiter);
+ }
+ else if (!worker->getFlags(LLWorkerClass::WCF_ABORT_REQUESTED))
+ {
+ abort_list.push_back(worker);
+ }
+ }
}
- return res;
-}
-
-//static
-void LLWorkerThread::pauseAll()
-{
- for (std::set<LLWorkerThread*>::iterator iter = sThreadList.begin(); iter != sThreadList.end(); iter++)
+ mDeleteMutex->unlock();
+ // abort and delete after releasing mutex
+ for (std::vector<LLWorkerClass*>::iterator iter = abort_list.begin();
+ iter != abort_list.end(); ++iter)
{
- (*iter)->pause();
+ (*iter)->abortWork(false);
}
-}
-
-//static
-void LLWorkerThread::waitOnAllPending()
-{
- for (std::set<LLWorkerThread*>::iterator iter = sThreadList.begin(); iter != sThreadList.end(); iter++)
+ for (std::vector<LLWorkerClass*>::iterator iter = delete_list.begin();
+ iter != delete_list.end(); ++iter)
{
- (*iter)->waitOnPending();
+ LLWorkerClass* worker = *iter;
+ if (worker->mRequestHandle)
+ {
+ // Finished but not completed
+ completeRequest(worker->mRequestHandle);
+ worker->mRequestHandle = LLWorkerThread::nullHandle();
+ worker->clearFlags(LLWorkerClass::WCF_HAVE_WORK);
+ }
+ delete *iter;
}
+ return res;
}
//----------------------------------------------------------------------------
-LLWorkerThread::LLWorkerThread(bool threaded, bool runalways) :
- LLQueuedThread("Worker", threaded, runalways)
-{
- sThreadList.insert(this);
-}
-
-LLWorkerThread::~LLWorkerThread()
-{
- llverify(sThreadList.erase(this) == 1);
- // ~LLQueuedThread() will be called here
-}
-
-//----------------------------------------------------------------------------
-
-
-LLWorkerThread::handle_t LLWorkerThread::add(LLWorkerClass* workerclass, S32 param, U32 priority)
+LLWorkerThread::handle_t LLWorkerThread::addWorkRequest(LLWorkerClass* workerclass, S32 param, U32 priority)
{
handle_t handle = generateHandle();
- Request* req = new Request(handle, priority, workerclass, param);
+ WorkRequest* req = new WorkRequest(handle, priority, workerclass, param);
bool res = addRequest(req);
if (!res)
@@ -118,63 +106,80 @@ LLWorkerThread::handle_t LLWorkerThread::add(LLWorkerClass* workerclass, S32 par
return handle;
}
-//============================================================================
-// Runs on its OWN thread
-
-bool LLWorkerThread::processRequest(QueuedRequest* qreq)
+void LLWorkerThread::deleteWorker(LLWorkerClass* workerclass)
{
- Request *req = (Request*)qreq;
-
- req->getWorkerClass()->setWorking(true);
-
- bool complete = req->getWorkerClass()->doWork(req->getParam());
-
- req->getWorkerClass()->setWorking(false);
-
- LLThread::yield(); // worker thread should yield after each request
-
- return complete;
+ mDeleteMutex->lock();
+ mDeleteList.push_back(workerclass);
+ mDeleteMutex->unlock();
}
//============================================================================
+// Runs on its OWN thread
-LLWorkerThread::Request::Request(handle_t handle, U32 priority, LLWorkerClass* workerclass, S32 param) :
+LLWorkerThread::WorkRequest::WorkRequest(handle_t handle, U32 priority, LLWorkerClass* workerclass, S32 param) :
LLQueuedThread::QueuedRequest(handle, priority),
mWorkerClass(workerclass),
mParam(param)
{
}
-void LLWorkerThread::Request::deleteRequest()
+LLWorkerThread::WorkRequest::~WorkRequest()
+{
+}
+
+// virtual (required for access by LLWorkerThread)
+void LLWorkerThread::WorkRequest::deleteRequest()
{
LLQueuedThread::QueuedRequest::deleteRequest();
}
+// virtual
+bool LLWorkerThread::WorkRequest::processRequest()
+{
+ LLWorkerClass* workerclass = getWorkerClass();
+ workerclass->setWorking(true);
+ bool complete = workerclass->doWork(getParam());
+ workerclass->setWorking(false);
+ return complete;
+}
+
+// virtual
+void LLWorkerThread::WorkRequest::finishRequest(bool completed)
+{
+ LLWorkerClass* workerclass = getWorkerClass();
+ workerclass->finishWork(getParam(), completed);
+ U32 flags = LLWorkerClass::WCF_WORK_FINISHED | (completed ? 0 : LLWorkerClass::WCF_WORK_ABORTED);
+ workerclass->setFlags(flags);
+}
+
//============================================================================
// LLWorkerClass:: operates in main thread
LLWorkerClass::LLWorkerClass(LLWorkerThread* workerthread, const std::string& name)
: mWorkerThread(workerthread),
mWorkerClassName(name),
- mWorkHandle(LLWorkerThread::nullHandle()),
+ mRequestHandle(LLWorkerThread::nullHandle()),
+ mMutex(workerthread->getWorkerAPRPool()),
mWorkFlags(0)
{
if (!mWorkerThread)
{
- mWorkerThread = LLWorkerThread::sLocal;
+ llerrs << "LLWorkerClass() called with NULL workerthread: " << name << llendl;
}
}
+
LLWorkerClass::~LLWorkerClass()
{
- if (mWorkHandle != LLWorkerThread::nullHandle())
+ llassert_always(!(mWorkFlags & WCF_WORKING));
+ llassert_always(mWorkFlags & WCF_DELETE_REQUESTED);
+ if (mRequestHandle != LLWorkerThread::nullHandle())
{
- LLWorkerThread::Request* workreq = (LLWorkerThread::Request*)mWorkerThread->getRequest(mWorkHandle);
+ LLWorkerThread::WorkRequest* workreq = (LLWorkerThread::WorkRequest*)mWorkerThread->getRequest(mRequestHandle);
if (!workreq)
{
llerrs << "LLWorkerClass destroyed with stale work handle" << llendl;
}
- if (workreq->getStatus() != LLWorkerThread::STATUS_ABORT &&
- workreq->getStatus() != LLWorkerThread::STATUS_ABORTED &&
+ if (workreq->getStatus() != LLWorkerThread::STATUS_ABORTED &&
workreq->getStatus() != LLWorkerThread::STATUS_COMPLETE)
{
llerrs << "LLWorkerClass destroyed with active worker! Worker Status: " << workreq->getStatus() << llendl;
@@ -184,21 +189,58 @@ LLWorkerClass::~LLWorkerClass()
void LLWorkerClass::setWorkerThread(LLWorkerThread* workerthread)
{
- if (mWorkHandle != LLWorkerThread::nullHandle())
+ mMutex.lock();
+ if (mRequestHandle != LLWorkerThread::nullHandle())
{
llerrs << "LLWorkerClass attempt to change WorkerThread with active worker!" << llendl;
}
mWorkerThread = workerthread;
+ mMutex.unlock();
+}
+
+//----------------------------------------------------------------------------
+
+//virtual
+void LLWorkerClass::finishWork(S32 param, bool success)
+{
+}
+
+//virtual
+bool LLWorkerClass::deleteOK()
+{
+ return true; // default always OK
+}
+
+//----------------------------------------------------------------------------
+
+// Called from worker thread
+void LLWorkerClass::setWorking(bool working)
+{
+ mMutex.lock();
+ if (working)
+ {
+ llassert_always(!(mWorkFlags & WCF_WORKING));
+ setFlags(WCF_WORKING);
+ }
+ else
+ {
+ llassert_always((mWorkFlags & WCF_WORKING));
+ clearFlags(WCF_WORKING);
+ }
+ mMutex.unlock();
}
//----------------------------------------------------------------------------
bool LLWorkerClass::yield()
{
- llassert(mWorkFlags & WCF_WORKING);
LLThread::yield();
mWorkerThread->checkPause();
- return (getFlags() & WCF_ABORT_REQUESTED) ? true : false;
+ bool res;
+ mMutex.lock();
+ res = (getFlags() & WCF_ABORT_REQUESTED) ? true : false;
+ mMutex.unlock();
+ return res;
}
//----------------------------------------------------------------------------
@@ -206,7 +248,9 @@ bool LLWorkerClass::yield()
// calls startWork, adds doWork() to queue
void LLWorkerClass::addWork(S32 param, U32 priority)
{
- if (mWorkHandle != LLWorkerThread::nullHandle())
+ mMutex.lock();
+ llassert_always(!(mWorkFlags & (WCF_WORKING|WCF_HAVE_WORK)));
+ if (mRequestHandle != LLWorkerThread::nullHandle())
{
llerrs << "LLWorkerClass attempt to add work with active worker!" << llendl;
}
@@ -214,70 +258,93 @@ void LLWorkerClass::addWork(S32 param, U32 priority)
// llinfos << "addWork: " << mWorkerClassName << " Param: " << param << llendl;
#endif
startWork(param);
- mWorkHandle = mWorkerThread->add(this, param, priority);
+ clearFlags(WCF_WORK_FINISHED|WCF_WORK_ABORTED);
+ setFlags(WCF_HAVE_WORK);
+ mRequestHandle = mWorkerThread->addWorkRequest(this, param, priority);
+ mMutex.unlock();
}
-void LLWorkerClass::abortWork()
+void LLWorkerClass::abortWork(bool autocomplete)
{
+ mMutex.lock();
#if _DEBUG
-// LLWorkerThread::Request* workreq = mWorkerThread->getRequest(mWorkHandle);
+// LLWorkerThread::WorkRequest* workreq = mWorkerThread->getRequest(mRequestHandle);
// if (workreq)
// llinfos << "abortWork: " << mWorkerClassName << " Param: " << workreq->getParam() << llendl;
#endif
- mWorkerThread->abortRequest(mWorkHandle);
- setFlags(WCF_ABORT_REQUESTED);
+ if (mRequestHandle != LLWorkerThread::nullHandle())
+ {
+ mWorkerThread->abortRequest(mRequestHandle, autocomplete);
+ mWorkerThread->setPriority(mRequestHandle, LLQueuedThread::PRIORITY_IMMEDIATE);
+ setFlags(WCF_ABORT_REQUESTED);
+ }
+ mMutex.unlock();
}
// if doWork is complete or aborted, call endWork() and return true
-bool LLWorkerClass::checkWork()
+bool LLWorkerClass::checkWork(bool aborting)
{
+ LLMutexLock lock(&mMutex);
bool complete = false, abort = false;
- LLWorkerThread::Request* workreq = (LLWorkerThread::Request*)mWorkerThread->getRequest(mWorkHandle);
- llassert(workreq);
- if (getFlags(WCF_ABORT_REQUESTED) || workreq->getStatus() == LLWorkerThread::STATUS_ABORTED)
+ if (mRequestHandle != LLWorkerThread::nullHandle())
{
- complete = true;
- abort = true;
+ LLWorkerThread::WorkRequest* workreq = (LLWorkerThread::WorkRequest*)mWorkerThread->getRequest(mRequestHandle);
+ llassert_always(workreq);
+ LLQueuedThread::status_t status = workreq->getStatus();
+ if (status == LLWorkerThread::STATUS_ABORTED)
+ {
+ complete = true;
+ abort = true;
+ }
+ else if (status == LLWorkerThread::STATUS_COMPLETE)
+ {
+ complete = true;
+ }
+ else
+ {
+ llassert_always(!aborting || (workreq->getFlags() & LLQueuedThread::FLAG_ABORT));
+ }
+ if (complete)
+ {
+ llassert_always(!(getFlags(WCF_WORKING)));
+ endWork(workreq->getParam(), abort);
+ mWorkerThread->completeRequest(mRequestHandle);
+ mRequestHandle = LLWorkerThread::nullHandle();
+ clearFlags(WCF_HAVE_WORK);
+ }
}
- else if (workreq->getStatus() == LLWorkerThread::STATUS_COMPLETE)
+ else
{
complete = true;
}
- if (complete)
- {
-#if _DEBUG
-// llinfos << "endWork: " << mWorkerClassName << " Param: " << workreq->getParam() << llendl;
-#endif
- endWork(workreq->getParam(), abort);
- mWorkerThread->completeRequest(mWorkHandle);
- mWorkHandle = LLWorkerThread::nullHandle();
- }
return complete;
}
-void LLWorkerClass::killWork()
+void LLWorkerClass::scheduleDelete()
{
- if (haveWork())
+ bool do_delete = false;
+ mMutex.lock();
+ if (!(getFlags(WCF_DELETE_REQUESTED)))
{
- abortWork();
- bool paused = mWorkerThread->isPaused();
- while (!checkWork())
- {
- mWorkerThread->updateQueue(0);
- }
- if (paused)
- {
- mWorkerThread->pause();
- }
+ setFlags(WCF_DELETE_REQUESTED);
+ do_delete = true;
+ }
+ mMutex.unlock();
+ if (do_delete)
+ {
+ mWorkerThread->deleteWorker(this);
}
}
void LLWorkerClass::setPriority(U32 priority)
{
- if (haveWork())
+ mMutex.lock();
+ if (mRequestHandle != LLWorkerThread::nullHandle())
{
- mWorkerThread->setPriority(mWorkHandle, priority);
+ mRequestPriority = priority;
+ mWorkerThread->setPriority(mRequestHandle, priority);
}
+ mMutex.unlock();
}
//============================================================================