diff options
-rw-r--r-- | indra/llcommon/llthreadsafequeue.h | 78 |
1 files changed, 70 insertions, 8 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 3f49dbccc2..bac536f7ee 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -78,7 +78,7 @@ public: // Add an element to the front of queue (will block if the queue has // reached capacity). // - // This call will raise an interrupt error if the queue is deleted while + // This call will raise an interrupt error if the queue is closed while // the caller is blocked. void pushFront(ElementT const & element); @@ -89,7 +89,7 @@ public: // Pop the element at the end of the queue (will block if the queue is // empty). // - // This call will raise an interrupt error if the queue is deleted while + // This call will raise an interrupt error if the queue is closed while // the caller is blocked. ElementT popBack(void); @@ -100,11 +100,27 @@ public: // Returns the size of the queue. size_t size(); + // closes the queue: + // - every subsequent pushFront() call will throw LLThreadSafeQueueInterrupt + // - every subsequent tryPushFront() call will return false + // - popBack() calls will return normally until the queue is drained, then + // every subsequent popBack() will throw LLThreadSafeQueueInterrupt + // - tryPopBack() calls will return normally until the queue is drained, + // then every subsequent tryPopBack() call will return false + void close(); + + // detect closed state + bool isClosed(); + // inverse of isClosed() + explicit operator bool(); + private: std::deque< ElementT > mStorage; U32 mCapacity; + bool mClosed; boost::fibers::mutex mLock; + typedef std::unique_lock<decltype(mLock)> lock_t; boost::fibers::condition_variable mCapacityCond; boost::fibers::condition_variable mEmptyCond; }; @@ -114,7 +130,8 @@ private: template<typename ElementT> LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(U32 capacity) : -mCapacity(capacity) + mCapacity(capacity), + mClosed(false) { } @@ -122,12 +139,18 @@ mCapacity(capacity) template<typename ElementT> void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element) { - std::unique_lock<decltype(mLock)> lock1(mLock); + lock_t lock1(mLock); while (true) { + if (mClosed) + { + LLTHROW(LLThreadSafeQueueInterrupt()); + } + if (mStorage.size() < mCapacity) { mStorage.push_front(element); + lock1.unlock(); mEmptyCond.notify_one(); return; } @@ -141,14 +164,18 @@ void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element) template<typename ElementT> bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element) { - std::unique_lock<decltype(mLock)> lock1(mLock, std::defer_lock); + lock_t lock1(mLock, std::defer_lock); if (!lock1.try_lock()) return false; + if (mClosed) + return false; + if (mStorage.size() >= mCapacity) return false; mStorage.push_front(element); + lock1.unlock(); mEmptyCond.notify_one(); return true; } @@ -157,17 +184,23 @@ bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element) template<typename ElementT> ElementT LLThreadSafeQueue<ElementT>::popBack(void) { - std::unique_lock<decltype(mLock)> lock1(mLock); + lock_t lock1(mLock); while (true) { if (!mStorage.empty()) { ElementT value = mStorage.back(); mStorage.pop_back(); + lock1.unlock(); mCapacityCond.notify_one(); return value; } + if (mClosed) + { + LLTHROW(LLThreadSafeQueueInterrupt()); + } + // Storage empty. Wait for signal. mEmptyCond.wait(lock1); } @@ -177,15 +210,18 @@ ElementT LLThreadSafeQueue<ElementT>::popBack(void) template<typename ElementT> bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element) { - std::unique_lock<decltype(mLock)> lock1(mLock, std::defer_lock); + lock_t lock1(mLock, std::defer_lock); if (!lock1.try_lock()) return false; + // no need to check mClosed: tryPopBack() behavior when the queue is + // closed is implemented by simple inability to push any new elements if (mStorage.empty()) return false; element = mStorage.back(); mStorage.pop_back(); + lock1.unlock(); mCapacityCond.notify_one(); return true; } @@ -194,8 +230,34 @@ bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element) template<typename ElementT> size_t LLThreadSafeQueue<ElementT>::size(void) { - std::lock_guard<decltype(mLock)> lock(mLock); + lock_t lock(mLock); return mStorage.size(); } +template<typename ElementT> +void LLThreadSafeQueue<ElementT>::close() +{ + lock_t lock(mLock); + mClosed = true; + lock.unlock(); + // wake up any blocked popBack() calls + mEmptyCond.notify_all(); + // wake up any blocked pushFront() calls + mCapacityCond.notify_all(); +} + +template<typename ElementT> +bool LLThreadSafeQueue<ElementT>::isClosed() +{ + lock_t lock(mLock); + return mClosed; +} + +template<typename ElementT> +LLThreadSafeQueue<ElementT>::operator bool() +{ + lock_t lock(mLock); + return ! mClosed; +} + #endif |