diff options
Diffstat (limited to 'indra/llcommon/llthreadsafequeue.h')
-rw-r--r-- | indra/llcommon/llthreadsafequeue.h | 554 |
1 files changed, 414 insertions, 140 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 26e0d71d31..68d79cdd12 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -1,6 +1,6 @@ /** * @file llthreadsafequeue.h - * @brief Base classes for thread, mutex and condition handling. + * @brief Queue protected with mutexes for cross-thread use * * $LicenseInfo:firstyear=2004&license=viewerlgpl$ * Second Life Viewer Source Code @@ -27,16 +27,19 @@ #ifndef LL_LLTHREADSAFEQUEUE_H #define LL_LLTHREADSAFEQUEUE_H -#include "llexception.h" -#include <deque> -#include <string> -#include <chrono> -#include "mutex.h" #include "llcoros.h" #include LLCOROS_MUTEX_HEADER #include <boost/fiber/timed_mutex.hpp> #include LLCOROS_CONDVAR_HEADER +#include "llexception.h" +#include "mutex.h" +#include <chrono> +#include <queue> +#include <string> +/***************************************************************************** +* LLThreadSafeQueue +*****************************************************************************/ // // A general queue exception. // @@ -66,70 +69,116 @@ public: } }; -// -// Implements a thread safe FIFO. -// -template<typename ElementT> +/** + * Implements a thread safe FIFO. + */ +// Let the default std::queue default to underlying std::deque. Override if +// desired. +template<typename ElementT, typename QueueT=std::queue<ElementT>> class LLThreadSafeQueue { public: typedef ElementT value_type; - - // If the pool is set to NULL one will be allocated and managed by this - // queue. + + // Limiting the number of pending items prevents unbounded growth of the + // underlying queue. LLThreadSafeQueue(U32 capacity = 1024); - - // Add an element to the front of queue (will block if the queue has - // reached capacity). + virtual ~LLThreadSafeQueue() {} + + // Add an element to the queue (will block if the queue has reached + // capacity). // // This call will raise an interrupt error if the queue is closed while // the caller is blocked. - void pushFront(ElementT const & element); - - // Try to add an element to the front of queue without blocking. Returns + template <typename T> + void push(T&& element); + // legacy name + void pushFront(ElementT const & element) { return push(element); } + + // Add an element to the queue (will block if the queue has reached + // capacity). Return false if the queue is closed before push is possible. + template <typename T> + bool pushIfOpen(T&& element); + + // Try to add an element to the queue without blocking. Returns // true only if the element was actually added. - bool tryPushFront(ElementT const & element); + template <typename T> + bool tryPush(T&& element); + // legacy name + bool tryPushFront(ElementT const & element) { return tryPush(element); } - // Try to add an element to the front of queue, blocking if full but with - // timeout. Returns true if the element was added. + // Try to add an element to the queue, blocking if full but with timeout + // after specified duration. Returns true if the element was added. // There are potentially two different timeouts involved: how long to try // to lock the mutex, versus how long to wait for the queue to stop being // full. Careful settings for each timeout might be orders of magnitude // apart. However, this method conflates them. + template <typename Rep, typename Period, typename T> + bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout, + T&& element); + // legacy name template <typename Rep, typename Period> bool tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout, - ElementT const & element); + ElementT const & element) { return tryPushFor(timeout, element); } + + // Try to add an element to the queue, blocking if full but with + // timeout at specified time_point. Returns true if the element was added. + template <typename Clock, typename Duration, typename T> + bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until, + T&& element); + // no legacy name because this is a newer method - // Pop the element at the end of the queue (will block if the queue is + // Pop the element at the head of the queue (will block if the queue is // empty). // // This call will raise an interrupt error if the queue is closed while // the caller is blocked. - ElementT popBack(void); - - // Pop an element from the end of the queue if there is one available. + ElementT pop(void); + // legacy name + ElementT popBack(void) { return pop(); } + + // Pop an element from the head of the queue if there is one available. // Returns true only if an element was popped. - bool tryPopBack(ElementT & element); - + bool tryPop(ElementT & element); + // legacy name + bool tryPopBack(ElementT & element) { return tryPop(element); } + + // Pop the element at the head of the queue, blocking if empty, with + // timeout after specified duration. Returns true if an element was popped. + template <typename Rep, typename Period> + bool tryPopFor(const std::chrono::duration<Rep, Period>& timeout, ElementT& element); + // no legacy name because this is a newer method + + // Pop the element at the head of the queue, blocking if empty, with + // timeout at specified time_point. Returns true if an element was popped. + template <typename Clock, typename Duration> + bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until, + ElementT& element); + // no legacy name because this is a newer method + // Returns the size of the queue. size_t size(); + //Returns the capacity of the queue. + U32 capacity() { return mCapacity; } + // closes the queue: - // - every subsequent pushFront() call will throw LLThreadSafeQueueInterrupt - // - every subsequent tryPushFront() call will return false - // - popBack() calls will return normally until the queue is drained, then - // every subsequent popBack() will throw LLThreadSafeQueueInterrupt - // - tryPopBack() calls will return normally until the queue is drained, - // then every subsequent tryPopBack() call will return false + // - every subsequent push() call will throw LLThreadSafeQueueInterrupt + // - every subsequent tryPush() call will return false + // - pop() calls will return normally until the queue is drained, then + // every subsequent pop() will throw LLThreadSafeQueueInterrupt + // - tryPop() calls will return normally until the queue is drained, + // then every subsequent tryPop() call will return false void close(); - // detect closed state + // producer end: are we prevented from pushing any additional items? bool isClosed(); - // inverse of isClosed() - explicit operator bool(); + // consumer end: are we done, is the queue entirely drained? + bool done(); -private: - std::deque< ElementT > mStorage; +protected: + typedef QueueT queue_type; + QueueT mStorage; U32 mCapacity; bool mClosed; @@ -137,37 +186,154 @@ private: typedef std::unique_lock<decltype(mLock)> lock_t; boost::fibers::condition_variable_any mCapacityCond; boost::fibers::condition_variable_any mEmptyCond; -}; -// LLThreadSafeQueue -//----------------------------------------------------------------------------- + enum pop_result { EMPTY, DONE, WAITING, POPPED }; + // implementation logic, suitable for passing to tryLockUntil() + template <typename Clock, typename Duration> + pop_result tryPopUntil_(lock_t& lock, + const std::chrono::time_point<Clock, Duration>& until, + ElementT& element); + // if we're able to lock immediately, do so and run the passed callable, + // which must accept lock_t& and return bool + template <typename CALLABLE> + bool tryLock(CALLABLE&& callable); + // if we're able to lock before the passed time_point, do so and run the + // passed callable, which must accept lock_t& and return bool + template <typename Clock, typename Duration, typename CALLABLE> + bool tryLockUntil(const std::chrono::time_point<Clock, Duration>& until, + CALLABLE&& callable); + // while lock is locked, really push the passed element, if we can + template <typename T> + bool push_(lock_t& lock, T&& element); + // while lock is locked, really pop the head element, if we can + pop_result pop_(lock_t& lock, ElementT& element); + // Is the current head element ready to pop? We say yes; subclass can + // override as needed. + virtual bool canPop(const ElementT& head) const { return true; } +}; -template<typename ElementT> -LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(U32 capacity) : +/***************************************************************************** +* PriorityQueueAdapter +*****************************************************************************/ +namespace LL +{ + /** + * std::priority_queue's API is almost like std::queue, intentionally of + * course, but you must access the element about to pop() as top() rather + * than as front(). Make an adapter for use with LLThreadSafeQueue. + */ + template <typename T, typename Container=std::vector<T>, + typename Compare=std::less<typename Container::value_type>> + class PriorityQueueAdapter + { + public: + // publish all the same types + typedef std::priority_queue<T, Container, Compare> queue_type; + typedef typename queue_type::container_type container_type; + typedef typename queue_type::value_compare value_compare; + typedef typename queue_type::value_type value_type; + typedef typename queue_type::size_type size_type; + typedef typename queue_type::reference reference; + typedef typename queue_type::const_reference const_reference; + + // Although std::queue defines both const and non-const front() + // methods, std::priority_queue defines only const top(). + const_reference front() const { return mQ.top(); } + // std::priority_queue has no equivalent to back(), so it's good that + // LLThreadSafeQueue doesn't use it. + + // All the rest of these merely forward to the corresponding + // queue_type methods. + bool empty() const { return mQ.empty(); } + size_type size() const { return mQ.size(); } + void push(const value_type& value) { mQ.push(value); } + void push(value_type&& value) { mQ.push(std::move(value)); } + template <typename... Args> + void emplace(Args&&... args) { mQ.emplace(std::forward<Args>(args)...); } + void pop() { mQ.pop(); } + + private: + queue_type mQ; + }; +} // namespace LL + + +/***************************************************************************** +* LLThreadSafeQueue implementation +*****************************************************************************/ +template<typename ElementT, typename QueueT> +LLThreadSafeQueue<ElementT, QueueT>::LLThreadSafeQueue(U32 capacity) : mCapacity(capacity), mClosed(false) { } -template<typename ElementT> -void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element) +// if we're able to lock immediately, do so and run the passed callable, which +// must accept lock_t& and return bool +template <typename ElementT, typename QueueT> +template <typename CALLABLE> +bool LLThreadSafeQueue<ElementT, QueueT>::tryLock(CALLABLE&& callable) +{ + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + lock_t lock1(mLock, std::defer_lock); + if (!lock1.try_lock()) + return false; + + return std::forward<CALLABLE>(callable)(lock1); +} + + +// if we're able to lock before the passed time_point, do so and run the +// passed callable, which must accept lock_t& and return bool +template <typename ElementT, typename QueueT> +template <typename Clock, typename Duration, typename CALLABLE> +bool LLThreadSafeQueue<ElementT, QueueT>::tryLockUntil( + const std::chrono::time_point<Clock, Duration>& until, + CALLABLE&& callable) { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + lock_t lock1(mLock, std::defer_lock); + if (!lock1.try_lock_until(until)) + return false; + + return std::forward<CALLABLE>(callable)(lock1); +} + + +// while lock is locked, really push the passed element, if we can +template <typename ElementT, typename QueueT> +template <typename T> +bool LLThreadSafeQueue<ElementT, QueueT>::push_(lock_t& lock, T&& element) +{ + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + if (mStorage.size() >= mCapacity) + return false; + + mStorage.push(std::forward<T>(element)); + lock.unlock(); + // now that we've pushed, if somebody's been waiting to pop, signal them + mEmptyCond.notify_one(); + return true; +} + + +template <typename ElementT, typename QueueT> +template <typename T> +bool LLThreadSafeQueue<ElementT, QueueT>::pushIfOpen(T&& element) +{ + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; lock_t lock1(mLock); while (true) { + // On the producer side, it doesn't matter whether the queue has been + // drained or not: the moment either end calls close(), further push() + // operations will fail. if (mClosed) - { - LLTHROW(LLThreadSafeQueueInterrupt()); - } + return false; - if (mStorage.size() < mCapacity) - { - mStorage.push_front(element); - lock1.unlock(); - mEmptyCond.notify_one(); - return; - } + if (push_(lock1, std::forward<T>(element))) + return true; // Storage Full. Wait for signal. mCapacityCond.wait(lock1); @@ -175,142 +341,250 @@ void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element) } -template <typename ElementT> -template <typename Rep, typename Period> -bool LLThreadSafeQueue<ElementT>::tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout, - ElementT const & element) +template <typename ElementT, typename QueueT> +template<typename T> +void LLThreadSafeQueue<ElementT, QueueT>::push(T&& element) { - // Convert duration to time_point: passing the same timeout duration to - // each of multiple calls is wrong. - auto endpoint = std::chrono::steady_clock::now() + timeout; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + if (! pushIfOpen(std::forward<T>(element))) + { + LLTHROW(LLThreadSafeQueueInterrupt()); + } +} - lock_t lock1(mLock, std::defer_lock); - if (!lock1.try_lock_until(endpoint)) - return false; - while (true) - { - if (mClosed) +template<typename ElementT, typename QueueT> +template<typename T> +bool LLThreadSafeQueue<ElementT, QueueT>::tryPush(T&& element) +{ + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + return tryLock( + [this, element=std::move(element)](lock_t& lock) { - return false; - } + if (mClosed) + return false; + return push_(lock, std::move(element)); + }); +} - if (mStorage.size() < mCapacity) - { - mStorage.push_front(element); - lock1.unlock(); - mEmptyCond.notify_one(); - return true; - } - // Storage Full. Wait for signal. - if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock1, endpoint)) - { - // timed out -- formally we might recheck both conditions above - return false; - } - // If we didn't time out, we were notified for some reason. Loop back - // to check. - } +template <typename ElementT, typename QueueT> +template <typename Rep, typename Period, typename T> +bool LLThreadSafeQueue<ElementT, QueueT>::tryPushFor( + const std::chrono::duration<Rep, Period>& timeout, + T&& element) +{ + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + // Convert duration to time_point: passing the same timeout duration to + // each of multiple calls is wrong. + return tryPushUntil(std::chrono::steady_clock::now() + timeout, + std::forward<T>(element)); } -template<typename ElementT> -bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element) +template <typename ElementT, typename QueueT> +template <typename Clock, typename Duration, typename T> +bool LLThreadSafeQueue<ElementT, QueueT>::tryPushUntil( + const std::chrono::time_point<Clock, Duration>& until, + T&& element) { - lock_t lock1(mLock, std::defer_lock); - if (!lock1.try_lock()) - return false; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + return tryLockUntil( + until, + [this, until, element=std::move(element)](lock_t& lock) + { + while (true) + { + if (mClosed) + { + return false; + } + + if (push_(lock, std::move(element))) + return true; + + // Storage Full. Wait for signal. + if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock, until)) + { + // timed out -- formally we might recheck both conditions above + return false; + } + // If we didn't time out, we were notified for some reason. Loop back + // to check. + } + }); +} - if (mClosed) - return false; - if (mStorage.size() >= mCapacity) - return false; +// while lock is locked, really pop the head element, if we can +template <typename ElementT, typename QueueT> +typename LLThreadSafeQueue<ElementT, QueueT>::pop_result +LLThreadSafeQueue<ElementT, QueueT>::pop_(lock_t& lock, ElementT& element) +{ + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + // If mStorage is empty, there's no head element. + if (mStorage.empty()) + return mClosed? DONE : EMPTY; - mStorage.push_front(element); - lock1.unlock(); - mEmptyCond.notify_one(); - return true; + // If there's a head element, pass it to canPop() to see if it's ready to pop. + if (! canPop(mStorage.front())) + return WAITING; + + // std::queue::front() is the element about to pop() + element = mStorage.front(); + mStorage.pop(); + lock.unlock(); + // now that we've popped, if somebody's been waiting to push, signal them + mCapacityCond.notify_one(); + return POPPED; } -template<typename ElementT> -ElementT LLThreadSafeQueue<ElementT>::popBack(void) +template<typename ElementT, typename QueueT> +ElementT LLThreadSafeQueue<ElementT, QueueT>::pop(void) { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; lock_t lock1(mLock); + ElementT value; while (true) { - if (!mStorage.empty()) - { - ElementT value = mStorage.back(); - mStorage.pop_back(); - lock1.unlock(); - mCapacityCond.notify_one(); - return value; - } - - if (mClosed) + // On the consumer side, we always try to pop before checking mClosed + // so we can finish draining the queue. + pop_result popped = pop_(lock1, value); + if (popped == POPPED) + return std::move(value); + + // Once the queue is DONE, there will never be any more coming. + if (popped == DONE) { LLTHROW(LLThreadSafeQueueInterrupt()); } - // Storage empty. Wait for signal. + // If we didn't pop because WAITING, i.e. canPop() returned false, + // then even if the producer end has been closed, there's still at + // least one item to drain: wait for it. Or we might be EMPTY, with + // the queue still open. Either way, wait for signal. mEmptyCond.wait(lock1); } } -template<typename ElementT> -bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element) +template<typename ElementT, typename QueueT> +bool LLThreadSafeQueue<ElementT, QueueT>::tryPop(ElementT & element) { - lock_t lock1(mLock, std::defer_lock); - if (!lock1.try_lock()) - return false; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + return tryLock( + [this, &element](lock_t& lock) + { + // conflate EMPTY, DONE, WAITING: tryPop() behavior when the queue + // is closed is implemented by simple inability to push any new + // elements + return pop_(lock, element) == POPPED; + }); +} - // no need to check mClosed: tryPopBack() behavior when the queue is - // closed is implemented by simple inability to push any new elements - if (mStorage.empty()) - return false; - element = mStorage.back(); - mStorage.pop_back(); - lock1.unlock(); - mCapacityCond.notify_one(); - return true; +template <typename ElementT, typename QueueT> +template <typename Rep, typename Period> +bool LLThreadSafeQueue<ElementT, QueueT>::tryPopFor( + const std::chrono::duration<Rep, Period>& timeout, + ElementT& element) +{ + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + // Convert duration to time_point: passing the same timeout duration to + // each of multiple calls is wrong. + return tryPopUntil(std::chrono::steady_clock::now() + timeout, element); } -template<typename ElementT> -size_t LLThreadSafeQueue<ElementT>::size(void) +template <typename ElementT, typename QueueT> +template <typename Clock, typename Duration> +bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil( + const std::chrono::time_point<Clock, Duration>& until, + ElementT& element) { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + return tryLockUntil( + until, + [this, until, &element](lock_t& lock) + { + // conflate EMPTY, DONE, WAITING + return tryPopUntil_(lock, until, element) == POPPED; + }); +} + + +// body of tryPopUntil(), called once we have the lock +template <typename ElementT, typename QueueT> +template <typename Clock, typename Duration> +typename LLThreadSafeQueue<ElementT, QueueT>::pop_result +LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil_( + lock_t& lock, + const std::chrono::time_point<Clock, Duration>& until, + ElementT& element) +{ + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + while (true) + { + pop_result popped = pop_(lock, element); + if (popped == POPPED || popped == DONE) + { + // If we succeeded, great! If we've drained the last item, so be + // it. Either way, break the loop and tell caller. + return popped; + } + + // EMPTY or WAITING: wait for signal. + if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until)) + { + // timed out -- formally we might recheck + // as it is, break loop + return popped; + } + // If we didn't time out, we were notified for some reason. Loop back + // to check. + } +} + + +template<typename ElementT, typename QueueT> +size_t LLThreadSafeQueue<ElementT, QueueT>::size(void) +{ + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; lock_t lock(mLock); return mStorage.size(); } -template<typename ElementT> -void LLThreadSafeQueue<ElementT>::close() + +template<typename ElementT, typename QueueT> +void LLThreadSafeQueue<ElementT, QueueT>::close() { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; lock_t lock(mLock); mClosed = true; lock.unlock(); - // wake up any blocked popBack() calls + // wake up any blocked pop() calls mEmptyCond.notify_all(); - // wake up any blocked pushFront() calls + // wake up any blocked push() calls mCapacityCond.notify_all(); } -template<typename ElementT> -bool LLThreadSafeQueue<ElementT>::isClosed() + +template<typename ElementT, typename QueueT> +bool LLThreadSafeQueue<ElementT, QueueT>::isClosed() { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; lock_t lock(mLock); - return mClosed && mStorage.size() == 0; + return mClosed; } -template<typename ElementT> -LLThreadSafeQueue<ElementT>::operator bool() + +template<typename ElementT, typename QueueT> +bool LLThreadSafeQueue<ElementT, QueueT>::done() { - return ! isClosed(); + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + lock_t lock(mLock); + return mClosed && mStorage.empty(); } #endif |