summaryrefslogtreecommitdiff
path: root/indra/llcommon/llthreadsafequeue.h
diff options
context:
space:
mode:
authorNat Goodspeed <nat@lindenlab.com>2019-12-19 10:17:30 -0500
committerNat Goodspeed <nat@lindenlab.com>2020-03-25 19:25:42 -0400
commit5c92047e827a0e997b726aa9f516ace124cc277f (patch)
tree11ef39419d6da25f1394cc12bcb61ad9e4d01aff /indra/llcommon/llthreadsafequeue.h
parent0756885eadafbda9763769186f14d728f0887ead (diff)
DRTVWR-476: Introduce LLThreadSafeQueue::close().
Also isClosed() and explicit operator bool() to detect closed state. close() causes every subsequent pushFront() to throw LLThreadSafeQueueInterrupt. Once the queue is drained, it causes popBack() to throw likewise.
Diffstat (limited to 'indra/llcommon/llthreadsafequeue.h')
-rw-r--r--indra/llcommon/llthreadsafequeue.h78
1 files changed, 70 insertions, 8 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h
index 3f49dbccc2..bac536f7ee 100644
--- a/indra/llcommon/llthreadsafequeue.h
+++ b/indra/llcommon/llthreadsafequeue.h
@@ -78,7 +78,7 @@ public:
// Add an element to the front of queue (will block if the queue has
// reached capacity).
//
- // This call will raise an interrupt error if the queue is deleted while
+ // This call will raise an interrupt error if the queue is closed while
// the caller is blocked.
void pushFront(ElementT const & element);
@@ -89,7 +89,7 @@ public:
// Pop the element at the end of the queue (will block if the queue is
// empty).
//
- // This call will raise an interrupt error if the queue is deleted while
+ // This call will raise an interrupt error if the queue is closed while
// the caller is blocked.
ElementT popBack(void);
@@ -100,11 +100,27 @@ public:
// Returns the size of the queue.
size_t size();
+ // closes the queue:
+ // - every subsequent pushFront() call will throw LLThreadSafeQueueInterrupt
+ // - every subsequent tryPushFront() call will return false
+ // - popBack() calls will return normally until the queue is drained, then
+ // every subsequent popBack() will throw LLThreadSafeQueueInterrupt
+ // - tryPopBack() calls will return normally until the queue is drained,
+ // then every subsequent tryPopBack() call will return false
+ void close();
+
+ // detect closed state
+ bool isClosed();
+ // inverse of isClosed()
+ explicit operator bool();
+
private:
std::deque< ElementT > mStorage;
U32 mCapacity;
+ bool mClosed;
boost::fibers::mutex mLock;
+ typedef std::unique_lock<decltype(mLock)> lock_t;
boost::fibers::condition_variable mCapacityCond;
boost::fibers::condition_variable mEmptyCond;
};
@@ -114,7 +130,8 @@ private:
template<typename ElementT>
LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(U32 capacity) :
-mCapacity(capacity)
+ mCapacity(capacity),
+ mClosed(false)
{
}
@@ -122,12 +139,18 @@ mCapacity(capacity)
template<typename ElementT>
void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)
{
- std::unique_lock<decltype(mLock)> lock1(mLock);
+ lock_t lock1(mLock);
while (true)
{
+ if (mClosed)
+ {
+ LLTHROW(LLThreadSafeQueueInterrupt());
+ }
+
if (mStorage.size() < mCapacity)
{
mStorage.push_front(element);
+ lock1.unlock();
mEmptyCond.notify_one();
return;
}
@@ -141,14 +164,18 @@ void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)
template<typename ElementT>
bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)
{
- std::unique_lock<decltype(mLock)> lock1(mLock, std::defer_lock);
+ lock_t lock1(mLock, std::defer_lock);
if (!lock1.try_lock())
return false;
+ if (mClosed)
+ return false;
+
if (mStorage.size() >= mCapacity)
return false;
mStorage.push_front(element);
+ lock1.unlock();
mEmptyCond.notify_one();
return true;
}
@@ -157,17 +184,23 @@ bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)
template<typename ElementT>
ElementT LLThreadSafeQueue<ElementT>::popBack(void)
{
- std::unique_lock<decltype(mLock)> lock1(mLock);
+ lock_t lock1(mLock);
while (true)
{
if (!mStorage.empty())
{
ElementT value = mStorage.back();
mStorage.pop_back();
+ lock1.unlock();
mCapacityCond.notify_one();
return value;
}
+ if (mClosed)
+ {
+ LLTHROW(LLThreadSafeQueueInterrupt());
+ }
+
// Storage empty. Wait for signal.
mEmptyCond.wait(lock1);
}
@@ -177,15 +210,18 @@ ElementT LLThreadSafeQueue<ElementT>::popBack(void)
template<typename ElementT>
bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element)
{
- std::unique_lock<decltype(mLock)> lock1(mLock, std::defer_lock);
+ lock_t lock1(mLock, std::defer_lock);
if (!lock1.try_lock())
return false;
+ // no need to check mClosed: tryPopBack() behavior when the queue is
+ // closed is implemented by simple inability to push any new elements
if (mStorage.empty())
return false;
element = mStorage.back();
mStorage.pop_back();
+ lock1.unlock();
mCapacityCond.notify_one();
return true;
}
@@ -194,8 +230,34 @@ bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element)
template<typename ElementT>
size_t LLThreadSafeQueue<ElementT>::size(void)
{
- std::lock_guard<decltype(mLock)> lock(mLock);
+ lock_t lock(mLock);
return mStorage.size();
}
+template<typename ElementT>
+void LLThreadSafeQueue<ElementT>::close()
+{
+ lock_t lock(mLock);
+ mClosed = true;
+ lock.unlock();
+ // wake up any blocked popBack() calls
+ mEmptyCond.notify_all();
+ // wake up any blocked pushFront() calls
+ mCapacityCond.notify_all();
+}
+
+template<typename ElementT>
+bool LLThreadSafeQueue<ElementT>::isClosed()
+{
+ lock_t lock(mLock);
+ return mClosed;
+}
+
+template<typename ElementT>
+LLThreadSafeQueue<ElementT>::operator bool()
+{
+ lock_t lock(mLock);
+ return ! mClosed;
+}
+
#endif