summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNat Goodspeed <nat@lindenlab.com>2021-10-04 16:19:59 -0400
committerNat Goodspeed <nat@lindenlab.com>2021-10-04 16:19:59 -0400
commitca60fbe72ce086fbdf0821043ad3be6aad06857c (patch)
tree86f8589528ce721e438a51096735884cbdadb353
parent1b1ebdf183e50c6a751493570ee6e643c33c4eda (diff)
SL-16024: LLThreadSafeQueue enhancements
Add LL::PriorityQueueAdapter, a wrapper for std::priority_queue to make its API more closely resemble std::queue for drop-in use as LLThreadSafeQueue's underlying QueueT container. Support move-only element types. Factor out some implementation redundancy: wrap actual push semantics as push_(), actual pop semantics as pop_(). push(), tryPush() and tryPushUntil() now call push_(); pop(), tryPop() and tryPopUntil() now call pop_(). Break out tryLock() and tryLockUntil() methods that, if they can lock, run the passed callable. Then tryPush(), tryPushUntil(), tryPop() and tryPopUntil() pass lambdas containing the meat of the original method body to tryLock() or tryLockUntil(), as appropriate.
-rw-r--r--indra/llcommon/llthreadsafequeue.h358
1 files changed, 235 insertions, 123 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h
index 04f51816d7..c57520c01f 100644
--- a/indra/llcommon/llthreadsafequeue.h
+++ b/indra/llcommon/llthreadsafequeue.h
@@ -37,6 +37,9 @@
#include <queue>
#include <string>
+/*****************************************************************************
+* LLThreadSafeQueue
+*****************************************************************************/
//
// A general queue exception.
//
@@ -77,8 +80,8 @@ class LLThreadSafeQueue
public:
typedef ElementT value_type;
- // If the pool is set to NULL one will be allocated and managed by this
- // queue.
+ // Limiting the number of pending items prevents unbounded growth of the
+ // underlying queue.
LLThreadSafeQueue(U32 capacity = 1024);
// Add an element to the queue (will block if the queue has
@@ -86,13 +89,15 @@ public:
//
// This call will raise an interrupt error if the queue is closed while
// the caller is blocked.
- void push(ElementT const& element);
+ template <typename T>
+ void push(T&& element);
// legacy name
void pushFront(ElementT const & element) { return push(element); }
// Try to add an element to the queue without blocking. Returns
// true only if the element was actually added.
- bool tryPush(ElementT const& element);
+ template <typename T>
+ bool tryPush(T&& element);
// legacy name
bool tryPushFront(ElementT const & element) { return tryPush(element); }
@@ -102,9 +107,9 @@ public:
// to lock the mutex, versus how long to wait for the queue to stop being
// full. Careful settings for each timeout might be orders of magnitude
// apart. However, this method conflates them.
- template <typename Rep, typename Period>
+ template <typename Rep, typename Period, typename T>
bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout,
- ElementT const & element);
+ T&& element);
// legacy name
template <typename Rep, typename Period>
bool tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout,
@@ -112,9 +117,9 @@ public:
// Try to add an element to the queue, blocking if full but with
// timeout at specified time_point. Returns true if the element was added.
- template <typename Clock, typename Duration>
- bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& timeout,
- ElementT const& element);
+ template <typename Clock, typename Duration, typename T>
+ bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until,
+ T&& element);
// no legacy name because this is a newer method
// Pop the element at the head of the queue (will block if the queue is
@@ -141,7 +146,7 @@ public:
// Pop the element at the head of the queue, blocking if empty, with
// timeout at specified time_point. Returns true if an element was popped.
template <typename Clock, typename Duration>
- bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& timeout,
+ bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until,
ElementT& element);
// no legacy name because this is a newer method
@@ -172,11 +177,74 @@ protected:
typedef std::unique_lock<decltype(mLock)> lock_t;
boost::fibers::condition_variable_any mCapacityCond;
boost::fibers::condition_variable_any mEmptyCond;
-};
-// LLThreadSafeQueue
-//-----------------------------------------------------------------------------
+ // if we're able to lock immediately, do so and run the passed callable,
+ // which must accept lock_t& and return bool
+ template <typename CALLABLE>
+ bool tryLock(CALLABLE&& callable);
+ // if we're able to lock before the passed time_point, do so and run the
+ // passed callable, which must accept lock_t& and return bool
+ template <typename Clock, typename Duration, typename CALLABLE>
+ bool tryLockUntil(const std::chrono::time_point<Clock, Duration>& until,
+ CALLABLE&& callable);
+ // while lock is locked, really push the passed element, if we can
+ template <typename T>
+ bool push_(lock_t& lock, T&& element);
+ // while lock is locked, really pop the head element, if we can
+ template <typename PRED>
+ bool pop_(lock_t& lock, ElementT& element,
+ PRED&& pred=[](const ElementT&){ return true; });
+};
+/*****************************************************************************
+* PriorityQueueAdapter
+*****************************************************************************/
+namespace LL
+{
+ /**
+ * std::priority_queue's API is almost like std::queue, intentionally of
+ * course, but you must access the element about to pop() as top() rather
+ * than as front(). Make an adapter for use with LLThreadSafeQueue.
+ */
+ template <typename T, typename Container=std::vector<T>,
+ typename Compare=std::less<typename Container::value_type>>
+ class PriorityQueueAdapter
+ {
+ public:
+ // publish all the same types
+ typedef std::priority_queue<T, Container, Compare> queue_type;
+ typedef typename queue_type::container_type container_type;
+ typedef typename queue_type::value_compare value_compare;
+ typedef typename queue_type::value_type value_type;
+ typedef typename queue_type::size_type size_type;
+ typedef typename queue_type::reference reference;
+ typedef typename queue_type::const_reference const_reference;
+
+ // Although std::queue defines both const and non-const front()
+ // methods, std::priority_queue defines only const top().
+ const_reference front() const { return mQ.top(); }
+ // std::priority_queue has no equivalent to back(), so it's good that
+ // LLThreadSafeQueue doesn't use it.
+
+ // All the rest of these merely forward to the corresponding
+ // queue_type methods.
+ bool empty() const { return mQ.empty(); }
+ size_type size() const { return mQ.size(); }
+ void push(const value_type& value) { mQ.push(value); }
+ void push(value_type&& value) { mQ.push(std::move(value)); }
+ template <typename... Args>
+ void emplace(Args&&... args) { mQ.emplace(std::forward<Args>(args)...); }
+ void pop() { mQ.pop(); }
+
+ private:
+ queue_type mQ;
+ };
+} // namespace LL
+
+
+/*****************************************************************************
+* LLThreadSafeQueue implementation
+*****************************************************************************/
template<typename ElementT, typename QueueT>
LLThreadSafeQueue<ElementT, QueueT>::LLThreadSafeQueue(U32 capacity) :
mCapacity(capacity),
@@ -185,24 +253,69 @@ LLThreadSafeQueue<ElementT, QueueT>::LLThreadSafeQueue(U32 capacity) :
}
-template<typename ElementT, typename QueueT>
-void LLThreadSafeQueue<ElementT, QueueT>::push(ElementT const & element)
+// if we're able to lock immediately, do so and run the passed callable, which
+// must accept lock_t& and return bool
+template <typename ElementT, typename QueueT>
+template <typename CALLABLE>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryLock(CALLABLE&& callable)
+{
+ lock_t lock1(mLock, std::defer_lock);
+ if (!lock1.try_lock())
+ return false;
+
+ return std::forward<CALLABLE>(callable)(lock1);
+}
+
+
+// if we're able to lock before the passed time_point, do so and run the
+// passed callable, which must accept lock_t& and return bool
+template <typename ElementT, typename QueueT>
+template <typename Clock, typename Duration, typename CALLABLE>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryLockUntil(
+ const std::chrono::time_point<Clock, Duration>& until,
+ CALLABLE&& callable)
+{
+ lock_t lock1(mLock, std::defer_lock);
+ if (!lock1.try_lock_until(until))
+ return false;
+
+ return std::forward<CALLABLE>(callable)(lock1);
+}
+
+
+// while lock is locked, really push the passed element, if we can
+template <typename ElementT, typename QueueT>
+template <typename T>
+bool LLThreadSafeQueue<ElementT, QueueT>::push_(lock_t& lock, T&& element)
+{
+ if (mStorage.size() >= mCapacity)
+ return false;
+
+ mStorage.push(std::forward<T>(element));
+ lock.unlock();
+ // now that we've pushed, if somebody's been waiting to pop, signal them
+ mEmptyCond.notify_one();
+ return true;
+}
+
+
+template <typename ElementT, typename QueueT>
+template<typename T>
+void LLThreadSafeQueue<ElementT, QueueT>::push(T&& element)
{
lock_t lock1(mLock);
while (true)
{
+ // On the producer side, it doesn't matter whether the queue has been
+ // drained or not: the moment either end calls close(), further push()
+ // operations will fail.
if (mClosed)
{
LLTHROW(LLThreadSafeQueueInterrupt());
}
- if (mStorage.size() < mCapacity)
- {
- mStorage.push(element);
- lock1.unlock();
- mEmptyCond.notify_one();
+ if (push_(lock1, std::forward<T>(element)))
return;
- }
// Storage Full. Wait for signal.
mCapacityCond.wait(lock1);
@@ -210,71 +323,85 @@ void LLThreadSafeQueue<ElementT, QueueT>::push(ElementT const & element)
}
+template<typename ElementT, typename QueueT>
+template<typename T>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryPush(T&& element)
+{
+ return tryLock(
+ [this, element=std::move(element)](lock_t& lock)
+ {
+ if (mClosed)
+ return false;
+ return push_(lock, std::move(element));
+ });
+}
+
+
template <typename ElementT, typename QueueT>
-template <typename Rep, typename Period>
+template <typename Rep, typename Period, typename T>
bool LLThreadSafeQueue<ElementT, QueueT>::tryPushFor(
const std::chrono::duration<Rep, Period>& timeout,
- ElementT const & element)
+ T&& element)
{
// Convert duration to time_point: passing the same timeout duration to
// each of multiple calls is wrong.
- return tryPushUntil(std::chrono::steady_clock::now() + timeout, element);
+ return tryPushUntil(std::chrono::steady_clock::now() + timeout,
+ std::forward<T>(element));
}
template <typename ElementT, typename QueueT>
-template <typename Clock, typename Duration>
+template <typename Clock, typename Duration, typename T>
bool LLThreadSafeQueue<ElementT, QueueT>::tryPushUntil(
- const std::chrono::time_point<Clock, Duration>& endpoint,
- ElementT const& element)
+ const std::chrono::time_point<Clock, Duration>& until,
+ T&& element)
{
- lock_t lock1(mLock, std::defer_lock);
- if (!lock1.try_lock_until(endpoint))
- return false;
-
- while (true)
- {
- if (mClosed)
+ return tryLockUntil(
+ until,
+ [this, until, element=std::move(element)](lock_t& lock)
{
- return false;
- }
-
- if (mStorage.size() < mCapacity)
- {
- mStorage.push(element);
- lock1.unlock();
- mEmptyCond.notify_one();
- return true;
- }
-
- // Storage Full. Wait for signal.
- if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock1, endpoint))
- {
- // timed out -- formally we might recheck both conditions above
- return false;
- }
- // If we didn't time out, we were notified for some reason. Loop back
- // to check.
- }
+ while (true)
+ {
+ if (mClosed)
+ {
+ return false;
+ }
+
+ if (push_(lock, std::move(element)))
+ return true;
+
+ // Storage Full. Wait for signal.
+ if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock, until))
+ {
+ // timed out -- formally we might recheck both conditions above
+ return false;
+ }
+ // If we didn't time out, we were notified for some reason. Loop back
+ // to check.
+ }
+ });
}
-template<typename ElementT, typename QueueT>
-bool LLThreadSafeQueue<ElementT, QueueT>::tryPush(ElementT const & element)
+// while lock is locked, really pop the head element, if we can
+template <typename ElementT, typename QueueT>
+template <typename PRED>
+bool LLThreadSafeQueue<ElementT, QueueT>::pop_(
+ lock_t& lock, ElementT& element, PRED&& pred)
{
- lock_t lock1(mLock, std::defer_lock);
- if (!lock1.try_lock())
- return false;
-
- if (mClosed)
- return false;
-
- if (mStorage.size() >= mCapacity)
+ // If mStorage is empty, there's no head element.
+ // If there's a head element, pass it to the predicate to see if caller
+ // considers it ready to pop.
+ // Unless both are satisfied, no point in continuing.
+ if (mStorage.empty() || ! std::forward<PRED>(pred)(mStorage.front()))
return false;
- mStorage.push(element);
- lock1.unlock();
- mEmptyCond.notify_one();
+ // std::queue::front() is the element about to pop()
+ element = mStorage.front();
+ mStorage.pop();
+ lock.unlock();
+ // now that we've popped, if somebody's been waiting to push, signal them
+ mCapacityCond.notify_one();
return true;
}
@@ -285,22 +412,20 @@ ElementT LLThreadSafeQueue<ElementT, QueueT>::pop(void)
lock_t lock1(mLock);
while (true)
{
- if (!mStorage.empty())
- {
- // std::queue::front() is the element about to pop()
- ElementT value = mStorage.front();
- mStorage.pop();
- lock1.unlock();
- mCapacityCond.notify_one();
- return value;
- }
-
+ // On the consumer side, we always try to pop before checking mClosed
+ // so we can finish draining the queue.
+ ElementT value;
+ if (pop_(lock1, value))
+ return std::move(value);
+
+ // Once the queue is empty, mClosed lets us know if there will ever be
+ // any more coming.
if (mClosed)
{
LLTHROW(LLThreadSafeQueueInterrupt());
}
- // Storage empty. Wait for signal.
+ // Storage empty, queue still open. Wait for signal.
mEmptyCond.wait(lock1);
}
}
@@ -309,21 +434,14 @@ ElementT LLThreadSafeQueue<ElementT, QueueT>::pop(void)
template<typename ElementT, typename QueueT>
bool LLThreadSafeQueue<ElementT, QueueT>::tryPop(ElementT & element)
{
- lock_t lock1(mLock, std::defer_lock);
- if (!lock1.try_lock())
- return false;
-
- // no need to check mClosed: tryPop() behavior when the queue is
- // closed is implemented by simple inability to push any new elements
- if (mStorage.empty())
- return false;
-
- // std::queue::front() is the element about to pop()
- element = mStorage.front();
- mStorage.pop();
- lock1.unlock();
- mCapacityCond.notify_one();
- return true;
+ return tryLock(
+ [this, &element](lock_t& lock)
+ {
+ // no need to check mClosed: tryPop() behavior when the queue is
+ // closed is implemented by simple inability to push any new
+ // elements
+ return pop_(lock, element);
+ });
}
@@ -342,39 +460,33 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPopFor(
template <typename ElementT, typename QueueT>
template <typename Clock, typename Duration>
bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil(
- const std::chrono::time_point<Clock, Duration>& endpoint,
+ const std::chrono::time_point<Clock, Duration>& until,
ElementT& element)
{
- lock_t lock1(mLock, std::defer_lock);
- if (!lock1.try_lock_until(endpoint))
- return false;
-
- while (true)
- {
- if (!mStorage.empty())
+ return tryLockUntil(
+ until,
+ [this, until, &element](lock_t& lock)
{
- // std::queue::front() is the element about to pop()
- element = mStorage.front();
- mStorage.pop();
- lock1.unlock();
- mCapacityCond.notify_one();
- return true;
- }
-
- if (mClosed)
- {
- return false;
- }
-
- // Storage empty. Wait for signal.
- if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock1, endpoint))
- {
- // timed out -- formally we might recheck both conditions above
- return false;
- }
- // If we didn't time out, we were notified for some reason. Loop back
- // to check.
- }
+ while (true)
+ {
+ if (pop_(lock, element))
+ return true;
+
+ if (mClosed)
+ {
+ return false;
+ }
+
+ // Storage empty. Wait for signal.
+ if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until))
+ {
+ // timed out -- formally we might recheck both conditions above
+ return false;
+ }
+ // If we didn't time out, we were notified for some reason. Loop back
+ // to check.
+ }
+ });
}