summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorandreykproductengine <andreykproductengine@lindenlab.com>2019-01-15 22:27:28 +0200
committerandreykproductengine <andreykproductengine@lindenlab.com>2019-01-15 22:27:28 +0200
commit8a92a771ba547d9d6a8bd1234e2e0b144a8bfcf5 (patch)
tree24787dcf00ea5155462358fd6c24ddccd174e8bb
parentaf0e498293c86dc7bb1ae7911bf932b7b287e115 (diff)
SL-10291 Replace apr thread with standard C++11 functionality
-rw-r--r--indra/llcommon/llthread.cpp77
-rw-r--r--indra/llcommon/llthread.h12
-rw-r--r--indra/llcommon/llthreadsafequeue.cpp81
-rw-r--r--indra/llcommon/llthreadsafequeue.h135
-rw-r--r--indra/newview/llmainlooprepeater.cpp2
5 files changed, 104 insertions, 203 deletions
diff --git a/indra/llcommon/llthread.cpp b/indra/llcommon/llthread.cpp
index 860415bb22..a4171729db 100644
--- a/indra/llcommon/llthread.cpp
+++ b/indra/llcommon/llthread.cpp
@@ -116,29 +116,27 @@ void LLThread::registerThreadID()
//
// Handed to the APR thread creation function
//
-void *APR_THREAD_FUNC LLThread::staticRun(apr_thread_t *apr_threadp, void *datap)
+void LLThread::threadRun()
{
- LLThread *threadp = (LLThread *)datap;
-
#ifdef LL_WINDOWS
- set_thread_name(-1, threadp->mName.c_str());
+ set_thread_name(-1, mName.c_str());
#endif
// for now, hard code all LLThreads to report to single master thread recorder, which is known to be running on main thread
- threadp->mRecorder = new LLTrace::ThreadRecorder(*LLTrace::get_master_thread_recorder());
+ mRecorder = new LLTrace::ThreadRecorder(*LLTrace::get_master_thread_recorder());
- sThreadID = threadp->mID;
+ sThreadID = mID;
// Run the user supplied function
do
{
try
{
- threadp->run();
+ run();
}
catch (const LLContinueError &e)
{
- LL_WARNS("THREAD") << "ContinueException on thread '" << threadp->mName <<
+ LL_WARNS("THREAD") << "ContinueException on thread '" << mName <<
"' reentering run(). Error what is: '" << e.what() << "'" << LL_ENDL;
//output possible call stacks to log file.
LLError::LLCallStacks::print();
@@ -153,39 +151,25 @@ void *APR_THREAD_FUNC LLThread::staticRun(apr_thread_t *apr_threadp, void *datap
//LL_INFOS() << "LLThread::staticRun() Exiting: " << threadp->mName << LL_ENDL;
- delete threadp->mRecorder;
- threadp->mRecorder = NULL;
+ delete mRecorder;
+ mRecorder = NULL;
// We're done with the run function, this thread is done executing now.
//NB: we are using this flag to sync across threads...we really need memory barriers here
// Todo: add LLMutex per thread instead of flag?
// We are using "while (mStatus != STOPPED) {ms_sleep();}" everywhere.
- threadp->mStatus = STOPPED;
-
- return NULL;
+ mStatus = STOPPED;
}
LLThread::LLThread(const std::string& name, apr_pool_t *poolp) :
mPaused(FALSE),
mName(name),
- mAPRThreadp(NULL),
+ mThreadp(NULL),
mStatus(STOPPED),
mRecorder(NULL)
{
mID = ++sIDIter;
-
- // Thread creation probably CAN be paranoid about APR being initialized, if necessary
- if (poolp)
- {
- mIsLocalPool = FALSE;
- mAPRPoolp = poolp;
- }
- else
- {
- mIsLocalPool = TRUE;
- apr_pool_create(&mAPRPoolp, NULL); // Create a subpool for this thread
- }
mRunCondition = new LLCondition();
mDataLock = new LLMutex();
mLocalAPRFilePoolp = NULL ;
@@ -217,7 +201,7 @@ void LLThread::shutdown()
// Warning! If you somehow call the thread destructor from itself,
// the thread will die in an unclean fashion!
- if (mAPRThreadp)
+ if (mThreadp)
{
if (!isStopped())
{
@@ -248,14 +232,19 @@ void LLThread::shutdown()
{
// This thread just wouldn't stop, even though we gave it time
//LL_WARNS() << "LLThread::~LLThread() exiting thread before clean exit!" << LL_ENDL;
- // Put a stake in its heart.
- apr_thread_exit(mAPRThreadp, -1);
+ // Put a stake in its heart. (A very hostile method to force a thread to quit)
+#if LL_WINDOWS
+ TerminateThread(mNativeHandle, 0);
+#else
+ pthread_cancel(mNativeHandle);
+#endif
+
delete mRecorder;
mRecorder = NULL;
mStatus = STOPPED;
return;
}
- mAPRThreadp = NULL;
+ mThreadp = NULL;
}
delete mRunCondition;
@@ -263,12 +252,6 @@ void LLThread::shutdown()
delete mDataLock;
mDataLock = NULL;
-
- if (mIsLocalPool && mAPRPoolp)
- {
- apr_pool_destroy(mAPRPoolp);
- mAPRPoolp = 0;
- }
if (mRecorder)
{
@@ -287,19 +270,15 @@ void LLThread::start()
// Set thread state to running
mStatus = RUNNING;
- apr_status_t status =
- apr_thread_create(&mAPRThreadp, NULL, staticRun, (void *)this, mAPRPoolp);
-
- if(status == APR_SUCCESS)
- {
- // We won't bother joining
- apr_thread_detach(mAPRThreadp);
+ try
+ {
+ mThreadp = new std::thread(std::bind(&LLThread::threadRun, this));
+ mNativeHandle = mThreadp->native_handle();
}
- else
+ catch (std::system_error& ex)
{
mStatus = STOPPED;
- LL_WARNS() << "failed to start thread " << mName << LL_ENDL;
- ll_apr_warn_status(status);
+ LL_WARNS() << "failed to start thread " << mName << " " << ex.what() << LL_ENDL;
}
}
@@ -376,11 +355,7 @@ U32 LLThread::currentID()
// static
void LLThread::yield()
{
-#if LL_LINUX || LL_SOLARIS
- sched_yield(); // annoyingly, apr_thread_yield is a noop on linux...
-#else
- apr_thread_yield();
-#endif
+ std::this_thread::yield();
}
void LLThread::wake()
diff --git a/indra/llcommon/llthread.h b/indra/llcommon/llthread.h
index dda7fa8ffb..863c9051f3 100644
--- a/indra/llcommon/llthread.h
+++ b/indra/llcommon/llthread.h
@@ -29,10 +29,10 @@
#include "llapp.h"
#include "llapr.h"
-#include "apr_thread_cond.h"
#include "boost/intrusive_ptr.hpp"
#include "llmutex.h"
#include "llrefcount.h"
+#include <thread>
LL_COMMON_API void assert_main_thread();
@@ -86,7 +86,6 @@ public:
// this kicks off the apr thread
void start(void);
- apr_pool_t *getAPRPool() { return mAPRPoolp; }
LLVolatileAPRPool* getLocalAPRFilePool() { return mLocalAPRFilePoolp ; }
U32 getID() const { return mID; }
@@ -97,19 +96,18 @@ public:
static void registerThreadID();
private:
- BOOL mPaused;
+ bool mPaused;
+ std::thread::native_handle_type mNativeHandle; // for termination in case of issues
// static function passed to APR thread creation routine
- static void *APR_THREAD_FUNC staticRun(struct apr_thread_t *apr_threadp, void *datap);
+ void threadRun();
protected:
std::string mName;
class LLCondition* mRunCondition;
LLMutex* mDataLock;
- apr_thread_t *mAPRThreadp;
- apr_pool_t *mAPRPoolp;
- BOOL mIsLocalPool;
+ std::thread *mThreadp;
EThreadStatus mStatus;
U32 mID;
LLTrace::ThreadRecorder* mRecorder;
diff --git a/indra/llcommon/llthreadsafequeue.cpp b/indra/llcommon/llthreadsafequeue.cpp
index 491f920c0f..bde36999ba 100644
--- a/indra/llcommon/llthreadsafequeue.cpp
+++ b/indra/llcommon/llthreadsafequeue.cpp
@@ -24,87 +24,6 @@
*/
#include "linden_common.h"
-#include <apr_pools.h>
-#include <apr_queue.h>
#include "llthreadsafequeue.h"
-#include "llexception.h"
-
-// LLThreadSafeQueueImplementation
-//-----------------------------------------------------------------------------
-
-
-LLThreadSafeQueueImplementation::LLThreadSafeQueueImplementation(apr_pool_t * pool, unsigned int capacity):
- mOwnsPool(pool == 0),
- mPool(pool),
- mQueue(0)
-{
- if(mOwnsPool) {
- apr_status_t status = apr_pool_create(&mPool, 0);
- if(status != APR_SUCCESS) LLTHROW(LLThreadSafeQueueError("failed to allocate pool"));
- } else {
- ; // No op.
- }
-
- apr_status_t status = apr_queue_create(&mQueue, capacity, mPool);
- if(status != APR_SUCCESS) LLTHROW(LLThreadSafeQueueError("failed to allocate queue"));
-}
-
-
-LLThreadSafeQueueImplementation::~LLThreadSafeQueueImplementation()
-{
- if(mQueue != 0) {
- if(apr_queue_size(mQueue) != 0) LL_WARNS() <<
- "terminating queue which still contains " << apr_queue_size(mQueue) <<
- " elements;" << "memory will be leaked" << LL_ENDL;
- apr_queue_term(mQueue);
- }
- if(mOwnsPool && (mPool != 0)) apr_pool_destroy(mPool);
-}
-
-
-void LLThreadSafeQueueImplementation::pushFront(void * element)
-{
- apr_status_t status = apr_queue_push(mQueue, element);
-
- if(status == APR_EINTR) {
- LLTHROW(LLThreadSafeQueueInterrupt());
- } else if(status != APR_SUCCESS) {
- LLTHROW(LLThreadSafeQueueError("push failed"));
- } else {
- ; // Success.
- }
-}
-
-
-bool LLThreadSafeQueueImplementation::tryPushFront(void * element){
- return apr_queue_trypush(mQueue, element) == APR_SUCCESS;
-}
-
-
-void * LLThreadSafeQueueImplementation::popBack(void)
-{
- void * element;
- apr_status_t status = apr_queue_pop(mQueue, &element);
-
- if(status == APR_EINTR) {
- LLTHROW(LLThreadSafeQueueInterrupt());
- } else if(status != APR_SUCCESS) {
- LLTHROW(LLThreadSafeQueueError("pop failed"));
- } else {
- return element;
- }
-}
-
-
-bool LLThreadSafeQueueImplementation::tryPopBack(void *& element)
-{
- return apr_queue_trypop(mQueue, &element) == APR_SUCCESS;
-}
-
-
-size_t LLThreadSafeQueueImplementation::size()
-{
- return apr_queue_size(mQueue);
-}
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h
index 45289ef0b4..b0bddac8e5 100644
--- a/indra/llcommon/llthreadsafequeue.h
+++ b/indra/llcommon/llthreadsafequeue.h
@@ -28,12 +28,20 @@
#define LL_LLTHREADSAFEQUEUE_H
#include "llexception.h"
+#include <deque>
#include <string>
+#if LL_WINDOWS
+#pragma warning (push)
+#pragma warning (disable:4265)
+#endif
+// 'std::_Pad' : class has virtual functions, but destructor is not virtual
+#include <mutex>
+#include <condition_variable>
-struct apr_pool_t; // From apr_pools.h
-class LLThreadSafeQueueImplementation; // See below.
-
+#if LL_WINDOWS
+#pragma warning (pop)
+#endif
//
// A general queue exception.
@@ -64,31 +72,6 @@ public:
}
};
-
-struct apr_queue_t; // From apr_queue.h
-
-
-//
-// Implementation details.
-//
-class LL_COMMON_API LLThreadSafeQueueImplementation
-{
-public:
- LLThreadSafeQueueImplementation(apr_pool_t * pool, unsigned int capacity);
- ~LLThreadSafeQueueImplementation();
- void pushFront(void * element);
- bool tryPushFront(void * element);
- void * popBack(void);
- bool tryPopBack(void *& element);
- size_t size();
-
-private:
- bool mOwnsPool;
- apr_pool_t * mPool;
- apr_queue_t * mQueue;
-};
-
-
//
// Implements a thread safe FIFO.
//
@@ -100,7 +83,7 @@ public:
// If the pool is set to NULL one will be allocated and managed by this
// queue.
- LLThreadSafeQueue(apr_pool_t * pool = 0, unsigned int capacity = 1024);
+ LLThreadSafeQueue(U32 capacity = 1024);
// Add an element to the front of queue (will block if the queue has
// reached capacity).
@@ -128,77 +111,103 @@ public:
size_t size();
private:
- LLThreadSafeQueueImplementation mImplementation;
-};
-
+ std::deque< ElementT > mStorage;
+ U32 mCapacity;
+ std::mutex mLock;
+ std::condition_variable mCapacityCond;
+ std::condition_variable mEmptyCond;
+};
// LLThreadSafeQueue
//-----------------------------------------------------------------------------
-
template<typename ElementT>
-LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(apr_pool_t * pool, unsigned int capacity):
- mImplementation(pool, capacity)
+LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(U32 capacity) :
+mCapacity(capacity)
{
- ; // No op.
}
template<typename ElementT>
void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)
{
- ElementT * elementCopy = new ElementT(element);
- try {
- mImplementation.pushFront(elementCopy);
- } catch (LLThreadSafeQueueInterrupt) {
- delete elementCopy;
- throw;
- }
+ while (true)
+ {
+ std::unique_lock<std::mutex> lock1(mLock);
+
+ if (mStorage.size() < mCapacity)
+ {
+ mStorage.push_front(element);
+ mEmptyCond.notify_one();
+ return;
+ }
+
+ // Storage Full. Wait for signal.
+ mCapacityCond.wait(lock1);
+ }
}
template<typename ElementT>
bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)
{
- ElementT * elementCopy = new ElementT(element);
- bool result = mImplementation.tryPushFront(elementCopy);
- if(!result) delete elementCopy;
- return result;
+ std::unique_lock<std::mutex> lock1(mLock, std::defer_lock);
+ if (!lock1.try_lock())
+ return false;
+
+ if (mStorage.size() >= mCapacity)
+ return false;
+
+ mStorage.push_front(element);
+ mEmptyCond.notify_one();
+ return true;
}
template<typename ElementT>
ElementT LLThreadSafeQueue<ElementT>::popBack(void)
{
- ElementT * element = reinterpret_cast<ElementT *> (mImplementation.popBack());
- ElementT result(*element);
- delete element;
- return result;
+ while (true)
+ {
+ std::unique_lock<std::mutex> lock1(mLock);
+
+ if (!mStorage.empty())
+ {
+ ElementT value = mStorage.back();
+ mStorage.pop_back();
+ mCapacityCond.notify_one();
+ return value;
+ }
+
+ // Storage empty. Wait for signal.
+ mEmptyCond.wait(lock1);
+ }
}
template<typename ElementT>
bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element)
{
- void * storedElement;
- bool result = mImplementation.tryPopBack(storedElement);
- if(result) {
- ElementT * elementPtr = reinterpret_cast<ElementT *>(storedElement);
- element = *elementPtr;
- delete elementPtr;
- } else {
- ; // No op.
- }
- return result;
+ std::unique_lock<std::mutex> lock1(mLock, std::defer_lock);
+ if (!lock1.try_lock())
+ return false;
+
+ if (mStorage.empty())
+ return false;
+
+ element = mStorage.back();
+ mStorage.pop_back();
+ mCapacityCond.notify_one();
+ return true;
}
template<typename ElementT>
size_t LLThreadSafeQueue<ElementT>::size(void)
{
- return mImplementation.size();
+ std::lock_guard<std::mutex> lock(mLock);
+ return mStorage.size();
}
-
#endif
diff --git a/indra/newview/llmainlooprepeater.cpp b/indra/newview/llmainlooprepeater.cpp
index db8d2e4ede..6736e9a950 100644
--- a/indra/newview/llmainlooprepeater.cpp
+++ b/indra/newview/llmainlooprepeater.cpp
@@ -46,7 +46,7 @@ void LLMainLoopRepeater::start(void)
{
if(mQueue != 0) return;
- mQueue = new LLThreadSafeQueue<LLSD>(gAPRPoolp, 1024);
+ mQueue = new LLThreadSafeQueue<LLSD>(1024);
mMainLoopConnection = LLEventPumps::instance().
obtain("mainloop").listen(LLEventPump::inventName(), boost::bind(&LLMainLoopRepeater::onMainLoop, this, _1));
mRepeaterConnection = LLEventPumps::instance().