From b080b06b422db6405982bee603118ee68e6c2500 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Tue, 3 Dec 2019 11:45:14 -0500 Subject: DRTVWR-494: Encapsulate redundant VS boilerplate around . --- indra/llcommon/llthreadsafequeue.h | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) (limited to 'indra/llcommon/llthreadsafequeue.h') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index b0bddac8e5..2cee7a3141 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -30,19 +30,9 @@ #include "llexception.h" #include #include - -#if LL_WINDOWS -#pragma warning (push) -#pragma warning (disable:4265) -#endif -// 'std::_Pad' : class has virtual functions, but destructor is not virtual -#include +#include "mutex.h" #include -#if LL_WINDOWS -#pragma warning (pop) -#endif - // // A general queue exception. // -- cgit v1.2.3 From af353911147b338359b3ab659bfb271e6c9a6383 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 14 Nov 2019 16:40:51 -0500 Subject: DRTVWR-476: Make LLThreadSafeQueue coroutine-safe as well. --- indra/llcommon/llthreadsafequeue.h | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) (limited to 'indra/llcommon/llthreadsafequeue.h') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 2cee7a3141..3f49dbccc2 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -31,7 +31,7 @@ #include #include #include "mutex.h" -#include +#include // // A general queue exception. @@ -104,9 +104,9 @@ private: std::deque< ElementT > mStorage; U32 mCapacity; - std::mutex mLock; - std::condition_variable mCapacityCond; - std::condition_variable mEmptyCond; + boost::fibers::mutex mLock; + boost::fibers::condition_variable mCapacityCond; + boost::fibers::condition_variable mEmptyCond; }; // LLThreadSafeQueue @@ -122,10 +122,9 @@ mCapacity(capacity) template void LLThreadSafeQueue::pushFront(ElementT const & element) { + std::unique_lock lock1(mLock); while (true) { - std::unique_lock lock1(mLock); - if (mStorage.size() < mCapacity) { mStorage.push_front(element); @@ -142,7 +141,7 @@ void LLThreadSafeQueue::pushFront(ElementT const & element) template bool LLThreadSafeQueue::tryPushFront(ElementT const & element) { - std::unique_lock lock1(mLock, std::defer_lock); + std::unique_lock lock1(mLock, std::defer_lock); if (!lock1.try_lock()) return false; @@ -158,10 +157,9 @@ bool LLThreadSafeQueue::tryPushFront(ElementT const & element) template ElementT LLThreadSafeQueue::popBack(void) { + std::unique_lock lock1(mLock); while (true) { - std::unique_lock lock1(mLock); - if (!mStorage.empty()) { ElementT value = mStorage.back(); @@ -179,7 +177,7 @@ ElementT LLThreadSafeQueue::popBack(void) template bool LLThreadSafeQueue::tryPopBack(ElementT & element) { - std::unique_lock lock1(mLock, std::defer_lock); + std::unique_lock lock1(mLock, std::defer_lock); if (!lock1.try_lock()) return false; @@ -196,7 +194,7 @@ bool LLThreadSafeQueue::tryPopBack(ElementT & element) template size_t LLThreadSafeQueue::size(void) { - std::lock_guard lock(mLock); + std::lock_guard lock(mLock); return mStorage.size(); } -- cgit v1.2.3 From 5c92047e827a0e997b726aa9f516ace124cc277f Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 19 Dec 2019 10:17:30 -0500 Subject: DRTVWR-476: Introduce LLThreadSafeQueue::close(). Also isClosed() and explicit operator bool() to detect closed state. close() causes every subsequent pushFront() to throw LLThreadSafeQueueInterrupt. Once the queue is drained, it causes popBack() to throw likewise. --- indra/llcommon/llthreadsafequeue.h | 78 ++++++++++++++++++++++++++++++++++---- 1 file changed, 70 insertions(+), 8 deletions(-) (limited to 'indra/llcommon/llthreadsafequeue.h') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 3f49dbccc2..bac536f7ee 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -78,7 +78,7 @@ public: // Add an element to the front of queue (will block if the queue has // reached capacity). // - // This call will raise an interrupt error if the queue is deleted while + // This call will raise an interrupt error if the queue is closed while // the caller is blocked. void pushFront(ElementT const & element); @@ -89,7 +89,7 @@ public: // Pop the element at the end of the queue (will block if the queue is // empty). // - // This call will raise an interrupt error if the queue is deleted while + // This call will raise an interrupt error if the queue is closed while // the caller is blocked. ElementT popBack(void); @@ -100,11 +100,27 @@ public: // Returns the size of the queue. size_t size(); + // closes the queue: + // - every subsequent pushFront() call will throw LLThreadSafeQueueInterrupt + // - every subsequent tryPushFront() call will return false + // - popBack() calls will return normally until the queue is drained, then + // every subsequent popBack() will throw LLThreadSafeQueueInterrupt + // - tryPopBack() calls will return normally until the queue is drained, + // then every subsequent tryPopBack() call will return false + void close(); + + // detect closed state + bool isClosed(); + // inverse of isClosed() + explicit operator bool(); + private: std::deque< ElementT > mStorage; U32 mCapacity; + bool mClosed; boost::fibers::mutex mLock; + typedef std::unique_lock lock_t; boost::fibers::condition_variable mCapacityCond; boost::fibers::condition_variable mEmptyCond; }; @@ -114,7 +130,8 @@ private: template LLThreadSafeQueue::LLThreadSafeQueue(U32 capacity) : -mCapacity(capacity) + mCapacity(capacity), + mClosed(false) { } @@ -122,12 +139,18 @@ mCapacity(capacity) template void LLThreadSafeQueue::pushFront(ElementT const & element) { - std::unique_lock lock1(mLock); + lock_t lock1(mLock); while (true) { + if (mClosed) + { + LLTHROW(LLThreadSafeQueueInterrupt()); + } + if (mStorage.size() < mCapacity) { mStorage.push_front(element); + lock1.unlock(); mEmptyCond.notify_one(); return; } @@ -141,14 +164,18 @@ void LLThreadSafeQueue::pushFront(ElementT const & element) template bool LLThreadSafeQueue::tryPushFront(ElementT const & element) { - std::unique_lock lock1(mLock, std::defer_lock); + lock_t lock1(mLock, std::defer_lock); if (!lock1.try_lock()) return false; + if (mClosed) + return false; + if (mStorage.size() >= mCapacity) return false; mStorage.push_front(element); + lock1.unlock(); mEmptyCond.notify_one(); return true; } @@ -157,17 +184,23 @@ bool LLThreadSafeQueue::tryPushFront(ElementT const & element) template ElementT LLThreadSafeQueue::popBack(void) { - std::unique_lock lock1(mLock); + lock_t lock1(mLock); while (true) { if (!mStorage.empty()) { ElementT value = mStorage.back(); mStorage.pop_back(); + lock1.unlock(); mCapacityCond.notify_one(); return value; } + if (mClosed) + { + LLTHROW(LLThreadSafeQueueInterrupt()); + } + // Storage empty. Wait for signal. mEmptyCond.wait(lock1); } @@ -177,15 +210,18 @@ ElementT LLThreadSafeQueue::popBack(void) template bool LLThreadSafeQueue::tryPopBack(ElementT & element) { - std::unique_lock lock1(mLock, std::defer_lock); + lock_t lock1(mLock, std::defer_lock); if (!lock1.try_lock()) return false; + // no need to check mClosed: tryPopBack() behavior when the queue is + // closed is implemented by simple inability to push any new elements if (mStorage.empty()) return false; element = mStorage.back(); mStorage.pop_back(); + lock1.unlock(); mCapacityCond.notify_one(); return true; } @@ -194,8 +230,34 @@ bool LLThreadSafeQueue::tryPopBack(ElementT & element) template size_t LLThreadSafeQueue::size(void) { - std::lock_guard lock(mLock); + lock_t lock(mLock); return mStorage.size(); } +template +void LLThreadSafeQueue::close() +{ + lock_t lock(mLock); + mClosed = true; + lock.unlock(); + // wake up any blocked popBack() calls + mEmptyCond.notify_all(); + // wake up any blocked pushFront() calls + mCapacityCond.notify_all(); +} + +template +bool LLThreadSafeQueue::isClosed() +{ + lock_t lock(mLock); + return mClosed; +} + +template +LLThreadSafeQueue::operator bool() +{ + lock_t lock(mLock); + return ! mClosed; +} + #endif -- cgit v1.2.3 From 98dfba0d2f24aeb92e023df9d48b23fef8253024 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 14 May 2020 14:51:52 -0400 Subject: DRTVWR-476: Wrap boost::fibers::mutex et al. with LLCoros aliases. Specifically: LLCoros::Mutex means boost::fibers::mutex LLCoros::LockType means std::unique_lock LLCoros::ConditionVariable means boost::fibers::condition_variable LLCoros::cv_status means boost::fibers::cv_status So as not to drag in all of boost::fibers::mutex.hpp or condition_variable.hpp for each consumer of llcoros.h, instead #define LLCOROS_MUTEX_HEADER and LLCOROS_CONDVAR_HEADER. Those who need them can #include the relevant macro. Update llcond.h and llthreadsafequeue.h accordingly. --- indra/llcommon/llthreadsafequeue.h | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'indra/llcommon/llthreadsafequeue.h') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index bac536f7ee..8f5e0f3bf3 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -31,7 +31,8 @@ #include #include #include "mutex.h" -#include +#include "llcoros.h" +#include LLCOROS_CONDVAR_HEADER // // A general queue exception. @@ -119,10 +120,10 @@ private: U32 mCapacity; bool mClosed; - boost::fibers::mutex mLock; - typedef std::unique_lock lock_t; - boost::fibers::condition_variable mCapacityCond; - boost::fibers::condition_variable mEmptyCond; + LLCoros::Mutex mLock; + typedef LLCoros::LockType lock_t; + LLCoros::ConditionVariable mCapacityCond; + LLCoros::ConditionVariable mEmptyCond; }; // LLThreadSafeQueue -- cgit v1.2.3 From a07553c2247c16d69bab40de7e61fc460953e450 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Wed, 20 May 2020 10:32:30 -0400 Subject: DRTVWR-476: Add LLThreadSafeQueue::tryPushFrontFor(). tryPushFrontFor() is pushFront() with a std::chrono::duration timeout. --- indra/llcommon/llthreadsafequeue.h | 65 ++++++++++++++++++++++++++++++++++---- 1 file changed, 59 insertions(+), 6 deletions(-) (limited to 'indra/llcommon/llthreadsafequeue.h') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 8f5e0f3bf3..30dd507f73 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -30,8 +30,11 @@ #include "llexception.h" #include #include +#include #include "mutex.h" #include "llcoros.h" +#include LLCOROS_MUTEX_HEADER +#include #include LLCOROS_CONDVAR_HEADER // @@ -83,10 +86,20 @@ public: // the caller is blocked. void pushFront(ElementT const & element); - // Try to add an element to the front ofqueue without blocking. Returns + // Try to add an element to the front of queue without blocking. Returns // true only if the element was actually added. bool tryPushFront(ElementT const & element); - + + // Try to add an element to the front of queue, blocking if full but with + // timeout. Returns true if the element was added. + // There are potentially two different timeouts involved: how long to try + // to lock the mutex, versus how long to wait for the queue to stop being + // full. Careful settings for each timeout might be orders of magnitude + // apart. However, this method conflates them. + template + bool tryPushFrontFor(const std::chrono::duration& timeout, + ElementT const & element); + // Pop the element at the end of the queue (will block if the queue is // empty). // @@ -120,10 +133,10 @@ private: U32 mCapacity; bool mClosed; - LLCoros::Mutex mLock; - typedef LLCoros::LockType lock_t; - LLCoros::ConditionVariable mCapacityCond; - LLCoros::ConditionVariable mEmptyCond; + boost::fibers::timed_mutex mLock; + typedef std::unique_lock lock_t; + boost::fibers::condition_variable_any mCapacityCond; + boost::fibers::condition_variable_any mEmptyCond; }; // LLThreadSafeQueue @@ -162,6 +175,46 @@ void LLThreadSafeQueue::pushFront(ElementT const & element) } +template +template +bool LLThreadSafeQueue::tryPushFrontFor(const std::chrono::duration& timeout, + ElementT const & element) +{ + // Convert duration to time_point: passing the same timeout duration to + // each of multiple calls is wrong. + auto endpoint = std::chrono::steady_clock::now() + timeout; + + lock_t lock1(mLock, std::defer_lock); + if (!lock1.try_lock_until(endpoint)) + return false; + + while (true) + { + if (mClosed) + { + return false; + } + + if (mStorage.size() < mCapacity) + { + mStorage.push_front(element); + lock1.unlock(); + mEmptyCond.notify_one(); + return true; + } + + // Storage Full. Wait for signal. + if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock1, endpoint)) + { + // timed out -- formally we might recheck both conditions above + return false; + } + // If we didn't time out, we were notified for some reason. Loop back + // to check. + } +} + + template bool LLThreadSafeQueue::tryPushFront(ElementT const & element) { -- cgit v1.2.3