summaryrefslogtreecommitdiff
path: root/indra/llcommon
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llcommon')
-rw-r--r--indra/llcommon/CMakeLists.txt8
-rw-r--r--indra/llcommon/chrono.h65
-rw-r--r--indra/llcommon/llcond.h24
-rw-r--r--indra/llcommon/llinstancetracker.h59
-rw-r--r--indra/llcommon/llleaplistener.cpp2
-rw-r--r--indra/llcommon/llthreadsafequeue.h511
-rw-r--r--indra/llcommon/tests/llinstancetracker_test.cpp14
-rw-r--r--indra/llcommon/tests/threadsafeschedule_test.cpp69
-rw-r--r--indra/llcommon/tests/tuple_test.cpp47
-rw-r--r--indra/llcommon/tests/workqueue_test.cpp158
-rw-r--r--indra/llcommon/threadsafeschedule.h373
-rw-r--r--indra/llcommon/tuple.h84
-rw-r--r--indra/llcommon/workqueue.cpp114
-rw-r--r--indra/llcommon/workqueue.h325
14 files changed, 1684 insertions, 169 deletions
diff --git a/indra/llcommon/CMakeLists.txt b/indra/llcommon/CMakeLists.txt
index 066d0404ac..fda43dd24c 100644
--- a/indra/llcommon/CMakeLists.txt
+++ b/indra/llcommon/CMakeLists.txt
@@ -123,12 +123,14 @@ set(llcommon_SOURCE_FILES
llworkerthread.cpp
timing.cpp
u64.cpp
+ workqueue.cpp
StackWalker.cpp
)
set(llcommon_HEADER_FILES
CMakeLists.txt
+ chrono.h
ctype_workaround.h
fix_macros.h
indra_constants.h
@@ -256,8 +258,11 @@ set(llcommon_HEADER_FILES
lockstatic.h
stdtypes.h
stringize.h
+ threadsafeschedule.h
timer.h
+ tuple.h
u64.h
+ workqueue.h
StackWalker.h
)
@@ -362,6 +367,9 @@ if (LL_TESTS)
LL_ADD_INTEGRATION_TEST(lluri "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llunits "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(stringize "" "${test_libs}")
+ LL_ADD_INTEGRATION_TEST(threadsafeschedule "" "${test_libs}")
+ LL_ADD_INTEGRATION_TEST(tuple "" "${test_libs}")
+ LL_ADD_INTEGRATION_TEST(workqueue "" "${test_libs}")
## llexception_test.cpp isn't a regression test, and doesn't need to be run
## every build. It's to help a developer make implementation choices about
diff --git a/indra/llcommon/chrono.h b/indra/llcommon/chrono.h
new file mode 100644
index 0000000000..806e871892
--- /dev/null
+++ b/indra/llcommon/chrono.h
@@ -0,0 +1,65 @@
+/**
+ * @file chrono.h
+ * @author Nat Goodspeed
+ * @date 2021-10-05
+ * @brief supplement <chrono> with utility functions
+ *
+ * $LicenseInfo:firstyear=2021&license=viewerlgpl$
+ * Copyright (c) 2021, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+#if ! defined(LL_CHRONO_H)
+#define LL_CHRONO_H
+
+#include <chrono>
+#include <type_traits> // std::enable_if
+
+namespace LL
+{
+
+// time_point_cast() is derived from https://stackoverflow.com/a/35293183
+// without the iteration: we think errors in the ~1 microsecond range are
+// probably acceptable.
+
+// This variant is for the optimal case when the source and dest use the same
+// clock: that case is handled by std::chrono.
+template <typename DestTimePoint, typename SrcTimePoint,
+ typename std::enable_if<std::is_same<typename DestTimePoint::clock,
+ typename SrcTimePoint::clock>::value,
+ bool>::type = true>
+DestTimePoint time_point_cast(const SrcTimePoint& time)
+{
+ return std::chrono::time_point_cast<typename DestTimePoint::duration>(time);
+}
+
+// This variant is for when the source and dest use different clocks -- see
+// the linked StackOverflow answer, also Howard Hinnant's, for more context.
+template <typename DestTimePoint, typename SrcTimePoint,
+ typename std::enable_if<! std::is_same<typename DestTimePoint::clock,
+ typename SrcTimePoint::clock>::value,
+ bool>::type = true>
+DestTimePoint time_point_cast(const SrcTimePoint& time)
+{
+ // The basic idea is that we must adjust the passed time_point by the
+ // difference between the clocks' epochs. But since time_point doesn't
+ // expose its epoch, we fall back on what each of them thinks is now().
+ // However, since we necessarily make sequential calls to those now()
+ // functions, the answers differ not only by the cycles spent executing
+ // those calls, but by potential OS interruptions between them. Try to
+ // reduce that error by capturing the source clock time both before and
+ // after the dest clock, and splitting the difference. Of course an
+ // interruption between two of these now() calls without a comparable
+ // interruption between the other two will skew the result, but better is
+ // more expensive.
+ const auto src_before = typename SrcTimePoint::clock::now();
+ const auto dest_now = typename DestTimePoint::clock::now();
+ const auto src_after = typename SrcTimePoint::clock::now();
+ const auto src_diff = src_after - src_before;
+ const auto src_now = src_before + src_diff / 2;
+ return dest_now + (time - src_now);
+}
+
+} // namespace LL
+
+#endif /* ! defined(LL_CHRONO_H) */
diff --git a/indra/llcommon/llcond.h b/indra/llcommon/llcond.h
index e31b67d893..c08acb66a1 100644
--- a/indra/llcommon/llcond.h
+++ b/indra/llcommon/llcond.h
@@ -53,6 +53,8 @@ private:
LLCoros::Mutex mMutex;
// Use LLCoros::ConditionVariable for the same reason.
LLCoros::ConditionVariable mCond;
+ using LockType = LLCoros::LockType;
+ using cv_status = LLCoros::cv_status;
public:
/// LLCond can be explicitly initialized with a specific value for mData if
@@ -65,10 +67,14 @@ public:
LLCond(const LLCond&) = delete;
LLCond& operator=(const LLCond&) = delete;
- /// get() returns a const reference to the stored DATA. The only way to
- /// get a non-const reference -- to modify the stored DATA -- is via
- /// update_one() or update_all().
- const value_type& get() const { return mData; }
+ /// get() returns the stored DATA by value -- so to use get(), DATA must
+ /// be copyable. The only way to get a non-const reference -- to modify
+ /// the stored DATA -- is via update_one() or update_all().
+ value_type get()
+ {
+ LockType lk(mMutex);
+ return mData;
+ }
/**
* Pass update_one() an invocable accepting non-const (DATA&). The
@@ -83,7 +89,7 @@ public:
void update_one(MODIFY modify)
{
{ // scope of lock can/should end before notify_one()
- LLCoros::LockType lk(mMutex);
+ LockType lk(mMutex);
modify(mData);
}
mCond.notify_one();
@@ -102,7 +108,7 @@ public:
void update_all(MODIFY modify)
{
{ // scope of lock can/should end before notify_all()
- LLCoros::LockType lk(mMutex);
+ LockType lk(mMutex);
modify(mData);
}
mCond.notify_all();
@@ -118,7 +124,7 @@ public:
template <typename Pred>
void wait(Pred pred)
{
- LLCoros::LockType lk(mMutex);
+ LockType lk(mMutex);
// We must iterate explicitly since the predicate accepted by
// condition_variable::wait() requires a different signature:
// condition_variable::wait() calls its predicate with no arguments.
@@ -205,14 +211,14 @@ private:
template <typename Clock, typename Duration, typename Pred>
bool wait_until(const std::chrono::time_point<Clock, Duration>& timeout_time, Pred pred)
{
- LLCoros::LockType lk(mMutex);
+ LockType lk(mMutex);
// We advise the caller to pass a predicate accepting (const DATA&).
// But what if they instead pass a predicate accepting non-const
// (DATA&)? Such a predicate could modify mData, which would be Bad.
// Forbid that.
while (! pred(const_cast<const value_type&>(mData)))
{
- if (LLCoros::cv_status::timeout == mCond.wait_until(lk, timeout_time))
+ if (cv_status::timeout == mCond.wait_until(lk, timeout_time))
{
// It's possible that wait_until() timed out AND the predicate
// became true more or less simultaneously. Even though
diff --git a/indra/llcommon/llinstancetracker.h b/indra/llcommon/llinstancetracker.h
index 402333cca7..02535a59e7 100644
--- a/indra/llcommon/llinstancetracker.h
+++ b/indra/llcommon/llinstancetracker.h
@@ -83,13 +83,34 @@ class LLInstanceTracker
typedef llthread::LockStatic<StaticData> LockStatic;
public:
+ using ptr_t = std::shared_ptr<T>;
+ using weak_t = std::weak_ptr<T>;
+
+ /**
+ * Storing a dumb T* somewhere external is a bad idea, since
+ * LLInstanceTracker subclasses are explicitly destroyed rather than
+ * managed by smart pointers. It's legal to declare stack instances of an
+ * LLInstanceTracker subclass. But it's reasonable to store a
+ * std::weak_ptr<T>, which will become invalid when the T instance is
+ * destroyed.
+ */
+ weak_t getWeak()
+ {
+ return mSelf;
+ }
+
+ static S32 instanceCount()
+ {
+ return LockStatic()->mMap.size();
+ }
+
// snapshot of std::pair<const KEY, std::shared_ptr<T>> pairs
class snapshot
{
// It's very important that what we store in this snapshot are
// weak_ptrs, NOT shared_ptrs. That's how we discover whether any
// instance has been deleted during the lifespan of a snapshot.
- typedef std::vector<std::pair<const KEY, std::weak_ptr<T>>> VectorType;
+ typedef std::vector<std::pair<const KEY, weak_t>> VectorType;
// Dereferencing our iterator produces a std::shared_ptr for each
// instance that still exists. Since we store weak_ptrs, that involves
// two chained transformations:
@@ -98,7 +119,7 @@ public:
// It is very important that we filter lazily, that is, during
// traversal. Any one of our stored weak_ptrs might expire during
// traversal.
- typedef std::pair<const KEY, std::shared_ptr<T>> strong_pair;
+ typedef std::pair<const KEY, ptr_t> strong_pair;
// Note for future reference: nat has not yet had any luck (up to
// Boost 1.67) trying to use boost::transform_iterator with a hand-
// coded functor, only with actual functions. In my experience, an
@@ -202,17 +223,12 @@ public:
iterator end() { return iterator(snapshot::end(), key_getter); }
};
- static T* getInstance(const KEY& k)
+ static ptr_t getInstance(const KEY& k)
{
LockStatic lock;
const InstanceMap& map(lock->mMap);
typename InstanceMap::const_iterator found = map.find(k);
- return (found == map.end()) ? NULL : found->second.get();
- }
-
- static S32 instanceCount()
- {
- return LockStatic()->mMap.size();
+ return (found == map.end()) ? NULL : found->second;
}
protected:
@@ -222,7 +238,9 @@ protected:
// shared_ptr, so give it a no-op deleter. We store shared_ptrs in our
// InstanceMap specifically so snapshot can store weak_ptrs so we can
// detect deletions during traversals.
- std::shared_ptr<T> ptr(static_cast<T*>(this), [](T*){});
+ ptr_t ptr(static_cast<T*>(this), [](T*){});
+ // save corresponding weak_ptr for future reference
+ mSelf = ptr;
LockStatic lock;
add_(lock, key, ptr);
}
@@ -257,7 +275,7 @@ private:
static std::string report(const char* key) { return report(std::string(key)); }
// caller must instantiate LockStatic
- void add_(LockStatic& lock, const KEY& key, const std::shared_ptr<T>& ptr)
+ void add_(LockStatic& lock, const KEY& key, const ptr_t& ptr)
{
mInstanceKey = key;
InstanceMap& map = lock->mMap;
@@ -281,7 +299,7 @@ private:
break;
}
}
- std::shared_ptr<T> remove_(LockStatic& lock)
+ ptr_t remove_(LockStatic& lock)
{
InstanceMap& map = lock->mMap;
typename InstanceMap::iterator iter = map.find(mInstanceKey);
@@ -295,6 +313,9 @@ private:
}
private:
+ // Storing a weak_ptr to self is a bit like deriving from
+ // std::enable_shared_from_this(), except more explicit.
+ weak_t mSelf;
KEY mInstanceKey;
};
@@ -326,6 +347,9 @@ class LLInstanceTracker<T, void, KEY_COLLISION_BEHAVIOR>
typedef llthread::LockStatic<StaticData> LockStatic;
public:
+ using ptr_t = std::shared_ptr<T>;
+ using weak_t = std::weak_ptr<T>;
+
/**
* Storing a dumb T* somewhere external is a bad idea, since
* LLInstanceTracker subclasses are explicitly destroyed rather than
@@ -334,12 +358,15 @@ public:
* std::weak_ptr<T>, which will become invalid when the T instance is
* destroyed.
*/
- std::weak_ptr<T> getWeak()
+ weak_t getWeak()
{
return mSelf;
}
- static S32 instanceCount() { return LockStatic()->mSet.size(); }
+ static S32 instanceCount()
+ {
+ return LockStatic()->mSet.size();
+ }
// snapshot of std::shared_ptr<T> pointers
class snapshot
@@ -347,7 +374,7 @@ public:
// It's very important that what we store in this snapshot are
// weak_ptrs, NOT shared_ptrs. That's how we discover whether any
// instance has been deleted during the lifespan of a snapshot.
- typedef std::vector<std::weak_ptr<T>> VectorType;
+ typedef std::vector<weak_t> VectorType;
// Dereferencing our iterator produces a std::shared_ptr for each
// instance that still exists. Since we store weak_ptrs, that involves
// two chained transformations:
@@ -453,7 +480,7 @@ protected:
private:
// Storing a weak_ptr to self is a bit like deriving from
// std::enable_shared_from_this(), except more explicit.
- std::weak_ptr<T> mSelf;
+ weak_t mSelf;
};
#endif
diff --git a/indra/llcommon/llleaplistener.cpp b/indra/llcommon/llleaplistener.cpp
index 3e6ce9092c..11bfec1b31 100644
--- a/indra/llcommon/llleaplistener.cpp
+++ b/indra/llcommon/llleaplistener.cpp
@@ -220,7 +220,7 @@ void LLLeapListener::getAPI(const LLSD& request) const
{
Response reply(LLSD(), request);
- LLEventAPI* found = LLEventAPI::getInstance(request["api"]);
+ auto found = LLEventAPI::getInstance(request["api"]);
if (found)
{
reply["name"] = found->getName();
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h
index 26e0d71d31..719edcd579 100644
--- a/indra/llcommon/llthreadsafequeue.h
+++ b/indra/llcommon/llthreadsafequeue.h
@@ -1,6 +1,6 @@
/**
* @file llthreadsafequeue.h
- * @brief Base classes for thread, mutex and condition handling.
+ * @brief Queue protected with mutexes for cross-thread use
*
* $LicenseInfo:firstyear=2004&license=viewerlgpl$
* Second Life Viewer Source Code
@@ -27,16 +27,19 @@
#ifndef LL_LLTHREADSAFEQUEUE_H
#define LL_LLTHREADSAFEQUEUE_H
-#include "llexception.h"
-#include <deque>
-#include <string>
-#include <chrono>
-#include "mutex.h"
#include "llcoros.h"
#include LLCOROS_MUTEX_HEADER
#include <boost/fiber/timed_mutex.hpp>
#include LLCOROS_CONDVAR_HEADER
+#include "llexception.h"
+#include "mutex.h"
+#include <chrono>
+#include <queue>
+#include <string>
+/*****************************************************************************
+* LLThreadSafeQueue
+*****************************************************************************/
//
// A general queue exception.
//
@@ -66,70 +69,108 @@ public:
}
};
-//
-// Implements a thread safe FIFO.
-//
-template<typename ElementT>
+/**
+ * Implements a thread safe FIFO.
+ */
+// Let the default std::queue default to underlying std::deque. Override if
+// desired.
+template<typename ElementT, typename QueueT=std::queue<ElementT>>
class LLThreadSafeQueue
{
public:
typedef ElementT value_type;
-
- // If the pool is set to NULL one will be allocated and managed by this
- // queue.
+
+ // Limiting the number of pending items prevents unbounded growth of the
+ // underlying queue.
LLThreadSafeQueue(U32 capacity = 1024);
-
- // Add an element to the front of queue (will block if the queue has
+ virtual ~LLThreadSafeQueue() {}
+
+ // Add an element to the queue (will block if the queue has
// reached capacity).
//
// This call will raise an interrupt error if the queue is closed while
// the caller is blocked.
- void pushFront(ElementT const & element);
-
- // Try to add an element to the front of queue without blocking. Returns
+ template <typename T>
+ void push(T&& element);
+ // legacy name
+ void pushFront(ElementT const & element) { return push(element); }
+
+ // Try to add an element to the queue without blocking. Returns
// true only if the element was actually added.
- bool tryPushFront(ElementT const & element);
+ template <typename T>
+ bool tryPush(T&& element);
+ // legacy name
+ bool tryPushFront(ElementT const & element) { return tryPush(element); }
- // Try to add an element to the front of queue, blocking if full but with
- // timeout. Returns true if the element was added.
+ // Try to add an element to the queue, blocking if full but with timeout
+ // after specified duration. Returns true if the element was added.
// There are potentially two different timeouts involved: how long to try
// to lock the mutex, versus how long to wait for the queue to stop being
// full. Careful settings for each timeout might be orders of magnitude
// apart. However, this method conflates them.
+ template <typename Rep, typename Period, typename T>
+ bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout,
+ T&& element);
+ // legacy name
template <typename Rep, typename Period>
bool tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout,
- ElementT const & element);
+ ElementT const & element) { return tryPushFor(timeout, element); }
- // Pop the element at the end of the queue (will block if the queue is
+ // Try to add an element to the queue, blocking if full but with
+ // timeout at specified time_point. Returns true if the element was added.
+ template <typename Clock, typename Duration, typename T>
+ bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until,
+ T&& element);
+ // no legacy name because this is a newer method
+
+ // Pop the element at the head of the queue (will block if the queue is
// empty).
//
// This call will raise an interrupt error if the queue is closed while
// the caller is blocked.
- ElementT popBack(void);
-
- // Pop an element from the end of the queue if there is one available.
+ ElementT pop(void);
+ // legacy name
+ ElementT popBack(void) { return pop(); }
+
+ // Pop an element from the head of the queue if there is one available.
// Returns true only if an element was popped.
- bool tryPopBack(ElementT & element);
-
+ bool tryPop(ElementT & element);
+ // legacy name
+ bool tryPopBack(ElementT & element) { return tryPop(element); }
+
+ // Pop the element at the head of the queue, blocking if empty, with
+ // timeout after specified duration. Returns true if an element was popped.
+ template <typename Rep, typename Period>
+ bool tryPopFor(const std::chrono::duration<Rep, Period>& timeout, ElementT& element);
+ // no legacy name because this is a newer method
+
+ // Pop the element at the head of the queue, blocking if empty, with
+ // timeout at specified time_point. Returns true if an element was popped.
+ template <typename Clock, typename Duration>
+ bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until,
+ ElementT& element);
+ // no legacy name because this is a newer method
+
// Returns the size of the queue.
size_t size();
// closes the queue:
- // - every subsequent pushFront() call will throw LLThreadSafeQueueInterrupt
- // - every subsequent tryPushFront() call will return false
- // - popBack() calls will return normally until the queue is drained, then
- // every subsequent popBack() will throw LLThreadSafeQueueInterrupt
- // - tryPopBack() calls will return normally until the queue is drained,
- // then every subsequent tryPopBack() call will return false
+ // - every subsequent push() call will throw LLThreadSafeQueueInterrupt
+ // - every subsequent tryPush() call will return false
+ // - pop() calls will return normally until the queue is drained, then
+ // every subsequent pop() will throw LLThreadSafeQueueInterrupt
+ // - tryPop() calls will return normally until the queue is drained,
+ // then every subsequent tryPop() call will return false
void close();
- // detect closed state
+ // producer end: are we prevented from pushing any additional items?
bool isClosed();
- // inverse of isClosed()
- explicit operator bool();
+ // consumer end: are we done, is the queue entirely drained?
+ bool done();
-private:
- std::deque< ElementT > mStorage;
+protected:
+ typedef QueueT queue_type;
+ QueueT mStorage;
U32 mCapacity;
bool mClosed;
@@ -137,37 +178,152 @@ private:
typedef std::unique_lock<decltype(mLock)> lock_t;
boost::fibers::condition_variable_any mCapacityCond;
boost::fibers::condition_variable_any mEmptyCond;
-};
-// LLThreadSafeQueue
-//-----------------------------------------------------------------------------
+ enum pop_result { EMPTY, DONE, WAITING, POPPED };
+ // implementation logic, suitable for passing to tryLockUntil()
+ template <typename Clock, typename Duration>
+ pop_result tryPopUntil_(lock_t& lock,
+ const std::chrono::time_point<Clock, Duration>& until,
+ ElementT& element);
+ // if we're able to lock immediately, do so and run the passed callable,
+ // which must accept lock_t& and return bool
+ template <typename CALLABLE>
+ bool tryLock(CALLABLE&& callable);
+ // if we're able to lock before the passed time_point, do so and run the
+ // passed callable, which must accept lock_t& and return bool
+ template <typename Clock, typename Duration, typename CALLABLE>
+ bool tryLockUntil(const std::chrono::time_point<Clock, Duration>& until,
+ CALLABLE&& callable);
+ // while lock is locked, really push the passed element, if we can
+ template <typename T>
+ bool push_(lock_t& lock, T&& element);
+ // while lock is locked, really pop the head element, if we can
+ pop_result pop_(lock_t& lock, ElementT& element);
+ // Is the current head element ready to pop? We say yes; subclass can
+ // override as needed.
+ virtual bool canPop(const ElementT& head) const { return true; }
+};
-template<typename ElementT>
-LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(U32 capacity) :
+/*****************************************************************************
+* PriorityQueueAdapter
+*****************************************************************************/
+namespace LL
+{
+ /**
+ * std::priority_queue's API is almost like std::queue, intentionally of
+ * course, but you must access the element about to pop() as top() rather
+ * than as front(). Make an adapter for use with LLThreadSafeQueue.
+ */
+ template <typename T, typename Container=std::vector<T>,
+ typename Compare=std::less<typename Container::value_type>>
+ class PriorityQueueAdapter
+ {
+ public:
+ // publish all the same types
+ typedef std::priority_queue<T, Container, Compare> queue_type;
+ typedef typename queue_type::container_type container_type;
+ typedef typename queue_type::value_compare value_compare;
+ typedef typename queue_type::value_type value_type;
+ typedef typename queue_type::size_type size_type;
+ typedef typename queue_type::reference reference;
+ typedef typename queue_type::const_reference const_reference;
+
+ // Although std::queue defines both const and non-const front()
+ // methods, std::priority_queue defines only const top().
+ const_reference front() const { return mQ.top(); }
+ // std::priority_queue has no equivalent to back(), so it's good that
+ // LLThreadSafeQueue doesn't use it.
+
+ // All the rest of these merely forward to the corresponding
+ // queue_type methods.
+ bool empty() const { return mQ.empty(); }
+ size_type size() const { return mQ.size(); }
+ void push(const value_type& value) { mQ.push(value); }
+ void push(value_type&& value) { mQ.push(std::move(value)); }
+ template <typename... Args>
+ void emplace(Args&&... args) { mQ.emplace(std::forward<Args>(args)...); }
+ void pop() { mQ.pop(); }
+
+ private:
+ queue_type mQ;
+ };
+} // namespace LL
+
+
+/*****************************************************************************
+* LLThreadSafeQueue implementation
+*****************************************************************************/
+template<typename ElementT, typename QueueT>
+LLThreadSafeQueue<ElementT, QueueT>::LLThreadSafeQueue(U32 capacity) :
mCapacity(capacity),
mClosed(false)
{
}
-template<typename ElementT>
-void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)
+// if we're able to lock immediately, do so and run the passed callable, which
+// must accept lock_t& and return bool
+template <typename ElementT, typename QueueT>
+template <typename CALLABLE>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryLock(CALLABLE&& callable)
+{
+ lock_t lock1(mLock, std::defer_lock);
+ if (!lock1.try_lock())
+ return false;
+
+ return std::forward<CALLABLE>(callable)(lock1);
+}
+
+
+// if we're able to lock before the passed time_point, do so and run the
+// passed callable, which must accept lock_t& and return bool
+template <typename ElementT, typename QueueT>
+template <typename Clock, typename Duration, typename CALLABLE>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryLockUntil(
+ const std::chrono::time_point<Clock, Duration>& until,
+ CALLABLE&& callable)
+{
+ lock_t lock1(mLock, std::defer_lock);
+ if (!lock1.try_lock_until(until))
+ return false;
+
+ return std::forward<CALLABLE>(callable)(lock1);
+}
+
+
+// while lock is locked, really push the passed element, if we can
+template <typename ElementT, typename QueueT>
+template <typename T>
+bool LLThreadSafeQueue<ElementT, QueueT>::push_(lock_t& lock, T&& element)
+{
+ if (mStorage.size() >= mCapacity)
+ return false;
+
+ mStorage.push(std::forward<T>(element));
+ lock.unlock();
+ // now that we've pushed, if somebody's been waiting to pop, signal them
+ mEmptyCond.notify_one();
+ return true;
+}
+
+
+template <typename ElementT, typename QueueT>
+template<typename T>
+void LLThreadSafeQueue<ElementT, QueueT>::push(T&& element)
{
lock_t lock1(mLock);
while (true)
{
+ // On the producer side, it doesn't matter whether the queue has been
+ // drained or not: the moment either end calls close(), further push()
+ // operations will fail.
if (mClosed)
{
LLTHROW(LLThreadSafeQueueInterrupt());
}
- if (mStorage.size() < mCapacity)
- {
- mStorage.push_front(element);
- lock1.unlock();
- mEmptyCond.notify_one();
+ if (push_(lock1, std::forward<T>(element)))
return;
- }
// Storage Full. Wait for signal.
mCapacityCond.wait(lock1);
@@ -175,142 +331,225 @@ void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)
}
-template <typename ElementT>
-template <typename Rep, typename Period>
-bool LLThreadSafeQueue<ElementT>::tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout,
- ElementT const & element)
+template<typename ElementT, typename QueueT>
+template<typename T>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryPush(T&& element)
{
- // Convert duration to time_point: passing the same timeout duration to
- // each of multiple calls is wrong.
- auto endpoint = std::chrono::steady_clock::now() + timeout;
+ return tryLock(
+ [this, element=std::move(element)](lock_t& lock)
+ {
+ if (mClosed)
+ return false;
+ return push_(lock, std::move(element));
+ });
+}
- lock_t lock1(mLock, std::defer_lock);
- if (!lock1.try_lock_until(endpoint))
- return false;
- while (true)
- {
- if (mClosed)
- {
- return false;
- }
+template <typename ElementT, typename QueueT>
+template <typename Rep, typename Period, typename T>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryPushFor(
+ const std::chrono::duration<Rep, Period>& timeout,
+ T&& element)
+{
+ // Convert duration to time_point: passing the same timeout duration to
+ // each of multiple calls is wrong.
+ return tryPushUntil(std::chrono::steady_clock::now() + timeout,
+ std::forward<T>(element));
+}
- if (mStorage.size() < mCapacity)
- {
- mStorage.push_front(element);
- lock1.unlock();
- mEmptyCond.notify_one();
- return true;
- }
- // Storage Full. Wait for signal.
- if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock1, endpoint))
+template <typename ElementT, typename QueueT>
+template <typename Clock, typename Duration, typename T>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryPushUntil(
+ const std::chrono::time_point<Clock, Duration>& until,
+ T&& element)
+{
+ return tryLockUntil(
+ until,
+ [this, until, element=std::move(element)](lock_t& lock)
{
- // timed out -- formally we might recheck both conditions above
- return false;
- }
- // If we didn't time out, we were notified for some reason. Loop back
- // to check.
- }
+ while (true)
+ {
+ if (mClosed)
+ {
+ return false;
+ }
+
+ if (push_(lock, std::move(element)))
+ return true;
+
+ // Storage Full. Wait for signal.
+ if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock, until))
+ {
+ // timed out -- formally we might recheck both conditions above
+ return false;
+ }
+ // If we didn't time out, we were notified for some reason. Loop back
+ // to check.
+ }
+ });
}
-template<typename ElementT>
-bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)
+// while lock is locked, really pop the head element, if we can
+template <typename ElementT, typename QueueT>
+typename LLThreadSafeQueue<ElementT, QueueT>::pop_result
+LLThreadSafeQueue<ElementT, QueueT>::pop_(lock_t& lock, ElementT& element)
{
- lock_t lock1(mLock, std::defer_lock);
- if (!lock1.try_lock())
- return false;
-
- if (mClosed)
- return false;
+ // If mStorage is empty, there's no head element.
+ if (mStorage.empty())
+ return mClosed? DONE : EMPTY;
- if (mStorage.size() >= mCapacity)
- return false;
+ // If there's a head element, pass it to canPop() to see if it's ready to pop.
+ if (! canPop(mStorage.front()))
+ return WAITING;
- mStorage.push_front(element);
- lock1.unlock();
- mEmptyCond.notify_one();
- return true;
+ // std::queue::front() is the element about to pop()
+ element = mStorage.front();
+ mStorage.pop();
+ lock.unlock();
+ // now that we've popped, if somebody's been waiting to push, signal them
+ mCapacityCond.notify_one();
+ return POPPED;
}
-template<typename ElementT>
-ElementT LLThreadSafeQueue<ElementT>::popBack(void)
+template<typename ElementT, typename QueueT>
+ElementT LLThreadSafeQueue<ElementT, QueueT>::pop(void)
{
lock_t lock1(mLock);
+ ElementT value;
while (true)
{
- if (!mStorage.empty())
- {
- ElementT value = mStorage.back();
- mStorage.pop_back();
- lock1.unlock();
- mCapacityCond.notify_one();
- return value;
- }
-
- if (mClosed)
+ // On the consumer side, we always try to pop before checking mClosed
+ // so we can finish draining the queue.
+ pop_result popped = pop_(lock1, value);
+ if (popped == POPPED)
+ return std::move(value);
+
+ // Once the queue is DONE, there will never be any more coming.
+ if (popped == DONE)
{
LLTHROW(LLThreadSafeQueueInterrupt());
}
- // Storage empty. Wait for signal.
+ // If we didn't pop because WAITING, i.e. canPop() returned false,
+ // then even if the producer end has been closed, there's still at
+ // least one item to drain: wait for it. Or we might be EMPTY, with
+ // the queue still open. Either way, wait for signal.
mEmptyCond.wait(lock1);
}
}
-template<typename ElementT>
-bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element)
+template<typename ElementT, typename QueueT>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryPop(ElementT & element)
{
- lock_t lock1(mLock, std::defer_lock);
- if (!lock1.try_lock())
- return false;
+ return tryLock(
+ [this, &element](lock_t& lock)
+ {
+ // conflate EMPTY, DONE, WAITING: tryPop() behavior when the queue
+ // is closed is implemented by simple inability to push any new
+ // elements
+ return pop_(lock, element) == POPPED;
+ });
+}
- // no need to check mClosed: tryPopBack() behavior when the queue is
- // closed is implemented by simple inability to push any new elements
- if (mStorage.empty())
- return false;
- element = mStorage.back();
- mStorage.pop_back();
- lock1.unlock();
- mCapacityCond.notify_one();
- return true;
+template <typename ElementT, typename QueueT>
+template <typename Rep, typename Period>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryPopFor(
+ const std::chrono::duration<Rep, Period>& timeout,
+ ElementT& element)
+{
+ // Convert duration to time_point: passing the same timeout duration to
+ // each of multiple calls is wrong.
+ return tryPopUntil(std::chrono::steady_clock::now() + timeout, element);
}
-template<typename ElementT>
-size_t LLThreadSafeQueue<ElementT>::size(void)
+template <typename ElementT, typename QueueT>
+template <typename Clock, typename Duration>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil(
+ const std::chrono::time_point<Clock, Duration>& until,
+ ElementT& element)
+{
+ return tryLockUntil(
+ until,
+ [this, until, &element](lock_t& lock)
+ {
+ // conflate EMPTY, DONE, WAITING
+ return tryPopUntil_(lock, until, element) == POPPED;
+ });
+}
+
+
+// body of tryPopUntil(), called once we have the lock
+template <typename ElementT, typename QueueT>
+template <typename Clock, typename Duration>
+typename LLThreadSafeQueue<ElementT, QueueT>::pop_result
+LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil_(
+ lock_t& lock,
+ const std::chrono::time_point<Clock, Duration>& until,
+ ElementT& element)
+{
+ while (true)
+ {
+ pop_result popped = pop_(lock, element);
+ if (popped == POPPED || popped == DONE)
+ {
+ // If we succeeded, great! If we've drained the last item, so be
+ // it. Either way, break the loop and tell caller.
+ return popped;
+ }
+
+ // EMPTY or WAITING: wait for signal.
+ if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until))
+ {
+ // timed out -- formally we might recheck
+ // as it is, break loop
+ return popped;
+ }
+ // If we didn't time out, we were notified for some reason. Loop back
+ // to check.
+ }
+}
+
+
+template<typename ElementT, typename QueueT>
+size_t LLThreadSafeQueue<ElementT, QueueT>::size(void)
{
lock_t lock(mLock);
return mStorage.size();
}
-template<typename ElementT>
-void LLThreadSafeQueue<ElementT>::close()
+
+template<typename ElementT, typename QueueT>
+void LLThreadSafeQueue<ElementT, QueueT>::close()
{
lock_t lock(mLock);
mClosed = true;
lock.unlock();
- // wake up any blocked popBack() calls
+ // wake up any blocked pop() calls
mEmptyCond.notify_all();
- // wake up any blocked pushFront() calls
+ // wake up any blocked push() calls
mCapacityCond.notify_all();
}
-template<typename ElementT>
-bool LLThreadSafeQueue<ElementT>::isClosed()
+
+template<typename ElementT, typename QueueT>
+bool LLThreadSafeQueue<ElementT, QueueT>::isClosed()
{
lock_t lock(mLock);
- return mClosed && mStorage.size() == 0;
+ return mClosed;
}
-template<typename ElementT>
-LLThreadSafeQueue<ElementT>::operator bool()
+
+template<typename ElementT, typename QueueT>
+bool LLThreadSafeQueue<ElementT, QueueT>::done()
{
- return ! isClosed();
+ lock_t lock(mLock);
+ return mClosed && mStorage.empty();
}
#endif
diff --git a/indra/llcommon/tests/llinstancetracker_test.cpp b/indra/llcommon/tests/llinstancetracker_test.cpp
index 9b89159625..5daa29adf4 100644
--- a/indra/llcommon/tests/llinstancetracker_test.cpp
+++ b/indra/llcommon/tests/llinstancetracker_test.cpp
@@ -90,19 +90,19 @@ namespace tut
{
Keyed one("one");
ensure_equals(Keyed::instanceCount(), 1);
- Keyed* found = Keyed::getInstance("one");
- ensure("couldn't find stack Keyed", found);
- ensure_equals("found wrong Keyed instance", found, &one);
+ auto found = Keyed::getInstance("one");
+ ensure("couldn't find stack Keyed", bool(found));
+ ensure_equals("found wrong Keyed instance", found.get(), &one);
{
boost::scoped_ptr<Keyed> two(new Keyed("two"));
ensure_equals(Keyed::instanceCount(), 2);
- Keyed* found = Keyed::getInstance("two");
- ensure("couldn't find heap Keyed", found);
- ensure_equals("found wrong Keyed instance", found, two.get());
+ auto found = Keyed::getInstance("two");
+ ensure("couldn't find heap Keyed", bool(found));
+ ensure_equals("found wrong Keyed instance", found.get(), two.get());
}
ensure_equals(Keyed::instanceCount(), 1);
}
- Keyed* found = Keyed::getInstance("one");
+ auto found = Keyed::getInstance("one");
ensure("Keyed key lives too long", ! found);
ensure_equals(Keyed::instanceCount(), 0);
}
diff --git a/indra/llcommon/tests/threadsafeschedule_test.cpp b/indra/llcommon/tests/threadsafeschedule_test.cpp
new file mode 100644
index 0000000000..af67b9f492
--- /dev/null
+++ b/indra/llcommon/tests/threadsafeschedule_test.cpp
@@ -0,0 +1,69 @@
+/**
+ * @file threadsafeschedule_test.cpp
+ * @author Nat Goodspeed
+ * @date 2021-10-04
+ * @brief Test for threadsafeschedule.
+ *
+ * $LicenseInfo:firstyear=2021&license=viewerlgpl$
+ * Copyright (c) 2021, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+// Precompiled header
+#include "linden_common.h"
+// associated header
+#include "threadsafeschedule.h"
+// STL headers
+// std headers
+#include <chrono>
+// external library headers
+// other Linden headers
+#include "../test/lltut.h"
+
+using namespace std::literals::chrono_literals; // ms suffix
+using namespace std::literals::string_literals; // s suffix
+using Queue = LL::ThreadSafeSchedule<std::string>;
+
+/*****************************************************************************
+* TUT
+*****************************************************************************/
+namespace tut
+{
+ struct threadsafeschedule_data
+ {
+ Queue queue;
+ };
+ typedef test_group<threadsafeschedule_data> threadsafeschedule_group;
+ typedef threadsafeschedule_group::object object;
+ threadsafeschedule_group threadsafeschedulegrp("threadsafeschedule");
+
+ template<> template<>
+ void object::test<1>()
+ {
+ set_test_name("push");
+ // Simply calling push() a few times might result in indeterminate
+ // delivery order if the resolution of steady_clock is coarser than
+ // the real time required for each push() call. Explicitly increment
+ // the timestamp for each one -- but since we're passing explicit
+ // timestamps, make the queue reorder them.
+ queue.push(Queue::TimeTuple(Queue::Clock::now() + 20ms, "ghi"));
+ // Given the various push() overloads, you have to match the type
+ // exactly: conversions are ambiguous.
+ queue.push("abc"s);
+ queue.push(Queue::Clock::now() + 10ms, "def");
+ queue.close();
+ auto entry = queue.pop();
+ ensure_equals("failed to pop first", std::get<0>(entry), "abc"s);
+ entry = queue.pop();
+ ensure_equals("failed to pop second", std::get<0>(entry), "def"s);
+ ensure("queue not closed", queue.isClosed());
+ ensure("queue prematurely done", ! queue.done());
+ std::string s;
+ bool popped = queue.tryPopFor(1s, s);
+ ensure("failed to pop third", popped);
+ ensure_equals("third is wrong", s, "ghi"s);
+ popped = queue.tryPop(s);
+ ensure("queue not empty", ! popped);
+ ensure("queue not done", queue.done());
+ }
+} // namespace tut
diff --git a/indra/llcommon/tests/tuple_test.cpp b/indra/llcommon/tests/tuple_test.cpp
new file mode 100644
index 0000000000..af94e2086c
--- /dev/null
+++ b/indra/llcommon/tests/tuple_test.cpp
@@ -0,0 +1,47 @@
+/**
+ * @file tuple_test.cpp
+ * @author Nat Goodspeed
+ * @date 2021-10-04
+ * @brief Test for tuple.
+ *
+ * $LicenseInfo:firstyear=2021&license=viewerlgpl$
+ * Copyright (c) 2021, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+// Precompiled header
+#include "linden_common.h"
+// associated header
+#include "tuple.h"
+// STL headers
+// std headers
+// external library headers
+// other Linden headers
+#include "../test/lltut.h"
+
+/*****************************************************************************
+* TUT
+*****************************************************************************/
+namespace tut
+{
+ struct tuple_data
+ {
+ };
+ typedef test_group<tuple_data> tuple_group;
+ typedef tuple_group::object object;
+ tuple_group tuplegrp("tuple");
+
+ template<> template<>
+ void object::test<1>()
+ {
+ set_test_name("tuple");
+ std::tuple<std::string, int> tup{ "abc", 17 };
+ std::tuple<int, std::string, int> ptup{ tuple_cons(34, tup) };
+ std::tuple<std::string, int> tup2;
+ int i;
+ std::tie(i, tup2) = tuple_split(ptup);
+ ensure_equals("tuple_car() fail", i, 34);
+ ensure_equals("tuple_cdr() (0) fail", std::get<0>(tup2), "abc");
+ ensure_equals("tuple_cdr() (1) fail", std::get<1>(tup2), 17);
+ }
+} // namespace tut
diff --git a/indra/llcommon/tests/workqueue_test.cpp b/indra/llcommon/tests/workqueue_test.cpp
new file mode 100644
index 0000000000..ab1cae6c14
--- /dev/null
+++ b/indra/llcommon/tests/workqueue_test.cpp
@@ -0,0 +1,158 @@
+/**
+ * @file workqueue_test.cpp
+ * @author Nat Goodspeed
+ * @date 2021-10-07
+ * @brief Test for workqueue.
+ *
+ * $LicenseInfo:firstyear=2021&license=viewerlgpl$
+ * Copyright (c) 2021, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+// Precompiled header
+#include "linden_common.h"
+// associated header
+#include "workqueue.h"
+// STL headers
+// std headers
+#include <chrono>
+#include <deque>
+// external library headers
+// other Linden headers
+#include "../test/lltut.h"
+#include "llcond.h"
+#include "llstring.h"
+#include "stringize.h"
+
+using namespace LL;
+using namespace std::literals::chrono_literals; // ms suffix
+using namespace std::literals::string_literals; // s suffix
+
+/*****************************************************************************
+* TUT
+*****************************************************************************/
+namespace tut
+{
+ struct workqueue_data
+ {
+ WorkQueue queue{"queue"};
+ };
+ typedef test_group<workqueue_data> workqueue_group;
+ typedef workqueue_group::object object;
+ workqueue_group workqueuegrp("workqueue");
+
+ template<> template<>
+ void object::test<1>()
+ {
+ set_test_name("name");
+ ensure_equals("didn't capture name", queue.getKey(), "queue");
+ ensure("not findable", WorkQueue::getInstance("queue") == queue.getWeak().lock());
+ WorkQueue q2;
+ ensure("has no name", LLStringUtil::startsWith(q2.getKey(), "WorkQueue"));
+ }
+
+ template<> template<>
+ void object::test<2>()
+ {
+ set_test_name("post");
+ bool wasRun{ false };
+ // We only get away with binding a simple bool because we're running
+ // the work on the same thread.
+ queue.post([&wasRun](){ wasRun = true; });
+ queue.close();
+ ensure("ran too soon", ! wasRun);
+ queue.runUntilClose();
+ ensure("didn't run", wasRun);
+ }
+
+ template<> template<>
+ void object::test<3>()
+ {
+ set_test_name("postEvery");
+ // record of runs
+ using Shared = std::deque<WorkQueue::TimePoint>;
+ // This is an example of how to share data between the originator of
+ // postEvery(work) and the work item itself, since usually a WorkQueue
+ // is used to dispatch work to a different thread. Neither of them
+ // should call any of LLCond's wait methods: you don't want to stall
+ // either the worker thread or the originating thread (conventionally
+ // main). Use LLCond or a subclass even if all you want to do is
+ // signal the work item that it can quit; consider LLOneShotCond.
+ LLCond<Shared> data;
+ auto start = WorkQueue::TimePoint::clock::now();
+ auto interval = 100ms;
+ queue.postEvery(
+ interval,
+ [&data, count = 0]
+ () mutable
+ {
+ // record the timestamp at which this instance is running
+ data.update_one(
+ [](Shared& data)
+ {
+ data.push_back(WorkQueue::TimePoint::clock::now());
+ });
+ // by the 3rd call, return false to stop
+ return (++count < 3);
+ });
+ // no convenient way to close() our queue while we've got a
+ // postEvery() running, so run until we think we should have exhausted
+ // the iterations
+ queue.runFor(10*interval);
+ // Take a copy of the captured deque.
+ Shared result = data.get();
+ ensure_equals("called wrong number of times", result.size(), 3);
+ // postEvery() assumes you want the first call to happen right away.
+ // Inject a fake start time that's (interval) earlier than that, to
+ // make our too early/too late tests uniform for all entries.
+ result.push_front(start - interval);
+ for (size_t i = 1; i < result.size(); ++i)
+ {
+ auto diff = (result[i] - result[i-1]);
+ try
+ {
+ ensure(STRINGIZE("call " << i << " too soon"), diff >= interval);
+ ensure(STRINGIZE("call " << i << " too late"), diff < interval*1.5);
+ }
+ catch (const tut::failure&)
+ {
+ auto interval_ms = interval / 1ms;
+ auto diff_ms = diff / 1ms;
+ std::cerr << "interval " << interval_ms
+ << "ms; diff " << diff_ms << "ms" << std::endl;
+ throw;
+ }
+ }
+ }
+
+ template<> template<>
+ void object::test<4>()
+ {
+ set_test_name("postTo");
+ WorkQueue main("main");
+ auto qptr = WorkQueue::getInstance("queue");
+ int result = 0;
+ main.postTo(
+ qptr,
+ [](){ return 17; },
+ // Note that a postTo() *callback* can safely bind a reference to
+ // a variable on the invoking thread, because the callback is run
+ // on the invoking thread.
+ [&result](int i){ result = i; });
+ // this should post the callback to main
+ qptr->runOne();
+ // this should run the callback
+ main.runOne();
+ ensure_equals("failed to run int callback", result, 17);
+
+ std::string alpha;
+ // postTo() handles arbitrary return types
+ main.postTo(
+ qptr,
+ [](){ return "abc"s; },
+ [&alpha](const std::string& s){ alpha = s; });
+ qptr->runPending();
+ main.runPending();
+ ensure_equals("failed to run string callback", alpha, "abc");
+ }
+} // namespace tut
diff --git a/indra/llcommon/threadsafeschedule.h b/indra/llcommon/threadsafeschedule.h
new file mode 100644
index 0000000000..c8ad23532b
--- /dev/null
+++ b/indra/llcommon/threadsafeschedule.h
@@ -0,0 +1,373 @@
+/**
+ * @file threadsafeschedule.h
+ * @author Nat Goodspeed
+ * @date 2021-10-02
+ * @brief ThreadSafeSchedule is an ordered queue in which every item has an
+ * associated timestamp.
+ *
+ * $LicenseInfo:firstyear=2021&license=viewerlgpl$
+ * Copyright (c) 2021, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+#if ! defined(LL_THREADSAFESCHEDULE_H)
+#define LL_THREADSAFESCHEDULE_H
+
+#include "chrono.h"
+#include "llexception.h"
+#include "llthreadsafequeue.h"
+#include "tuple.h"
+#include <chrono>
+#include <tuple>
+
+namespace LL
+{
+ namespace ThreadSafeSchedulePrivate
+ {
+ using TimePoint = std::chrono::steady_clock::time_point;
+ // Bundle consumer's data with a TimePoint to order items by timestamp.
+ template <typename... Args>
+ using TimestampedTuple = std::tuple<TimePoint, Args...>;
+
+ // comparison functor for TimedTuples -- see TimedQueue comments
+ struct ReverseTupleOrder
+ {
+ template <typename Tuple>
+ bool operator()(const Tuple& left, const Tuple& right) const
+ {
+ return std::get<0>(left) > std::get<0>(right);
+ }
+ };
+
+ template <typename... Args>
+ using TimedQueue = PriorityQueueAdapter<
+ TimestampedTuple<Args...>,
+ // std::vector is the default storage for std::priority_queue,
+ // have to restate to specify comparison template parameter
+ std::vector<TimestampedTuple<Args...>>,
+ // std::priority_queue uses a counterintuitive comparison
+ // behavior: the default std::less comparator is used to present
+ // the *highest* value as top(). So to sort by earliest timestamp,
+ // we must invert by using >.
+ ReverseTupleOrder>;
+ } // namespace ThreadSafeSchedulePrivate
+
+ /**
+ * ThreadSafeSchedule is an ordered LLThreadSafeQueue in which every item
+ * is given an associated timestamp. That is, TimePoint is implicitly
+ * prepended to the std::tuple with the specified types.
+ *
+ * Items are popped in increasing chronological order. Moreover, any item
+ * with a timestamp in the future is held back until
+ * std::chrono::steady_clock reaches that timestamp.
+ */
+ template <typename... Args>
+ class ThreadSafeSchedule:
+ public LLThreadSafeQueue<ThreadSafeSchedulePrivate::TimestampedTuple<Args...>,
+ ThreadSafeSchedulePrivate::TimedQueue<Args...>>
+ {
+ public:
+ using DataTuple = std::tuple<Args...>;
+ using TimeTuple = ThreadSafeSchedulePrivate::TimestampedTuple<Args...>;
+
+ private:
+ using super = LLThreadSafeQueue<TimeTuple, ThreadSafeSchedulePrivate::TimedQueue<Args...>>;
+ using lock_t = typename super::lock_t;
+ // VS 2017 needs this due to a bug:
+ // https://developercommunity.visualstudio.com/t/cannot-access-protected-enumerator-of-enclosing-cl/203430
+ enum pop_result { EMPTY=super::EMPTY, DONE=super::DONE, WAITING=super::WAITING, POPPED=super::POPPED };
+
+ public:
+ using Closed = LLThreadSafeQueueInterrupt;
+ using TimePoint = ThreadSafeSchedulePrivate::TimePoint;
+ using Clock = TimePoint::clock;
+
+ ThreadSafeSchedule(U32 capacity=1024):
+ super(capacity)
+ {}
+
+ /*----------------------------- push() -----------------------------*/
+ /// explicitly pass TimeTuple
+ using super::push;
+
+ /// pass DataTuple with implicit now
+ // This could be ambiguous for Args with a single type. Unfortunately
+ // we can't enable_if an individual method with a condition based on
+ // the *class* template arguments, only on that method's template
+ // arguments. We could specialize this class for the single-Args case;
+ // we could minimize redundancy by breaking out a common base class...
+ void push(const DataTuple& tuple)
+ {
+ push(tuple_cons(Clock::now(), tuple));
+ }
+
+ /// individually pass each component of the TimeTuple
+ void push(const TimePoint& time, Args&&... args)
+ {
+ push(TimeTuple(time, std::forward<Args>(args)...));
+ }
+
+ /// individually pass every component except the TimePoint (implies now)
+ // This could be ambiguous if the first specified template parameter
+ // type is also TimePoint. We could try to disambiguate, but a simpler
+ // approach would be for the caller to explicitly construct DataTuple
+ // and call that overload.
+ void push(Args&&... args)
+ {
+ push(Clock::now(), std::forward<Args>(args)...);
+ }
+
+ /*--------------------------- tryPush() ----------------------------*/
+ /// explicit TimeTuple
+ using super::tryPush;
+
+ /// DataTuple with implicit now
+ bool tryPush(const DataTuple& tuple)
+ {
+ return tryPush(tuple_cons(Clock::now(), tuple));
+ }
+
+ /// individually pass components
+ bool tryPush(const TimePoint& time, Args&&... args)
+ {
+ return tryPush(TimeTuple(time, std::forward<Args>(args)...));
+ }
+
+ /// individually pass components with implicit now
+ bool tryPush(Args&&... args)
+ {
+ return tryPush(Clock::now(), std::forward<Args>(args)...);
+ }
+
+ /*-------------------------- tryPushFor() --------------------------*/
+ /// explicit TimeTuple
+ using super::tryPushFor;
+
+ /// DataTuple with implicit now
+ template <typename Rep, typename Period>
+ bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout,
+ const DataTuple& tuple)
+ {
+ return tryPushFor(timeout, tuple_cons(Clock::now(), tuple));
+ }
+
+ /// individually pass components
+ template <typename Rep, typename Period>
+ bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout,
+ const TimePoint& time, Args&&... args)
+ {
+ return tryPushFor(TimeTuple(time, std::forward<Args>(args)...));
+ }
+
+ /// individually pass components with implicit now
+ template <typename Rep, typename Period>
+ bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout,
+ Args&&... args)
+ {
+ return tryPushFor(Clock::now(), std::forward<Args>(args)...);
+ }
+
+ /*------------------------- tryPushUntil() -------------------------*/
+ /// explicit TimeTuple
+ using super::tryPushUntil;
+
+ /// DataTuple with implicit now
+ template <typename Clock, typename Duration>
+ bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until,
+ const DataTuple& tuple)
+ {
+ return tryPushUntil(until, tuple_cons(Clock::now(), tuple));
+ }
+
+ /// individually pass components
+ template <typename Clock, typename Duration>
+ bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until,
+ const TimePoint& time, Args&&... args)
+ {
+ return tryPushUntil(until, TimeTuple(time, std::forward<Args>(args)...));
+ }
+
+ /// individually pass components with implicit now
+ template <typename Clock, typename Duration>
+ bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until,
+ Args&&... args)
+ {
+ return tryPushUntil(until, Clock::now(), std::forward<Args>(args)...);
+ }
+
+ /*----------------------------- pop() ------------------------------*/
+ // Our consumer may or may not care about the timestamp associated
+ // with each popped item, so we allow retrieving either DataTuple or
+ // TimeTuple. One potential use would be to observe, and possibly
+ // adjust for, the time lag between the item time and the actual
+ // current time.
+
+ /// pop DataTuple by value
+ // It would be great to notice when sizeof...(Args) == 1 and directly
+ // return the first (only) value, instead of making pop()'s caller
+ // call std::get<0>(value). See push(DataTuple) remarks for why we
+ // haven't yet jumped through those hoops.
+ DataTuple pop()
+ {
+ return tuple_cdr(popWithTime());
+ }
+
+ /// pop TimeTuple by value
+ TimeTuple popWithTime()
+ {
+ lock_t lock(super::mLock);
+ // We can't just sit around waiting forever, given that there may
+ // be items in the queue that are not yet ready but will *become*
+ // ready in the near future. So in fact, with this class, every
+ // pop() becomes a tryPopUntil(), constrained to the timestamp of
+ // the head item. It almost doesn't matter what we specify for the
+ // caller's time constraint -- all we really care about is the
+ // head item's timestamp. Since pop() and popWithTime() are
+ // defined to wait until either an item becomes available or the
+ // queue is closed, loop until one of those things happens. The
+ // constraint we pass just determines how often we'll loop while
+ // waiting.
+ TimeTuple tt;
+ while (true)
+ {
+ // Pick a point suitably far into the future.
+ TimePoint until = TimePoint::clock::now() + std::chrono::hours(24);
+ pop_result popped = tryPopUntil_(lock, until, tt);
+ if (popped == POPPED)
+ return std::move(tt);
+
+ // DONE: throw, just as super::pop() does
+ if (popped == DONE)
+ {
+ LLTHROW(LLThreadSafeQueueInterrupt());
+ }
+ // WAITING: we've still got items to drain.
+ // EMPTY: not closed, so it's worth waiting for more items.
+ // Either way, loop back to wait.
+ }
+ }
+
+ // We can use tryPop(TimeTuple&) just as it stands; the only behavior
+ // difference is in our canPop() override method.
+ using super::tryPop;
+
+ /// tryPop(DataTuple&)
+ bool tryPop(DataTuple& tuple)
+ {
+ TimeTuple tt;
+ if (! super::tryPop(tt))
+ return false;
+ tuple = tuple_cdr(std::move(tt));
+ return true;
+ }
+
+ /// for when Args has exactly one type
+ bool tryPop(typename std::tuple_element<1, TimeTuple>::type& value)
+ {
+ TimeTuple tt;
+ if (! super::tryPop(tt))
+ return false;
+ value = std::get<1>(std::move(tt));
+ return true;
+ }
+
+ /// tryPopFor()
+ template <typename Rep, typename Period, typename Tuple>
+ bool tryPopFor(const std::chrono::duration<Rep, Period>& timeout, Tuple& tuple)
+ {
+ // It's important to use OUR tryPopUntil() implementation, rather
+ // than delegating immediately to our base class.
+ return tryPopUntil(Clock::now() + timeout, tuple);
+ }
+
+ /// tryPopUntil(TimeTuple&)
+ template <typename Clock, typename Duration>
+ bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until,
+ TimeTuple& tuple)
+ {
+ // super::tryPopUntil() wakes up when an item becomes available or
+ // we hit 'until', whichever comes first. Thing is, the current
+ // head of the queue could become ready sooner than either of
+ // those events, and we need to deliver it as soon as it does.
+ // Don't wait past the TimePoint of the head item.
+ // Naturally, lock the queue before peeking at mStorage.
+ return super::tryLockUntil(
+ until,
+ [this, until, &tuple](lock_t& lock)
+ {
+ // Use our time_point_cast to allow for 'until' that's a
+ // time_point type other than TimePoint.
+ return POPPED ==
+ tryPopUntil_(lock, LL::time_point_cast<TimePoint>(until), tuple);
+ });
+ }
+
+ pop_result tryPopUntil_(lock_t& lock, const TimePoint& until, TimeTuple& tuple)
+ {
+ TimePoint adjusted = until;
+ if (! super::mStorage.empty())
+ {
+ // use whichever is earlier: the head item's timestamp, or
+ // the caller's limit
+ adjusted = min(std::get<0>(super::mStorage.front()), adjusted);
+ }
+ // now delegate to base-class tryPopUntil_()
+ pop_result popped;
+ while ((popped = pop_result(super::tryPopUntil_(lock, adjusted, tuple))) == WAITING)
+ {
+ // If super::tryPopUntil_() returns WAITING, it means there's
+ // a head item, but it's not yet time. But it's worth looping
+ // back to recheck.
+ }
+ return popped;
+ }
+
+ /// tryPopUntil(DataTuple&)
+ template <typename Clock, typename Duration>
+ bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until,
+ DataTuple& tuple)
+ {
+ TimeTuple tt;
+ if (! tryPopUntil(until, tt))
+ return false;
+ tuple = tuple_cdr(std::move(tt));
+ return true;
+ }
+
+ /// for when Args has exactly one type
+ template <typename Clock, typename Duration>
+ bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until,
+ typename std::tuple_element<1, TimeTuple>::type& value)
+ {
+ TimeTuple tt;
+ if (! tryPopUntil(until, tt))
+ return false;
+ value = std::get<1>(std::move(tt));
+ return true;
+ }
+
+ /*------------------------------ etc. ------------------------------*/
+ // We can't hide items that aren't yet ready because we can't traverse
+ // the underlying priority_queue: it has no iterators, only top(). So
+ // a consumer could observe size() > 0 and yet tryPop() returns false.
+ // Shrug, in a multi-consumer scenario that would be expected behavior.
+ using super::size;
+ // open/closed state
+ using super::close;
+ using super::isClosed;
+ using super::done;
+
+ private:
+ // this method is called by base class pop_() every time we're
+ // considering whether to deliver the current head element
+ bool canPop(const TimeTuple& head) const override
+ {
+ // an item with a future timestamp isn't yet ready to pop
+ // (should we add some slop for overhead?)
+ return std::get<0>(head) <= Clock::now();
+ }
+ };
+
+} // namespace LL
+
+#endif /* ! defined(LL_THREADSAFESCHEDULE_H) */
diff --git a/indra/llcommon/tuple.h b/indra/llcommon/tuple.h
new file mode 100644
index 0000000000..bfe7e3c2ba
--- /dev/null
+++ b/indra/llcommon/tuple.h
@@ -0,0 +1,84 @@
+/**
+ * @file tuple.h
+ * @author Nat Goodspeed
+ * @date 2021-10-04
+ * @brief A couple tuple utilities
+ *
+ * $LicenseInfo:firstyear=2021&license=viewerlgpl$
+ * Copyright (c) 2021, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+#if ! defined(LL_TUPLE_H)
+#define LL_TUPLE_H
+
+#include <tuple>
+#include <type_traits> // std::remove_reference
+#include <utility> // std::pair
+
+/**
+ * tuple_cons() behaves like LISP cons: it uses std::tuple_cat() to prepend a
+ * new item of arbitrary type to an existing std::tuple.
+ */
+template <typename First, typename... Rest, typename Tuple_=std::tuple<Rest...>>
+auto tuple_cons(First&& first, Tuple_&& rest)
+{
+ // All we need to do is make a tuple containing 'first', and let
+ // tuple_cat() do the hard part.
+ return std::tuple_cat(std::tuple<First>(std::forward<First>(first)),
+ std::forward<Tuple_>(rest));
+}
+
+/**
+ * tuple_car() behaves like LISP car: it extracts the first item from a
+ * std::tuple.
+ */
+template <typename... Args, typename Tuple_=std::tuple<Args...>>
+auto tuple_car(Tuple_&& tuple)
+{
+ return std::get<0>(std::forward<Tuple_>(tuple));
+}
+
+/**
+ * tuple_cdr() behaves like LISP cdr: it returns a new tuple containing
+ * everything BUT the first item.
+ */
+// derived from https://stackoverflow.com/a/24046437
+template <typename Tuple, std::size_t... Indices>
+auto tuple_cdr_(Tuple&& tuple, const std::index_sequence<Indices...>)
+{
+ // Given an index sequence from [0..N-1), extract tuple items [1..N)
+ return std::make_tuple(std::get<Indices+1u>(std::forward<Tuple>(tuple))...);
+}
+
+template <typename Tuple>
+auto tuple_cdr(Tuple&& tuple)
+{
+ return tuple_cdr_(
+ std::forward<Tuple>(tuple),
+ // Pass helper function an index sequence one item shorter than tuple
+ std::make_index_sequence<
+ std::tuple_size<
+ // tuple_size doesn't like reference types
+ typename std::remove_reference<Tuple>::type
+ >::value - 1u>
+ ());
+}
+
+/**
+ * tuple_split(), the opposite of tuple_cons(), has no direct analog in LISP.
+ * It returns a std::pair of tuple_car(), tuple_cdr(). We could call this
+ * function tuple_car_cdr(), or tuple_slice() or some such. But tuple_split()
+ * feels more descriptive.
+ */
+template <typename... Args, typename Tuple_=std::tuple<Args...>>
+auto tuple_split(Tuple_&& tuple)
+{
+ // We're not really worried about forwarding multiple times a tuple that
+ // might contain move-only items, because the implementation above only
+ // applies std::get() exactly once to each item.
+ return std::make_pair(tuple_car(std::forward<Tuple_>(tuple)),
+ tuple_cdr(std::forward<Tuple_>(tuple)));
+}
+
+#endif /* ! defined(LL_TUPLE_H) */
diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp
new file mode 100644
index 0000000000..15e292fb43
--- /dev/null
+++ b/indra/llcommon/workqueue.cpp
@@ -0,0 +1,114 @@
+/**
+ * @file workqueue.cpp
+ * @author Nat Goodspeed
+ * @date 2021-10-06
+ * @brief Implementation for WorkQueue.
+ *
+ * $LicenseInfo:firstyear=2021&license=viewerlgpl$
+ * Copyright (c) 2021, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+// Precompiled header
+#include "linden_common.h"
+// associated header
+#include "workqueue.h"
+// STL headers
+// std headers
+// external library headers
+// other Linden headers
+#include "llerror.h"
+#include "llexception.h"
+#include "stringize.h"
+
+LL::WorkQueue::WorkQueue(const std::string& name):
+ super(makeName(name))
+{
+ // TODO: register for "LLApp" events so we can implicitly close() on
+ // viewer shutdown.
+}
+
+void LL::WorkQueue::close()
+{
+ mQueue.close();
+}
+
+void LL::WorkQueue::runUntilClose()
+{
+ try
+ {
+ for (;;)
+ {
+ callWork(mQueue.pop());
+ }
+ }
+ catch (const Queue::Closed&)
+ {
+ }
+}
+
+bool LL::WorkQueue::runPending()
+{
+ for (Work work; mQueue.tryPop(work); )
+ {
+ callWork(work);
+ }
+ return ! mQueue.done();
+}
+
+bool LL::WorkQueue::runOne()
+{
+ Work work;
+ if (mQueue.tryPop(work))
+ {
+ callWork(work);
+ }
+ return ! mQueue.done();
+}
+
+bool LL::WorkQueue::runUntil(const TimePoint& until)
+{
+ // Should we subtract some slop to allow for typical Work execution time?
+ // How much slop?
+ Work work;
+ while (TimePoint::clock::now() < until && mQueue.tryPopUntil(until, work))
+ {
+ callWork(work);
+ }
+ return ! mQueue.done();
+}
+
+std::string LL::WorkQueue::makeName(const std::string& name)
+{
+ if (! name.empty())
+ return name;
+
+ static thread_local U32 discriminator = 0;
+ return STRINGIZE("WorkQueue" << discriminator++);
+}
+
+void LL::WorkQueue::callWork(const Queue::DataTuple& work)
+{
+ // ThreadSafeSchedule::pop() always delivers a tuple, even when
+ // there's only one data field per item, as for us.
+ callWork(std::get<0>(work));
+}
+
+void LL::WorkQueue::callWork(const Work& work)
+{
+ try
+ {
+ work();
+ }
+ catch (...)
+ {
+ // No matter what goes wrong with any individual work item, the worker
+ // thread must go on! Log our own instance name with the exception.
+ LOG_UNHANDLED_EXCEPTION(getKey());
+ }
+}
+
+void LL::WorkQueue::error(const std::string& msg)
+{
+ LL_ERRS("WorkQueue") << msg << LL_ENDL;
+}
diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h
new file mode 100644
index 0000000000..a52f7b0e26
--- /dev/null
+++ b/indra/llcommon/workqueue.h
@@ -0,0 +1,325 @@
+/**
+ * @file workqueue.h
+ * @author Nat Goodspeed
+ * @date 2021-09-30
+ * @brief Queue used for inter-thread work passing.
+ *
+ * $LicenseInfo:firstyear=2021&license=viewerlgpl$
+ * Copyright (c) 2021, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+#if ! defined(LL_WORKQUEUE_H)
+#define LL_WORKQUEUE_H
+
+#include "llinstancetracker.h"
+#include "threadsafeschedule.h"
+#include <chrono>
+#include <functional> // std::function
+#include <queue>
+#include <string>
+#include <utility> // std::pair
+#include <vector>
+
+namespace LL
+{
+ /**
+ * A typical WorkQueue has a string name that can be used to find it.
+ */
+ class WorkQueue: public LLInstanceTracker<WorkQueue, std::string>
+ {
+ private:
+ using super = LLInstanceTracker<WorkQueue, std::string>;
+
+ public:
+ using Work = std::function<void()>;
+
+ private:
+ using Queue = ThreadSafeSchedule<Work>;
+ // helper for postEvery()
+ template <typename Rep, typename Period, typename CALLABLE>
+ class BackJack;
+
+ public:
+ using TimePoint = Queue::TimePoint;
+ using TimedWork = Queue::TimeTuple;
+ using Closed = Queue::Closed;
+
+ /**
+ * You may omit the WorkQueue name, in which case a unique name is
+ * synthesized; for practical purposes that makes it anonymous.
+ */
+ WorkQueue(const std::string& name = std::string());
+
+ /**
+ * Since the point of WorkQueue is to pass work to some other worker
+ * thread(s) asynchronously, it's important that the WorkQueue continue
+ * to exist until the worker thread(s) have drained it. To communicate
+ * that it's time for them to quit, close() the queue.
+ */
+ void close();
+
+ /*---------------------- fire and forget API -----------------------*/
+
+ /// fire-and-forget, but at a particular (future?) time
+ template <typename CALLABLE>
+ void post(const TimePoint& time, CALLABLE&& callable)
+ {
+ // Defer reifying an arbitrary CALLABLE until we hit this method.
+ // All other methods should accept CALLABLEs of arbitrary type to
+ // avoid multiple levels of std::function indirection.
+ mQueue.push(TimedWork(time, std::move(callable)));
+ }
+
+ /// fire-and-forget
+ template <typename CALLABLE>
+ void post(CALLABLE&& callable)
+ {
+ // We use TimePoint::clock::now() instead of TimePoint's
+ // representation of the epoch because this WorkQueue may contain
+ // a mix of past-due TimedWork items and TimedWork items scheduled
+ // for the future. Sift this new item into the correct place.
+ post(TimePoint::clock::now(), std::move(callable));
+ }
+
+ /**
+ * Launch a callable returning bool that will trigger repeatedly at
+ * specified interval, until the callable returns false.
+ *
+ * If you need to signal that callable from outside, DO NOT bind a
+ * reference to a simple bool! That's not thread-safe. Instead, bind
+ * an LLCond variant, e.g. LLOneShotCond or LLBoolCond.
+ */
+ template <typename Rep, typename Period, typename CALLABLE>
+ void postEvery(const std::chrono::duration<Rep, Period>& interval,
+ CALLABLE&& callable);
+
+ /*------------------------- handshake API --------------------------*/
+
+ /**
+ * Post work to another WorkQueue to be run at a specified time,
+ * requesting a specific callback to be run on this WorkQueue on
+ * completion.
+ *
+ * Returns true if able to post, false if the other WorkQueue is
+ * inaccessible.
+ */
+ template <typename CALLABLE, typename CALLBACK>
+ bool postTo(std::weak_ptr<WorkQueue> target,
+ const TimePoint& time, CALLABLE&& callable, CALLBACK&& callback)
+ {
+ // We're being asked to post to the WorkQueue at target.
+ // target is a weak_ptr: have to lock it to check it.
+ auto tptr = target.lock();
+ if (! tptr)
+ // can't post() if the target WorkQueue has been destroyed
+ return false;
+
+ // Here we believe target WorkQueue still exists. Post to it a
+ // lambda that packages our callable, our callback and a weak_ptr
+ // to this originating WorkQueue.
+ tptr->post(
+ time,
+ [reply = super::getWeak(),
+ callable = std::move(callable),
+ callback = std::move(callback)]
+ ()
+ {
+ // Call the callable in any case -- but to minimize
+ // copying the result, immediately bind it into a reply
+ // lambda. The reply lambda also binds the original
+ // callback, so that when we, the originating WorkQueue,
+ // finally receive and process the reply lambda, we'll
+ // call the bound callback with the bound result -- on the
+ // same thread that originally called postTo().
+ auto rlambda =
+ [result = callable(),
+ callback = std::move(callback)]
+ ()
+ { callback(std::move(result)); };
+ // Check if this originating WorkQueue still exists.
+ // Remember, the outer lambda is now running on a thread
+ // servicing the target WorkQueue, and real time has
+ // elapsed since postTo()'s tptr->post() call.
+ // reply is a weak_ptr: have to lock it to check it.
+ auto rptr = reply.lock();
+ if (rptr)
+ {
+ // Only post reply lambda if the originating WorkQueue
+ // still exists. If not -- who would we tell? Log it?
+ try
+ {
+ rptr->post(std::move(rlambda));
+ }
+ catch (const Closed&)
+ {
+ // Originating WorkQueue might still exist, but
+ // might be Closed. Same thing: just discard the
+ // callback.
+ }
+ }
+ });
+ // looks like we were able to post()
+ return true;
+ }
+
+ /**
+ * Post work to another WorkQueue, requesting a specific callback to
+ * be run on this WorkQueue on completion.
+ *
+ * Returns true if able to post, false if the other WorkQueue is
+ * inaccessible.
+ */
+ template <typename CALLABLE, typename CALLBACK>
+ bool postTo(std::weak_ptr<WorkQueue> target,
+ CALLABLE&& callable, CALLBACK&& callback)
+ {
+ return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback));
+ }
+
+ /*--------------------------- worker API ---------------------------*/
+
+ /**
+ * runUntilClose() pulls TimedWork items off this WorkQueue until the
+ * queue is closed, at which point it returns. This would be the
+ * typical entry point for a simple worker thread.
+ */
+ void runUntilClose();
+
+ /**
+ * runPending() runs all TimedWork items that are ready to run. It
+ * returns true if the queue remains open, false if the queue has been
+ * closed. This could be used by a thread whose primary purpose is to
+ * serve the queue, but also wants to do other things with its idle time.
+ */
+ bool runPending();
+
+ /**
+ * runOne() runs at most one ready TimedWork item -- zero if none are
+ * ready. It returns true if the queue remains open, false if the
+ * queue has been closed.
+ */
+ bool runOne();
+
+ /**
+ * runFor() runs a subset of ready TimedWork items, until the
+ * timeslice has been exceeded. It returns true if the queue remains
+ * open, false if the queue has been closed. This could be used by a
+ * busy main thread to lend a bounded few CPU cycles to this WorkQueue
+ * without risking the WorkQueue blowing out the length of any one
+ * frame.
+ */
+ template <typename Rep, typename Period>
+ bool runFor(const std::chrono::duration<Rep, Period>& timeslice)
+ {
+ return runUntil(TimePoint::clock::now() + timeslice);
+ }
+
+ /**
+ * runUntil() is just like runFor(), only with a specific end time
+ * instead of a timeslice duration.
+ */
+ bool runUntil(const TimePoint& until);
+
+ private:
+ static void error(const std::string& msg);
+ static std::string makeName(const std::string& name);
+ void callWork(const Queue::DataTuple& work);
+ void callWork(const Work& work);
+ Queue mQueue;
+ };
+
+ /**
+ * BackJack is, in effect, a hand-rolled lambda, binding a WorkQueue, a
+ * CALLABLE that returns bool, a TimePoint and an interval at which to
+ * relaunch it. As long as the callable continues returning true, BackJack
+ * keeps resubmitting it to the target WorkQueue.
+ */
+ // Why is BackJack a class and not a lambda? Because, unlike a lambda, a
+ // class method gets its own 'this' pointer -- which we need to resubmit
+ // the whole BackJack callable.
+ template <typename Rep, typename Period, typename CALLABLE>
+ class WorkQueue::BackJack
+ {
+ public:
+ // bind the desired data
+ BackJack(std::weak_ptr<WorkQueue> target,
+ const WorkQueue::TimePoint& start,
+ const std::chrono::duration<Rep, Period>& interval,
+ CALLABLE&& callable):
+ mTarget(target),
+ mStart(start),
+ mInterval(interval),
+ mCallable(std::move(callable))
+ {}
+
+ // Call by target WorkQueue -- note that although WE require a
+ // callable returning bool, WorkQueue wants a void callable. We
+ // consume the bool.
+ void operator()()
+ {
+ // If mCallable() throws an exception, don't catch it here: if it
+ // throws once, it's likely to throw every time, so it's a waste
+ // of time to arrange to call it again.
+ if (mCallable())
+ {
+ // Modify mStart to the new start time we desire. If we simply
+ // added mInterval to now, we'd get actual timings of
+ // (mInterval + slop), where 'slop' is the latency between the
+ // previous mStart and the WorkQueue actually calling us.
+ // Instead, add mInterval to mStart so that at least we
+ // register our intent to fire at exact mIntervals.
+ mStart += mInterval;
+
+ // We're being called at this moment by the target WorkQueue.
+ // Assume it still exists, rather than checking the result of
+ // lock().
+ // Resubmit the whole *this callable: that's why we're a class
+ // rather than a lambda. Allow moving *this so we can carry a
+ // move-only callable; but naturally this statement must be
+ // the last time we reference this instance, which may become
+ // moved-from.
+ try
+ {
+ mTarget.lock()->post(mStart, std::move(*this));
+ }
+ catch (const Closed&)
+ {
+ // Once this queue is closed, oh well, just stop
+ }
+ }
+ }
+
+ private:
+ std::weak_ptr<WorkQueue> mTarget;
+ WorkQueue::TimePoint mStart;
+ std::chrono::duration<Rep, Period> mInterval;
+ CALLABLE mCallable;
+ };
+
+ template <typename Rep, typename Period, typename CALLABLE>
+ void WorkQueue::postEvery(const std::chrono::duration<Rep, Period>& interval,
+ CALLABLE&& callable)
+ {
+ if (interval.count() <= 0)
+ {
+ // It's essential that postEvery() be called with a positive
+ // interval, since each call to BackJack posts another instance of
+ // itself at (start + interval) and we order by target time. A
+ // zero or negative interval would result in that BackJack
+ // instance going to the head of the queue every time, immediately
+ // ready to run. Effectively that would produce an infinite loop,
+ // a denial of service on this WorkQueue.
+ error("postEvery(interval) may not be 0");
+ }
+ // Instantiate and post a suitable BackJack, binding a weak_ptr to
+ // self, the current time, the desired interval and the desired
+ // callable.
+ post(
+ BackJack<Rep, Period, CALLABLE>(
+ getWeak(), TimePoint::clock::now(), interval, std::move(callable)));
+ }
+
+} // namespace LL
+
+#endif /* ! defined(LL_WORKQUEUE_H) */