From 5c92047e827a0e997b726aa9f516ace124cc277f Mon Sep 17 00:00:00 2001
From: Nat Goodspeed <nat@lindenlab.com>
Date: Thu, 19 Dec 2019 10:17:30 -0500
Subject: DRTVWR-476: Introduce LLThreadSafeQueue::close().

Also isClosed() and explicit operator bool() to detect closed state.

close() causes every subsequent pushFront() to throw
LLThreadSafeQueueInterrupt. Once the queue is drained, it causes popBack() to
throw likewise.
---
 indra/llcommon/llthreadsafequeue.h | 78 ++++++++++++++++++++++++++++++++++----
 1 file changed, 70 insertions(+), 8 deletions(-)

diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h
index 3f49dbccc2..bac536f7ee 100644
--- a/indra/llcommon/llthreadsafequeue.h
+++ b/indra/llcommon/llthreadsafequeue.h
@@ -78,7 +78,7 @@ public:
 	// Add an element to the front of queue (will block if the queue has
 	// reached capacity).
 	//
-	// This call will raise an interrupt error if the queue is deleted while
+	// This call will raise an interrupt error if the queue is closed while
 	// the caller is blocked.
 	void pushFront(ElementT const & element);
 	
@@ -89,7 +89,7 @@ public:
 	// Pop the element at the end of the queue (will block if the queue is
 	// empty).
 	//
-	// This call will raise an interrupt error if the queue is deleted while
+	// This call will raise an interrupt error if the queue is closed while
 	// the caller is blocked.
 	ElementT popBack(void);
 	
@@ -100,11 +100,27 @@ public:
 	// Returns the size of the queue.
 	size_t size();
 
+	// closes the queue:
+	// - every subsequent pushFront() call will throw LLThreadSafeQueueInterrupt
+	// - every subsequent tryPushFront() call will return false
+	// - popBack() calls will return normally until the queue is drained, then
+	//   every subsequent popBack() will throw LLThreadSafeQueueInterrupt
+	// - tryPopBack() calls will return normally until the queue is drained,
+	//   then every subsequent tryPopBack() call will return false
+	void close();
+
+	// detect closed state
+	bool isClosed();
+	// inverse of isClosed()
+	explicit operator bool();
+
 private:
 	std::deque< ElementT > mStorage;
 	U32 mCapacity;
+	bool mClosed;
 
 	boost::fibers::mutex mLock;
+	typedef std::unique_lock<decltype(mLock)> lock_t;
 	boost::fibers::condition_variable mCapacityCond;
 	boost::fibers::condition_variable mEmptyCond;
 };
@@ -114,7 +130,8 @@ private:
 
 template<typename ElementT>
 LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(U32 capacity) :
-mCapacity(capacity)
+    mCapacity(capacity),
+    mClosed(false)
 {
 }
 
@@ -122,12 +139,18 @@ mCapacity(capacity)
 template<typename ElementT>
 void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)
 {
-    std::unique_lock<decltype(mLock)> lock1(mLock);
+    lock_t lock1(mLock);
     while (true)
     {
+        if (mClosed)
+        {
+            LLTHROW(LLThreadSafeQueueInterrupt());
+        }
+
         if (mStorage.size() < mCapacity)
         {
             mStorage.push_front(element);
+            lock1.unlock();
             mEmptyCond.notify_one();
             return;
         }
@@ -141,14 +164,18 @@ void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)
 template<typename ElementT>
 bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)
 {
-    std::unique_lock<decltype(mLock)> lock1(mLock, std::defer_lock);
+    lock_t lock1(mLock, std::defer_lock);
     if (!lock1.try_lock())
         return false;
 
+    if (mClosed)
+        return false;
+
     if (mStorage.size() >= mCapacity)
         return false;
 
     mStorage.push_front(element);
+    lock1.unlock();
     mEmptyCond.notify_one();
     return true;
 }
@@ -157,17 +184,23 @@ bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)
 template<typename ElementT>
 ElementT LLThreadSafeQueue<ElementT>::popBack(void)
 {
-    std::unique_lock<decltype(mLock)> lock1(mLock);
+    lock_t lock1(mLock);
     while (true)
     {
         if (!mStorage.empty())
         {
             ElementT value = mStorage.back();
             mStorage.pop_back();
+            lock1.unlock();
             mCapacityCond.notify_one();
             return value;
         }
 
+        if (mClosed)
+        {
+            LLTHROW(LLThreadSafeQueueInterrupt());
+        }
+
         // Storage empty. Wait for signal.
         mEmptyCond.wait(lock1);
     }
@@ -177,15 +210,18 @@ ElementT LLThreadSafeQueue<ElementT>::popBack(void)
 template<typename ElementT>
 bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element)
 {
-    std::unique_lock<decltype(mLock)> lock1(mLock, std::defer_lock);
+    lock_t lock1(mLock, std::defer_lock);
     if (!lock1.try_lock())
         return false;
 
+    // no need to check mClosed: tryPopBack() behavior when the queue is
+    // closed is implemented by simple inability to push any new elements
     if (mStorage.empty())
         return false;
 
     element = mStorage.back();
     mStorage.pop_back();
+    lock1.unlock();
     mCapacityCond.notify_one();
     return true;
 }
@@ -194,8 +230,34 @@ bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element)
 template<typename ElementT>
 size_t LLThreadSafeQueue<ElementT>::size(void)
 {
-    std::lock_guard<decltype(mLock)> lock(mLock);
+    lock_t lock(mLock);
     return mStorage.size();
 }
 
+template<typename ElementT>
+void LLThreadSafeQueue<ElementT>::close()
+{
+    lock_t lock(mLock);
+    mClosed = true;
+    lock.unlock();
+    // wake up any blocked popBack() calls
+    mEmptyCond.notify_all();
+    // wake up any blocked pushFront() calls
+    mCapacityCond.notify_all();
+}
+
+template<typename ElementT>
+bool LLThreadSafeQueue<ElementT>::isClosed()
+{
+    lock_t lock(mLock);
+    return mClosed;
+}
+
+template<typename ElementT>
+LLThreadSafeQueue<ElementT>::operator bool()
+{
+    lock_t lock(mLock);
+    return ! mClosed;
+}
+
 #endif
-- 
cgit v1.2.3