diff options
Diffstat (limited to 'indra/llcommon/llthreadsafequeue.h')
-rw-r--r-- | indra/llcommon/llthreadsafequeue.h | 156 |
1 files changed, 130 insertions, 26 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index b0bddac8e5..30dd507f73 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -30,18 +30,12 @@ #include "llexception.h" #include <deque> #include <string> - -#if LL_WINDOWS -#pragma warning (push) -#pragma warning (disable:4265) -#endif -// 'std::_Pad' : class has virtual functions, but destructor is not virtual -#include <mutex> -#include <condition_variable> - -#if LL_WINDOWS -#pragma warning (pop) -#endif +#include <chrono> +#include "mutex.h" +#include "llcoros.h" +#include LLCOROS_MUTEX_HEADER +#include <boost/fiber/timed_mutex.hpp> +#include LLCOROS_CONDVAR_HEADER // // A general queue exception. @@ -88,18 +82,28 @@ public: // Add an element to the front of queue (will block if the queue has // reached capacity). // - // This call will raise an interrupt error if the queue is deleted while + // This call will raise an interrupt error if the queue is closed while // the caller is blocked. void pushFront(ElementT const & element); - // Try to add an element to the front ofqueue without blocking. Returns + // Try to add an element to the front of queue without blocking. Returns // true only if the element was actually added. bool tryPushFront(ElementT const & element); - + + // Try to add an element to the front of queue, blocking if full but with + // timeout. Returns true if the element was added. + // There are potentially two different timeouts involved: how long to try + // to lock the mutex, versus how long to wait for the queue to stop being + // full. Careful settings for each timeout might be orders of magnitude + // apart. However, this method conflates them. + template <typename Rep, typename Period> + bool tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout, + ElementT const & element); + // Pop the element at the end of the queue (will block if the queue is // empty). // - // This call will raise an interrupt error if the queue is deleted while + // This call will raise an interrupt error if the queue is closed while // the caller is blocked. ElementT popBack(void); @@ -110,13 +114,29 @@ public: // Returns the size of the queue. size_t size(); + // closes the queue: + // - every subsequent pushFront() call will throw LLThreadSafeQueueInterrupt + // - every subsequent tryPushFront() call will return false + // - popBack() calls will return normally until the queue is drained, then + // every subsequent popBack() will throw LLThreadSafeQueueInterrupt + // - tryPopBack() calls will return normally until the queue is drained, + // then every subsequent tryPopBack() call will return false + void close(); + + // detect closed state + bool isClosed(); + // inverse of isClosed() + explicit operator bool(); + private: std::deque< ElementT > mStorage; U32 mCapacity; + bool mClosed; - std::mutex mLock; - std::condition_variable mCapacityCond; - std::condition_variable mEmptyCond; + boost::fibers::timed_mutex mLock; + typedef std::unique_lock<decltype(mLock)> lock_t; + boost::fibers::condition_variable_any mCapacityCond; + boost::fibers::condition_variable_any mEmptyCond; }; // LLThreadSafeQueue @@ -124,7 +144,8 @@ private: template<typename ElementT> LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(U32 capacity) : -mCapacity(capacity) + mCapacity(capacity), + mClosed(false) { } @@ -132,13 +153,18 @@ mCapacity(capacity) template<typename ElementT> void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element) { + lock_t lock1(mLock); while (true) { - std::unique_lock<std::mutex> lock1(mLock); + if (mClosed) + { + LLTHROW(LLThreadSafeQueueInterrupt()); + } if (mStorage.size() < mCapacity) { mStorage.push_front(element); + lock1.unlock(); mEmptyCond.notify_one(); return; } @@ -149,17 +175,61 @@ void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element) } +template <typename ElementT> +template <typename Rep, typename Period> +bool LLThreadSafeQueue<ElementT>::tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout, + ElementT const & element) +{ + // Convert duration to time_point: passing the same timeout duration to + // each of multiple calls is wrong. + auto endpoint = std::chrono::steady_clock::now() + timeout; + + lock_t lock1(mLock, std::defer_lock); + if (!lock1.try_lock_until(endpoint)) + return false; + + while (true) + { + if (mClosed) + { + return false; + } + + if (mStorage.size() < mCapacity) + { + mStorage.push_front(element); + lock1.unlock(); + mEmptyCond.notify_one(); + return true; + } + + // Storage Full. Wait for signal. + if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock1, endpoint)) + { + // timed out -- formally we might recheck both conditions above + return false; + } + // If we didn't time out, we were notified for some reason. Loop back + // to check. + } +} + + template<typename ElementT> bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element) { - std::unique_lock<std::mutex> lock1(mLock, std::defer_lock); + lock_t lock1(mLock, std::defer_lock); if (!lock1.try_lock()) return false; + if (mClosed) + return false; + if (mStorage.size() >= mCapacity) return false; mStorage.push_front(element); + lock1.unlock(); mEmptyCond.notify_one(); return true; } @@ -168,18 +238,23 @@ bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element) template<typename ElementT> ElementT LLThreadSafeQueue<ElementT>::popBack(void) { + lock_t lock1(mLock); while (true) { - std::unique_lock<std::mutex> lock1(mLock); - if (!mStorage.empty()) { ElementT value = mStorage.back(); mStorage.pop_back(); + lock1.unlock(); mCapacityCond.notify_one(); return value; } + if (mClosed) + { + LLTHROW(LLThreadSafeQueueInterrupt()); + } + // Storage empty. Wait for signal. mEmptyCond.wait(lock1); } @@ -189,15 +264,18 @@ ElementT LLThreadSafeQueue<ElementT>::popBack(void) template<typename ElementT> bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element) { - std::unique_lock<std::mutex> lock1(mLock, std::defer_lock); + lock_t lock1(mLock, std::defer_lock); if (!lock1.try_lock()) return false; + // no need to check mClosed: tryPopBack() behavior when the queue is + // closed is implemented by simple inability to push any new elements if (mStorage.empty()) return false; element = mStorage.back(); mStorage.pop_back(); + lock1.unlock(); mCapacityCond.notify_one(); return true; } @@ -206,8 +284,34 @@ bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element) template<typename ElementT> size_t LLThreadSafeQueue<ElementT>::size(void) { - std::lock_guard<std::mutex> lock(mLock); + lock_t lock(mLock); return mStorage.size(); } +template<typename ElementT> +void LLThreadSafeQueue<ElementT>::close() +{ + lock_t lock(mLock); + mClosed = true; + lock.unlock(); + // wake up any blocked popBack() calls + mEmptyCond.notify_all(); + // wake up any blocked pushFront() calls + mCapacityCond.notify_all(); +} + +template<typename ElementT> +bool LLThreadSafeQueue<ElementT>::isClosed() +{ + lock_t lock(mLock); + return mClosed; +} + +template<typename ElementT> +LLThreadSafeQueue<ElementT>::operator bool() +{ + lock_t lock(mLock); + return ! mClosed; +} + #endif |