diff options
| -rw-r--r-- | indra/llcommon/llthread.cpp | 77 | ||||
| -rw-r--r-- | indra/llcommon/llthread.h | 12 | ||||
| -rw-r--r-- | indra/llcommon/llthreadsafequeue.cpp | 81 | ||||
| -rw-r--r-- | indra/llcommon/llthreadsafequeue.h | 135 | ||||
| -rw-r--r-- | indra/newview/llmainlooprepeater.cpp | 2 | 
5 files changed, 104 insertions, 203 deletions
| diff --git a/indra/llcommon/llthread.cpp b/indra/llcommon/llthread.cpp index 860415bb22..a4171729db 100644 --- a/indra/llcommon/llthread.cpp +++ b/indra/llcommon/llthread.cpp @@ -116,29 +116,27 @@ void LLThread::registerThreadID()  //  // Handed to the APR thread creation function  // -void *APR_THREAD_FUNC LLThread::staticRun(apr_thread_t *apr_threadp, void *datap) +void LLThread::threadRun()  { -    LLThread *threadp = (LLThread *)datap; -  #ifdef LL_WINDOWS -    set_thread_name(-1, threadp->mName.c_str()); +    set_thread_name(-1, mName.c_str());  #endif      // for now, hard code all LLThreads to report to single master thread recorder, which is known to be running on main thread -    threadp->mRecorder = new LLTrace::ThreadRecorder(*LLTrace::get_master_thread_recorder()); +    mRecorder = new LLTrace::ThreadRecorder(*LLTrace::get_master_thread_recorder()); -    sThreadID = threadp->mID; +    sThreadID = mID;      // Run the user supplied function      do       {          try          { -            threadp->run(); +            run();          }          catch (const LLContinueError &e)          { -            LL_WARNS("THREAD") << "ContinueException on thread '" << threadp->mName << +            LL_WARNS("THREAD") << "ContinueException on thread '" << mName <<                  "' reentering run(). Error what is: '" << e.what() << "'" << LL_ENDL;              //output possible call stacks to log file.              LLError::LLCallStacks::print(); @@ -153,39 +151,25 @@ void *APR_THREAD_FUNC LLThread::staticRun(apr_thread_t *apr_threadp, void *datap      //LL_INFOS() << "LLThread::staticRun() Exiting: " << threadp->mName << LL_ENDL; -    delete threadp->mRecorder; -    threadp->mRecorder = NULL; +    delete mRecorder; +    mRecorder = NULL;      // We're done with the run function, this thread is done executing now.      //NB: we are using this flag to sync across threads...we really need memory barriers here      // Todo: add LLMutex per thread instead of flag?      // We are using "while (mStatus != STOPPED) {ms_sleep();}" everywhere. -    threadp->mStatus = STOPPED; - -    return NULL; +    mStatus = STOPPED;  }  LLThread::LLThread(const std::string& name, apr_pool_t *poolp) :      mPaused(FALSE),      mName(name), -    mAPRThreadp(NULL), +    mThreadp(NULL),      mStatus(STOPPED),      mRecorder(NULL)  {      mID = ++sIDIter; - -    // Thread creation probably CAN be paranoid about APR being initialized, if necessary -    if (poolp) -    { -        mIsLocalPool = FALSE; -        mAPRPoolp = poolp; -    } -    else -    { -        mIsLocalPool = TRUE; -        apr_pool_create(&mAPRPoolp, NULL); // Create a subpool for this thread -    }      mRunCondition = new LLCondition();      mDataLock = new LLMutex();      mLocalAPRFilePoolp = NULL ; @@ -217,7 +201,7 @@ void LLThread::shutdown()      // Warning!  If you somehow call the thread destructor from itself,      // the thread will die in an unclean fashion! -    if (mAPRThreadp) +    if (mThreadp)      {          if (!isStopped())          { @@ -248,14 +232,19 @@ void LLThread::shutdown()          {              // This thread just wouldn't stop, even though we gave it time              //LL_WARNS() << "LLThread::~LLThread() exiting thread before clean exit!" << LL_ENDL; -            // Put a stake in its heart. -            apr_thread_exit(mAPRThreadp, -1); +            // Put a stake in its heart. (A very hostile method to force a thread to quit) +#if		LL_WINDOWS +            TerminateThread(mNativeHandle, 0); +#else +            pthread_cancel(mNativeHandle); +#endif +              delete mRecorder;              mRecorder = NULL;              mStatus = STOPPED;              return;          } -        mAPRThreadp = NULL; +        mThreadp = NULL;      }      delete mRunCondition; @@ -263,12 +252,6 @@ void LLThread::shutdown()      delete mDataLock;      mDataLock = NULL; -     -    if (mIsLocalPool && mAPRPoolp) -    { -        apr_pool_destroy(mAPRPoolp); -        mAPRPoolp = 0; -    }      if (mRecorder)      { @@ -287,19 +270,15 @@ void LLThread::start()      // Set thread state to running      mStatus = RUNNING; -    apr_status_t status = -        apr_thread_create(&mAPRThreadp, NULL, staticRun, (void *)this, mAPRPoolp); -     -    if(status == APR_SUCCESS) -    {    -        // We won't bother joining -        apr_thread_detach(mAPRThreadp); +    try +    { +        mThreadp = new std::thread(std::bind(&LLThread::threadRun, this)); +        mNativeHandle = mThreadp->native_handle();      } -    else +    catch (std::system_error& ex)      {          mStatus = STOPPED; -        LL_WARNS() << "failed to start thread " << mName << LL_ENDL; -        ll_apr_warn_status(status); +        LL_WARNS() << "failed to start thread " << mName << " " << ex.what() << LL_ENDL;      }  } @@ -376,11 +355,7 @@ U32 LLThread::currentID()  // static  void LLThread::yield()  { -#if LL_LINUX || LL_SOLARIS -    sched_yield(); // annoyingly, apr_thread_yield  is a noop on linux... -#else -    apr_thread_yield(); -#endif +    std::this_thread::yield();  }  void LLThread::wake() diff --git a/indra/llcommon/llthread.h b/indra/llcommon/llthread.h index dda7fa8ffb..863c9051f3 100644 --- a/indra/llcommon/llthread.h +++ b/indra/llcommon/llthread.h @@ -29,10 +29,10 @@  #include "llapp.h"  #include "llapr.h" -#include "apr_thread_cond.h"  #include "boost/intrusive_ptr.hpp"  #include "llmutex.h"  #include "llrefcount.h" +#include <thread>  LL_COMMON_API void assert_main_thread(); @@ -86,7 +86,6 @@ public:      // this kicks off the apr thread      void start(void); -    apr_pool_t *getAPRPool() { return mAPRPoolp; }      LLVolatileAPRPool* getLocalAPRFilePool() { return mLocalAPRFilePoolp ; }      U32 getID() const { return mID; } @@ -97,19 +96,18 @@ public:      static void registerThreadID();  private: -    BOOL                mPaused; +    bool                mPaused; +    std::thread::native_handle_type mNativeHandle; // for termination in case of issues      // static function passed to APR thread creation routine -    static void *APR_THREAD_FUNC staticRun(struct apr_thread_t *apr_threadp, void *datap); +    void threadRun();  protected:      std::string         mName;      class LLCondition*  mRunCondition;      LLMutex*            mDataLock; -    apr_thread_t        *mAPRThreadp; -    apr_pool_t          *mAPRPoolp; -    BOOL                mIsLocalPool; +    std::thread        *mThreadp;      EThreadStatus       mStatus;      U32                 mID;      LLTrace::ThreadRecorder* mRecorder; diff --git a/indra/llcommon/llthreadsafequeue.cpp b/indra/llcommon/llthreadsafequeue.cpp index 491f920c0f..bde36999ba 100644 --- a/indra/llcommon/llthreadsafequeue.cpp +++ b/indra/llcommon/llthreadsafequeue.cpp @@ -24,87 +24,6 @@   */  #include "linden_common.h" -#include <apr_pools.h> -#include <apr_queue.h>  #include "llthreadsafequeue.h" -#include "llexception.h" - -// LLThreadSafeQueueImplementation -//----------------------------------------------------------------------------- - - -LLThreadSafeQueueImplementation::LLThreadSafeQueueImplementation(apr_pool_t * pool, unsigned int capacity): -	mOwnsPool(pool == 0), -	mPool(pool), -	mQueue(0) -{ -	if(mOwnsPool) { -		apr_status_t status = apr_pool_create(&mPool, 0); -		if(status != APR_SUCCESS) LLTHROW(LLThreadSafeQueueError("failed to allocate pool")); -	} else { -		; // No op. -	} -	 -	apr_status_t status = apr_queue_create(&mQueue, capacity, mPool); -	if(status != APR_SUCCESS) LLTHROW(LLThreadSafeQueueError("failed to allocate queue")); -} - - -LLThreadSafeQueueImplementation::~LLThreadSafeQueueImplementation() -{ -	if(mQueue != 0) { -		if(apr_queue_size(mQueue) != 0) LL_WARNS() <<  -			"terminating queue which still contains " << apr_queue_size(mQueue) << -			" elements;" << "memory will be leaked" << LL_ENDL; -		apr_queue_term(mQueue); -	} -	if(mOwnsPool && (mPool != 0)) apr_pool_destroy(mPool); -} - - -void LLThreadSafeQueueImplementation::pushFront(void * element) -{ -	apr_status_t status = apr_queue_push(mQueue, element); -	 -	if(status == APR_EINTR) { -		LLTHROW(LLThreadSafeQueueInterrupt()); -	} else if(status != APR_SUCCESS) { -		LLTHROW(LLThreadSafeQueueError("push failed")); -	} else { -		; // Success. -	} -} - - -bool LLThreadSafeQueueImplementation::tryPushFront(void * element){ -	return apr_queue_trypush(mQueue, element) == APR_SUCCESS; -} - - -void * LLThreadSafeQueueImplementation::popBack(void) -{ -	void * element; -	apr_status_t status = apr_queue_pop(mQueue, &element); - -	if(status == APR_EINTR) { -		LLTHROW(LLThreadSafeQueueInterrupt()); -	} else if(status != APR_SUCCESS) { -		LLTHROW(LLThreadSafeQueueError("pop failed")); -	} else { -		return element; -	} -} - - -bool LLThreadSafeQueueImplementation::tryPopBack(void *& element) -{ -	return apr_queue_trypop(mQueue, &element) == APR_SUCCESS; -} - - -size_t LLThreadSafeQueueImplementation::size() -{ -	return apr_queue_size(mQueue); -} diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 45289ef0b4..b0bddac8e5 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -28,12 +28,20 @@  #define LL_LLTHREADSAFEQUEUE_H  #include "llexception.h" +#include <deque>  #include <string> +#if LL_WINDOWS +#pragma warning (push) +#pragma warning (disable:4265) +#endif +// 'std::_Pad' : class has virtual functions, but destructor is not virtual +#include <mutex> +#include <condition_variable> -struct apr_pool_t; // From apr_pools.h -class LLThreadSafeQueueImplementation; // See below. - +#if LL_WINDOWS +#pragma warning (pop) +#endif  //  // A general queue exception. @@ -64,31 +72,6 @@ public:  	}  }; - -struct apr_queue_t; // From apr_queue.h - - -// -// Implementation details.  -// -class LL_COMMON_API LLThreadSafeQueueImplementation -{ -public: -	LLThreadSafeQueueImplementation(apr_pool_t * pool, unsigned int capacity); -	~LLThreadSafeQueueImplementation(); -	void pushFront(void * element); -	bool tryPushFront(void * element); -	void * popBack(void); -	bool tryPopBack(void *& element); -	size_t size(); -	 -private: -	bool mOwnsPool; -	apr_pool_t * mPool; -	apr_queue_t * mQueue; -}; - -  //  // Implements a thread safe FIFO.  // @@ -100,7 +83,7 @@ public:  	// If the pool is set to NULL one will be allocated and managed by this  	// queue. -	LLThreadSafeQueue(apr_pool_t * pool = 0, unsigned int capacity = 1024); +	LLThreadSafeQueue(U32 capacity = 1024);  	// Add an element to the front of queue (will block if the queue has  	// reached capacity). @@ -128,77 +111,103 @@ public:  	size_t size();  private: -	LLThreadSafeQueueImplementation mImplementation; -}; - +	std::deque< ElementT > mStorage; +	U32 mCapacity; +	std::mutex mLock; +	std::condition_variable mCapacityCond; +	std::condition_variable mEmptyCond; +};  // LLThreadSafeQueue  //----------------------------------------------------------------------------- -  template<typename ElementT> -LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(apr_pool_t * pool, unsigned int capacity): -	mImplementation(pool, capacity) +LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(U32 capacity) : +mCapacity(capacity)  { -	; // No op.  }  template<typename ElementT>  void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)  { -	ElementT * elementCopy = new ElementT(element); -	try { -		mImplementation.pushFront(elementCopy); -	} catch (LLThreadSafeQueueInterrupt) { -		delete elementCopy; -		throw; -	} +    while (true) +    { +        std::unique_lock<std::mutex> lock1(mLock); + +        if (mStorage.size() < mCapacity) +        { +            mStorage.push_front(element); +            mEmptyCond.notify_one(); +            return; +        } + +        // Storage Full. Wait for signal. +        mCapacityCond.wait(lock1); +    }  }  template<typename ElementT>  bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)  { -	ElementT * elementCopy = new ElementT(element); -	bool result = mImplementation.tryPushFront(elementCopy); -	if(!result) delete elementCopy; -	return result; +    std::unique_lock<std::mutex> lock1(mLock, std::defer_lock); +    if (!lock1.try_lock()) +        return false; + +    if (mStorage.size() >= mCapacity) +        return false; + +    mStorage.push_front(element); +    mEmptyCond.notify_one(); +    return true;  }  template<typename ElementT>  ElementT LLThreadSafeQueue<ElementT>::popBack(void)  { -	ElementT * element = reinterpret_cast<ElementT *> (mImplementation.popBack()); -	ElementT result(*element); -	delete element; -	return result; +    while (true) +    { +        std::unique_lock<std::mutex> lock1(mLock); + +        if (!mStorage.empty()) +        { +            ElementT value = mStorage.back(); +            mStorage.pop_back(); +            mCapacityCond.notify_one(); +            return value; +        } + +        // Storage empty. Wait for signal. +        mEmptyCond.wait(lock1); +    }  }  template<typename ElementT>  bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element)  { -	void * storedElement; -	bool result = mImplementation.tryPopBack(storedElement); -	if(result) { -		ElementT * elementPtr = reinterpret_cast<ElementT *>(storedElement);  -		element = *elementPtr; -		delete elementPtr; -	} else { -		; // No op. -	} -	return result; +    std::unique_lock<std::mutex> lock1(mLock, std::defer_lock); +    if (!lock1.try_lock()) +        return false; + +    if (mStorage.empty()) +        return false; + +    element = mStorage.back(); +    mStorage.pop_back(); +    mCapacityCond.notify_one(); +    return true;  }  template<typename ElementT>  size_t LLThreadSafeQueue<ElementT>::size(void)  { -	return mImplementation.size(); +    std::lock_guard<std::mutex> lock(mLock); +    return mStorage.size();  } -  #endif diff --git a/indra/newview/llmainlooprepeater.cpp b/indra/newview/llmainlooprepeater.cpp index db8d2e4ede..6736e9a950 100644 --- a/indra/newview/llmainlooprepeater.cpp +++ b/indra/newview/llmainlooprepeater.cpp @@ -46,7 +46,7 @@ void LLMainLoopRepeater::start(void)  {  	if(mQueue != 0) return; -	mQueue = new LLThreadSafeQueue<LLSD>(gAPRPoolp, 1024); +	mQueue = new LLThreadSafeQueue<LLSD>(1024);  	mMainLoopConnection = LLEventPumps::instance().  		obtain("mainloop").listen(LLEventPump::inventName(), boost::bind(&LLMainLoopRepeater::onMainLoop, this, _1));  	mRepeaterConnection = LLEventPumps::instance(). | 
