diff options
author | andreykproductengine <andreykproductengine@lindenlab.com> | 2019-01-15 22:27:28 +0200 |
---|---|---|
committer | andreykproductengine <andreykproductengine@lindenlab.com> | 2019-01-15 22:27:28 +0200 |
commit | 8a92a771ba547d9d6a8bd1234e2e0b144a8bfcf5 (patch) | |
tree | 24787dcf00ea5155462358fd6c24ddccd174e8bb | |
parent | af0e498293c86dc7bb1ae7911bf932b7b287e115 (diff) |
SL-10291 Replace apr thread with standard C++11 functionality
-rw-r--r-- | indra/llcommon/llthread.cpp | 77 | ||||
-rw-r--r-- | indra/llcommon/llthread.h | 12 | ||||
-rw-r--r-- | indra/llcommon/llthreadsafequeue.cpp | 81 | ||||
-rw-r--r-- | indra/llcommon/llthreadsafequeue.h | 135 | ||||
-rw-r--r-- | indra/newview/llmainlooprepeater.cpp | 2 |
5 files changed, 104 insertions, 203 deletions
diff --git a/indra/llcommon/llthread.cpp b/indra/llcommon/llthread.cpp index 860415bb22..a4171729db 100644 --- a/indra/llcommon/llthread.cpp +++ b/indra/llcommon/llthread.cpp @@ -116,29 +116,27 @@ void LLThread::registerThreadID() // // Handed to the APR thread creation function // -void *APR_THREAD_FUNC LLThread::staticRun(apr_thread_t *apr_threadp, void *datap) +void LLThread::threadRun() { - LLThread *threadp = (LLThread *)datap; - #ifdef LL_WINDOWS - set_thread_name(-1, threadp->mName.c_str()); + set_thread_name(-1, mName.c_str()); #endif // for now, hard code all LLThreads to report to single master thread recorder, which is known to be running on main thread - threadp->mRecorder = new LLTrace::ThreadRecorder(*LLTrace::get_master_thread_recorder()); + mRecorder = new LLTrace::ThreadRecorder(*LLTrace::get_master_thread_recorder()); - sThreadID = threadp->mID; + sThreadID = mID; // Run the user supplied function do { try { - threadp->run(); + run(); } catch (const LLContinueError &e) { - LL_WARNS("THREAD") << "ContinueException on thread '" << threadp->mName << + LL_WARNS("THREAD") << "ContinueException on thread '" << mName << "' reentering run(). Error what is: '" << e.what() << "'" << LL_ENDL; //output possible call stacks to log file. LLError::LLCallStacks::print(); @@ -153,39 +151,25 @@ void *APR_THREAD_FUNC LLThread::staticRun(apr_thread_t *apr_threadp, void *datap //LL_INFOS() << "LLThread::staticRun() Exiting: " << threadp->mName << LL_ENDL; - delete threadp->mRecorder; - threadp->mRecorder = NULL; + delete mRecorder; + mRecorder = NULL; // We're done with the run function, this thread is done executing now. //NB: we are using this flag to sync across threads...we really need memory barriers here // Todo: add LLMutex per thread instead of flag? // We are using "while (mStatus != STOPPED) {ms_sleep();}" everywhere. - threadp->mStatus = STOPPED; - - return NULL; + mStatus = STOPPED; } LLThread::LLThread(const std::string& name, apr_pool_t *poolp) : mPaused(FALSE), mName(name), - mAPRThreadp(NULL), + mThreadp(NULL), mStatus(STOPPED), mRecorder(NULL) { mID = ++sIDIter; - - // Thread creation probably CAN be paranoid about APR being initialized, if necessary - if (poolp) - { - mIsLocalPool = FALSE; - mAPRPoolp = poolp; - } - else - { - mIsLocalPool = TRUE; - apr_pool_create(&mAPRPoolp, NULL); // Create a subpool for this thread - } mRunCondition = new LLCondition(); mDataLock = new LLMutex(); mLocalAPRFilePoolp = NULL ; @@ -217,7 +201,7 @@ void LLThread::shutdown() // Warning! If you somehow call the thread destructor from itself, // the thread will die in an unclean fashion! - if (mAPRThreadp) + if (mThreadp) { if (!isStopped()) { @@ -248,14 +232,19 @@ void LLThread::shutdown() { // This thread just wouldn't stop, even though we gave it time //LL_WARNS() << "LLThread::~LLThread() exiting thread before clean exit!" << LL_ENDL; - // Put a stake in its heart. - apr_thread_exit(mAPRThreadp, -1); + // Put a stake in its heart. (A very hostile method to force a thread to quit) +#if LL_WINDOWS + TerminateThread(mNativeHandle, 0); +#else + pthread_cancel(mNativeHandle); +#endif + delete mRecorder; mRecorder = NULL; mStatus = STOPPED; return; } - mAPRThreadp = NULL; + mThreadp = NULL; } delete mRunCondition; @@ -263,12 +252,6 @@ void LLThread::shutdown() delete mDataLock; mDataLock = NULL; - - if (mIsLocalPool && mAPRPoolp) - { - apr_pool_destroy(mAPRPoolp); - mAPRPoolp = 0; - } if (mRecorder) { @@ -287,19 +270,15 @@ void LLThread::start() // Set thread state to running mStatus = RUNNING; - apr_status_t status = - apr_thread_create(&mAPRThreadp, NULL, staticRun, (void *)this, mAPRPoolp); - - if(status == APR_SUCCESS) - { - // We won't bother joining - apr_thread_detach(mAPRThreadp); + try + { + mThreadp = new std::thread(std::bind(&LLThread::threadRun, this)); + mNativeHandle = mThreadp->native_handle(); } - else + catch (std::system_error& ex) { mStatus = STOPPED; - LL_WARNS() << "failed to start thread " << mName << LL_ENDL; - ll_apr_warn_status(status); + LL_WARNS() << "failed to start thread " << mName << " " << ex.what() << LL_ENDL; } } @@ -376,11 +355,7 @@ U32 LLThread::currentID() // static void LLThread::yield() { -#if LL_LINUX || LL_SOLARIS - sched_yield(); // annoyingly, apr_thread_yield is a noop on linux... -#else - apr_thread_yield(); -#endif + std::this_thread::yield(); } void LLThread::wake() diff --git a/indra/llcommon/llthread.h b/indra/llcommon/llthread.h index dda7fa8ffb..863c9051f3 100644 --- a/indra/llcommon/llthread.h +++ b/indra/llcommon/llthread.h @@ -29,10 +29,10 @@ #include "llapp.h" #include "llapr.h" -#include "apr_thread_cond.h" #include "boost/intrusive_ptr.hpp" #include "llmutex.h" #include "llrefcount.h" +#include <thread> LL_COMMON_API void assert_main_thread(); @@ -86,7 +86,6 @@ public: // this kicks off the apr thread void start(void); - apr_pool_t *getAPRPool() { return mAPRPoolp; } LLVolatileAPRPool* getLocalAPRFilePool() { return mLocalAPRFilePoolp ; } U32 getID() const { return mID; } @@ -97,19 +96,18 @@ public: static void registerThreadID(); private: - BOOL mPaused; + bool mPaused; + std::thread::native_handle_type mNativeHandle; // for termination in case of issues // static function passed to APR thread creation routine - static void *APR_THREAD_FUNC staticRun(struct apr_thread_t *apr_threadp, void *datap); + void threadRun(); protected: std::string mName; class LLCondition* mRunCondition; LLMutex* mDataLock; - apr_thread_t *mAPRThreadp; - apr_pool_t *mAPRPoolp; - BOOL mIsLocalPool; + std::thread *mThreadp; EThreadStatus mStatus; U32 mID; LLTrace::ThreadRecorder* mRecorder; diff --git a/indra/llcommon/llthreadsafequeue.cpp b/indra/llcommon/llthreadsafequeue.cpp index 491f920c0f..bde36999ba 100644 --- a/indra/llcommon/llthreadsafequeue.cpp +++ b/indra/llcommon/llthreadsafequeue.cpp @@ -24,87 +24,6 @@ */ #include "linden_common.h" -#include <apr_pools.h> -#include <apr_queue.h> #include "llthreadsafequeue.h" -#include "llexception.h" - -// LLThreadSafeQueueImplementation -//----------------------------------------------------------------------------- - - -LLThreadSafeQueueImplementation::LLThreadSafeQueueImplementation(apr_pool_t * pool, unsigned int capacity): - mOwnsPool(pool == 0), - mPool(pool), - mQueue(0) -{ - if(mOwnsPool) { - apr_status_t status = apr_pool_create(&mPool, 0); - if(status != APR_SUCCESS) LLTHROW(LLThreadSafeQueueError("failed to allocate pool")); - } else { - ; // No op. - } - - apr_status_t status = apr_queue_create(&mQueue, capacity, mPool); - if(status != APR_SUCCESS) LLTHROW(LLThreadSafeQueueError("failed to allocate queue")); -} - - -LLThreadSafeQueueImplementation::~LLThreadSafeQueueImplementation() -{ - if(mQueue != 0) { - if(apr_queue_size(mQueue) != 0) LL_WARNS() << - "terminating queue which still contains " << apr_queue_size(mQueue) << - " elements;" << "memory will be leaked" << LL_ENDL; - apr_queue_term(mQueue); - } - if(mOwnsPool && (mPool != 0)) apr_pool_destroy(mPool); -} - - -void LLThreadSafeQueueImplementation::pushFront(void * element) -{ - apr_status_t status = apr_queue_push(mQueue, element); - - if(status == APR_EINTR) { - LLTHROW(LLThreadSafeQueueInterrupt()); - } else if(status != APR_SUCCESS) { - LLTHROW(LLThreadSafeQueueError("push failed")); - } else { - ; // Success. - } -} - - -bool LLThreadSafeQueueImplementation::tryPushFront(void * element){ - return apr_queue_trypush(mQueue, element) == APR_SUCCESS; -} - - -void * LLThreadSafeQueueImplementation::popBack(void) -{ - void * element; - apr_status_t status = apr_queue_pop(mQueue, &element); - - if(status == APR_EINTR) { - LLTHROW(LLThreadSafeQueueInterrupt()); - } else if(status != APR_SUCCESS) { - LLTHROW(LLThreadSafeQueueError("pop failed")); - } else { - return element; - } -} - - -bool LLThreadSafeQueueImplementation::tryPopBack(void *& element) -{ - return apr_queue_trypop(mQueue, &element) == APR_SUCCESS; -} - - -size_t LLThreadSafeQueueImplementation::size() -{ - return apr_queue_size(mQueue); -} diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 45289ef0b4..b0bddac8e5 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -28,12 +28,20 @@ #define LL_LLTHREADSAFEQUEUE_H #include "llexception.h" +#include <deque> #include <string> +#if LL_WINDOWS +#pragma warning (push) +#pragma warning (disable:4265) +#endif +// 'std::_Pad' : class has virtual functions, but destructor is not virtual +#include <mutex> +#include <condition_variable> -struct apr_pool_t; // From apr_pools.h -class LLThreadSafeQueueImplementation; // See below. - +#if LL_WINDOWS +#pragma warning (pop) +#endif // // A general queue exception. @@ -64,31 +72,6 @@ public: } }; - -struct apr_queue_t; // From apr_queue.h - - -// -// Implementation details. -// -class LL_COMMON_API LLThreadSafeQueueImplementation -{ -public: - LLThreadSafeQueueImplementation(apr_pool_t * pool, unsigned int capacity); - ~LLThreadSafeQueueImplementation(); - void pushFront(void * element); - bool tryPushFront(void * element); - void * popBack(void); - bool tryPopBack(void *& element); - size_t size(); - -private: - bool mOwnsPool; - apr_pool_t * mPool; - apr_queue_t * mQueue; -}; - - // // Implements a thread safe FIFO. // @@ -100,7 +83,7 @@ public: // If the pool is set to NULL one will be allocated and managed by this // queue. - LLThreadSafeQueue(apr_pool_t * pool = 0, unsigned int capacity = 1024); + LLThreadSafeQueue(U32 capacity = 1024); // Add an element to the front of queue (will block if the queue has // reached capacity). @@ -128,77 +111,103 @@ public: size_t size(); private: - LLThreadSafeQueueImplementation mImplementation; -}; - + std::deque< ElementT > mStorage; + U32 mCapacity; + std::mutex mLock; + std::condition_variable mCapacityCond; + std::condition_variable mEmptyCond; +}; // LLThreadSafeQueue //----------------------------------------------------------------------------- - template<typename ElementT> -LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(apr_pool_t * pool, unsigned int capacity): - mImplementation(pool, capacity) +LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(U32 capacity) : +mCapacity(capacity) { - ; // No op. } template<typename ElementT> void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element) { - ElementT * elementCopy = new ElementT(element); - try { - mImplementation.pushFront(elementCopy); - } catch (LLThreadSafeQueueInterrupt) { - delete elementCopy; - throw; - } + while (true) + { + std::unique_lock<std::mutex> lock1(mLock); + + if (mStorage.size() < mCapacity) + { + mStorage.push_front(element); + mEmptyCond.notify_one(); + return; + } + + // Storage Full. Wait for signal. + mCapacityCond.wait(lock1); + } } template<typename ElementT> bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element) { - ElementT * elementCopy = new ElementT(element); - bool result = mImplementation.tryPushFront(elementCopy); - if(!result) delete elementCopy; - return result; + std::unique_lock<std::mutex> lock1(mLock, std::defer_lock); + if (!lock1.try_lock()) + return false; + + if (mStorage.size() >= mCapacity) + return false; + + mStorage.push_front(element); + mEmptyCond.notify_one(); + return true; } template<typename ElementT> ElementT LLThreadSafeQueue<ElementT>::popBack(void) { - ElementT * element = reinterpret_cast<ElementT *> (mImplementation.popBack()); - ElementT result(*element); - delete element; - return result; + while (true) + { + std::unique_lock<std::mutex> lock1(mLock); + + if (!mStorage.empty()) + { + ElementT value = mStorage.back(); + mStorage.pop_back(); + mCapacityCond.notify_one(); + return value; + } + + // Storage empty. Wait for signal. + mEmptyCond.wait(lock1); + } } template<typename ElementT> bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element) { - void * storedElement; - bool result = mImplementation.tryPopBack(storedElement); - if(result) { - ElementT * elementPtr = reinterpret_cast<ElementT *>(storedElement); - element = *elementPtr; - delete elementPtr; - } else { - ; // No op. - } - return result; + std::unique_lock<std::mutex> lock1(mLock, std::defer_lock); + if (!lock1.try_lock()) + return false; + + if (mStorage.empty()) + return false; + + element = mStorage.back(); + mStorage.pop_back(); + mCapacityCond.notify_one(); + return true; } template<typename ElementT> size_t LLThreadSafeQueue<ElementT>::size(void) { - return mImplementation.size(); + std::lock_guard<std::mutex> lock(mLock); + return mStorage.size(); } - #endif diff --git a/indra/newview/llmainlooprepeater.cpp b/indra/newview/llmainlooprepeater.cpp index db8d2e4ede..6736e9a950 100644 --- a/indra/newview/llmainlooprepeater.cpp +++ b/indra/newview/llmainlooprepeater.cpp @@ -46,7 +46,7 @@ void LLMainLoopRepeater::start(void) { if(mQueue != 0) return; - mQueue = new LLThreadSafeQueue<LLSD>(gAPRPoolp, 1024); + mQueue = new LLThreadSafeQueue<LLSD>(1024); mMainLoopConnection = LLEventPumps::instance(). obtain("mainloop").listen(LLEventPump::inventName(), boost::bind(&LLMainLoopRepeater::onMainLoop, this, _1)); mRepeaterConnection = LLEventPumps::instance(). |