summaryrefslogtreecommitdiff
path: root/indra/llcommon/llthreadsafequeue.h
diff options
context:
space:
mode:
authorAndrey Kleshchev <andreykproductengine@lindenlab.com>2020-08-11 00:37:42 +0300
committerAndrey Kleshchev <andreykproductengine@lindenlab.com>2020-08-11 00:37:42 +0300
commit1a390cb0dfeed66b3fcf8edd89baf7577e6ee1c5 (patch)
tree4ed2107c5e2649c26d1c85ffe30c0a56f7a21164 /indra/llcommon/llthreadsafequeue.h
parent7a9a114e168ea68345b54e80b3cfbac7deb4764a (diff)
parent89cde15fb8c52071805af78e61848e743f2ab2f1 (diff)
Merged master into DRTVWR-482
Diffstat (limited to 'indra/llcommon/llthreadsafequeue.h')
-rw-r--r--indra/llcommon/llthreadsafequeue.h156
1 files changed, 130 insertions, 26 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h
index b0bddac8e5..30dd507f73 100644
--- a/indra/llcommon/llthreadsafequeue.h
+++ b/indra/llcommon/llthreadsafequeue.h
@@ -30,18 +30,12 @@
#include "llexception.h"
#include <deque>
#include <string>
-
-#if LL_WINDOWS
-#pragma warning (push)
-#pragma warning (disable:4265)
-#endif
-// 'std::_Pad' : class has virtual functions, but destructor is not virtual
-#include <mutex>
-#include <condition_variable>
-
-#if LL_WINDOWS
-#pragma warning (pop)
-#endif
+#include <chrono>
+#include "mutex.h"
+#include "llcoros.h"
+#include LLCOROS_MUTEX_HEADER
+#include <boost/fiber/timed_mutex.hpp>
+#include LLCOROS_CONDVAR_HEADER
//
// A general queue exception.
@@ -88,18 +82,28 @@ public:
// Add an element to the front of queue (will block if the queue has
// reached capacity).
//
- // This call will raise an interrupt error if the queue is deleted while
+ // This call will raise an interrupt error if the queue is closed while
// the caller is blocked.
void pushFront(ElementT const & element);
- // Try to add an element to the front ofqueue without blocking. Returns
+ // Try to add an element to the front of queue without blocking. Returns
// true only if the element was actually added.
bool tryPushFront(ElementT const & element);
-
+
+ // Try to add an element to the front of queue, blocking if full but with
+ // timeout. Returns true if the element was added.
+ // There are potentially two different timeouts involved: how long to try
+ // to lock the mutex, versus how long to wait for the queue to stop being
+ // full. Careful settings for each timeout might be orders of magnitude
+ // apart. However, this method conflates them.
+ template <typename Rep, typename Period>
+ bool tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout,
+ ElementT const & element);
+
// Pop the element at the end of the queue (will block if the queue is
// empty).
//
- // This call will raise an interrupt error if the queue is deleted while
+ // This call will raise an interrupt error if the queue is closed while
// the caller is blocked.
ElementT popBack(void);
@@ -110,13 +114,29 @@ public:
// Returns the size of the queue.
size_t size();
+ // closes the queue:
+ // - every subsequent pushFront() call will throw LLThreadSafeQueueInterrupt
+ // - every subsequent tryPushFront() call will return false
+ // - popBack() calls will return normally until the queue is drained, then
+ // every subsequent popBack() will throw LLThreadSafeQueueInterrupt
+ // - tryPopBack() calls will return normally until the queue is drained,
+ // then every subsequent tryPopBack() call will return false
+ void close();
+
+ // detect closed state
+ bool isClosed();
+ // inverse of isClosed()
+ explicit operator bool();
+
private:
std::deque< ElementT > mStorage;
U32 mCapacity;
+ bool mClosed;
- std::mutex mLock;
- std::condition_variable mCapacityCond;
- std::condition_variable mEmptyCond;
+ boost::fibers::timed_mutex mLock;
+ typedef std::unique_lock<decltype(mLock)> lock_t;
+ boost::fibers::condition_variable_any mCapacityCond;
+ boost::fibers::condition_variable_any mEmptyCond;
};
// LLThreadSafeQueue
@@ -124,7 +144,8 @@ private:
template<typename ElementT>
LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(U32 capacity) :
-mCapacity(capacity)
+ mCapacity(capacity),
+ mClosed(false)
{
}
@@ -132,13 +153,18 @@ mCapacity(capacity)
template<typename ElementT>
void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)
{
+ lock_t lock1(mLock);
while (true)
{
- std::unique_lock<std::mutex> lock1(mLock);
+ if (mClosed)
+ {
+ LLTHROW(LLThreadSafeQueueInterrupt());
+ }
if (mStorage.size() < mCapacity)
{
mStorage.push_front(element);
+ lock1.unlock();
mEmptyCond.notify_one();
return;
}
@@ -149,17 +175,61 @@ void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)
}
+template <typename ElementT>
+template <typename Rep, typename Period>
+bool LLThreadSafeQueue<ElementT>::tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout,
+ ElementT const & element)
+{
+ // Convert duration to time_point: passing the same timeout duration to
+ // each of multiple calls is wrong.
+ auto endpoint = std::chrono::steady_clock::now() + timeout;
+
+ lock_t lock1(mLock, std::defer_lock);
+ if (!lock1.try_lock_until(endpoint))
+ return false;
+
+ while (true)
+ {
+ if (mClosed)
+ {
+ return false;
+ }
+
+ if (mStorage.size() < mCapacity)
+ {
+ mStorage.push_front(element);
+ lock1.unlock();
+ mEmptyCond.notify_one();
+ return true;
+ }
+
+ // Storage Full. Wait for signal.
+ if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock1, endpoint))
+ {
+ // timed out -- formally we might recheck both conditions above
+ return false;
+ }
+ // If we didn't time out, we were notified for some reason. Loop back
+ // to check.
+ }
+}
+
+
template<typename ElementT>
bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)
{
- std::unique_lock<std::mutex> lock1(mLock, std::defer_lock);
+ lock_t lock1(mLock, std::defer_lock);
if (!lock1.try_lock())
return false;
+ if (mClosed)
+ return false;
+
if (mStorage.size() >= mCapacity)
return false;
mStorage.push_front(element);
+ lock1.unlock();
mEmptyCond.notify_one();
return true;
}
@@ -168,18 +238,23 @@ bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)
template<typename ElementT>
ElementT LLThreadSafeQueue<ElementT>::popBack(void)
{
+ lock_t lock1(mLock);
while (true)
{
- std::unique_lock<std::mutex> lock1(mLock);
-
if (!mStorage.empty())
{
ElementT value = mStorage.back();
mStorage.pop_back();
+ lock1.unlock();
mCapacityCond.notify_one();
return value;
}
+ if (mClosed)
+ {
+ LLTHROW(LLThreadSafeQueueInterrupt());
+ }
+
// Storage empty. Wait for signal.
mEmptyCond.wait(lock1);
}
@@ -189,15 +264,18 @@ ElementT LLThreadSafeQueue<ElementT>::popBack(void)
template<typename ElementT>
bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element)
{
- std::unique_lock<std::mutex> lock1(mLock, std::defer_lock);
+ lock_t lock1(mLock, std::defer_lock);
if (!lock1.try_lock())
return false;
+ // no need to check mClosed: tryPopBack() behavior when the queue is
+ // closed is implemented by simple inability to push any new elements
if (mStorage.empty())
return false;
element = mStorage.back();
mStorage.pop_back();
+ lock1.unlock();
mCapacityCond.notify_one();
return true;
}
@@ -206,8 +284,34 @@ bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element)
template<typename ElementT>
size_t LLThreadSafeQueue<ElementT>::size(void)
{
- std::lock_guard<std::mutex> lock(mLock);
+ lock_t lock(mLock);
return mStorage.size();
}
+template<typename ElementT>
+void LLThreadSafeQueue<ElementT>::close()
+{
+ lock_t lock(mLock);
+ mClosed = true;
+ lock.unlock();
+ // wake up any blocked popBack() calls
+ mEmptyCond.notify_all();
+ // wake up any blocked pushFront() calls
+ mCapacityCond.notify_all();
+}
+
+template<typename ElementT>
+bool LLThreadSafeQueue<ElementT>::isClosed()
+{
+ lock_t lock(mLock);
+ return mClosed;
+}
+
+template<typename ElementT>
+LLThreadSafeQueue<ElementT>::operator bool()
+{
+ lock_t lock(mLock);
+ return ! mClosed;
+}
+
#endif