summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--indra/llcommon/llthreadsafequeue.h229
1 files changed, 163 insertions, 66 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h
index 26e0d71d31..04f51816d7 100644
--- a/indra/llcommon/llthreadsafequeue.h
+++ b/indra/llcommon/llthreadsafequeue.h
@@ -1,6 +1,6 @@
/**
* @file llthreadsafequeue.h
- * @brief Base classes for thread, mutex and condition handling.
+ * @brief Queue protected with mutexes for cross-thread use
*
* $LicenseInfo:firstyear=2004&license=viewerlgpl$
* Second Life Viewer Source Code
@@ -27,15 +27,15 @@
#ifndef LL_LLTHREADSAFEQUEUE_H
#define LL_LLTHREADSAFEQUEUE_H
-#include "llexception.h"
-#include <deque>
-#include <string>
-#include <chrono>
-#include "mutex.h"
#include "llcoros.h"
#include LLCOROS_MUTEX_HEADER
#include <boost/fiber/timed_mutex.hpp>
#include LLCOROS_CONDVAR_HEADER
+#include "llexception.h"
+#include "mutex.h"
+#include <chrono>
+#include <queue>
+#include <string>
//
// A general queue exception.
@@ -66,61 +66,95 @@ public:
}
};
-//
-// Implements a thread safe FIFO.
-//
-template<typename ElementT>
+/**
+ * Implements a thread safe FIFO.
+ */
+// Let the default std::queue default to underlying std::deque. Override if
+// desired.
+template<typename ElementT, typename QueueT=std::queue<ElementT>>
class LLThreadSafeQueue
{
public:
typedef ElementT value_type;
-
+
// If the pool is set to NULL one will be allocated and managed by this
// queue.
LLThreadSafeQueue(U32 capacity = 1024);
-
- // Add an element to the front of queue (will block if the queue has
+
+ // Add an element to the queue (will block if the queue has
// reached capacity).
//
// This call will raise an interrupt error if the queue is closed while
// the caller is blocked.
- void pushFront(ElementT const & element);
-
- // Try to add an element to the front of queue without blocking. Returns
+ void push(ElementT const& element);
+ // legacy name
+ void pushFront(ElementT const & element) { return push(element); }
+
+ // Try to add an element to the queue without blocking. Returns
// true only if the element was actually added.
- bool tryPushFront(ElementT const & element);
+ bool tryPush(ElementT const& element);
+ // legacy name
+ bool tryPushFront(ElementT const & element) { return tryPush(element); }
- // Try to add an element to the front of queue, blocking if full but with
- // timeout. Returns true if the element was added.
+ // Try to add an element to the queue, blocking if full but with timeout
+ // after specified duration. Returns true if the element was added.
// There are potentially two different timeouts involved: how long to try
// to lock the mutex, versus how long to wait for the queue to stop being
// full. Careful settings for each timeout might be orders of magnitude
// apart. However, this method conflates them.
template <typename Rep, typename Period>
+ bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout,
+ ElementT const & element);
+ // legacy name
+ template <typename Rep, typename Period>
bool tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout,
- ElementT const & element);
+ ElementT const & element) { return tryPushFor(timeout, element); }
+
+ // Try to add an element to the queue, blocking if full but with
+ // timeout at specified time_point. Returns true if the element was added.
+ template <typename Clock, typename Duration>
+ bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& timeout,
+ ElementT const& element);
+ // no legacy name because this is a newer method
- // Pop the element at the end of the queue (will block if the queue is
+ // Pop the element at the head of the queue (will block if the queue is
// empty).
//
// This call will raise an interrupt error if the queue is closed while
// the caller is blocked.
- ElementT popBack(void);
-
- // Pop an element from the end of the queue if there is one available.
+ ElementT pop(void);
+ // legacy name
+ ElementT popBack(void) { return pop(); }
+
+ // Pop an element from the head of the queue if there is one available.
// Returns true only if an element was popped.
- bool tryPopBack(ElementT & element);
-
+ bool tryPop(ElementT & element);
+ // legacy name
+ bool tryPopBack(ElementT & element) { return tryPop(element); }
+
+ // Pop the element at the head of the queue, blocking if empty, with
+ // timeout after specified duration. Returns true if an element was popped.
+ template <typename Rep, typename Period>
+ bool tryPopFor(const std::chrono::duration<Rep, Period>& timeout, ElementT& element);
+ // no legacy name because this is a newer method
+
+ // Pop the element at the head of the queue, blocking if empty, with
+ // timeout at specified time_point. Returns true if an element was popped.
+ template <typename Clock, typename Duration>
+ bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& timeout,
+ ElementT& element);
+ // no legacy name because this is a newer method
+
// Returns the size of the queue.
size_t size();
// closes the queue:
- // - every subsequent pushFront() call will throw LLThreadSafeQueueInterrupt
- // - every subsequent tryPushFront() call will return false
- // - popBack() calls will return normally until the queue is drained, then
- // every subsequent popBack() will throw LLThreadSafeQueueInterrupt
- // - tryPopBack() calls will return normally until the queue is drained,
- // then every subsequent tryPopBack() call will return false
+ // - every subsequent push() call will throw LLThreadSafeQueueInterrupt
+ // - every subsequent tryPush() call will return false
+ // - pop() calls will return normally until the queue is drained, then
+ // every subsequent pop() will throw LLThreadSafeQueueInterrupt
+ // - tryPop() calls will return normally until the queue is drained,
+ // then every subsequent tryPop() call will return false
void close();
// detect closed state
@@ -128,8 +162,9 @@ public:
// inverse of isClosed()
explicit operator bool();
-private:
- std::deque< ElementT > mStorage;
+protected:
+ typedef QueueT queue_type;
+ QueueT mStorage;
U32 mCapacity;
bool mClosed;
@@ -142,16 +177,16 @@ private:
// LLThreadSafeQueue
//-----------------------------------------------------------------------------
-template<typename ElementT>
-LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(U32 capacity) :
+template<typename ElementT, typename QueueT>
+LLThreadSafeQueue<ElementT, QueueT>::LLThreadSafeQueue(U32 capacity) :
mCapacity(capacity),
mClosed(false)
{
}
-template<typename ElementT>
-void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)
+template<typename ElementT, typename QueueT>
+void LLThreadSafeQueue<ElementT, QueueT>::push(ElementT const & element)
{
lock_t lock1(mLock);
while (true)
@@ -163,7 +198,7 @@ void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)
if (mStorage.size() < mCapacity)
{
- mStorage.push_front(element);
+ mStorage.push(element);
lock1.unlock();
mEmptyCond.notify_one();
return;
@@ -175,15 +210,24 @@ void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)
}
-template <typename ElementT>
+template <typename ElementT, typename QueueT>
template <typename Rep, typename Period>
-bool LLThreadSafeQueue<ElementT>::tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout,
- ElementT const & element)
+bool LLThreadSafeQueue<ElementT, QueueT>::tryPushFor(
+ const std::chrono::duration<Rep, Period>& timeout,
+ ElementT const & element)
{
// Convert duration to time_point: passing the same timeout duration to
// each of multiple calls is wrong.
- auto endpoint = std::chrono::steady_clock::now() + timeout;
+ return tryPushUntil(std::chrono::steady_clock::now() + timeout, element);
+}
+
+template <typename ElementT, typename QueueT>
+template <typename Clock, typename Duration>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryPushUntil(
+ const std::chrono::time_point<Clock, Duration>& endpoint,
+ ElementT const& element)
+{
lock_t lock1(mLock, std::defer_lock);
if (!lock1.try_lock_until(endpoint))
return false;
@@ -197,7 +241,7 @@ bool LLThreadSafeQueue<ElementT>::tryPushFrontFor(const std::chrono::duration<Re
if (mStorage.size() < mCapacity)
{
- mStorage.push_front(element);
+ mStorage.push(element);
lock1.unlock();
mEmptyCond.notify_one();
return true;
@@ -215,8 +259,8 @@ bool LLThreadSafeQueue<ElementT>::tryPushFrontFor(const std::chrono::duration<Re
}
-template<typename ElementT>
-bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)
+template<typename ElementT, typename QueueT>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryPush(ElementT const & element)
{
lock_t lock1(mLock, std::defer_lock);
if (!lock1.try_lock())
@@ -228,23 +272,24 @@ bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)
if (mStorage.size() >= mCapacity)
return false;
- mStorage.push_front(element);
+ mStorage.push(element);
lock1.unlock();
mEmptyCond.notify_one();
return true;
}
-template<typename ElementT>
-ElementT LLThreadSafeQueue<ElementT>::popBack(void)
+template<typename ElementT, typename QueueT>
+ElementT LLThreadSafeQueue<ElementT, QueueT>::pop(void)
{
lock_t lock1(mLock);
while (true)
{
if (!mStorage.empty())
{
- ElementT value = mStorage.back();
- mStorage.pop_back();
+ // std::queue::front() is the element about to pop()
+ ElementT value = mStorage.front();
+ mStorage.pop();
lock1.unlock();
mCapacityCond.notify_one();
return value;
@@ -261,54 +306,106 @@ ElementT LLThreadSafeQueue<ElementT>::popBack(void)
}
-template<typename ElementT>
-bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element)
+template<typename ElementT, typename QueueT>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryPop(ElementT & element)
{
lock_t lock1(mLock, std::defer_lock);
if (!lock1.try_lock())
return false;
- // no need to check mClosed: tryPopBack() behavior when the queue is
+ // no need to check mClosed: tryPop() behavior when the queue is
// closed is implemented by simple inability to push any new elements
if (mStorage.empty())
return false;
- element = mStorage.back();
- mStorage.pop_back();
+ // std::queue::front() is the element about to pop()
+ element = mStorage.front();
+ mStorage.pop();
lock1.unlock();
mCapacityCond.notify_one();
return true;
}
-template<typename ElementT>
-size_t LLThreadSafeQueue<ElementT>::size(void)
+template <typename ElementT, typename QueueT>
+template <typename Rep, typename Period>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryPopFor(
+ const std::chrono::duration<Rep, Period>& timeout,
+ ElementT& element)
+{
+ // Convert duration to time_point: passing the same timeout duration to
+ // each of multiple calls is wrong.
+ return tryPopUntil(std::chrono::steady_clock::now() + timeout, element);
+}
+
+
+template <typename ElementT, typename QueueT>
+template <typename Clock, typename Duration>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil(
+ const std::chrono::time_point<Clock, Duration>& endpoint,
+ ElementT& element)
+{
+ lock_t lock1(mLock, std::defer_lock);
+ if (!lock1.try_lock_until(endpoint))
+ return false;
+
+ while (true)
+ {
+ if (!mStorage.empty())
+ {
+ // std::queue::front() is the element about to pop()
+ element = mStorage.front();
+ mStorage.pop();
+ lock1.unlock();
+ mCapacityCond.notify_one();
+ return true;
+ }
+
+ if (mClosed)
+ {
+ return false;
+ }
+
+ // Storage empty. Wait for signal.
+ if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock1, endpoint))
+ {
+ // timed out -- formally we might recheck both conditions above
+ return false;
+ }
+ // If we didn't time out, we were notified for some reason. Loop back
+ // to check.
+ }
+}
+
+
+template<typename ElementT, typename QueueT>
+size_t LLThreadSafeQueue<ElementT, QueueT>::size(void)
{
lock_t lock(mLock);
return mStorage.size();
}
-template<typename ElementT>
-void LLThreadSafeQueue<ElementT>::close()
+template<typename ElementT, typename QueueT>
+void LLThreadSafeQueue<ElementT, QueueT>::close()
{
lock_t lock(mLock);
mClosed = true;
lock.unlock();
- // wake up any blocked popBack() calls
+ // wake up any blocked pop() calls
mEmptyCond.notify_all();
- // wake up any blocked pushFront() calls
+ // wake up any blocked push() calls
mCapacityCond.notify_all();
}
-template<typename ElementT>
-bool LLThreadSafeQueue<ElementT>::isClosed()
+template<typename ElementT, typename QueueT>
+bool LLThreadSafeQueue<ElementT, QueueT>::isClosed()
{
lock_t lock(mLock);
return mClosed && mStorage.size() == 0;
}
-template<typename ElementT>
-LLThreadSafeQueue<ElementT>::operator bool()
+template<typename ElementT, typename QueueT>
+LLThreadSafeQueue<ElementT, QueueT>::operator bool()
{
return ! isClosed();
}