From 5c92047e827a0e997b726aa9f516ace124cc277f Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 19 Dec 2019 10:17:30 -0500 Subject: DRTVWR-476: Introduce LLThreadSafeQueue::close(). Also isClosed() and explicit operator bool() to detect closed state. close() causes every subsequent pushFront() to throw LLThreadSafeQueueInterrupt. Once the queue is drained, it causes popBack() to throw likewise. --- indra/llcommon/llthreadsafequeue.h | 78 ++++++++++++++++++++++++++++++++++---- 1 file changed, 70 insertions(+), 8 deletions(-) (limited to 'indra/llcommon/llthreadsafequeue.h') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 3f49dbccc2..bac536f7ee 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -78,7 +78,7 @@ public: // Add an element to the front of queue (will block if the queue has // reached capacity). // - // This call will raise an interrupt error if the queue is deleted while + // This call will raise an interrupt error if the queue is closed while // the caller is blocked. void pushFront(ElementT const & element); @@ -89,7 +89,7 @@ public: // Pop the element at the end of the queue (will block if the queue is // empty). // - // This call will raise an interrupt error if the queue is deleted while + // This call will raise an interrupt error if the queue is closed while // the caller is blocked. ElementT popBack(void); @@ -100,11 +100,27 @@ public: // Returns the size of the queue. size_t size(); + // closes the queue: + // - every subsequent pushFront() call will throw LLThreadSafeQueueInterrupt + // - every subsequent tryPushFront() call will return false + // - popBack() calls will return normally until the queue is drained, then + // every subsequent popBack() will throw LLThreadSafeQueueInterrupt + // - tryPopBack() calls will return normally until the queue is drained, + // then every subsequent tryPopBack() call will return false + void close(); + + // detect closed state + bool isClosed(); + // inverse of isClosed() + explicit operator bool(); + private: std::deque< ElementT > mStorage; U32 mCapacity; + bool mClosed; boost::fibers::mutex mLock; + typedef std::unique_lock lock_t; boost::fibers::condition_variable mCapacityCond; boost::fibers::condition_variable mEmptyCond; }; @@ -114,7 +130,8 @@ private: template LLThreadSafeQueue::LLThreadSafeQueue(U32 capacity) : -mCapacity(capacity) + mCapacity(capacity), + mClosed(false) { } @@ -122,12 +139,18 @@ mCapacity(capacity) template void LLThreadSafeQueue::pushFront(ElementT const & element) { - std::unique_lock lock1(mLock); + lock_t lock1(mLock); while (true) { + if (mClosed) + { + LLTHROW(LLThreadSafeQueueInterrupt()); + } + if (mStorage.size() < mCapacity) { mStorage.push_front(element); + lock1.unlock(); mEmptyCond.notify_one(); return; } @@ -141,14 +164,18 @@ void LLThreadSafeQueue::pushFront(ElementT const & element) template bool LLThreadSafeQueue::tryPushFront(ElementT const & element) { - std::unique_lock lock1(mLock, std::defer_lock); + lock_t lock1(mLock, std::defer_lock); if (!lock1.try_lock()) return false; + if (mClosed) + return false; + if (mStorage.size() >= mCapacity) return false; mStorage.push_front(element); + lock1.unlock(); mEmptyCond.notify_one(); return true; } @@ -157,17 +184,23 @@ bool LLThreadSafeQueue::tryPushFront(ElementT const & element) template ElementT LLThreadSafeQueue::popBack(void) { - std::unique_lock lock1(mLock); + lock_t lock1(mLock); while (true) { if (!mStorage.empty()) { ElementT value = mStorage.back(); mStorage.pop_back(); + lock1.unlock(); mCapacityCond.notify_one(); return value; } + if (mClosed) + { + LLTHROW(LLThreadSafeQueueInterrupt()); + } + // Storage empty. Wait for signal. mEmptyCond.wait(lock1); } @@ -177,15 +210,18 @@ ElementT LLThreadSafeQueue::popBack(void) template bool LLThreadSafeQueue::tryPopBack(ElementT & element) { - std::unique_lock lock1(mLock, std::defer_lock); + lock_t lock1(mLock, std::defer_lock); if (!lock1.try_lock()) return false; + // no need to check mClosed: tryPopBack() behavior when the queue is + // closed is implemented by simple inability to push any new elements if (mStorage.empty()) return false; element = mStorage.back(); mStorage.pop_back(); + lock1.unlock(); mCapacityCond.notify_one(); return true; } @@ -194,8 +230,34 @@ bool LLThreadSafeQueue::tryPopBack(ElementT & element) template size_t LLThreadSafeQueue::size(void) { - std::lock_guard lock(mLock); + lock_t lock(mLock); return mStorage.size(); } +template +void LLThreadSafeQueue::close() +{ + lock_t lock(mLock); + mClosed = true; + lock.unlock(); + // wake up any blocked popBack() calls + mEmptyCond.notify_all(); + // wake up any blocked pushFront() calls + mCapacityCond.notify_all(); +} + +template +bool LLThreadSafeQueue::isClosed() +{ + lock_t lock(mLock); + return mClosed; +} + +template +LLThreadSafeQueue::operator bool() +{ + lock_t lock(mLock); + return ! mClosed; +} + #endif -- cgit v1.2.3