From 7c9aeed97d4ba3641971b9a1a92d334ec0adbb09 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Fri, 1 Oct 2021 16:05:23 -0400 Subject: SL-16024: Enhance LLThreadSafeQueue for use with WorkQueue. First, parameterize LLThreadSafeQueue's queue type. This allows us to substitute (e.g.) a std::priority_queue for a particular instance. Use std::queue for the default queue type, changing the operations invoked on the queue type from std::deque methods to std::queue methods. Rename published methods from (e.g.) pushFront() and popBack() to simple push() and pop(), retaining legacy names as aliases. Not only are the overt Front and Back unnecessary; they're the opposite of how std::queue uses std::deque or std::list, so they only confuse the reader. Break out tryPushUntil() method. We already use that logic internally to tryPushFor(), so it's just as easy to publish it as its own entry point. Add tryPopFor() and tryPopUntil() to allow limiting the time we'll wait for a queue item to become available. --- indra/llcommon/llthreadsafequeue.h | 229 ++++++++++++++++++++++++++----------- 1 file changed, 163 insertions(+), 66 deletions(-) (limited to 'indra/llcommon/llthreadsafequeue.h') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 26e0d71d31..04f51816d7 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -1,6 +1,6 @@ /** * @file llthreadsafequeue.h - * @brief Base classes for thread, mutex and condition handling. + * @brief Queue protected with mutexes for cross-thread use * * $LicenseInfo:firstyear=2004&license=viewerlgpl$ * Second Life Viewer Source Code @@ -27,15 +27,15 @@ #ifndef LL_LLTHREADSAFEQUEUE_H #define LL_LLTHREADSAFEQUEUE_H -#include "llexception.h" -#include -#include -#include -#include "mutex.h" #include "llcoros.h" #include LLCOROS_MUTEX_HEADER #include #include LLCOROS_CONDVAR_HEADER +#include "llexception.h" +#include "mutex.h" +#include +#include +#include // // A general queue exception. @@ -66,61 +66,95 @@ public: } }; -// -// Implements a thread safe FIFO. -// -template +/** + * Implements a thread safe FIFO. + */ +// Let the default std::queue default to underlying std::deque. Override if +// desired. +template> class LLThreadSafeQueue { public: typedef ElementT value_type; - + // If the pool is set to NULL one will be allocated and managed by this // queue. LLThreadSafeQueue(U32 capacity = 1024); - - // Add an element to the front of queue (will block if the queue has + + // Add an element to the queue (will block if the queue has // reached capacity). // // This call will raise an interrupt error if the queue is closed while // the caller is blocked. - void pushFront(ElementT const & element); - - // Try to add an element to the front of queue without blocking. Returns + void push(ElementT const& element); + // legacy name + void pushFront(ElementT const & element) { return push(element); } + + // Try to add an element to the queue without blocking. Returns // true only if the element was actually added. - bool tryPushFront(ElementT const & element); + bool tryPush(ElementT const& element); + // legacy name + bool tryPushFront(ElementT const & element) { return tryPush(element); } - // Try to add an element to the front of queue, blocking if full but with - // timeout. Returns true if the element was added. + // Try to add an element to the queue, blocking if full but with timeout + // after specified duration. Returns true if the element was added. // There are potentially two different timeouts involved: how long to try // to lock the mutex, versus how long to wait for the queue to stop being // full. Careful settings for each timeout might be orders of magnitude // apart. However, this method conflates them. template + bool tryPushFor(const std::chrono::duration& timeout, + ElementT const & element); + // legacy name + template bool tryPushFrontFor(const std::chrono::duration& timeout, - ElementT const & element); + ElementT const & element) { return tryPushFor(timeout, element); } + + // Try to add an element to the queue, blocking if full but with + // timeout at specified time_point. Returns true if the element was added. + template + bool tryPushUntil(const std::chrono::time_point& timeout, + ElementT const& element); + // no legacy name because this is a newer method - // Pop the element at the end of the queue (will block if the queue is + // Pop the element at the head of the queue (will block if the queue is // empty). // // This call will raise an interrupt error if the queue is closed while // the caller is blocked. - ElementT popBack(void); - - // Pop an element from the end of the queue if there is one available. + ElementT pop(void); + // legacy name + ElementT popBack(void) { return pop(); } + + // Pop an element from the head of the queue if there is one available. // Returns true only if an element was popped. - bool tryPopBack(ElementT & element); - + bool tryPop(ElementT & element); + // legacy name + bool tryPopBack(ElementT & element) { return tryPop(element); } + + // Pop the element at the head of the queue, blocking if empty, with + // timeout after specified duration. Returns true if an element was popped. + template + bool tryPopFor(const std::chrono::duration& timeout, ElementT& element); + // no legacy name because this is a newer method + + // Pop the element at the head of the queue, blocking if empty, with + // timeout at specified time_point. Returns true if an element was popped. + template + bool tryPopUntil(const std::chrono::time_point& timeout, + ElementT& element); + // no legacy name because this is a newer method + // Returns the size of the queue. size_t size(); // closes the queue: - // - every subsequent pushFront() call will throw LLThreadSafeQueueInterrupt - // - every subsequent tryPushFront() call will return false - // - popBack() calls will return normally until the queue is drained, then - // every subsequent popBack() will throw LLThreadSafeQueueInterrupt - // - tryPopBack() calls will return normally until the queue is drained, - // then every subsequent tryPopBack() call will return false + // - every subsequent push() call will throw LLThreadSafeQueueInterrupt + // - every subsequent tryPush() call will return false + // - pop() calls will return normally until the queue is drained, then + // every subsequent pop() will throw LLThreadSafeQueueInterrupt + // - tryPop() calls will return normally until the queue is drained, + // then every subsequent tryPop() call will return false void close(); // detect closed state @@ -128,8 +162,9 @@ public: // inverse of isClosed() explicit operator bool(); -private: - std::deque< ElementT > mStorage; +protected: + typedef QueueT queue_type; + QueueT mStorage; U32 mCapacity; bool mClosed; @@ -142,16 +177,16 @@ private: // LLThreadSafeQueue //----------------------------------------------------------------------------- -template -LLThreadSafeQueue::LLThreadSafeQueue(U32 capacity) : +template +LLThreadSafeQueue::LLThreadSafeQueue(U32 capacity) : mCapacity(capacity), mClosed(false) { } -template -void LLThreadSafeQueue::pushFront(ElementT const & element) +template +void LLThreadSafeQueue::push(ElementT const & element) { lock_t lock1(mLock); while (true) @@ -163,7 +198,7 @@ void LLThreadSafeQueue::pushFront(ElementT const & element) if (mStorage.size() < mCapacity) { - mStorage.push_front(element); + mStorage.push(element); lock1.unlock(); mEmptyCond.notify_one(); return; @@ -175,15 +210,24 @@ void LLThreadSafeQueue::pushFront(ElementT const & element) } -template +template template -bool LLThreadSafeQueue::tryPushFrontFor(const std::chrono::duration& timeout, - ElementT const & element) +bool LLThreadSafeQueue::tryPushFor( + const std::chrono::duration& timeout, + ElementT const & element) { // Convert duration to time_point: passing the same timeout duration to // each of multiple calls is wrong. - auto endpoint = std::chrono::steady_clock::now() + timeout; + return tryPushUntil(std::chrono::steady_clock::now() + timeout, element); +} + +template +template +bool LLThreadSafeQueue::tryPushUntil( + const std::chrono::time_point& endpoint, + ElementT const& element) +{ lock_t lock1(mLock, std::defer_lock); if (!lock1.try_lock_until(endpoint)) return false; @@ -197,7 +241,7 @@ bool LLThreadSafeQueue::tryPushFrontFor(const std::chrono::duration::tryPushFrontFor(const std::chrono::duration -bool LLThreadSafeQueue::tryPushFront(ElementT const & element) +template +bool LLThreadSafeQueue::tryPush(ElementT const & element) { lock_t lock1(mLock, std::defer_lock); if (!lock1.try_lock()) @@ -228,23 +272,24 @@ bool LLThreadSafeQueue::tryPushFront(ElementT const & element) if (mStorage.size() >= mCapacity) return false; - mStorage.push_front(element); + mStorage.push(element); lock1.unlock(); mEmptyCond.notify_one(); return true; } -template -ElementT LLThreadSafeQueue::popBack(void) +template +ElementT LLThreadSafeQueue::pop(void) { lock_t lock1(mLock); while (true) { if (!mStorage.empty()) { - ElementT value = mStorage.back(); - mStorage.pop_back(); + // std::queue::front() is the element about to pop() + ElementT value = mStorage.front(); + mStorage.pop(); lock1.unlock(); mCapacityCond.notify_one(); return value; @@ -261,54 +306,106 @@ ElementT LLThreadSafeQueue::popBack(void) } -template -bool LLThreadSafeQueue::tryPopBack(ElementT & element) +template +bool LLThreadSafeQueue::tryPop(ElementT & element) { lock_t lock1(mLock, std::defer_lock); if (!lock1.try_lock()) return false; - // no need to check mClosed: tryPopBack() behavior when the queue is + // no need to check mClosed: tryPop() behavior when the queue is // closed is implemented by simple inability to push any new elements if (mStorage.empty()) return false; - element = mStorage.back(); - mStorage.pop_back(); + // std::queue::front() is the element about to pop() + element = mStorage.front(); + mStorage.pop(); lock1.unlock(); mCapacityCond.notify_one(); return true; } -template -size_t LLThreadSafeQueue::size(void) +template +template +bool LLThreadSafeQueue::tryPopFor( + const std::chrono::duration& timeout, + ElementT& element) +{ + // Convert duration to time_point: passing the same timeout duration to + // each of multiple calls is wrong. + return tryPopUntil(std::chrono::steady_clock::now() + timeout, element); +} + + +template +template +bool LLThreadSafeQueue::tryPopUntil( + const std::chrono::time_point& endpoint, + ElementT& element) +{ + lock_t lock1(mLock, std::defer_lock); + if (!lock1.try_lock_until(endpoint)) + return false; + + while (true) + { + if (!mStorage.empty()) + { + // std::queue::front() is the element about to pop() + element = mStorage.front(); + mStorage.pop(); + lock1.unlock(); + mCapacityCond.notify_one(); + return true; + } + + if (mClosed) + { + return false; + } + + // Storage empty. Wait for signal. + if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock1, endpoint)) + { + // timed out -- formally we might recheck both conditions above + return false; + } + // If we didn't time out, we were notified for some reason. Loop back + // to check. + } +} + + +template +size_t LLThreadSafeQueue::size(void) { lock_t lock(mLock); return mStorage.size(); } -template -void LLThreadSafeQueue::close() +template +void LLThreadSafeQueue::close() { lock_t lock(mLock); mClosed = true; lock.unlock(); - // wake up any blocked popBack() calls + // wake up any blocked pop() calls mEmptyCond.notify_all(); - // wake up any blocked pushFront() calls + // wake up any blocked push() calls mCapacityCond.notify_all(); } -template -bool LLThreadSafeQueue::isClosed() +template +bool LLThreadSafeQueue::isClosed() { lock_t lock(mLock); return mClosed && mStorage.size() == 0; } -template -LLThreadSafeQueue::operator bool() +template +LLThreadSafeQueue::operator bool() { return ! isClosed(); } -- cgit v1.2.3 From ca60fbe72ce086fbdf0821043ad3be6aad06857c Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Mon, 4 Oct 2021 16:19:59 -0400 Subject: SL-16024: LLThreadSafeQueue enhancements Add LL::PriorityQueueAdapter, a wrapper for std::priority_queue to make its API more closely resemble std::queue for drop-in use as LLThreadSafeQueue's underlying QueueT container. Support move-only element types. Factor out some implementation redundancy: wrap actual push semantics as push_(), actual pop semantics as pop_(). push(), tryPush() and tryPushUntil() now call push_(); pop(), tryPop() and tryPopUntil() now call pop_(). Break out tryLock() and tryLockUntil() methods that, if they can lock, run the passed callable. Then tryPush(), tryPushUntil(), tryPop() and tryPopUntil() pass lambdas containing the meat of the original method body to tryLock() or tryLockUntil(), as appropriate. --- indra/llcommon/llthreadsafequeue.h | 358 ++++++++++++++++++++++++------------- 1 file changed, 235 insertions(+), 123 deletions(-) (limited to 'indra/llcommon/llthreadsafequeue.h') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 04f51816d7..c57520c01f 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -37,6 +37,9 @@ #include #include +/***************************************************************************** +* LLThreadSafeQueue +*****************************************************************************/ // // A general queue exception. // @@ -77,8 +80,8 @@ class LLThreadSafeQueue public: typedef ElementT value_type; - // If the pool is set to NULL one will be allocated and managed by this - // queue. + // Limiting the number of pending items prevents unbounded growth of the + // underlying queue. LLThreadSafeQueue(U32 capacity = 1024); // Add an element to the queue (will block if the queue has @@ -86,13 +89,15 @@ public: // // This call will raise an interrupt error if the queue is closed while // the caller is blocked. - void push(ElementT const& element); + template + void push(T&& element); // legacy name void pushFront(ElementT const & element) { return push(element); } // Try to add an element to the queue without blocking. Returns // true only if the element was actually added. - bool tryPush(ElementT const& element); + template + bool tryPush(T&& element); // legacy name bool tryPushFront(ElementT const & element) { return tryPush(element); } @@ -102,9 +107,9 @@ public: // to lock the mutex, versus how long to wait for the queue to stop being // full. Careful settings for each timeout might be orders of magnitude // apart. However, this method conflates them. - template + template bool tryPushFor(const std::chrono::duration& timeout, - ElementT const & element); + T&& element); // legacy name template bool tryPushFrontFor(const std::chrono::duration& timeout, @@ -112,9 +117,9 @@ public: // Try to add an element to the queue, blocking if full but with // timeout at specified time_point. Returns true if the element was added. - template - bool tryPushUntil(const std::chrono::time_point& timeout, - ElementT const& element); + template + bool tryPushUntil(const std::chrono::time_point& until, + T&& element); // no legacy name because this is a newer method // Pop the element at the head of the queue (will block if the queue is @@ -141,7 +146,7 @@ public: // Pop the element at the head of the queue, blocking if empty, with // timeout at specified time_point. Returns true if an element was popped. template - bool tryPopUntil(const std::chrono::time_point& timeout, + bool tryPopUntil(const std::chrono::time_point& until, ElementT& element); // no legacy name because this is a newer method @@ -172,11 +177,74 @@ protected: typedef std::unique_lock lock_t; boost::fibers::condition_variable_any mCapacityCond; boost::fibers::condition_variable_any mEmptyCond; -}; -// LLThreadSafeQueue -//----------------------------------------------------------------------------- + // if we're able to lock immediately, do so and run the passed callable, + // which must accept lock_t& and return bool + template + bool tryLock(CALLABLE&& callable); + // if we're able to lock before the passed time_point, do so and run the + // passed callable, which must accept lock_t& and return bool + template + bool tryLockUntil(const std::chrono::time_point& until, + CALLABLE&& callable); + // while lock is locked, really push the passed element, if we can + template + bool push_(lock_t& lock, T&& element); + // while lock is locked, really pop the head element, if we can + template + bool pop_(lock_t& lock, ElementT& element, + PRED&& pred=[](const ElementT&){ return true; }); +}; +/***************************************************************************** +* PriorityQueueAdapter +*****************************************************************************/ +namespace LL +{ + /** + * std::priority_queue's API is almost like std::queue, intentionally of + * course, but you must access the element about to pop() as top() rather + * than as front(). Make an adapter for use with LLThreadSafeQueue. + */ + template , + typename Compare=std::less> + class PriorityQueueAdapter + { + public: + // publish all the same types + typedef std::priority_queue queue_type; + typedef typename queue_type::container_type container_type; + typedef typename queue_type::value_compare value_compare; + typedef typename queue_type::value_type value_type; + typedef typename queue_type::size_type size_type; + typedef typename queue_type::reference reference; + typedef typename queue_type::const_reference const_reference; + + // Although std::queue defines both const and non-const front() + // methods, std::priority_queue defines only const top(). + const_reference front() const { return mQ.top(); } + // std::priority_queue has no equivalent to back(), so it's good that + // LLThreadSafeQueue doesn't use it. + + // All the rest of these merely forward to the corresponding + // queue_type methods. + bool empty() const { return mQ.empty(); } + size_type size() const { return mQ.size(); } + void push(const value_type& value) { mQ.push(value); } + void push(value_type&& value) { mQ.push(std::move(value)); } + template + void emplace(Args&&... args) { mQ.emplace(std::forward(args)...); } + void pop() { mQ.pop(); } + + private: + queue_type mQ; + }; +} // namespace LL + + +/***************************************************************************** +* LLThreadSafeQueue implementation +*****************************************************************************/ template LLThreadSafeQueue::LLThreadSafeQueue(U32 capacity) : mCapacity(capacity), @@ -185,24 +253,69 @@ LLThreadSafeQueue::LLThreadSafeQueue(U32 capacity) : } -template -void LLThreadSafeQueue::push(ElementT const & element) +// if we're able to lock immediately, do so and run the passed callable, which +// must accept lock_t& and return bool +template +template +bool LLThreadSafeQueue::tryLock(CALLABLE&& callable) +{ + lock_t lock1(mLock, std::defer_lock); + if (!lock1.try_lock()) + return false; + + return std::forward(callable)(lock1); +} + + +// if we're able to lock before the passed time_point, do so and run the +// passed callable, which must accept lock_t& and return bool +template +template +bool LLThreadSafeQueue::tryLockUntil( + const std::chrono::time_point& until, + CALLABLE&& callable) +{ + lock_t lock1(mLock, std::defer_lock); + if (!lock1.try_lock_until(until)) + return false; + + return std::forward(callable)(lock1); +} + + +// while lock is locked, really push the passed element, if we can +template +template +bool LLThreadSafeQueue::push_(lock_t& lock, T&& element) +{ + if (mStorage.size() >= mCapacity) + return false; + + mStorage.push(std::forward(element)); + lock.unlock(); + // now that we've pushed, if somebody's been waiting to pop, signal them + mEmptyCond.notify_one(); + return true; +} + + +template +template +void LLThreadSafeQueue::push(T&& element) { lock_t lock1(mLock); while (true) { + // On the producer side, it doesn't matter whether the queue has been + // drained or not: the moment either end calls close(), further push() + // operations will fail. if (mClosed) { LLTHROW(LLThreadSafeQueueInterrupt()); } - if (mStorage.size() < mCapacity) - { - mStorage.push(element); - lock1.unlock(); - mEmptyCond.notify_one(); + if (push_(lock1, std::forward(element))) return; - } // Storage Full. Wait for signal. mCapacityCond.wait(lock1); @@ -210,71 +323,85 @@ void LLThreadSafeQueue::push(ElementT const & element) } +template +template +bool LLThreadSafeQueue::tryPush(T&& element) +{ + return tryLock( + [this, element=std::move(element)](lock_t& lock) + { + if (mClosed) + return false; + return push_(lock, std::move(element)); + }); +} + + template -template +template bool LLThreadSafeQueue::tryPushFor( const std::chrono::duration& timeout, - ElementT const & element) + T&& element) { // Convert duration to time_point: passing the same timeout duration to // each of multiple calls is wrong. - return tryPushUntil(std::chrono::steady_clock::now() + timeout, element); + return tryPushUntil(std::chrono::steady_clock::now() + timeout, + std::forward(element)); } template -template +template bool LLThreadSafeQueue::tryPushUntil( - const std::chrono::time_point& endpoint, - ElementT const& element) + const std::chrono::time_point& until, + T&& element) { - lock_t lock1(mLock, std::defer_lock); - if (!lock1.try_lock_until(endpoint)) - return false; - - while (true) - { - if (mClosed) + return tryLockUntil( + until, + [this, until, element=std::move(element)](lock_t& lock) { - return false; - } - - if (mStorage.size() < mCapacity) - { - mStorage.push(element); - lock1.unlock(); - mEmptyCond.notify_one(); - return true; - } - - // Storage Full. Wait for signal. - if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock1, endpoint)) - { - // timed out -- formally we might recheck both conditions above - return false; - } - // If we didn't time out, we were notified for some reason. Loop back - // to check. - } + while (true) + { + if (mClosed) + { + return false; + } + + if (push_(lock, std::move(element))) + return true; + + // Storage Full. Wait for signal. + if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock, until)) + { + // timed out -- formally we might recheck both conditions above + return false; + } + // If we didn't time out, we were notified for some reason. Loop back + // to check. + } + }); } -template -bool LLThreadSafeQueue::tryPush(ElementT const & element) +// while lock is locked, really pop the head element, if we can +template +template +bool LLThreadSafeQueue::pop_( + lock_t& lock, ElementT& element, PRED&& pred) { - lock_t lock1(mLock, std::defer_lock); - if (!lock1.try_lock()) - return false; - - if (mClosed) - return false; - - if (mStorage.size() >= mCapacity) + // If mStorage is empty, there's no head element. + // If there's a head element, pass it to the predicate to see if caller + // considers it ready to pop. + // Unless both are satisfied, no point in continuing. + if (mStorage.empty() || ! std::forward(pred)(mStorage.front())) return false; - mStorage.push(element); - lock1.unlock(); - mEmptyCond.notify_one(); + // std::queue::front() is the element about to pop() + element = mStorage.front(); + mStorage.pop(); + lock.unlock(); + // now that we've popped, if somebody's been waiting to push, signal them + mCapacityCond.notify_one(); return true; } @@ -285,22 +412,20 @@ ElementT LLThreadSafeQueue::pop(void) lock_t lock1(mLock); while (true) { - if (!mStorage.empty()) - { - // std::queue::front() is the element about to pop() - ElementT value = mStorage.front(); - mStorage.pop(); - lock1.unlock(); - mCapacityCond.notify_one(); - return value; - } - + // On the consumer side, we always try to pop before checking mClosed + // so we can finish draining the queue. + ElementT value; + if (pop_(lock1, value)) + return std::move(value); + + // Once the queue is empty, mClosed lets us know if there will ever be + // any more coming. if (mClosed) { LLTHROW(LLThreadSafeQueueInterrupt()); } - // Storage empty. Wait for signal. + // Storage empty, queue still open. Wait for signal. mEmptyCond.wait(lock1); } } @@ -309,21 +434,14 @@ ElementT LLThreadSafeQueue::pop(void) template bool LLThreadSafeQueue::tryPop(ElementT & element) { - lock_t lock1(mLock, std::defer_lock); - if (!lock1.try_lock()) - return false; - - // no need to check mClosed: tryPop() behavior when the queue is - // closed is implemented by simple inability to push any new elements - if (mStorage.empty()) - return false; - - // std::queue::front() is the element about to pop() - element = mStorage.front(); - mStorage.pop(); - lock1.unlock(); - mCapacityCond.notify_one(); - return true; + return tryLock( + [this, &element](lock_t& lock) + { + // no need to check mClosed: tryPop() behavior when the queue is + // closed is implemented by simple inability to push any new + // elements + return pop_(lock, element); + }); } @@ -342,39 +460,33 @@ bool LLThreadSafeQueue::tryPopFor( template template bool LLThreadSafeQueue::tryPopUntil( - const std::chrono::time_point& endpoint, + const std::chrono::time_point& until, ElementT& element) { - lock_t lock1(mLock, std::defer_lock); - if (!lock1.try_lock_until(endpoint)) - return false; - - while (true) - { - if (!mStorage.empty()) + return tryLockUntil( + until, + [this, until, &element](lock_t& lock) { - // std::queue::front() is the element about to pop() - element = mStorage.front(); - mStorage.pop(); - lock1.unlock(); - mCapacityCond.notify_one(); - return true; - } - - if (mClosed) - { - return false; - } - - // Storage empty. Wait for signal. - if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock1, endpoint)) - { - // timed out -- formally we might recheck both conditions above - return false; - } - // If we didn't time out, we were notified for some reason. Loop back - // to check. - } + while (true) + { + if (pop_(lock, element)) + return true; + + if (mClosed) + { + return false; + } + + // Storage empty. Wait for signal. + if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until)) + { + // timed out -- formally we might recheck both conditions above + return false; + } + // If we didn't time out, we were notified for some reason. Loop back + // to check. + } + }); } -- cgit v1.2.3 From a35e266547e4d2c8dbd6b003c64b719d91eaaf87 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Mon, 4 Oct 2021 17:21:39 -0400 Subject: SL-16024: Don't use a lambda as default arg for universal reference. Instead, break out a separate pop_() method that explicitly provides the lambda to the real pop_() implementation. --- indra/llcommon/llthreadsafequeue.h | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) (limited to 'indra/llcommon/llthreadsafequeue.h') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index c57520c01f..1dffad6b89 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -191,9 +191,11 @@ protected: template bool push_(lock_t& lock, T&& element); // while lock is locked, really pop the head element, if we can + bool pop_(lock_t& lock, ElementT& element); + // pop_() with an explicit predicate indicating whether the head element + // is ready to be popped template - bool pop_(lock_t& lock, ElementT& element, - PRED&& pred=[](const ElementT&){ return true; }); + bool pop_(lock_t& lock, ElementT& element, PRED&& pred); }; /***************************************************************************** @@ -385,6 +387,16 @@ bool LLThreadSafeQueue::tryPushUntil( // while lock is locked, really pop the head element, if we can template +bool LLThreadSafeQueue::pop_(lock_t& lock, ElementT& element) +{ + // default predicate: head element, if present, is always ready to pop + return pop_(lock, element, [](const ElementT&){ return true; }); +} + + +// pop_() with an explicit predicate indicating whether the head element +// is ready to be popped +template template bool LLThreadSafeQueue::pop_( lock_t& lock, ElementT& element, PRED&& pred) -- cgit v1.2.3 From 955b967623983cb50ba09f7b82e5f01f2c6bcebb Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Tue, 5 Oct 2021 17:31:53 -0400 Subject: SL-16024: Add ThreadSafeSchedule, a timestamped LLThreadSafeQueue. ThreadSafeSchedule orders its items by timestamp, which can be passed either implicitly or explicitly. The timestamp specifies earliest delivery time: an item cannot be popped until that time. Add initial tests. Tweak the LLThreadSafeQueue base class to support ThreadSafeSchedule: introduce virtual canPop() method to report whether the current head item is available to pop. The base class unconditionally says yes, ThreadSafeSchedule says it depends on whether its timestamp is still in the future. This replaces the protected pop_() overload accepting a predicate. Rather than explicitly passing a predicate through a couple levels of function call, use canPop() at the level it matters. Runtime behavior that varies depending on an object's leaf class is what virtual functions were invented for. Give pop_() a three-state enum return so pop() can distinguish between "closed and empty" (throws exception) versus "closed, not yet drained because we're not yet ready to pop the head item" (waits). Also break out protected tryPopUntil_() method, the body logic of tryPopUntil(). The public method locks the data structure, the protected method requires that its caller has already done so. Add chrono.h with a more full-featured LL::time_point_cast() function than the one found in , which only converts between time_point durations, not between time_points based on different clocks. --- indra/llcommon/llthreadsafequeue.h | 121 +++++++++++++++++++++---------------- 1 file changed, 68 insertions(+), 53 deletions(-) (limited to 'indra/llcommon/llthreadsafequeue.h') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 1dffad6b89..bd2d82d4c3 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -83,6 +83,7 @@ public: // Limiting the number of pending items prevents unbounded growth of the // underlying queue. LLThreadSafeQueue(U32 capacity = 1024); + virtual ~LLThreadSafeQueue() {} // Add an element to the queue (will block if the queue has // reached capacity). @@ -162,10 +163,10 @@ public: // then every subsequent tryPop() call will return false void close(); - // detect closed state + // producer end: are we prevented from pushing any additional items? bool isClosed(); - // inverse of isClosed() - explicit operator bool(); + // consumer end: are we done, is the queue entirely drained? + bool done(); protected: typedef QueueT queue_type; @@ -178,6 +179,11 @@ protected: boost::fibers::condition_variable_any mCapacityCond; boost::fibers::condition_variable_any mEmptyCond; + // implementation logic, suitable for passing to tryLockUntil() + template + bool tryPopUntil_(lock_t& lock, + const std::chrono::time_point& until, + ElementT& element); // if we're able to lock immediately, do so and run the passed callable, // which must accept lock_t& and return bool template @@ -191,11 +197,11 @@ protected: template bool push_(lock_t& lock, T&& element); // while lock is locked, really pop the head element, if we can - bool pop_(lock_t& lock, ElementT& element); - // pop_() with an explicit predicate indicating whether the head element - // is ready to be popped - template - bool pop_(lock_t& lock, ElementT& element, PRED&& pred); + enum pop_result { EMPTY, WAITING, POPPED }; + pop_result pop_(lock_t& lock, ElementT& element); + // Is the current head element ready to pop? We say yes; subclass can + // override as needed. + virtual bool canPop(const ElementT& head) const { return true; } }; /***************************************************************************** @@ -387,26 +393,16 @@ bool LLThreadSafeQueue::tryPushUntil( // while lock is locked, really pop the head element, if we can template -bool LLThreadSafeQueue::pop_(lock_t& lock, ElementT& element) -{ - // default predicate: head element, if present, is always ready to pop - return pop_(lock, element, [](const ElementT&){ return true; }); -} - - -// pop_() with an explicit predicate indicating whether the head element -// is ready to be popped -template -template -bool LLThreadSafeQueue::pop_( - lock_t& lock, ElementT& element, PRED&& pred) +typename LLThreadSafeQueue::pop_result +LLThreadSafeQueue::pop_(lock_t& lock, ElementT& element) { // If mStorage is empty, there's no head element. - // If there's a head element, pass it to the predicate to see if caller - // considers it ready to pop. - // Unless both are satisfied, no point in continuing. - if (mStorage.empty() || ! std::forward(pred)(mStorage.front())) - return false; + if (mStorage.empty()) + return EMPTY; + + // If there's a head element, pass it to canPop() to see if it's ready to pop. + if (! canPop(mStorage.front())) + return WAITING; // std::queue::front() is the element about to pop() element = mStorage.front(); @@ -414,7 +410,7 @@ bool LLThreadSafeQueue::pop_( lock.unlock(); // now that we've popped, if somebody's been waiting to push, signal them mCapacityCond.notify_one(); - return true; + return POPPED; } @@ -422,17 +418,20 @@ template ElementT LLThreadSafeQueue::pop(void) { lock_t lock1(mLock); + ElementT value; while (true) { // On the consumer side, we always try to pop before checking mClosed // so we can finish draining the queue. - ElementT value; - if (pop_(lock1, value)) + pop_result popped = pop_(lock1, value); + if (popped == POPPED) return std::move(value); // Once the queue is empty, mClosed lets us know if there will ever be - // any more coming. - if (mClosed) + // any more coming. If we didn't pop because WAITING, i.e. canPop() + // returned false, then even if the producer end has been closed, + // there's still at least one item to drain: wait for it. + if (popped == EMPTY && mClosed) { LLTHROW(LLThreadSafeQueueInterrupt()); } @@ -452,7 +451,7 @@ bool LLThreadSafeQueue::tryPop(ElementT & element) // no need to check mClosed: tryPop() behavior when the queue is // closed is implemented by simple inability to push any new // elements - return pop_(lock, element); + return pop_(lock, element) == POPPED; }); } @@ -479,26 +478,38 @@ bool LLThreadSafeQueue::tryPopUntil( until, [this, until, &element](lock_t& lock) { - while (true) - { - if (pop_(lock, element)) - return true; + return tryPopUntil_(lock, until, element); + }); +} - if (mClosed) - { - return false; - } - // Storage empty. Wait for signal. - if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until)) - { - // timed out -- formally we might recheck both conditions above - return false; - } - // If we didn't time out, we were notified for some reason. Loop back - // to check. - } - }); +// body of tryPopUntil(), called once we have the lock +template +template +bool LLThreadSafeQueue::tryPopUntil_( + lock_t& lock, + const std::chrono::time_point& until, + ElementT& element) +{ + while (true) + { + if (pop_(lock, element) == POPPED) + return true; + + if (mClosed) + { + return false; + } + + // Storage empty. Wait for signal. + if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until)) + { + // timed out -- formally we might recheck both conditions above + return false; + } + // If we didn't time out, we were notified for some reason. Loop back + // to check. + } } @@ -509,6 +520,7 @@ size_t LLThreadSafeQueue::size(void) return mStorage.size(); } + template void LLThreadSafeQueue::close() { @@ -521,17 +533,20 @@ void LLThreadSafeQueue::close() mCapacityCond.notify_all(); } + template bool LLThreadSafeQueue::isClosed() { lock_t lock(mLock); - return mClosed && mStorage.size() == 0; + return mClosed; } + template -LLThreadSafeQueue::operator bool() +bool LLThreadSafeQueue::done() { - return ! isClosed(); + lock_t lock(mLock); + return mClosed && mStorage.size() == 0; } #endif -- cgit v1.2.3 From cf70766b4504f7ee745822926c526ed9c86c9339 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Wed, 6 Oct 2021 12:54:29 -0400 Subject: SL-16024: Fix ThreadSafeSchedule::tryPopFor(), tryPopUntil(). ThreadSafeSchedule::tryPopUntil() (and therefore tryPopFor()) was simply delegating to LLThreadSafeQueue::tryPopUntil(), with an adjusted timeout since we want to wake up as soon as the head item, if any, becomes ready. But then we have to loop back to retry the pop to actually deal with that head item. In addition, ThreadSafeSchedule::popWithTime() was spinning rather than properly blocking on a timed condition variable. Fixed. --- indra/llcommon/llthreadsafequeue.h | 51 ++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 24 deletions(-) (limited to 'indra/llcommon/llthreadsafequeue.h') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index bd2d82d4c3..719edcd579 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -179,11 +179,12 @@ protected: boost::fibers::condition_variable_any mCapacityCond; boost::fibers::condition_variable_any mEmptyCond; + enum pop_result { EMPTY, DONE, WAITING, POPPED }; // implementation logic, suitable for passing to tryLockUntil() template - bool tryPopUntil_(lock_t& lock, - const std::chrono::time_point& until, - ElementT& element); + pop_result tryPopUntil_(lock_t& lock, + const std::chrono::time_point& until, + ElementT& element); // if we're able to lock immediately, do so and run the passed callable, // which must accept lock_t& and return bool template @@ -197,7 +198,6 @@ protected: template bool push_(lock_t& lock, T&& element); // while lock is locked, really pop the head element, if we can - enum pop_result { EMPTY, WAITING, POPPED }; pop_result pop_(lock_t& lock, ElementT& element); // Is the current head element ready to pop? We say yes; subclass can // override as needed. @@ -398,7 +398,7 @@ LLThreadSafeQueue::pop_(lock_t& lock, ElementT& element) { // If mStorage is empty, there's no head element. if (mStorage.empty()) - return EMPTY; + return mClosed? DONE : EMPTY; // If there's a head element, pass it to canPop() to see if it's ready to pop. if (! canPop(mStorage.front())) @@ -427,16 +427,16 @@ ElementT LLThreadSafeQueue::pop(void) if (popped == POPPED) return std::move(value); - // Once the queue is empty, mClosed lets us know if there will ever be - // any more coming. If we didn't pop because WAITING, i.e. canPop() - // returned false, then even if the producer end has been closed, - // there's still at least one item to drain: wait for it. - if (popped == EMPTY && mClosed) + // Once the queue is DONE, there will never be any more coming. + if (popped == DONE) { LLTHROW(LLThreadSafeQueueInterrupt()); } - // Storage empty, queue still open. Wait for signal. + // If we didn't pop because WAITING, i.e. canPop() returned false, + // then even if the producer end has been closed, there's still at + // least one item to drain: wait for it. Or we might be EMPTY, with + // the queue still open. Either way, wait for signal. mEmptyCond.wait(lock1); } } @@ -448,8 +448,8 @@ bool LLThreadSafeQueue::tryPop(ElementT & element) return tryLock( [this, &element](lock_t& lock) { - // no need to check mClosed: tryPop() behavior when the queue is - // closed is implemented by simple inability to push any new + // conflate EMPTY, DONE, WAITING: tryPop() behavior when the queue + // is closed is implemented by simple inability to push any new // elements return pop_(lock, element) == POPPED; }); @@ -478,7 +478,8 @@ bool LLThreadSafeQueue::tryPopUntil( until, [this, until, &element](lock_t& lock) { - return tryPopUntil_(lock, until, element); + // conflate EMPTY, DONE, WAITING + return tryPopUntil_(lock, until, element) == POPPED; }); } @@ -486,26 +487,28 @@ bool LLThreadSafeQueue::tryPopUntil( // body of tryPopUntil(), called once we have the lock template template -bool LLThreadSafeQueue::tryPopUntil_( +typename LLThreadSafeQueue::pop_result +LLThreadSafeQueue::tryPopUntil_( lock_t& lock, const std::chrono::time_point& until, ElementT& element) { while (true) { - if (pop_(lock, element) == POPPED) - return true; - - if (mClosed) + pop_result popped = pop_(lock, element); + if (popped == POPPED || popped == DONE) { - return false; + // If we succeeded, great! If we've drained the last item, so be + // it. Either way, break the loop and tell caller. + return popped; } - // Storage empty. Wait for signal. + // EMPTY or WAITING: wait for signal. if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until)) { - // timed out -- formally we might recheck both conditions above - return false; + // timed out -- formally we might recheck + // as it is, break loop + return popped; } // If we didn't time out, we were notified for some reason. Loop back // to check. @@ -546,7 +549,7 @@ template bool LLThreadSafeQueue::done() { lock_t lock(mLock); - return mClosed && mStorage.size() == 0; + return mClosed && mStorage.empty(); } #endif -- cgit v1.2.3 From d00272e0cc9974f35a46f0c313ee2c0e11cddbda Mon Sep 17 00:00:00 2001 From: Dave Parks Date: Mon, 11 Oct 2021 16:03:40 +0000 Subject: SL-16099 Multi-threaded OpenGL usage on Windows, enable Core Profile and VAOs by default. --- indra/llcommon/llthreadsafequeue.h | 3 +++ 1 file changed, 3 insertions(+) (limited to 'indra/llcommon/llthreadsafequeue.h') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 719edcd579..06e8d8f609 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -154,6 +154,9 @@ public: // Returns the size of the queue. size_t size(); + //Returns the capacity of the queue. + U32 capacity() { return mCapacity; } + // closes the queue: // - every subsequent push() call will throw LLThreadSafeQueueInterrupt // - every subsequent tryPush() call will return false -- cgit v1.2.3 From 89f2169e9d2c03ed92810689563ca110886abf16 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 4 Nov 2021 16:43:11 -0400 Subject: SL-16202: Add postIfOpen() methods to WorkQueue, LLThreadSafeQueue. postIfOpen() provides a no-exception alternative to post(), which blocks if full but throws if closed. postIfOpen() likewise blocks if full, but returns true if able to post and false if the queue was closed. --- indra/llcommon/llthreadsafequeue.h | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) (limited to 'indra/llcommon/llthreadsafequeue.h') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 06e8d8f609..5c934791fe 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -85,8 +85,8 @@ public: LLThreadSafeQueue(U32 capacity = 1024); virtual ~LLThreadSafeQueue() {} - // Add an element to the queue (will block if the queue has - // reached capacity). + // Add an element to the queue (will block if the queue has reached + // capacity). // // This call will raise an interrupt error if the queue is closed while // the caller is blocked. @@ -95,6 +95,11 @@ public: // legacy name void pushFront(ElementT const & element) { return push(element); } + // Add an element to the queue (will block if the queue has reached + // capacity). Return false if the queue is closed before push is possible. + template + bool pushIfOpen(T&& element); + // Try to add an element to the queue without blocking. Returns // true only if the element was actually added. template @@ -311,8 +316,8 @@ bool LLThreadSafeQueue::push_(lock_t& lock, T&& element) template -template -void LLThreadSafeQueue::push(T&& element) +template +bool LLThreadSafeQueue::pushIfOpen(T&& element) { lock_t lock1(mLock); while (true) @@ -321,12 +326,10 @@ void LLThreadSafeQueue::push(T&& element) // drained or not: the moment either end calls close(), further push() // operations will fail. if (mClosed) - { - LLTHROW(LLThreadSafeQueueInterrupt()); - } + return false; if (push_(lock1, std::forward(element))) - return; + return true; // Storage Full. Wait for signal. mCapacityCond.wait(lock1); @@ -334,6 +337,17 @@ void LLThreadSafeQueue::push(T&& element) } +template +template +void LLThreadSafeQueue::push(T&& element) +{ + if (! pushIfOpen(std::forward(element))) + { + LLTHROW(LLThreadSafeQueueInterrupt()); + } +} + + template template bool LLThreadSafeQueue::tryPush(T&& element) -- cgit v1.2.3 From 029b41c0419e975bbb28454538b46dc69ce5d2ba Mon Sep 17 00:00:00 2001 From: Dave Houlton Date: Mon, 15 Nov 2021 09:25:35 -0700 Subject: Revert "SL-16220: Merge branch 'origin/DRTVWR-546' into glthread" This reverts commit 5188a26a8521251dda07ac0140bb129f28417e49, reversing changes made to 819088563e13f1d75e048311fbaf0df4a79b7e19. --- indra/llcommon/llthreadsafequeue.h | 30 ++++++++---------------------- 1 file changed, 8 insertions(+), 22 deletions(-) (limited to 'indra/llcommon/llthreadsafequeue.h') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 5c934791fe..06e8d8f609 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -85,8 +85,8 @@ public: LLThreadSafeQueue(U32 capacity = 1024); virtual ~LLThreadSafeQueue() {} - // Add an element to the queue (will block if the queue has reached - // capacity). + // Add an element to the queue (will block if the queue has + // reached capacity). // // This call will raise an interrupt error if the queue is closed while // the caller is blocked. @@ -95,11 +95,6 @@ public: // legacy name void pushFront(ElementT const & element) { return push(element); } - // Add an element to the queue (will block if the queue has reached - // capacity). Return false if the queue is closed before push is possible. - template - bool pushIfOpen(T&& element); - // Try to add an element to the queue without blocking. Returns // true only if the element was actually added. template @@ -316,8 +311,8 @@ bool LLThreadSafeQueue::push_(lock_t& lock, T&& element) template -template -bool LLThreadSafeQueue::pushIfOpen(T&& element) +template +void LLThreadSafeQueue::push(T&& element) { lock_t lock1(mLock); while (true) @@ -326,10 +321,12 @@ bool LLThreadSafeQueue::pushIfOpen(T&& element) // drained or not: the moment either end calls close(), further push() // operations will fail. if (mClosed) - return false; + { + LLTHROW(LLThreadSafeQueueInterrupt()); + } if (push_(lock1, std::forward(element))) - return true; + return; // Storage Full. Wait for signal. mCapacityCond.wait(lock1); @@ -337,17 +334,6 @@ bool LLThreadSafeQueue::pushIfOpen(T&& element) } -template -template -void LLThreadSafeQueue::push(T&& element) -{ - if (! pushIfOpen(std::forward(element))) - { - LLTHROW(LLThreadSafeQueueInterrupt()); - } -} - - template template bool LLThreadSafeQueue::tryPush(T&& element) -- cgit v1.2.3 From 9b0d8c7e629597fd8e6dfb91a6b8f625b34ab274 Mon Sep 17 00:00:00 2001 From: Runitai Linden Date: Mon, 22 Nov 2021 18:42:56 -0600 Subject: SL-16094 More profile hooks for threading code, remove redundant wglCreateContextAttribs call --- indra/llcommon/llthreadsafequeue.h | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) (limited to 'indra/llcommon/llthreadsafequeue.h') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 5c934791fe..2806506550 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -275,6 +275,7 @@ template template bool LLThreadSafeQueue::tryLock(CALLABLE&& callable) { + LL_PROFILE_ZONE_SCOPED; lock_t lock1(mLock, std::defer_lock); if (!lock1.try_lock()) return false; @@ -291,6 +292,7 @@ bool LLThreadSafeQueue::tryLockUntil( const std::chrono::time_point& until, CALLABLE&& callable) { + LL_PROFILE_ZONE_SCOPED; lock_t lock1(mLock, std::defer_lock); if (!lock1.try_lock_until(until)) return false; @@ -304,6 +306,7 @@ template template bool LLThreadSafeQueue::push_(lock_t& lock, T&& element) { + LL_PROFILE_ZONE_SCOPED; if (mStorage.size() >= mCapacity) return false; @@ -319,6 +322,7 @@ template template bool LLThreadSafeQueue::pushIfOpen(T&& element) { + LL_PROFILE_ZONE_SCOPED; lock_t lock1(mLock); while (true) { @@ -341,6 +345,7 @@ template template void LLThreadSafeQueue::push(T&& element) { + LL_PROFILE_ZONE_SCOPED; if (! pushIfOpen(std::forward(element))) { LLTHROW(LLThreadSafeQueueInterrupt()); @@ -352,6 +357,7 @@ template template bool LLThreadSafeQueue::tryPush(T&& element) { + LL_PROFILE_ZONE_SCOPED; return tryLock( [this, element=std::move(element)](lock_t& lock) { @@ -368,6 +374,7 @@ bool LLThreadSafeQueue::tryPushFor( const std::chrono::duration& timeout, T&& element) { + LL_PROFILE_ZONE_SCOPED; // Convert duration to time_point: passing the same timeout duration to // each of multiple calls is wrong. return tryPushUntil(std::chrono::steady_clock::now() + timeout, @@ -381,6 +388,7 @@ bool LLThreadSafeQueue::tryPushUntil( const std::chrono::time_point& until, T&& element) { + LL_PROFILE_ZONE_SCOPED; return tryLockUntil( until, [this, until, element=std::move(element)](lock_t& lock) @@ -413,6 +421,7 @@ template typename LLThreadSafeQueue::pop_result LLThreadSafeQueue::pop_(lock_t& lock, ElementT& element) { + LL_PROFILE_ZONE_SCOPED; // If mStorage is empty, there's no head element. if (mStorage.empty()) return mClosed? DONE : EMPTY; @@ -434,6 +443,7 @@ LLThreadSafeQueue::pop_(lock_t& lock, ElementT& element) template ElementT LLThreadSafeQueue::pop(void) { + LL_PROFILE_ZONE_SCOPED; lock_t lock1(mLock); ElementT value; while (true) @@ -462,6 +472,7 @@ ElementT LLThreadSafeQueue::pop(void) template bool LLThreadSafeQueue::tryPop(ElementT & element) { + LL_PROFILE_ZONE_SCOPED; return tryLock( [this, &element](lock_t& lock) { @@ -479,6 +490,7 @@ bool LLThreadSafeQueue::tryPopFor( const std::chrono::duration& timeout, ElementT& element) { + LL_PROFILE_ZONE_SCOPED; // Convert duration to time_point: passing the same timeout duration to // each of multiple calls is wrong. return tryPopUntil(std::chrono::steady_clock::now() + timeout, element); @@ -491,6 +503,7 @@ bool LLThreadSafeQueue::tryPopUntil( const std::chrono::time_point& until, ElementT& element) { + LL_PROFILE_ZONE_SCOPED; return tryLockUntil( until, [this, until, &element](lock_t& lock) @@ -510,6 +523,7 @@ LLThreadSafeQueue::tryPopUntil_( const std::chrono::time_point& until, ElementT& element) { + LL_PROFILE_ZONE_SCOPED; while (true) { pop_result popped = pop_(lock, element); @@ -536,6 +550,7 @@ LLThreadSafeQueue::tryPopUntil_( template size_t LLThreadSafeQueue::size(void) { + LL_PROFILE_ZONE_SCOPED; lock_t lock(mLock); return mStorage.size(); } @@ -544,6 +559,7 @@ size_t LLThreadSafeQueue::size(void) template void LLThreadSafeQueue::close() { + LL_PROFILE_ZONE_SCOPED; lock_t lock(mLock); mClosed = true; lock.unlock(); @@ -557,6 +573,7 @@ void LLThreadSafeQueue::close() template bool LLThreadSafeQueue::isClosed() { + LL_PROFILE_ZONE_SCOPED; lock_t lock(mLock); return mClosed; } @@ -565,6 +582,7 @@ bool LLThreadSafeQueue::isClosed() template bool LLThreadSafeQueue::done() { + LL_PROFILE_ZONE_SCOPED; lock_t lock(mLock); return mClosed && mStorage.empty(); } -- cgit v1.2.3 From 877a02dba1df8a5d7d9f40b04d6be834ed9864da Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Wed, 24 Nov 2021 09:38:56 -0500 Subject: SL-16094: Fix merge glitches from previous revert. --- indra/llcommon/llthreadsafequeue.h | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) (limited to 'indra/llcommon/llthreadsafequeue.h') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index a588175074..2806506550 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -85,8 +85,8 @@ public: LLThreadSafeQueue(U32 capacity = 1024); virtual ~LLThreadSafeQueue() {} - // Add an element to the queue (will block if the queue has - // reached capacity). + // Add an element to the queue (will block if the queue has reached + // capacity). // // This call will raise an interrupt error if the queue is closed while // the caller is blocked. @@ -95,6 +95,11 @@ public: // legacy name void pushFront(ElementT const & element) { return push(element); } + // Add an element to the queue (will block if the queue has reached + // capacity). Return false if the queue is closed before push is possible. + template + bool pushIfOpen(T&& element); + // Try to add an element to the queue without blocking. Returns // true only if the element was actually added. template @@ -314,8 +319,8 @@ bool LLThreadSafeQueue::push_(lock_t& lock, T&& element) template -template -void LLThreadSafeQueue::push(T&& element) +template +bool LLThreadSafeQueue::pushIfOpen(T&& element) { LL_PROFILE_ZONE_SCOPED; lock_t lock1(mLock); @@ -325,12 +330,10 @@ void LLThreadSafeQueue::push(T&& element) // drained or not: the moment either end calls close(), further push() // operations will fail. if (mClosed) - { - LLTHROW(LLThreadSafeQueueInterrupt()); - } + return false; if (push_(lock1, std::forward(element))) - return; + return true; // Storage Full. Wait for signal. mCapacityCond.wait(lock1); -- cgit v1.2.3 From b504c692554d492113a10ef45427fe0ab0d8a85d Mon Sep 17 00:00:00 2001 From: Ptolemy Date: Thu, 13 Jan 2022 12:55:53 -0800 Subject: SL-16606: Add profiler category THREAD --- indra/llcommon/llthreadsafequeue.h | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) (limited to 'indra/llcommon/llthreadsafequeue.h') diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 2806506550..68d79cdd12 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -275,7 +275,7 @@ template template bool LLThreadSafeQueue::tryLock(CALLABLE&& callable) { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; lock_t lock1(mLock, std::defer_lock); if (!lock1.try_lock()) return false; @@ -292,7 +292,7 @@ bool LLThreadSafeQueue::tryLockUntil( const std::chrono::time_point& until, CALLABLE&& callable) { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; lock_t lock1(mLock, std::defer_lock); if (!lock1.try_lock_until(until)) return false; @@ -306,7 +306,7 @@ template template bool LLThreadSafeQueue::push_(lock_t& lock, T&& element) { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; if (mStorage.size() >= mCapacity) return false; @@ -322,7 +322,7 @@ template template bool LLThreadSafeQueue::pushIfOpen(T&& element) { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; lock_t lock1(mLock); while (true) { @@ -345,7 +345,7 @@ template template void LLThreadSafeQueue::push(T&& element) { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; if (! pushIfOpen(std::forward(element))) { LLTHROW(LLThreadSafeQueueInterrupt()); @@ -357,7 +357,7 @@ template template bool LLThreadSafeQueue::tryPush(T&& element) { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; return tryLock( [this, element=std::move(element)](lock_t& lock) { @@ -374,7 +374,7 @@ bool LLThreadSafeQueue::tryPushFor( const std::chrono::duration& timeout, T&& element) { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; // Convert duration to time_point: passing the same timeout duration to // each of multiple calls is wrong. return tryPushUntil(std::chrono::steady_clock::now() + timeout, @@ -388,7 +388,7 @@ bool LLThreadSafeQueue::tryPushUntil( const std::chrono::time_point& until, T&& element) { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; return tryLockUntil( until, [this, until, element=std::move(element)](lock_t& lock) @@ -421,7 +421,7 @@ template typename LLThreadSafeQueue::pop_result LLThreadSafeQueue::pop_(lock_t& lock, ElementT& element) { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; // If mStorage is empty, there's no head element. if (mStorage.empty()) return mClosed? DONE : EMPTY; @@ -443,7 +443,7 @@ LLThreadSafeQueue::pop_(lock_t& lock, ElementT& element) template ElementT LLThreadSafeQueue::pop(void) { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; lock_t lock1(mLock); ElementT value; while (true) @@ -472,7 +472,7 @@ ElementT LLThreadSafeQueue::pop(void) template bool LLThreadSafeQueue::tryPop(ElementT & element) { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; return tryLock( [this, &element](lock_t& lock) { @@ -490,7 +490,7 @@ bool LLThreadSafeQueue::tryPopFor( const std::chrono::duration& timeout, ElementT& element) { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; // Convert duration to time_point: passing the same timeout duration to // each of multiple calls is wrong. return tryPopUntil(std::chrono::steady_clock::now() + timeout, element); @@ -503,7 +503,7 @@ bool LLThreadSafeQueue::tryPopUntil( const std::chrono::time_point& until, ElementT& element) { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; return tryLockUntil( until, [this, until, &element](lock_t& lock) @@ -523,7 +523,7 @@ LLThreadSafeQueue::tryPopUntil_( const std::chrono::time_point& until, ElementT& element) { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; while (true) { pop_result popped = pop_(lock, element); @@ -550,7 +550,7 @@ LLThreadSafeQueue::tryPopUntil_( template size_t LLThreadSafeQueue::size(void) { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; lock_t lock(mLock); return mStorage.size(); } @@ -559,7 +559,7 @@ size_t LLThreadSafeQueue::size(void) template void LLThreadSafeQueue::close() { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; lock_t lock(mLock); mClosed = true; lock.unlock(); @@ -573,7 +573,7 @@ void LLThreadSafeQueue::close() template bool LLThreadSafeQueue::isClosed() { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; lock_t lock(mLock); return mClosed; } @@ -582,7 +582,7 @@ bool LLThreadSafeQueue::isClosed() template bool LLThreadSafeQueue::done() { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; lock_t lock(mLock); return mClosed && mStorage.empty(); } -- cgit v1.2.3