summaryrefslogtreecommitdiff
path: root/indra/llcommon
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llcommon')
-rw-r--r--indra/llcommon/llthreadsafequeue.h358
1 files changed, 235 insertions, 123 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h
index 04f51816d7..c57520c01f 100644
--- a/indra/llcommon/llthreadsafequeue.h
+++ b/indra/llcommon/llthreadsafequeue.h
@@ -37,6 +37,9 @@
#include <queue>
#include <string>
+/*****************************************************************************
+* LLThreadSafeQueue
+*****************************************************************************/
//
// A general queue exception.
//
@@ -77,8 +80,8 @@ class LLThreadSafeQueue
public:
typedef ElementT value_type;
- // If the pool is set to NULL one will be allocated and managed by this
- // queue.
+ // Limiting the number of pending items prevents unbounded growth of the
+ // underlying queue.
LLThreadSafeQueue(U32 capacity = 1024);
// Add an element to the queue (will block if the queue has
@@ -86,13 +89,15 @@ public:
//
// This call will raise an interrupt error if the queue is closed while
// the caller is blocked.
- void push(ElementT const& element);
+ template <typename T>
+ void push(T&& element);
// legacy name
void pushFront(ElementT const & element) { return push(element); }
// Try to add an element to the queue without blocking. Returns
// true only if the element was actually added.
- bool tryPush(ElementT const& element);
+ template <typename T>
+ bool tryPush(T&& element);
// legacy name
bool tryPushFront(ElementT const & element) { return tryPush(element); }
@@ -102,9 +107,9 @@ public:
// to lock the mutex, versus how long to wait for the queue to stop being
// full. Careful settings for each timeout might be orders of magnitude
// apart. However, this method conflates them.
- template <typename Rep, typename Period>
+ template <typename Rep, typename Period, typename T>
bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout,
- ElementT const & element);
+ T&& element);
// legacy name
template <typename Rep, typename Period>
bool tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout,
@@ -112,9 +117,9 @@ public:
// Try to add an element to the queue, blocking if full but with
// timeout at specified time_point. Returns true if the element was added.
- template <typename Clock, typename Duration>
- bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& timeout,
- ElementT const& element);
+ template <typename Clock, typename Duration, typename T>
+ bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until,
+ T&& element);
// no legacy name because this is a newer method
// Pop the element at the head of the queue (will block if the queue is
@@ -141,7 +146,7 @@ public:
// Pop the element at the head of the queue, blocking if empty, with
// timeout at specified time_point. Returns true if an element was popped.
template <typename Clock, typename Duration>
- bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& timeout,
+ bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until,
ElementT& element);
// no legacy name because this is a newer method
@@ -172,11 +177,74 @@ protected:
typedef std::unique_lock<decltype(mLock)> lock_t;
boost::fibers::condition_variable_any mCapacityCond;
boost::fibers::condition_variable_any mEmptyCond;
-};
-// LLThreadSafeQueue
-//-----------------------------------------------------------------------------
+ // if we're able to lock immediately, do so and run the passed callable,
+ // which must accept lock_t& and return bool
+ template <typename CALLABLE>
+ bool tryLock(CALLABLE&& callable);
+ // if we're able to lock before the passed time_point, do so and run the
+ // passed callable, which must accept lock_t& and return bool
+ template <typename Clock, typename Duration, typename CALLABLE>
+ bool tryLockUntil(const std::chrono::time_point<Clock, Duration>& until,
+ CALLABLE&& callable);
+ // while lock is locked, really push the passed element, if we can
+ template <typename T>
+ bool push_(lock_t& lock, T&& element);
+ // while lock is locked, really pop the head element, if we can
+ template <typename PRED>
+ bool pop_(lock_t& lock, ElementT& element,
+ PRED&& pred=[](const ElementT&){ return true; });
+};
+/*****************************************************************************
+* PriorityQueueAdapter
+*****************************************************************************/
+namespace LL
+{
+ /**
+ * std::priority_queue's API is almost like std::queue, intentionally of
+ * course, but you must access the element about to pop() as top() rather
+ * than as front(). Make an adapter for use with LLThreadSafeQueue.
+ */
+ template <typename T, typename Container=std::vector<T>,
+ typename Compare=std::less<typename Container::value_type>>
+ class PriorityQueueAdapter
+ {
+ public:
+ // publish all the same types
+ typedef std::priority_queue<T, Container, Compare> queue_type;
+ typedef typename queue_type::container_type container_type;
+ typedef typename queue_type::value_compare value_compare;
+ typedef typename queue_type::value_type value_type;
+ typedef typename queue_type::size_type size_type;
+ typedef typename queue_type::reference reference;
+ typedef typename queue_type::const_reference const_reference;
+
+ // Although std::queue defines both const and non-const front()
+ // methods, std::priority_queue defines only const top().
+ const_reference front() const { return mQ.top(); }
+ // std::priority_queue has no equivalent to back(), so it's good that
+ // LLThreadSafeQueue doesn't use it.
+
+ // All the rest of these merely forward to the corresponding
+ // queue_type methods.
+ bool empty() const { return mQ.empty(); }
+ size_type size() const { return mQ.size(); }
+ void push(const value_type& value) { mQ.push(value); }
+ void push(value_type&& value) { mQ.push(std::move(value)); }
+ template <typename... Args>
+ void emplace(Args&&... args) { mQ.emplace(std::forward<Args>(args)...); }
+ void pop() { mQ.pop(); }
+
+ private:
+ queue_type mQ;
+ };
+} // namespace LL
+
+
+/*****************************************************************************
+* LLThreadSafeQueue implementation
+*****************************************************************************/
template<typename ElementT, typename QueueT>
LLThreadSafeQueue<ElementT, QueueT>::LLThreadSafeQueue(U32 capacity) :
mCapacity(capacity),
@@ -185,24 +253,69 @@ LLThreadSafeQueue<ElementT, QueueT>::LLThreadSafeQueue(U32 capacity) :
}
-template<typename ElementT, typename QueueT>
-void LLThreadSafeQueue<ElementT, QueueT>::push(ElementT const & element)
+// if we're able to lock immediately, do so and run the passed callable, which
+// must accept lock_t& and return bool
+template <typename ElementT, typename QueueT>
+template <typename CALLABLE>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryLock(CALLABLE&& callable)
+{
+ lock_t lock1(mLock, std::defer_lock);
+ if (!lock1.try_lock())
+ return false;
+
+ return std::forward<CALLABLE>(callable)(lock1);
+}
+
+
+// if we're able to lock before the passed time_point, do so and run the
+// passed callable, which must accept lock_t& and return bool
+template <typename ElementT, typename QueueT>
+template <typename Clock, typename Duration, typename CALLABLE>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryLockUntil(
+ const std::chrono::time_point<Clock, Duration>& until,
+ CALLABLE&& callable)
+{
+ lock_t lock1(mLock, std::defer_lock);
+ if (!lock1.try_lock_until(until))
+ return false;
+
+ return std::forward<CALLABLE>(callable)(lock1);
+}
+
+
+// while lock is locked, really push the passed element, if we can
+template <typename ElementT, typename QueueT>
+template <typename T>
+bool LLThreadSafeQueue<ElementT, QueueT>::push_(lock_t& lock, T&& element)
+{
+ if (mStorage.size() >= mCapacity)
+ return false;
+
+ mStorage.push(std::forward<T>(element));
+ lock.unlock();
+ // now that we've pushed, if somebody's been waiting to pop, signal them
+ mEmptyCond.notify_one();
+ return true;
+}
+
+
+template <typename ElementT, typename QueueT>
+template<typename T>
+void LLThreadSafeQueue<ElementT, QueueT>::push(T&& element)
{
lock_t lock1(mLock);
while (true)
{
+ // On the producer side, it doesn't matter whether the queue has been
+ // drained or not: the moment either end calls close(), further push()
+ // operations will fail.
if (mClosed)
{
LLTHROW(LLThreadSafeQueueInterrupt());
}
- if (mStorage.size() < mCapacity)
- {
- mStorage.push(element);
- lock1.unlock();
- mEmptyCond.notify_one();
+ if (push_(lock1, std::forward<T>(element)))
return;
- }
// Storage Full. Wait for signal.
mCapacityCond.wait(lock1);
@@ -210,71 +323,85 @@ void LLThreadSafeQueue<ElementT, QueueT>::push(ElementT const & element)
}
+template<typename ElementT, typename QueueT>
+template<typename T>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryPush(T&& element)
+{
+ return tryLock(
+ [this, element=std::move(element)](lock_t& lock)
+ {
+ if (mClosed)
+ return false;
+ return push_(lock, std::move(element));
+ });
+}
+
+
template <typename ElementT, typename QueueT>
-template <typename Rep, typename Period>
+template <typename Rep, typename Period, typename T>
bool LLThreadSafeQueue<ElementT, QueueT>::tryPushFor(
const std::chrono::duration<Rep, Period>& timeout,
- ElementT const & element)
+ T&& element)
{
// Convert duration to time_point: passing the same timeout duration to
// each of multiple calls is wrong.
- return tryPushUntil(std::chrono::steady_clock::now() + timeout, element);
+ return tryPushUntil(std::chrono::steady_clock::now() + timeout,
+ std::forward<T>(element));
}
template <typename ElementT, typename QueueT>
-template <typename Clock, typename Duration>
+template <typename Clock, typename Duration, typename T>
bool LLThreadSafeQueue<ElementT, QueueT>::tryPushUntil(
- const std::chrono::time_point<Clock, Duration>& endpoint,
- ElementT const& element)
+ const std::chrono::time_point<Clock, Duration>& until,
+ T&& element)
{
- lock_t lock1(mLock, std::defer_lock);
- if (!lock1.try_lock_until(endpoint))
- return false;
-
- while (true)
- {
- if (mClosed)
+ return tryLockUntil(
+ until,
+ [this, until, element=std::move(element)](lock_t& lock)
{
- return false;
- }
-
- if (mStorage.size() < mCapacity)
- {
- mStorage.push(element);
- lock1.unlock();
- mEmptyCond.notify_one();
- return true;
- }
-
- // Storage Full. Wait for signal.
- if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock1, endpoint))
- {
- // timed out -- formally we might recheck both conditions above
- return false;
- }
- // If we didn't time out, we were notified for some reason. Loop back
- // to check.
- }
+ while (true)
+ {
+ if (mClosed)
+ {
+ return false;
+ }
+
+ if (push_(lock, std::move(element)))
+ return true;
+
+ // Storage Full. Wait for signal.
+ if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock, until))
+ {
+ // timed out -- formally we might recheck both conditions above
+ return false;
+ }
+ // If we didn't time out, we were notified for some reason. Loop back
+ // to check.
+ }
+ });
}
-template<typename ElementT, typename QueueT>
-bool LLThreadSafeQueue<ElementT, QueueT>::tryPush(ElementT const & element)
+// while lock is locked, really pop the head element, if we can
+template <typename ElementT, typename QueueT>
+template <typename PRED>
+bool LLThreadSafeQueue<ElementT, QueueT>::pop_(
+ lock_t& lock, ElementT& element, PRED&& pred)
{
- lock_t lock1(mLock, std::defer_lock);
- if (!lock1.try_lock())
- return false;
-
- if (mClosed)
- return false;
-
- if (mStorage.size() >= mCapacity)
+ // If mStorage is empty, there's no head element.
+ // If there's a head element, pass it to the predicate to see if caller
+ // considers it ready to pop.
+ // Unless both are satisfied, no point in continuing.
+ if (mStorage.empty() || ! std::forward<PRED>(pred)(mStorage.front()))
return false;
- mStorage.push(element);
- lock1.unlock();
- mEmptyCond.notify_one();
+ // std::queue::front() is the element about to pop()
+ element = mStorage.front();
+ mStorage.pop();
+ lock.unlock();
+ // now that we've popped, if somebody's been waiting to push, signal them
+ mCapacityCond.notify_one();
return true;
}
@@ -285,22 +412,20 @@ ElementT LLThreadSafeQueue<ElementT, QueueT>::pop(void)
lock_t lock1(mLock);
while (true)
{
- if (!mStorage.empty())
- {
- // std::queue::front() is the element about to pop()
- ElementT value = mStorage.front();
- mStorage.pop();
- lock1.unlock();
- mCapacityCond.notify_one();
- return value;
- }
-
+ // On the consumer side, we always try to pop before checking mClosed
+ // so we can finish draining the queue.
+ ElementT value;
+ if (pop_(lock1, value))
+ return std::move(value);
+
+ // Once the queue is empty, mClosed lets us know if there will ever be
+ // any more coming.
if (mClosed)
{
LLTHROW(LLThreadSafeQueueInterrupt());
}
- // Storage empty. Wait for signal.
+ // Storage empty, queue still open. Wait for signal.
mEmptyCond.wait(lock1);
}
}
@@ -309,21 +434,14 @@ ElementT LLThreadSafeQueue<ElementT, QueueT>::pop(void)
template<typename ElementT, typename QueueT>
bool LLThreadSafeQueue<ElementT, QueueT>::tryPop(ElementT & element)
{
- lock_t lock1(mLock, std::defer_lock);
- if (!lock1.try_lock())
- return false;
-
- // no need to check mClosed: tryPop() behavior when the queue is
- // closed is implemented by simple inability to push any new elements
- if (mStorage.empty())
- return false;
-
- // std::queue::front() is the element about to pop()
- element = mStorage.front();
- mStorage.pop();
- lock1.unlock();
- mCapacityCond.notify_one();
- return true;
+ return tryLock(
+ [this, &element](lock_t& lock)
+ {
+ // no need to check mClosed: tryPop() behavior when the queue is
+ // closed is implemented by simple inability to push any new
+ // elements
+ return pop_(lock, element);
+ });
}
@@ -342,39 +460,33 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPopFor(
template <typename ElementT, typename QueueT>
template <typename Clock, typename Duration>
bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil(
- const std::chrono::time_point<Clock, Duration>& endpoint,
+ const std::chrono::time_point<Clock, Duration>& until,
ElementT& element)
{
- lock_t lock1(mLock, std::defer_lock);
- if (!lock1.try_lock_until(endpoint))
- return false;
-
- while (true)
- {
- if (!mStorage.empty())
+ return tryLockUntil(
+ until,
+ [this, until, &element](lock_t& lock)
{
- // std::queue::front() is the element about to pop()
- element = mStorage.front();
- mStorage.pop();
- lock1.unlock();
- mCapacityCond.notify_one();
- return true;
- }
-
- if (mClosed)
- {
- return false;
- }
-
- // Storage empty. Wait for signal.
- if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock1, endpoint))
- {
- // timed out -- formally we might recheck both conditions above
- return false;
- }
- // If we didn't time out, we were notified for some reason. Loop back
- // to check.
- }
+ while (true)
+ {
+ if (pop_(lock, element))
+ return true;
+
+ if (mClosed)
+ {
+ return false;
+ }
+
+ // Storage empty. Wait for signal.
+ if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until))
+ {
+ // timed out -- formally we might recheck both conditions above
+ return false;
+ }
+ // If we didn't time out, we were notified for some reason. Loop back
+ // to check.
+ }
+ });
}