diff options
Diffstat (limited to 'indra/llcommon')
-rw-r--r-- | indra/llcommon/CMakeLists.txt | 8 | ||||
-rw-r--r-- | indra/llcommon/chrono.h | 65 | ||||
-rw-r--r-- | indra/llcommon/llcond.h | 24 | ||||
-rw-r--r-- | indra/llcommon/llinstancetracker.h | 59 | ||||
-rw-r--r-- | indra/llcommon/llleaplistener.cpp | 2 | ||||
-rw-r--r-- | indra/llcommon/llthreadsafequeue.h | 511 | ||||
-rw-r--r-- | indra/llcommon/tests/llinstancetracker_test.cpp | 14 | ||||
-rw-r--r-- | indra/llcommon/tests/threadsafeschedule_test.cpp | 69 | ||||
-rw-r--r-- | indra/llcommon/tests/tuple_test.cpp | 47 | ||||
-rw-r--r-- | indra/llcommon/tests/workqueue_test.cpp | 158 | ||||
-rw-r--r-- | indra/llcommon/threadsafeschedule.h | 373 | ||||
-rw-r--r-- | indra/llcommon/tuple.h | 84 | ||||
-rw-r--r-- | indra/llcommon/workqueue.cpp | 114 | ||||
-rw-r--r-- | indra/llcommon/workqueue.h | 325 |
14 files changed, 1684 insertions, 169 deletions
diff --git a/indra/llcommon/CMakeLists.txt b/indra/llcommon/CMakeLists.txt index 066d0404ac..fda43dd24c 100644 --- a/indra/llcommon/CMakeLists.txt +++ b/indra/llcommon/CMakeLists.txt @@ -123,12 +123,14 @@ set(llcommon_SOURCE_FILES llworkerthread.cpp timing.cpp u64.cpp + workqueue.cpp StackWalker.cpp ) set(llcommon_HEADER_FILES CMakeLists.txt + chrono.h ctype_workaround.h fix_macros.h indra_constants.h @@ -256,8 +258,11 @@ set(llcommon_HEADER_FILES lockstatic.h stdtypes.h stringize.h + threadsafeschedule.h timer.h + tuple.h u64.h + workqueue.h StackWalker.h ) @@ -362,6 +367,9 @@ if (LL_TESTS) LL_ADD_INTEGRATION_TEST(lluri "" "${test_libs}") LL_ADD_INTEGRATION_TEST(llunits "" "${test_libs}") LL_ADD_INTEGRATION_TEST(stringize "" "${test_libs}") + LL_ADD_INTEGRATION_TEST(threadsafeschedule "" "${test_libs}") + LL_ADD_INTEGRATION_TEST(tuple "" "${test_libs}") + LL_ADD_INTEGRATION_TEST(workqueue "" "${test_libs}") ## llexception_test.cpp isn't a regression test, and doesn't need to be run ## every build. It's to help a developer make implementation choices about diff --git a/indra/llcommon/chrono.h b/indra/llcommon/chrono.h new file mode 100644 index 0000000000..806e871892 --- /dev/null +++ b/indra/llcommon/chrono.h @@ -0,0 +1,65 @@ +/** + * @file chrono.h + * @author Nat Goodspeed + * @date 2021-10-05 + * @brief supplement <chrono> with utility functions + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +#if ! defined(LL_CHRONO_H) +#define LL_CHRONO_H + +#include <chrono> +#include <type_traits> // std::enable_if + +namespace LL +{ + +// time_point_cast() is derived from https://stackoverflow.com/a/35293183 +// without the iteration: we think errors in the ~1 microsecond range are +// probably acceptable. + +// This variant is for the optimal case when the source and dest use the same +// clock: that case is handled by std::chrono. +template <typename DestTimePoint, typename SrcTimePoint, + typename std::enable_if<std::is_same<typename DestTimePoint::clock, + typename SrcTimePoint::clock>::value, + bool>::type = true> +DestTimePoint time_point_cast(const SrcTimePoint& time) +{ + return std::chrono::time_point_cast<typename DestTimePoint::duration>(time); +} + +// This variant is for when the source and dest use different clocks -- see +// the linked StackOverflow answer, also Howard Hinnant's, for more context. +template <typename DestTimePoint, typename SrcTimePoint, + typename std::enable_if<! std::is_same<typename DestTimePoint::clock, + typename SrcTimePoint::clock>::value, + bool>::type = true> +DestTimePoint time_point_cast(const SrcTimePoint& time) +{ + // The basic idea is that we must adjust the passed time_point by the + // difference between the clocks' epochs. But since time_point doesn't + // expose its epoch, we fall back on what each of them thinks is now(). + // However, since we necessarily make sequential calls to those now() + // functions, the answers differ not only by the cycles spent executing + // those calls, but by potential OS interruptions between them. Try to + // reduce that error by capturing the source clock time both before and + // after the dest clock, and splitting the difference. Of course an + // interruption between two of these now() calls without a comparable + // interruption between the other two will skew the result, but better is + // more expensive. + const auto src_before = typename SrcTimePoint::clock::now(); + const auto dest_now = typename DestTimePoint::clock::now(); + const auto src_after = typename SrcTimePoint::clock::now(); + const auto src_diff = src_after - src_before; + const auto src_now = src_before + src_diff / 2; + return dest_now + (time - src_now); +} + +} // namespace LL + +#endif /* ! defined(LL_CHRONO_H) */ diff --git a/indra/llcommon/llcond.h b/indra/llcommon/llcond.h index e31b67d893..c08acb66a1 100644 --- a/indra/llcommon/llcond.h +++ b/indra/llcommon/llcond.h @@ -53,6 +53,8 @@ private: LLCoros::Mutex mMutex; // Use LLCoros::ConditionVariable for the same reason. LLCoros::ConditionVariable mCond; + using LockType = LLCoros::LockType; + using cv_status = LLCoros::cv_status; public: /// LLCond can be explicitly initialized with a specific value for mData if @@ -65,10 +67,14 @@ public: LLCond(const LLCond&) = delete; LLCond& operator=(const LLCond&) = delete; - /// get() returns a const reference to the stored DATA. The only way to - /// get a non-const reference -- to modify the stored DATA -- is via - /// update_one() or update_all(). - const value_type& get() const { return mData; } + /// get() returns the stored DATA by value -- so to use get(), DATA must + /// be copyable. The only way to get a non-const reference -- to modify + /// the stored DATA -- is via update_one() or update_all(). + value_type get() + { + LockType lk(mMutex); + return mData; + } /** * Pass update_one() an invocable accepting non-const (DATA&). The @@ -83,7 +89,7 @@ public: void update_one(MODIFY modify) { { // scope of lock can/should end before notify_one() - LLCoros::LockType lk(mMutex); + LockType lk(mMutex); modify(mData); } mCond.notify_one(); @@ -102,7 +108,7 @@ public: void update_all(MODIFY modify) { { // scope of lock can/should end before notify_all() - LLCoros::LockType lk(mMutex); + LockType lk(mMutex); modify(mData); } mCond.notify_all(); @@ -118,7 +124,7 @@ public: template <typename Pred> void wait(Pred pred) { - LLCoros::LockType lk(mMutex); + LockType lk(mMutex); // We must iterate explicitly since the predicate accepted by // condition_variable::wait() requires a different signature: // condition_variable::wait() calls its predicate with no arguments. @@ -205,14 +211,14 @@ private: template <typename Clock, typename Duration, typename Pred> bool wait_until(const std::chrono::time_point<Clock, Duration>& timeout_time, Pred pred) { - LLCoros::LockType lk(mMutex); + LockType lk(mMutex); // We advise the caller to pass a predicate accepting (const DATA&). // But what if they instead pass a predicate accepting non-const // (DATA&)? Such a predicate could modify mData, which would be Bad. // Forbid that. while (! pred(const_cast<const value_type&>(mData))) { - if (LLCoros::cv_status::timeout == mCond.wait_until(lk, timeout_time)) + if (cv_status::timeout == mCond.wait_until(lk, timeout_time)) { // It's possible that wait_until() timed out AND the predicate // became true more or less simultaneously. Even though diff --git a/indra/llcommon/llinstancetracker.h b/indra/llcommon/llinstancetracker.h index 402333cca7..02535a59e7 100644 --- a/indra/llcommon/llinstancetracker.h +++ b/indra/llcommon/llinstancetracker.h @@ -83,13 +83,34 @@ class LLInstanceTracker typedef llthread::LockStatic<StaticData> LockStatic; public: + using ptr_t = std::shared_ptr<T>; + using weak_t = std::weak_ptr<T>; + + /** + * Storing a dumb T* somewhere external is a bad idea, since + * LLInstanceTracker subclasses are explicitly destroyed rather than + * managed by smart pointers. It's legal to declare stack instances of an + * LLInstanceTracker subclass. But it's reasonable to store a + * std::weak_ptr<T>, which will become invalid when the T instance is + * destroyed. + */ + weak_t getWeak() + { + return mSelf; + } + + static S32 instanceCount() + { + return LockStatic()->mMap.size(); + } + // snapshot of std::pair<const KEY, std::shared_ptr<T>> pairs class snapshot { // It's very important that what we store in this snapshot are // weak_ptrs, NOT shared_ptrs. That's how we discover whether any // instance has been deleted during the lifespan of a snapshot. - typedef std::vector<std::pair<const KEY, std::weak_ptr<T>>> VectorType; + typedef std::vector<std::pair<const KEY, weak_t>> VectorType; // Dereferencing our iterator produces a std::shared_ptr for each // instance that still exists. Since we store weak_ptrs, that involves // two chained transformations: @@ -98,7 +119,7 @@ public: // It is very important that we filter lazily, that is, during // traversal. Any one of our stored weak_ptrs might expire during // traversal. - typedef std::pair<const KEY, std::shared_ptr<T>> strong_pair; + typedef std::pair<const KEY, ptr_t> strong_pair; // Note for future reference: nat has not yet had any luck (up to // Boost 1.67) trying to use boost::transform_iterator with a hand- // coded functor, only with actual functions. In my experience, an @@ -202,17 +223,12 @@ public: iterator end() { return iterator(snapshot::end(), key_getter); } }; - static T* getInstance(const KEY& k) + static ptr_t getInstance(const KEY& k) { LockStatic lock; const InstanceMap& map(lock->mMap); typename InstanceMap::const_iterator found = map.find(k); - return (found == map.end()) ? NULL : found->second.get(); - } - - static S32 instanceCount() - { - return LockStatic()->mMap.size(); + return (found == map.end()) ? NULL : found->second; } protected: @@ -222,7 +238,9 @@ protected: // shared_ptr, so give it a no-op deleter. We store shared_ptrs in our // InstanceMap specifically so snapshot can store weak_ptrs so we can // detect deletions during traversals. - std::shared_ptr<T> ptr(static_cast<T*>(this), [](T*){}); + ptr_t ptr(static_cast<T*>(this), [](T*){}); + // save corresponding weak_ptr for future reference + mSelf = ptr; LockStatic lock; add_(lock, key, ptr); } @@ -257,7 +275,7 @@ private: static std::string report(const char* key) { return report(std::string(key)); } // caller must instantiate LockStatic - void add_(LockStatic& lock, const KEY& key, const std::shared_ptr<T>& ptr) + void add_(LockStatic& lock, const KEY& key, const ptr_t& ptr) { mInstanceKey = key; InstanceMap& map = lock->mMap; @@ -281,7 +299,7 @@ private: break; } } - std::shared_ptr<T> remove_(LockStatic& lock) + ptr_t remove_(LockStatic& lock) { InstanceMap& map = lock->mMap; typename InstanceMap::iterator iter = map.find(mInstanceKey); @@ -295,6 +313,9 @@ private: } private: + // Storing a weak_ptr to self is a bit like deriving from + // std::enable_shared_from_this(), except more explicit. + weak_t mSelf; KEY mInstanceKey; }; @@ -326,6 +347,9 @@ class LLInstanceTracker<T, void, KEY_COLLISION_BEHAVIOR> typedef llthread::LockStatic<StaticData> LockStatic; public: + using ptr_t = std::shared_ptr<T>; + using weak_t = std::weak_ptr<T>; + /** * Storing a dumb T* somewhere external is a bad idea, since * LLInstanceTracker subclasses are explicitly destroyed rather than @@ -334,12 +358,15 @@ public: * std::weak_ptr<T>, which will become invalid when the T instance is * destroyed. */ - std::weak_ptr<T> getWeak() + weak_t getWeak() { return mSelf; } - static S32 instanceCount() { return LockStatic()->mSet.size(); } + static S32 instanceCount() + { + return LockStatic()->mSet.size(); + } // snapshot of std::shared_ptr<T> pointers class snapshot @@ -347,7 +374,7 @@ public: // It's very important that what we store in this snapshot are // weak_ptrs, NOT shared_ptrs. That's how we discover whether any // instance has been deleted during the lifespan of a snapshot. - typedef std::vector<std::weak_ptr<T>> VectorType; + typedef std::vector<weak_t> VectorType; // Dereferencing our iterator produces a std::shared_ptr for each // instance that still exists. Since we store weak_ptrs, that involves // two chained transformations: @@ -453,7 +480,7 @@ protected: private: // Storing a weak_ptr to self is a bit like deriving from // std::enable_shared_from_this(), except more explicit. - std::weak_ptr<T> mSelf; + weak_t mSelf; }; #endif diff --git a/indra/llcommon/llleaplistener.cpp b/indra/llcommon/llleaplistener.cpp index 3e6ce9092c..11bfec1b31 100644 --- a/indra/llcommon/llleaplistener.cpp +++ b/indra/llcommon/llleaplistener.cpp @@ -220,7 +220,7 @@ void LLLeapListener::getAPI(const LLSD& request) const { Response reply(LLSD(), request); - LLEventAPI* found = LLEventAPI::getInstance(request["api"]); + auto found = LLEventAPI::getInstance(request["api"]); if (found) { reply["name"] = found->getName(); diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 26e0d71d31..719edcd579 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -1,6 +1,6 @@ /** * @file llthreadsafequeue.h - * @brief Base classes for thread, mutex and condition handling. + * @brief Queue protected with mutexes for cross-thread use * * $LicenseInfo:firstyear=2004&license=viewerlgpl$ * Second Life Viewer Source Code @@ -27,16 +27,19 @@ #ifndef LL_LLTHREADSAFEQUEUE_H #define LL_LLTHREADSAFEQUEUE_H -#include "llexception.h" -#include <deque> -#include <string> -#include <chrono> -#include "mutex.h" #include "llcoros.h" #include LLCOROS_MUTEX_HEADER #include <boost/fiber/timed_mutex.hpp> #include LLCOROS_CONDVAR_HEADER +#include "llexception.h" +#include "mutex.h" +#include <chrono> +#include <queue> +#include <string> +/***************************************************************************** +* LLThreadSafeQueue +*****************************************************************************/ // // A general queue exception. // @@ -66,70 +69,108 @@ public: } }; -// -// Implements a thread safe FIFO. -// -template<typename ElementT> +/** + * Implements a thread safe FIFO. + */ +// Let the default std::queue default to underlying std::deque. Override if +// desired. +template<typename ElementT, typename QueueT=std::queue<ElementT>> class LLThreadSafeQueue { public: typedef ElementT value_type; - - // If the pool is set to NULL one will be allocated and managed by this - // queue. + + // Limiting the number of pending items prevents unbounded growth of the + // underlying queue. LLThreadSafeQueue(U32 capacity = 1024); - - // Add an element to the front of queue (will block if the queue has + virtual ~LLThreadSafeQueue() {} + + // Add an element to the queue (will block if the queue has // reached capacity). // // This call will raise an interrupt error if the queue is closed while // the caller is blocked. - void pushFront(ElementT const & element); - - // Try to add an element to the front of queue without blocking. Returns + template <typename T> + void push(T&& element); + // legacy name + void pushFront(ElementT const & element) { return push(element); } + + // Try to add an element to the queue without blocking. Returns // true only if the element was actually added. - bool tryPushFront(ElementT const & element); + template <typename T> + bool tryPush(T&& element); + // legacy name + bool tryPushFront(ElementT const & element) { return tryPush(element); } - // Try to add an element to the front of queue, blocking if full but with - // timeout. Returns true if the element was added. + // Try to add an element to the queue, blocking if full but with timeout + // after specified duration. Returns true if the element was added. // There are potentially two different timeouts involved: how long to try // to lock the mutex, versus how long to wait for the queue to stop being // full. Careful settings for each timeout might be orders of magnitude // apart. However, this method conflates them. + template <typename Rep, typename Period, typename T> + bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout, + T&& element); + // legacy name template <typename Rep, typename Period> bool tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout, - ElementT const & element); + ElementT const & element) { return tryPushFor(timeout, element); } - // Pop the element at the end of the queue (will block if the queue is + // Try to add an element to the queue, blocking if full but with + // timeout at specified time_point. Returns true if the element was added. + template <typename Clock, typename Duration, typename T> + bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until, + T&& element); + // no legacy name because this is a newer method + + // Pop the element at the head of the queue (will block if the queue is // empty). // // This call will raise an interrupt error if the queue is closed while // the caller is blocked. - ElementT popBack(void); - - // Pop an element from the end of the queue if there is one available. + ElementT pop(void); + // legacy name + ElementT popBack(void) { return pop(); } + + // Pop an element from the head of the queue if there is one available. // Returns true only if an element was popped. - bool tryPopBack(ElementT & element); - + bool tryPop(ElementT & element); + // legacy name + bool tryPopBack(ElementT & element) { return tryPop(element); } + + // Pop the element at the head of the queue, blocking if empty, with + // timeout after specified duration. Returns true if an element was popped. + template <typename Rep, typename Period> + bool tryPopFor(const std::chrono::duration<Rep, Period>& timeout, ElementT& element); + // no legacy name because this is a newer method + + // Pop the element at the head of the queue, blocking if empty, with + // timeout at specified time_point. Returns true if an element was popped. + template <typename Clock, typename Duration> + bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until, + ElementT& element); + // no legacy name because this is a newer method + // Returns the size of the queue. size_t size(); // closes the queue: - // - every subsequent pushFront() call will throw LLThreadSafeQueueInterrupt - // - every subsequent tryPushFront() call will return false - // - popBack() calls will return normally until the queue is drained, then - // every subsequent popBack() will throw LLThreadSafeQueueInterrupt - // - tryPopBack() calls will return normally until the queue is drained, - // then every subsequent tryPopBack() call will return false + // - every subsequent push() call will throw LLThreadSafeQueueInterrupt + // - every subsequent tryPush() call will return false + // - pop() calls will return normally until the queue is drained, then + // every subsequent pop() will throw LLThreadSafeQueueInterrupt + // - tryPop() calls will return normally until the queue is drained, + // then every subsequent tryPop() call will return false void close(); - // detect closed state + // producer end: are we prevented from pushing any additional items? bool isClosed(); - // inverse of isClosed() - explicit operator bool(); + // consumer end: are we done, is the queue entirely drained? + bool done(); -private: - std::deque< ElementT > mStorage; +protected: + typedef QueueT queue_type; + QueueT mStorage; U32 mCapacity; bool mClosed; @@ -137,37 +178,152 @@ private: typedef std::unique_lock<decltype(mLock)> lock_t; boost::fibers::condition_variable_any mCapacityCond; boost::fibers::condition_variable_any mEmptyCond; -}; -// LLThreadSafeQueue -//----------------------------------------------------------------------------- + enum pop_result { EMPTY, DONE, WAITING, POPPED }; + // implementation logic, suitable for passing to tryLockUntil() + template <typename Clock, typename Duration> + pop_result tryPopUntil_(lock_t& lock, + const std::chrono::time_point<Clock, Duration>& until, + ElementT& element); + // if we're able to lock immediately, do so and run the passed callable, + // which must accept lock_t& and return bool + template <typename CALLABLE> + bool tryLock(CALLABLE&& callable); + // if we're able to lock before the passed time_point, do so and run the + // passed callable, which must accept lock_t& and return bool + template <typename Clock, typename Duration, typename CALLABLE> + bool tryLockUntil(const std::chrono::time_point<Clock, Duration>& until, + CALLABLE&& callable); + // while lock is locked, really push the passed element, if we can + template <typename T> + bool push_(lock_t& lock, T&& element); + // while lock is locked, really pop the head element, if we can + pop_result pop_(lock_t& lock, ElementT& element); + // Is the current head element ready to pop? We say yes; subclass can + // override as needed. + virtual bool canPop(const ElementT& head) const { return true; } +}; -template<typename ElementT> -LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(U32 capacity) : +/***************************************************************************** +* PriorityQueueAdapter +*****************************************************************************/ +namespace LL +{ + /** + * std::priority_queue's API is almost like std::queue, intentionally of + * course, but you must access the element about to pop() as top() rather + * than as front(). Make an adapter for use with LLThreadSafeQueue. + */ + template <typename T, typename Container=std::vector<T>, + typename Compare=std::less<typename Container::value_type>> + class PriorityQueueAdapter + { + public: + // publish all the same types + typedef std::priority_queue<T, Container, Compare> queue_type; + typedef typename queue_type::container_type container_type; + typedef typename queue_type::value_compare value_compare; + typedef typename queue_type::value_type value_type; + typedef typename queue_type::size_type size_type; + typedef typename queue_type::reference reference; + typedef typename queue_type::const_reference const_reference; + + // Although std::queue defines both const and non-const front() + // methods, std::priority_queue defines only const top(). + const_reference front() const { return mQ.top(); } + // std::priority_queue has no equivalent to back(), so it's good that + // LLThreadSafeQueue doesn't use it. + + // All the rest of these merely forward to the corresponding + // queue_type methods. + bool empty() const { return mQ.empty(); } + size_type size() const { return mQ.size(); } + void push(const value_type& value) { mQ.push(value); } + void push(value_type&& value) { mQ.push(std::move(value)); } + template <typename... Args> + void emplace(Args&&... args) { mQ.emplace(std::forward<Args>(args)...); } + void pop() { mQ.pop(); } + + private: + queue_type mQ; + }; +} // namespace LL + + +/***************************************************************************** +* LLThreadSafeQueue implementation +*****************************************************************************/ +template<typename ElementT, typename QueueT> +LLThreadSafeQueue<ElementT, QueueT>::LLThreadSafeQueue(U32 capacity) : mCapacity(capacity), mClosed(false) { } -template<typename ElementT> -void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element) +// if we're able to lock immediately, do so and run the passed callable, which +// must accept lock_t& and return bool +template <typename ElementT, typename QueueT> +template <typename CALLABLE> +bool LLThreadSafeQueue<ElementT, QueueT>::tryLock(CALLABLE&& callable) +{ + lock_t lock1(mLock, std::defer_lock); + if (!lock1.try_lock()) + return false; + + return std::forward<CALLABLE>(callable)(lock1); +} + + +// if we're able to lock before the passed time_point, do so and run the +// passed callable, which must accept lock_t& and return bool +template <typename ElementT, typename QueueT> +template <typename Clock, typename Duration, typename CALLABLE> +bool LLThreadSafeQueue<ElementT, QueueT>::tryLockUntil( + const std::chrono::time_point<Clock, Duration>& until, + CALLABLE&& callable) +{ + lock_t lock1(mLock, std::defer_lock); + if (!lock1.try_lock_until(until)) + return false; + + return std::forward<CALLABLE>(callable)(lock1); +} + + +// while lock is locked, really push the passed element, if we can +template <typename ElementT, typename QueueT> +template <typename T> +bool LLThreadSafeQueue<ElementT, QueueT>::push_(lock_t& lock, T&& element) +{ + if (mStorage.size() >= mCapacity) + return false; + + mStorage.push(std::forward<T>(element)); + lock.unlock(); + // now that we've pushed, if somebody's been waiting to pop, signal them + mEmptyCond.notify_one(); + return true; +} + + +template <typename ElementT, typename QueueT> +template<typename T> +void LLThreadSafeQueue<ElementT, QueueT>::push(T&& element) { lock_t lock1(mLock); while (true) { + // On the producer side, it doesn't matter whether the queue has been + // drained or not: the moment either end calls close(), further push() + // operations will fail. if (mClosed) { LLTHROW(LLThreadSafeQueueInterrupt()); } - if (mStorage.size() < mCapacity) - { - mStorage.push_front(element); - lock1.unlock(); - mEmptyCond.notify_one(); + if (push_(lock1, std::forward<T>(element))) return; - } // Storage Full. Wait for signal. mCapacityCond.wait(lock1); @@ -175,142 +331,225 @@ void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element) } -template <typename ElementT> -template <typename Rep, typename Period> -bool LLThreadSafeQueue<ElementT>::tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout, - ElementT const & element) +template<typename ElementT, typename QueueT> +template<typename T> +bool LLThreadSafeQueue<ElementT, QueueT>::tryPush(T&& element) { - // Convert duration to time_point: passing the same timeout duration to - // each of multiple calls is wrong. - auto endpoint = std::chrono::steady_clock::now() + timeout; + return tryLock( + [this, element=std::move(element)](lock_t& lock) + { + if (mClosed) + return false; + return push_(lock, std::move(element)); + }); +} - lock_t lock1(mLock, std::defer_lock); - if (!lock1.try_lock_until(endpoint)) - return false; - while (true) - { - if (mClosed) - { - return false; - } +template <typename ElementT, typename QueueT> +template <typename Rep, typename Period, typename T> +bool LLThreadSafeQueue<ElementT, QueueT>::tryPushFor( + const std::chrono::duration<Rep, Period>& timeout, + T&& element) +{ + // Convert duration to time_point: passing the same timeout duration to + // each of multiple calls is wrong. + return tryPushUntil(std::chrono::steady_clock::now() + timeout, + std::forward<T>(element)); +} - if (mStorage.size() < mCapacity) - { - mStorage.push_front(element); - lock1.unlock(); - mEmptyCond.notify_one(); - return true; - } - // Storage Full. Wait for signal. - if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock1, endpoint)) +template <typename ElementT, typename QueueT> +template <typename Clock, typename Duration, typename T> +bool LLThreadSafeQueue<ElementT, QueueT>::tryPushUntil( + const std::chrono::time_point<Clock, Duration>& until, + T&& element) +{ + return tryLockUntil( + until, + [this, until, element=std::move(element)](lock_t& lock) { - // timed out -- formally we might recheck both conditions above - return false; - } - // If we didn't time out, we were notified for some reason. Loop back - // to check. - } + while (true) + { + if (mClosed) + { + return false; + } + + if (push_(lock, std::move(element))) + return true; + + // Storage Full. Wait for signal. + if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock, until)) + { + // timed out -- formally we might recheck both conditions above + return false; + } + // If we didn't time out, we were notified for some reason. Loop back + // to check. + } + }); } -template<typename ElementT> -bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element) +// while lock is locked, really pop the head element, if we can +template <typename ElementT, typename QueueT> +typename LLThreadSafeQueue<ElementT, QueueT>::pop_result +LLThreadSafeQueue<ElementT, QueueT>::pop_(lock_t& lock, ElementT& element) { - lock_t lock1(mLock, std::defer_lock); - if (!lock1.try_lock()) - return false; - - if (mClosed) - return false; + // If mStorage is empty, there's no head element. + if (mStorage.empty()) + return mClosed? DONE : EMPTY; - if (mStorage.size() >= mCapacity) - return false; + // If there's a head element, pass it to canPop() to see if it's ready to pop. + if (! canPop(mStorage.front())) + return WAITING; - mStorage.push_front(element); - lock1.unlock(); - mEmptyCond.notify_one(); - return true; + // std::queue::front() is the element about to pop() + element = mStorage.front(); + mStorage.pop(); + lock.unlock(); + // now that we've popped, if somebody's been waiting to push, signal them + mCapacityCond.notify_one(); + return POPPED; } -template<typename ElementT> -ElementT LLThreadSafeQueue<ElementT>::popBack(void) +template<typename ElementT, typename QueueT> +ElementT LLThreadSafeQueue<ElementT, QueueT>::pop(void) { lock_t lock1(mLock); + ElementT value; while (true) { - if (!mStorage.empty()) - { - ElementT value = mStorage.back(); - mStorage.pop_back(); - lock1.unlock(); - mCapacityCond.notify_one(); - return value; - } - - if (mClosed) + // On the consumer side, we always try to pop before checking mClosed + // so we can finish draining the queue. + pop_result popped = pop_(lock1, value); + if (popped == POPPED) + return std::move(value); + + // Once the queue is DONE, there will never be any more coming. + if (popped == DONE) { LLTHROW(LLThreadSafeQueueInterrupt()); } - // Storage empty. Wait for signal. + // If we didn't pop because WAITING, i.e. canPop() returned false, + // then even if the producer end has been closed, there's still at + // least one item to drain: wait for it. Or we might be EMPTY, with + // the queue still open. Either way, wait for signal. mEmptyCond.wait(lock1); } } -template<typename ElementT> -bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element) +template<typename ElementT, typename QueueT> +bool LLThreadSafeQueue<ElementT, QueueT>::tryPop(ElementT & element) { - lock_t lock1(mLock, std::defer_lock); - if (!lock1.try_lock()) - return false; + return tryLock( + [this, &element](lock_t& lock) + { + // conflate EMPTY, DONE, WAITING: tryPop() behavior when the queue + // is closed is implemented by simple inability to push any new + // elements + return pop_(lock, element) == POPPED; + }); +} - // no need to check mClosed: tryPopBack() behavior when the queue is - // closed is implemented by simple inability to push any new elements - if (mStorage.empty()) - return false; - element = mStorage.back(); - mStorage.pop_back(); - lock1.unlock(); - mCapacityCond.notify_one(); - return true; +template <typename ElementT, typename QueueT> +template <typename Rep, typename Period> +bool LLThreadSafeQueue<ElementT, QueueT>::tryPopFor( + const std::chrono::duration<Rep, Period>& timeout, + ElementT& element) +{ + // Convert duration to time_point: passing the same timeout duration to + // each of multiple calls is wrong. + return tryPopUntil(std::chrono::steady_clock::now() + timeout, element); } -template<typename ElementT> -size_t LLThreadSafeQueue<ElementT>::size(void) +template <typename ElementT, typename QueueT> +template <typename Clock, typename Duration> +bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil( + const std::chrono::time_point<Clock, Duration>& until, + ElementT& element) +{ + return tryLockUntil( + until, + [this, until, &element](lock_t& lock) + { + // conflate EMPTY, DONE, WAITING + return tryPopUntil_(lock, until, element) == POPPED; + }); +} + + +// body of tryPopUntil(), called once we have the lock +template <typename ElementT, typename QueueT> +template <typename Clock, typename Duration> +typename LLThreadSafeQueue<ElementT, QueueT>::pop_result +LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil_( + lock_t& lock, + const std::chrono::time_point<Clock, Duration>& until, + ElementT& element) +{ + while (true) + { + pop_result popped = pop_(lock, element); + if (popped == POPPED || popped == DONE) + { + // If we succeeded, great! If we've drained the last item, so be + // it. Either way, break the loop and tell caller. + return popped; + } + + // EMPTY or WAITING: wait for signal. + if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until)) + { + // timed out -- formally we might recheck + // as it is, break loop + return popped; + } + // If we didn't time out, we were notified for some reason. Loop back + // to check. + } +} + + +template<typename ElementT, typename QueueT> +size_t LLThreadSafeQueue<ElementT, QueueT>::size(void) { lock_t lock(mLock); return mStorage.size(); } -template<typename ElementT> -void LLThreadSafeQueue<ElementT>::close() + +template<typename ElementT, typename QueueT> +void LLThreadSafeQueue<ElementT, QueueT>::close() { lock_t lock(mLock); mClosed = true; lock.unlock(); - // wake up any blocked popBack() calls + // wake up any blocked pop() calls mEmptyCond.notify_all(); - // wake up any blocked pushFront() calls + // wake up any blocked push() calls mCapacityCond.notify_all(); } -template<typename ElementT> -bool LLThreadSafeQueue<ElementT>::isClosed() + +template<typename ElementT, typename QueueT> +bool LLThreadSafeQueue<ElementT, QueueT>::isClosed() { lock_t lock(mLock); - return mClosed && mStorage.size() == 0; + return mClosed; } -template<typename ElementT> -LLThreadSafeQueue<ElementT>::operator bool() + +template<typename ElementT, typename QueueT> +bool LLThreadSafeQueue<ElementT, QueueT>::done() { - return ! isClosed(); + lock_t lock(mLock); + return mClosed && mStorage.empty(); } #endif diff --git a/indra/llcommon/tests/llinstancetracker_test.cpp b/indra/llcommon/tests/llinstancetracker_test.cpp index 9b89159625..5daa29adf4 100644 --- a/indra/llcommon/tests/llinstancetracker_test.cpp +++ b/indra/llcommon/tests/llinstancetracker_test.cpp @@ -90,19 +90,19 @@ namespace tut { Keyed one("one"); ensure_equals(Keyed::instanceCount(), 1); - Keyed* found = Keyed::getInstance("one"); - ensure("couldn't find stack Keyed", found); - ensure_equals("found wrong Keyed instance", found, &one); + auto found = Keyed::getInstance("one"); + ensure("couldn't find stack Keyed", bool(found)); + ensure_equals("found wrong Keyed instance", found.get(), &one); { boost::scoped_ptr<Keyed> two(new Keyed("two")); ensure_equals(Keyed::instanceCount(), 2); - Keyed* found = Keyed::getInstance("two"); - ensure("couldn't find heap Keyed", found); - ensure_equals("found wrong Keyed instance", found, two.get()); + auto found = Keyed::getInstance("two"); + ensure("couldn't find heap Keyed", bool(found)); + ensure_equals("found wrong Keyed instance", found.get(), two.get()); } ensure_equals(Keyed::instanceCount(), 1); } - Keyed* found = Keyed::getInstance("one"); + auto found = Keyed::getInstance("one"); ensure("Keyed key lives too long", ! found); ensure_equals(Keyed::instanceCount(), 0); } diff --git a/indra/llcommon/tests/threadsafeschedule_test.cpp b/indra/llcommon/tests/threadsafeschedule_test.cpp new file mode 100644 index 0000000000..af67b9f492 --- /dev/null +++ b/indra/llcommon/tests/threadsafeschedule_test.cpp @@ -0,0 +1,69 @@ +/** + * @file threadsafeschedule_test.cpp + * @author Nat Goodspeed + * @date 2021-10-04 + * @brief Test for threadsafeschedule. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +// Precompiled header +#include "linden_common.h" +// associated header +#include "threadsafeschedule.h" +// STL headers +// std headers +#include <chrono> +// external library headers +// other Linden headers +#include "../test/lltut.h" + +using namespace std::literals::chrono_literals; // ms suffix +using namespace std::literals::string_literals; // s suffix +using Queue = LL::ThreadSafeSchedule<std::string>; + +/***************************************************************************** +* TUT +*****************************************************************************/ +namespace tut +{ + struct threadsafeschedule_data + { + Queue queue; + }; + typedef test_group<threadsafeschedule_data> threadsafeschedule_group; + typedef threadsafeschedule_group::object object; + threadsafeschedule_group threadsafeschedulegrp("threadsafeschedule"); + + template<> template<> + void object::test<1>() + { + set_test_name("push"); + // Simply calling push() a few times might result in indeterminate + // delivery order if the resolution of steady_clock is coarser than + // the real time required for each push() call. Explicitly increment + // the timestamp for each one -- but since we're passing explicit + // timestamps, make the queue reorder them. + queue.push(Queue::TimeTuple(Queue::Clock::now() + 20ms, "ghi")); + // Given the various push() overloads, you have to match the type + // exactly: conversions are ambiguous. + queue.push("abc"s); + queue.push(Queue::Clock::now() + 10ms, "def"); + queue.close(); + auto entry = queue.pop(); + ensure_equals("failed to pop first", std::get<0>(entry), "abc"s); + entry = queue.pop(); + ensure_equals("failed to pop second", std::get<0>(entry), "def"s); + ensure("queue not closed", queue.isClosed()); + ensure("queue prematurely done", ! queue.done()); + std::string s; + bool popped = queue.tryPopFor(1s, s); + ensure("failed to pop third", popped); + ensure_equals("third is wrong", s, "ghi"s); + popped = queue.tryPop(s); + ensure("queue not empty", ! popped); + ensure("queue not done", queue.done()); + } +} // namespace tut diff --git a/indra/llcommon/tests/tuple_test.cpp b/indra/llcommon/tests/tuple_test.cpp new file mode 100644 index 0000000000..af94e2086c --- /dev/null +++ b/indra/llcommon/tests/tuple_test.cpp @@ -0,0 +1,47 @@ +/** + * @file tuple_test.cpp + * @author Nat Goodspeed + * @date 2021-10-04 + * @brief Test for tuple. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +// Precompiled header +#include "linden_common.h" +// associated header +#include "tuple.h" +// STL headers +// std headers +// external library headers +// other Linden headers +#include "../test/lltut.h" + +/***************************************************************************** +* TUT +*****************************************************************************/ +namespace tut +{ + struct tuple_data + { + }; + typedef test_group<tuple_data> tuple_group; + typedef tuple_group::object object; + tuple_group tuplegrp("tuple"); + + template<> template<> + void object::test<1>() + { + set_test_name("tuple"); + std::tuple<std::string, int> tup{ "abc", 17 }; + std::tuple<int, std::string, int> ptup{ tuple_cons(34, tup) }; + std::tuple<std::string, int> tup2; + int i; + std::tie(i, tup2) = tuple_split(ptup); + ensure_equals("tuple_car() fail", i, 34); + ensure_equals("tuple_cdr() (0) fail", std::get<0>(tup2), "abc"); + ensure_equals("tuple_cdr() (1) fail", std::get<1>(tup2), 17); + } +} // namespace tut diff --git a/indra/llcommon/tests/workqueue_test.cpp b/indra/llcommon/tests/workqueue_test.cpp new file mode 100644 index 0000000000..ab1cae6c14 --- /dev/null +++ b/indra/llcommon/tests/workqueue_test.cpp @@ -0,0 +1,158 @@ +/** + * @file workqueue_test.cpp + * @author Nat Goodspeed + * @date 2021-10-07 + * @brief Test for workqueue. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +// Precompiled header +#include "linden_common.h" +// associated header +#include "workqueue.h" +// STL headers +// std headers +#include <chrono> +#include <deque> +// external library headers +// other Linden headers +#include "../test/lltut.h" +#include "llcond.h" +#include "llstring.h" +#include "stringize.h" + +using namespace LL; +using namespace std::literals::chrono_literals; // ms suffix +using namespace std::literals::string_literals; // s suffix + +/***************************************************************************** +* TUT +*****************************************************************************/ +namespace tut +{ + struct workqueue_data + { + WorkQueue queue{"queue"}; + }; + typedef test_group<workqueue_data> workqueue_group; + typedef workqueue_group::object object; + workqueue_group workqueuegrp("workqueue"); + + template<> template<> + void object::test<1>() + { + set_test_name("name"); + ensure_equals("didn't capture name", queue.getKey(), "queue"); + ensure("not findable", WorkQueue::getInstance("queue") == queue.getWeak().lock()); + WorkQueue q2; + ensure("has no name", LLStringUtil::startsWith(q2.getKey(), "WorkQueue")); + } + + template<> template<> + void object::test<2>() + { + set_test_name("post"); + bool wasRun{ false }; + // We only get away with binding a simple bool because we're running + // the work on the same thread. + queue.post([&wasRun](){ wasRun = true; }); + queue.close(); + ensure("ran too soon", ! wasRun); + queue.runUntilClose(); + ensure("didn't run", wasRun); + } + + template<> template<> + void object::test<3>() + { + set_test_name("postEvery"); + // record of runs + using Shared = std::deque<WorkQueue::TimePoint>; + // This is an example of how to share data between the originator of + // postEvery(work) and the work item itself, since usually a WorkQueue + // is used to dispatch work to a different thread. Neither of them + // should call any of LLCond's wait methods: you don't want to stall + // either the worker thread or the originating thread (conventionally + // main). Use LLCond or a subclass even if all you want to do is + // signal the work item that it can quit; consider LLOneShotCond. + LLCond<Shared> data; + auto start = WorkQueue::TimePoint::clock::now(); + auto interval = 100ms; + queue.postEvery( + interval, + [&data, count = 0] + () mutable + { + // record the timestamp at which this instance is running + data.update_one( + [](Shared& data) + { + data.push_back(WorkQueue::TimePoint::clock::now()); + }); + // by the 3rd call, return false to stop + return (++count < 3); + }); + // no convenient way to close() our queue while we've got a + // postEvery() running, so run until we think we should have exhausted + // the iterations + queue.runFor(10*interval); + // Take a copy of the captured deque. + Shared result = data.get(); + ensure_equals("called wrong number of times", result.size(), 3); + // postEvery() assumes you want the first call to happen right away. + // Inject a fake start time that's (interval) earlier than that, to + // make our too early/too late tests uniform for all entries. + result.push_front(start - interval); + for (size_t i = 1; i < result.size(); ++i) + { + auto diff = (result[i] - result[i-1]); + try + { + ensure(STRINGIZE("call " << i << " too soon"), diff >= interval); + ensure(STRINGIZE("call " << i << " too late"), diff < interval*1.5); + } + catch (const tut::failure&) + { + auto interval_ms = interval / 1ms; + auto diff_ms = diff / 1ms; + std::cerr << "interval " << interval_ms + << "ms; diff " << diff_ms << "ms" << std::endl; + throw; + } + } + } + + template<> template<> + void object::test<4>() + { + set_test_name("postTo"); + WorkQueue main("main"); + auto qptr = WorkQueue::getInstance("queue"); + int result = 0; + main.postTo( + qptr, + [](){ return 17; }, + // Note that a postTo() *callback* can safely bind a reference to + // a variable on the invoking thread, because the callback is run + // on the invoking thread. + [&result](int i){ result = i; }); + // this should post the callback to main + qptr->runOne(); + // this should run the callback + main.runOne(); + ensure_equals("failed to run int callback", result, 17); + + std::string alpha; + // postTo() handles arbitrary return types + main.postTo( + qptr, + [](){ return "abc"s; }, + [&alpha](const std::string& s){ alpha = s; }); + qptr->runPending(); + main.runPending(); + ensure_equals("failed to run string callback", alpha, "abc"); + } +} // namespace tut diff --git a/indra/llcommon/threadsafeschedule.h b/indra/llcommon/threadsafeschedule.h new file mode 100644 index 0000000000..c8ad23532b --- /dev/null +++ b/indra/llcommon/threadsafeschedule.h @@ -0,0 +1,373 @@ +/** + * @file threadsafeschedule.h + * @author Nat Goodspeed + * @date 2021-10-02 + * @brief ThreadSafeSchedule is an ordered queue in which every item has an + * associated timestamp. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +#if ! defined(LL_THREADSAFESCHEDULE_H) +#define LL_THREADSAFESCHEDULE_H + +#include "chrono.h" +#include "llexception.h" +#include "llthreadsafequeue.h" +#include "tuple.h" +#include <chrono> +#include <tuple> + +namespace LL +{ + namespace ThreadSafeSchedulePrivate + { + using TimePoint = std::chrono::steady_clock::time_point; + // Bundle consumer's data with a TimePoint to order items by timestamp. + template <typename... Args> + using TimestampedTuple = std::tuple<TimePoint, Args...>; + + // comparison functor for TimedTuples -- see TimedQueue comments + struct ReverseTupleOrder + { + template <typename Tuple> + bool operator()(const Tuple& left, const Tuple& right) const + { + return std::get<0>(left) > std::get<0>(right); + } + }; + + template <typename... Args> + using TimedQueue = PriorityQueueAdapter< + TimestampedTuple<Args...>, + // std::vector is the default storage for std::priority_queue, + // have to restate to specify comparison template parameter + std::vector<TimestampedTuple<Args...>>, + // std::priority_queue uses a counterintuitive comparison + // behavior: the default std::less comparator is used to present + // the *highest* value as top(). So to sort by earliest timestamp, + // we must invert by using >. + ReverseTupleOrder>; + } // namespace ThreadSafeSchedulePrivate + + /** + * ThreadSafeSchedule is an ordered LLThreadSafeQueue in which every item + * is given an associated timestamp. That is, TimePoint is implicitly + * prepended to the std::tuple with the specified types. + * + * Items are popped in increasing chronological order. Moreover, any item + * with a timestamp in the future is held back until + * std::chrono::steady_clock reaches that timestamp. + */ + template <typename... Args> + class ThreadSafeSchedule: + public LLThreadSafeQueue<ThreadSafeSchedulePrivate::TimestampedTuple<Args...>, + ThreadSafeSchedulePrivate::TimedQueue<Args...>> + { + public: + using DataTuple = std::tuple<Args...>; + using TimeTuple = ThreadSafeSchedulePrivate::TimestampedTuple<Args...>; + + private: + using super = LLThreadSafeQueue<TimeTuple, ThreadSafeSchedulePrivate::TimedQueue<Args...>>; + using lock_t = typename super::lock_t; + // VS 2017 needs this due to a bug: + // https://developercommunity.visualstudio.com/t/cannot-access-protected-enumerator-of-enclosing-cl/203430 + enum pop_result { EMPTY=super::EMPTY, DONE=super::DONE, WAITING=super::WAITING, POPPED=super::POPPED }; + + public: + using Closed = LLThreadSafeQueueInterrupt; + using TimePoint = ThreadSafeSchedulePrivate::TimePoint; + using Clock = TimePoint::clock; + + ThreadSafeSchedule(U32 capacity=1024): + super(capacity) + {} + + /*----------------------------- push() -----------------------------*/ + /// explicitly pass TimeTuple + using super::push; + + /// pass DataTuple with implicit now + // This could be ambiguous for Args with a single type. Unfortunately + // we can't enable_if an individual method with a condition based on + // the *class* template arguments, only on that method's template + // arguments. We could specialize this class for the single-Args case; + // we could minimize redundancy by breaking out a common base class... + void push(const DataTuple& tuple) + { + push(tuple_cons(Clock::now(), tuple)); + } + + /// individually pass each component of the TimeTuple + void push(const TimePoint& time, Args&&... args) + { + push(TimeTuple(time, std::forward<Args>(args)...)); + } + + /// individually pass every component except the TimePoint (implies now) + // This could be ambiguous if the first specified template parameter + // type is also TimePoint. We could try to disambiguate, but a simpler + // approach would be for the caller to explicitly construct DataTuple + // and call that overload. + void push(Args&&... args) + { + push(Clock::now(), std::forward<Args>(args)...); + } + + /*--------------------------- tryPush() ----------------------------*/ + /// explicit TimeTuple + using super::tryPush; + + /// DataTuple with implicit now + bool tryPush(const DataTuple& tuple) + { + return tryPush(tuple_cons(Clock::now(), tuple)); + } + + /// individually pass components + bool tryPush(const TimePoint& time, Args&&... args) + { + return tryPush(TimeTuple(time, std::forward<Args>(args)...)); + } + + /// individually pass components with implicit now + bool tryPush(Args&&... args) + { + return tryPush(Clock::now(), std::forward<Args>(args)...); + } + + /*-------------------------- tryPushFor() --------------------------*/ + /// explicit TimeTuple + using super::tryPushFor; + + /// DataTuple with implicit now + template <typename Rep, typename Period> + bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout, + const DataTuple& tuple) + { + return tryPushFor(timeout, tuple_cons(Clock::now(), tuple)); + } + + /// individually pass components + template <typename Rep, typename Period> + bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout, + const TimePoint& time, Args&&... args) + { + return tryPushFor(TimeTuple(time, std::forward<Args>(args)...)); + } + + /// individually pass components with implicit now + template <typename Rep, typename Period> + bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout, + Args&&... args) + { + return tryPushFor(Clock::now(), std::forward<Args>(args)...); + } + + /*------------------------- tryPushUntil() -------------------------*/ + /// explicit TimeTuple + using super::tryPushUntil; + + /// DataTuple with implicit now + template <typename Clock, typename Duration> + bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until, + const DataTuple& tuple) + { + return tryPushUntil(until, tuple_cons(Clock::now(), tuple)); + } + + /// individually pass components + template <typename Clock, typename Duration> + bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until, + const TimePoint& time, Args&&... args) + { + return tryPushUntil(until, TimeTuple(time, std::forward<Args>(args)...)); + } + + /// individually pass components with implicit now + template <typename Clock, typename Duration> + bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until, + Args&&... args) + { + return tryPushUntil(until, Clock::now(), std::forward<Args>(args)...); + } + + /*----------------------------- pop() ------------------------------*/ + // Our consumer may or may not care about the timestamp associated + // with each popped item, so we allow retrieving either DataTuple or + // TimeTuple. One potential use would be to observe, and possibly + // adjust for, the time lag between the item time and the actual + // current time. + + /// pop DataTuple by value + // It would be great to notice when sizeof...(Args) == 1 and directly + // return the first (only) value, instead of making pop()'s caller + // call std::get<0>(value). See push(DataTuple) remarks for why we + // haven't yet jumped through those hoops. + DataTuple pop() + { + return tuple_cdr(popWithTime()); + } + + /// pop TimeTuple by value + TimeTuple popWithTime() + { + lock_t lock(super::mLock); + // We can't just sit around waiting forever, given that there may + // be items in the queue that are not yet ready but will *become* + // ready in the near future. So in fact, with this class, every + // pop() becomes a tryPopUntil(), constrained to the timestamp of + // the head item. It almost doesn't matter what we specify for the + // caller's time constraint -- all we really care about is the + // head item's timestamp. Since pop() and popWithTime() are + // defined to wait until either an item becomes available or the + // queue is closed, loop until one of those things happens. The + // constraint we pass just determines how often we'll loop while + // waiting. + TimeTuple tt; + while (true) + { + // Pick a point suitably far into the future. + TimePoint until = TimePoint::clock::now() + std::chrono::hours(24); + pop_result popped = tryPopUntil_(lock, until, tt); + if (popped == POPPED) + return std::move(tt); + + // DONE: throw, just as super::pop() does + if (popped == DONE) + { + LLTHROW(LLThreadSafeQueueInterrupt()); + } + // WAITING: we've still got items to drain. + // EMPTY: not closed, so it's worth waiting for more items. + // Either way, loop back to wait. + } + } + + // We can use tryPop(TimeTuple&) just as it stands; the only behavior + // difference is in our canPop() override method. + using super::tryPop; + + /// tryPop(DataTuple&) + bool tryPop(DataTuple& tuple) + { + TimeTuple tt; + if (! super::tryPop(tt)) + return false; + tuple = tuple_cdr(std::move(tt)); + return true; + } + + /// for when Args has exactly one type + bool tryPop(typename std::tuple_element<1, TimeTuple>::type& value) + { + TimeTuple tt; + if (! super::tryPop(tt)) + return false; + value = std::get<1>(std::move(tt)); + return true; + } + + /// tryPopFor() + template <typename Rep, typename Period, typename Tuple> + bool tryPopFor(const std::chrono::duration<Rep, Period>& timeout, Tuple& tuple) + { + // It's important to use OUR tryPopUntil() implementation, rather + // than delegating immediately to our base class. + return tryPopUntil(Clock::now() + timeout, tuple); + } + + /// tryPopUntil(TimeTuple&) + template <typename Clock, typename Duration> + bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until, + TimeTuple& tuple) + { + // super::tryPopUntil() wakes up when an item becomes available or + // we hit 'until', whichever comes first. Thing is, the current + // head of the queue could become ready sooner than either of + // those events, and we need to deliver it as soon as it does. + // Don't wait past the TimePoint of the head item. + // Naturally, lock the queue before peeking at mStorage. + return super::tryLockUntil( + until, + [this, until, &tuple](lock_t& lock) + { + // Use our time_point_cast to allow for 'until' that's a + // time_point type other than TimePoint. + return POPPED == + tryPopUntil_(lock, LL::time_point_cast<TimePoint>(until), tuple); + }); + } + + pop_result tryPopUntil_(lock_t& lock, const TimePoint& until, TimeTuple& tuple) + { + TimePoint adjusted = until; + if (! super::mStorage.empty()) + { + // use whichever is earlier: the head item's timestamp, or + // the caller's limit + adjusted = min(std::get<0>(super::mStorage.front()), adjusted); + } + // now delegate to base-class tryPopUntil_() + pop_result popped; + while ((popped = pop_result(super::tryPopUntil_(lock, adjusted, tuple))) == WAITING) + { + // If super::tryPopUntil_() returns WAITING, it means there's + // a head item, but it's not yet time. But it's worth looping + // back to recheck. + } + return popped; + } + + /// tryPopUntil(DataTuple&) + template <typename Clock, typename Duration> + bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until, + DataTuple& tuple) + { + TimeTuple tt; + if (! tryPopUntil(until, tt)) + return false; + tuple = tuple_cdr(std::move(tt)); + return true; + } + + /// for when Args has exactly one type + template <typename Clock, typename Duration> + bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until, + typename std::tuple_element<1, TimeTuple>::type& value) + { + TimeTuple tt; + if (! tryPopUntil(until, tt)) + return false; + value = std::get<1>(std::move(tt)); + return true; + } + + /*------------------------------ etc. ------------------------------*/ + // We can't hide items that aren't yet ready because we can't traverse + // the underlying priority_queue: it has no iterators, only top(). So + // a consumer could observe size() > 0 and yet tryPop() returns false. + // Shrug, in a multi-consumer scenario that would be expected behavior. + using super::size; + // open/closed state + using super::close; + using super::isClosed; + using super::done; + + private: + // this method is called by base class pop_() every time we're + // considering whether to deliver the current head element + bool canPop(const TimeTuple& head) const override + { + // an item with a future timestamp isn't yet ready to pop + // (should we add some slop for overhead?) + return std::get<0>(head) <= Clock::now(); + } + }; + +} // namespace LL + +#endif /* ! defined(LL_THREADSAFESCHEDULE_H) */ diff --git a/indra/llcommon/tuple.h b/indra/llcommon/tuple.h new file mode 100644 index 0000000000..bfe7e3c2ba --- /dev/null +++ b/indra/llcommon/tuple.h @@ -0,0 +1,84 @@ +/** + * @file tuple.h + * @author Nat Goodspeed + * @date 2021-10-04 + * @brief A couple tuple utilities + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +#if ! defined(LL_TUPLE_H) +#define LL_TUPLE_H + +#include <tuple> +#include <type_traits> // std::remove_reference +#include <utility> // std::pair + +/** + * tuple_cons() behaves like LISP cons: it uses std::tuple_cat() to prepend a + * new item of arbitrary type to an existing std::tuple. + */ +template <typename First, typename... Rest, typename Tuple_=std::tuple<Rest...>> +auto tuple_cons(First&& first, Tuple_&& rest) +{ + // All we need to do is make a tuple containing 'first', and let + // tuple_cat() do the hard part. + return std::tuple_cat(std::tuple<First>(std::forward<First>(first)), + std::forward<Tuple_>(rest)); +} + +/** + * tuple_car() behaves like LISP car: it extracts the first item from a + * std::tuple. + */ +template <typename... Args, typename Tuple_=std::tuple<Args...>> +auto tuple_car(Tuple_&& tuple) +{ + return std::get<0>(std::forward<Tuple_>(tuple)); +} + +/** + * tuple_cdr() behaves like LISP cdr: it returns a new tuple containing + * everything BUT the first item. + */ +// derived from https://stackoverflow.com/a/24046437 +template <typename Tuple, std::size_t... Indices> +auto tuple_cdr_(Tuple&& tuple, const std::index_sequence<Indices...>) +{ + // Given an index sequence from [0..N-1), extract tuple items [1..N) + return std::make_tuple(std::get<Indices+1u>(std::forward<Tuple>(tuple))...); +} + +template <typename Tuple> +auto tuple_cdr(Tuple&& tuple) +{ + return tuple_cdr_( + std::forward<Tuple>(tuple), + // Pass helper function an index sequence one item shorter than tuple + std::make_index_sequence< + std::tuple_size< + // tuple_size doesn't like reference types + typename std::remove_reference<Tuple>::type + >::value - 1u> + ()); +} + +/** + * tuple_split(), the opposite of tuple_cons(), has no direct analog in LISP. + * It returns a std::pair of tuple_car(), tuple_cdr(). We could call this + * function tuple_car_cdr(), or tuple_slice() or some such. But tuple_split() + * feels more descriptive. + */ +template <typename... Args, typename Tuple_=std::tuple<Args...>> +auto tuple_split(Tuple_&& tuple) +{ + // We're not really worried about forwarding multiple times a tuple that + // might contain move-only items, because the implementation above only + // applies std::get() exactly once to each item. + return std::make_pair(tuple_car(std::forward<Tuple_>(tuple)), + tuple_cdr(std::forward<Tuple_>(tuple))); +} + +#endif /* ! defined(LL_TUPLE_H) */ diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp new file mode 100644 index 0000000000..15e292fb43 --- /dev/null +++ b/indra/llcommon/workqueue.cpp @@ -0,0 +1,114 @@ +/** + * @file workqueue.cpp + * @author Nat Goodspeed + * @date 2021-10-06 + * @brief Implementation for WorkQueue. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +// Precompiled header +#include "linden_common.h" +// associated header +#include "workqueue.h" +// STL headers +// std headers +// external library headers +// other Linden headers +#include "llerror.h" +#include "llexception.h" +#include "stringize.h" + +LL::WorkQueue::WorkQueue(const std::string& name): + super(makeName(name)) +{ + // TODO: register for "LLApp" events so we can implicitly close() on + // viewer shutdown. +} + +void LL::WorkQueue::close() +{ + mQueue.close(); +} + +void LL::WorkQueue::runUntilClose() +{ + try + { + for (;;) + { + callWork(mQueue.pop()); + } + } + catch (const Queue::Closed&) + { + } +} + +bool LL::WorkQueue::runPending() +{ + for (Work work; mQueue.tryPop(work); ) + { + callWork(work); + } + return ! mQueue.done(); +} + +bool LL::WorkQueue::runOne() +{ + Work work; + if (mQueue.tryPop(work)) + { + callWork(work); + } + return ! mQueue.done(); +} + +bool LL::WorkQueue::runUntil(const TimePoint& until) +{ + // Should we subtract some slop to allow for typical Work execution time? + // How much slop? + Work work; + while (TimePoint::clock::now() < until && mQueue.tryPopUntil(until, work)) + { + callWork(work); + } + return ! mQueue.done(); +} + +std::string LL::WorkQueue::makeName(const std::string& name) +{ + if (! name.empty()) + return name; + + static thread_local U32 discriminator = 0; + return STRINGIZE("WorkQueue" << discriminator++); +} + +void LL::WorkQueue::callWork(const Queue::DataTuple& work) +{ + // ThreadSafeSchedule::pop() always delivers a tuple, even when + // there's only one data field per item, as for us. + callWork(std::get<0>(work)); +} + +void LL::WorkQueue::callWork(const Work& work) +{ + try + { + work(); + } + catch (...) + { + // No matter what goes wrong with any individual work item, the worker + // thread must go on! Log our own instance name with the exception. + LOG_UNHANDLED_EXCEPTION(getKey()); + } +} + +void LL::WorkQueue::error(const std::string& msg) +{ + LL_ERRS("WorkQueue") << msg << LL_ENDL; +} diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h new file mode 100644 index 0000000000..a52f7b0e26 --- /dev/null +++ b/indra/llcommon/workqueue.h @@ -0,0 +1,325 @@ +/** + * @file workqueue.h + * @author Nat Goodspeed + * @date 2021-09-30 + * @brief Queue used for inter-thread work passing. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +#if ! defined(LL_WORKQUEUE_H) +#define LL_WORKQUEUE_H + +#include "llinstancetracker.h" +#include "threadsafeschedule.h" +#include <chrono> +#include <functional> // std::function +#include <queue> +#include <string> +#include <utility> // std::pair +#include <vector> + +namespace LL +{ + /** + * A typical WorkQueue has a string name that can be used to find it. + */ + class WorkQueue: public LLInstanceTracker<WorkQueue, std::string> + { + private: + using super = LLInstanceTracker<WorkQueue, std::string>; + + public: + using Work = std::function<void()>; + + private: + using Queue = ThreadSafeSchedule<Work>; + // helper for postEvery() + template <typename Rep, typename Period, typename CALLABLE> + class BackJack; + + public: + using TimePoint = Queue::TimePoint; + using TimedWork = Queue::TimeTuple; + using Closed = Queue::Closed; + + /** + * You may omit the WorkQueue name, in which case a unique name is + * synthesized; for practical purposes that makes it anonymous. + */ + WorkQueue(const std::string& name = std::string()); + + /** + * Since the point of WorkQueue is to pass work to some other worker + * thread(s) asynchronously, it's important that the WorkQueue continue + * to exist until the worker thread(s) have drained it. To communicate + * that it's time for them to quit, close() the queue. + */ + void close(); + + /*---------------------- fire and forget API -----------------------*/ + + /// fire-and-forget, but at a particular (future?) time + template <typename CALLABLE> + void post(const TimePoint& time, CALLABLE&& callable) + { + // Defer reifying an arbitrary CALLABLE until we hit this method. + // All other methods should accept CALLABLEs of arbitrary type to + // avoid multiple levels of std::function indirection. + mQueue.push(TimedWork(time, std::move(callable))); + } + + /// fire-and-forget + template <typename CALLABLE> + void post(CALLABLE&& callable) + { + // We use TimePoint::clock::now() instead of TimePoint's + // representation of the epoch because this WorkQueue may contain + // a mix of past-due TimedWork items and TimedWork items scheduled + // for the future. Sift this new item into the correct place. + post(TimePoint::clock::now(), std::move(callable)); + } + + /** + * Launch a callable returning bool that will trigger repeatedly at + * specified interval, until the callable returns false. + * + * If you need to signal that callable from outside, DO NOT bind a + * reference to a simple bool! That's not thread-safe. Instead, bind + * an LLCond variant, e.g. LLOneShotCond or LLBoolCond. + */ + template <typename Rep, typename Period, typename CALLABLE> + void postEvery(const std::chrono::duration<Rep, Period>& interval, + CALLABLE&& callable); + + /*------------------------- handshake API --------------------------*/ + + /** + * Post work to another WorkQueue to be run at a specified time, + * requesting a specific callback to be run on this WorkQueue on + * completion. + * + * Returns true if able to post, false if the other WorkQueue is + * inaccessible. + */ + template <typename CALLABLE, typename CALLBACK> + bool postTo(std::weak_ptr<WorkQueue> target, + const TimePoint& time, CALLABLE&& callable, CALLBACK&& callback) + { + // We're being asked to post to the WorkQueue at target. + // target is a weak_ptr: have to lock it to check it. + auto tptr = target.lock(); + if (! tptr) + // can't post() if the target WorkQueue has been destroyed + return false; + + // Here we believe target WorkQueue still exists. Post to it a + // lambda that packages our callable, our callback and a weak_ptr + // to this originating WorkQueue. + tptr->post( + time, + [reply = super::getWeak(), + callable = std::move(callable), + callback = std::move(callback)] + () + { + // Call the callable in any case -- but to minimize + // copying the result, immediately bind it into a reply + // lambda. The reply lambda also binds the original + // callback, so that when we, the originating WorkQueue, + // finally receive and process the reply lambda, we'll + // call the bound callback with the bound result -- on the + // same thread that originally called postTo(). + auto rlambda = + [result = callable(), + callback = std::move(callback)] + () + { callback(std::move(result)); }; + // Check if this originating WorkQueue still exists. + // Remember, the outer lambda is now running on a thread + // servicing the target WorkQueue, and real time has + // elapsed since postTo()'s tptr->post() call. + // reply is a weak_ptr: have to lock it to check it. + auto rptr = reply.lock(); + if (rptr) + { + // Only post reply lambda if the originating WorkQueue + // still exists. If not -- who would we tell? Log it? + try + { + rptr->post(std::move(rlambda)); + } + catch (const Closed&) + { + // Originating WorkQueue might still exist, but + // might be Closed. Same thing: just discard the + // callback. + } + } + }); + // looks like we were able to post() + return true; + } + + /** + * Post work to another WorkQueue, requesting a specific callback to + * be run on this WorkQueue on completion. + * + * Returns true if able to post, false if the other WorkQueue is + * inaccessible. + */ + template <typename CALLABLE, typename CALLBACK> + bool postTo(std::weak_ptr<WorkQueue> target, + CALLABLE&& callable, CALLBACK&& callback) + { + return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); + } + + /*--------------------------- worker API ---------------------------*/ + + /** + * runUntilClose() pulls TimedWork items off this WorkQueue until the + * queue is closed, at which point it returns. This would be the + * typical entry point for a simple worker thread. + */ + void runUntilClose(); + + /** + * runPending() runs all TimedWork items that are ready to run. It + * returns true if the queue remains open, false if the queue has been + * closed. This could be used by a thread whose primary purpose is to + * serve the queue, but also wants to do other things with its idle time. + */ + bool runPending(); + + /** + * runOne() runs at most one ready TimedWork item -- zero if none are + * ready. It returns true if the queue remains open, false if the + * queue has been closed. + */ + bool runOne(); + + /** + * runFor() runs a subset of ready TimedWork items, until the + * timeslice has been exceeded. It returns true if the queue remains + * open, false if the queue has been closed. This could be used by a + * busy main thread to lend a bounded few CPU cycles to this WorkQueue + * without risking the WorkQueue blowing out the length of any one + * frame. + */ + template <typename Rep, typename Period> + bool runFor(const std::chrono::duration<Rep, Period>& timeslice) + { + return runUntil(TimePoint::clock::now() + timeslice); + } + + /** + * runUntil() is just like runFor(), only with a specific end time + * instead of a timeslice duration. + */ + bool runUntil(const TimePoint& until); + + private: + static void error(const std::string& msg); + static std::string makeName(const std::string& name); + void callWork(const Queue::DataTuple& work); + void callWork(const Work& work); + Queue mQueue; + }; + + /** + * BackJack is, in effect, a hand-rolled lambda, binding a WorkQueue, a + * CALLABLE that returns bool, a TimePoint and an interval at which to + * relaunch it. As long as the callable continues returning true, BackJack + * keeps resubmitting it to the target WorkQueue. + */ + // Why is BackJack a class and not a lambda? Because, unlike a lambda, a + // class method gets its own 'this' pointer -- which we need to resubmit + // the whole BackJack callable. + template <typename Rep, typename Period, typename CALLABLE> + class WorkQueue::BackJack + { + public: + // bind the desired data + BackJack(std::weak_ptr<WorkQueue> target, + const WorkQueue::TimePoint& start, + const std::chrono::duration<Rep, Period>& interval, + CALLABLE&& callable): + mTarget(target), + mStart(start), + mInterval(interval), + mCallable(std::move(callable)) + {} + + // Call by target WorkQueue -- note that although WE require a + // callable returning bool, WorkQueue wants a void callable. We + // consume the bool. + void operator()() + { + // If mCallable() throws an exception, don't catch it here: if it + // throws once, it's likely to throw every time, so it's a waste + // of time to arrange to call it again. + if (mCallable()) + { + // Modify mStart to the new start time we desire. If we simply + // added mInterval to now, we'd get actual timings of + // (mInterval + slop), where 'slop' is the latency between the + // previous mStart and the WorkQueue actually calling us. + // Instead, add mInterval to mStart so that at least we + // register our intent to fire at exact mIntervals. + mStart += mInterval; + + // We're being called at this moment by the target WorkQueue. + // Assume it still exists, rather than checking the result of + // lock(). + // Resubmit the whole *this callable: that's why we're a class + // rather than a lambda. Allow moving *this so we can carry a + // move-only callable; but naturally this statement must be + // the last time we reference this instance, which may become + // moved-from. + try + { + mTarget.lock()->post(mStart, std::move(*this)); + } + catch (const Closed&) + { + // Once this queue is closed, oh well, just stop + } + } + } + + private: + std::weak_ptr<WorkQueue> mTarget; + WorkQueue::TimePoint mStart; + std::chrono::duration<Rep, Period> mInterval; + CALLABLE mCallable; + }; + + template <typename Rep, typename Period, typename CALLABLE> + void WorkQueue::postEvery(const std::chrono::duration<Rep, Period>& interval, + CALLABLE&& callable) + { + if (interval.count() <= 0) + { + // It's essential that postEvery() be called with a positive + // interval, since each call to BackJack posts another instance of + // itself at (start + interval) and we order by target time. A + // zero or negative interval would result in that BackJack + // instance going to the head of the queue every time, immediately + // ready to run. Effectively that would produce an infinite loop, + // a denial of service on this WorkQueue. + error("postEvery(interval) may not be 0"); + } + // Instantiate and post a suitable BackJack, binding a weak_ptr to + // self, the current time, the desired interval and the desired + // callable. + post( + BackJack<Rep, Period, CALLABLE>( + getWeak(), TimePoint::clock::now(), interval, std::move(callable))); + } + +} // namespace LL + +#endif /* ! defined(LL_WORKQUEUE_H) */ |