diff options
Diffstat (limited to 'indra/llcommon/llthreadsafequeue.h')
-rw-r--r-- | indra/llcommon/llthreadsafequeue.h | 135 |
1 files changed, 72 insertions, 63 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 45289ef0b4..b0bddac8e5 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -28,12 +28,20 @@ #define LL_LLTHREADSAFEQUEUE_H #include "llexception.h" +#include <deque> #include <string> +#if LL_WINDOWS +#pragma warning (push) +#pragma warning (disable:4265) +#endif +// 'std::_Pad' : class has virtual functions, but destructor is not virtual +#include <mutex> +#include <condition_variable> -struct apr_pool_t; // From apr_pools.h -class LLThreadSafeQueueImplementation; // See below. - +#if LL_WINDOWS +#pragma warning (pop) +#endif // // A general queue exception. @@ -64,31 +72,6 @@ public: } }; - -struct apr_queue_t; // From apr_queue.h - - -// -// Implementation details. -// -class LL_COMMON_API LLThreadSafeQueueImplementation -{ -public: - LLThreadSafeQueueImplementation(apr_pool_t * pool, unsigned int capacity); - ~LLThreadSafeQueueImplementation(); - void pushFront(void * element); - bool tryPushFront(void * element); - void * popBack(void); - bool tryPopBack(void *& element); - size_t size(); - -private: - bool mOwnsPool; - apr_pool_t * mPool; - apr_queue_t * mQueue; -}; - - // // Implements a thread safe FIFO. // @@ -100,7 +83,7 @@ public: // If the pool is set to NULL one will be allocated and managed by this // queue. - LLThreadSafeQueue(apr_pool_t * pool = 0, unsigned int capacity = 1024); + LLThreadSafeQueue(U32 capacity = 1024); // Add an element to the front of queue (will block if the queue has // reached capacity). @@ -128,77 +111,103 @@ public: size_t size(); private: - LLThreadSafeQueueImplementation mImplementation; -}; - + std::deque< ElementT > mStorage; + U32 mCapacity; + std::mutex mLock; + std::condition_variable mCapacityCond; + std::condition_variable mEmptyCond; +}; // LLThreadSafeQueue //----------------------------------------------------------------------------- - template<typename ElementT> -LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(apr_pool_t * pool, unsigned int capacity): - mImplementation(pool, capacity) +LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(U32 capacity) : +mCapacity(capacity) { - ; // No op. } template<typename ElementT> void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element) { - ElementT * elementCopy = new ElementT(element); - try { - mImplementation.pushFront(elementCopy); - } catch (LLThreadSafeQueueInterrupt) { - delete elementCopy; - throw; - } + while (true) + { + std::unique_lock<std::mutex> lock1(mLock); + + if (mStorage.size() < mCapacity) + { + mStorage.push_front(element); + mEmptyCond.notify_one(); + return; + } + + // Storage Full. Wait for signal. + mCapacityCond.wait(lock1); + } } template<typename ElementT> bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element) { - ElementT * elementCopy = new ElementT(element); - bool result = mImplementation.tryPushFront(elementCopy); - if(!result) delete elementCopy; - return result; + std::unique_lock<std::mutex> lock1(mLock, std::defer_lock); + if (!lock1.try_lock()) + return false; + + if (mStorage.size() >= mCapacity) + return false; + + mStorage.push_front(element); + mEmptyCond.notify_one(); + return true; } template<typename ElementT> ElementT LLThreadSafeQueue<ElementT>::popBack(void) { - ElementT * element = reinterpret_cast<ElementT *> (mImplementation.popBack()); - ElementT result(*element); - delete element; - return result; + while (true) + { + std::unique_lock<std::mutex> lock1(mLock); + + if (!mStorage.empty()) + { + ElementT value = mStorage.back(); + mStorage.pop_back(); + mCapacityCond.notify_one(); + return value; + } + + // Storage empty. Wait for signal. + mEmptyCond.wait(lock1); + } } template<typename ElementT> bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element) { - void * storedElement; - bool result = mImplementation.tryPopBack(storedElement); - if(result) { - ElementT * elementPtr = reinterpret_cast<ElementT *>(storedElement); - element = *elementPtr; - delete elementPtr; - } else { - ; // No op. - } - return result; + std::unique_lock<std::mutex> lock1(mLock, std::defer_lock); + if (!lock1.try_lock()) + return false; + + if (mStorage.empty()) + return false; + + element = mStorage.back(); + mStorage.pop_back(); + mCapacityCond.notify_one(); + return true; } template<typename ElementT> size_t LLThreadSafeQueue<ElementT>::size(void) { - return mImplementation.size(); + std::lock_guard<std::mutex> lock(mLock); + return mStorage.size(); } - #endif |