diff options
Diffstat (limited to 'indra/llcommon/llthreadsafequeue.h')
-rw-r--r-- | indra/llcommon/llthreadsafequeue.h | 99 |
1 files changed, 75 insertions, 24 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index b0bddac8e5..8f5e0f3bf3 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -30,18 +30,9 @@ #include "llexception.h" #include <deque> #include <string> - -#if LL_WINDOWS -#pragma warning (push) -#pragma warning (disable:4265) -#endif -// 'std::_Pad' : class has virtual functions, but destructor is not virtual -#include <mutex> -#include <condition_variable> - -#if LL_WINDOWS -#pragma warning (pop) -#endif +#include "mutex.h" +#include "llcoros.h" +#include LLCOROS_CONDVAR_HEADER // // A general queue exception. @@ -88,7 +79,7 @@ public: // Add an element to the front of queue (will block if the queue has // reached capacity). // - // This call will raise an interrupt error if the queue is deleted while + // This call will raise an interrupt error if the queue is closed while // the caller is blocked. void pushFront(ElementT const & element); @@ -99,7 +90,7 @@ public: // Pop the element at the end of the queue (will block if the queue is // empty). // - // This call will raise an interrupt error if the queue is deleted while + // This call will raise an interrupt error if the queue is closed while // the caller is blocked. ElementT popBack(void); @@ -110,13 +101,29 @@ public: // Returns the size of the queue. size_t size(); + // closes the queue: + // - every subsequent pushFront() call will throw LLThreadSafeQueueInterrupt + // - every subsequent tryPushFront() call will return false + // - popBack() calls will return normally until the queue is drained, then + // every subsequent popBack() will throw LLThreadSafeQueueInterrupt + // - tryPopBack() calls will return normally until the queue is drained, + // then every subsequent tryPopBack() call will return false + void close(); + + // detect closed state + bool isClosed(); + // inverse of isClosed() + explicit operator bool(); + private: std::deque< ElementT > mStorage; U32 mCapacity; + bool mClosed; - std::mutex mLock; - std::condition_variable mCapacityCond; - std::condition_variable mEmptyCond; + LLCoros::Mutex mLock; + typedef LLCoros::LockType lock_t; + LLCoros::ConditionVariable mCapacityCond; + LLCoros::ConditionVariable mEmptyCond; }; // LLThreadSafeQueue @@ -124,7 +131,8 @@ private: template<typename ElementT> LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(U32 capacity) : -mCapacity(capacity) + mCapacity(capacity), + mClosed(false) { } @@ -132,13 +140,18 @@ mCapacity(capacity) template<typename ElementT> void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element) { + lock_t lock1(mLock); while (true) { - std::unique_lock<std::mutex> lock1(mLock); + if (mClosed) + { + LLTHROW(LLThreadSafeQueueInterrupt()); + } if (mStorage.size() < mCapacity) { mStorage.push_front(element); + lock1.unlock(); mEmptyCond.notify_one(); return; } @@ -152,14 +165,18 @@ void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element) template<typename ElementT> bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element) { - std::unique_lock<std::mutex> lock1(mLock, std::defer_lock); + lock_t lock1(mLock, std::defer_lock); if (!lock1.try_lock()) return false; + if (mClosed) + return false; + if (mStorage.size() >= mCapacity) return false; mStorage.push_front(element); + lock1.unlock(); mEmptyCond.notify_one(); return true; } @@ -168,18 +185,23 @@ bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element) template<typename ElementT> ElementT LLThreadSafeQueue<ElementT>::popBack(void) { + lock_t lock1(mLock); while (true) { - std::unique_lock<std::mutex> lock1(mLock); - if (!mStorage.empty()) { ElementT value = mStorage.back(); mStorage.pop_back(); + lock1.unlock(); mCapacityCond.notify_one(); return value; } + if (mClosed) + { + LLTHROW(LLThreadSafeQueueInterrupt()); + } + // Storage empty. Wait for signal. mEmptyCond.wait(lock1); } @@ -189,15 +211,18 @@ ElementT LLThreadSafeQueue<ElementT>::popBack(void) template<typename ElementT> bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element) { - std::unique_lock<std::mutex> lock1(mLock, std::defer_lock); + lock_t lock1(mLock, std::defer_lock); if (!lock1.try_lock()) return false; + // no need to check mClosed: tryPopBack() behavior when the queue is + // closed is implemented by simple inability to push any new elements if (mStorage.empty()) return false; element = mStorage.back(); mStorage.pop_back(); + lock1.unlock(); mCapacityCond.notify_one(); return true; } @@ -206,8 +231,34 @@ bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element) template<typename ElementT> size_t LLThreadSafeQueue<ElementT>::size(void) { - std::lock_guard<std::mutex> lock(mLock); + lock_t lock(mLock); return mStorage.size(); } +template<typename ElementT> +void LLThreadSafeQueue<ElementT>::close() +{ + lock_t lock(mLock); + mClosed = true; + lock.unlock(); + // wake up any blocked popBack() calls + mEmptyCond.notify_all(); + // wake up any blocked pushFront() calls + mCapacityCond.notify_all(); +} + +template<typename ElementT> +bool LLThreadSafeQueue<ElementT>::isClosed() +{ + lock_t lock(mLock); + return mClosed; +} + +template<typename ElementT> +LLThreadSafeQueue<ElementT>::operator bool() +{ + lock_t lock(mLock); + return ! mClosed; +} + #endif |