summaryrefslogtreecommitdiff
path: root/indra/llcommon/threadsafeschedule.h
diff options
context:
space:
mode:
authorDave Houlton <euclid@lindenlab.com>2022-05-26 11:18:44 -0600
committerDave Houlton <euclid@lindenlab.com>2022-05-26 11:18:44 -0600
commit02c71b0ac2f99dd1c26a649ffce2182b2fc9a7d9 (patch)
tree4255c2827cc3e07921275bfd7937f4f43f29d5fa /indra/llcommon/threadsafeschedule.h
parent3f58ec2fdfb76ce2160884a3e97be49f60b6ac90 (diff)
parentcdbd06e8ed6e3f4285a61f5c0b607a65dfdf8dfd (diff)
Merge branch 'master' v 6.6.1 into DRTVWR-528
Diffstat (limited to 'indra/llcommon/threadsafeschedule.h')
-rw-r--r--indra/llcommon/threadsafeschedule.h399
1 files changed, 399 insertions, 0 deletions
diff --git a/indra/llcommon/threadsafeschedule.h b/indra/llcommon/threadsafeschedule.h
new file mode 100644
index 0000000000..3e0da94c02
--- /dev/null
+++ b/indra/llcommon/threadsafeschedule.h
@@ -0,0 +1,399 @@
+/**
+ * @file threadsafeschedule.h
+ * @author Nat Goodspeed
+ * @date 2021-10-02
+ * @brief ThreadSafeSchedule is an ordered queue in which every item has an
+ * associated timestamp.
+ *
+ * $LicenseInfo:firstyear=2021&license=viewerlgpl$
+ * Copyright (c) 2021, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+#if ! defined(LL_THREADSAFESCHEDULE_H)
+#define LL_THREADSAFESCHEDULE_H
+
+#include "chrono.h"
+#include "llexception.h"
+#include "llthreadsafequeue.h"
+#include "tuple.h"
+#include <chrono>
+#include <tuple>
+
+namespace LL
+{
+ namespace ThreadSafeSchedulePrivate
+ {
+ using TimePoint = std::chrono::steady_clock::time_point;
+ // Bundle consumer's data with a TimePoint to order items by timestamp.
+ template <typename... Args>
+ using TimestampedTuple = std::tuple<TimePoint, Args...>;
+
+ // comparison functor for TimedTuples -- see TimedQueue comments
+ struct ReverseTupleOrder
+ {
+ template <typename Tuple>
+ bool operator()(const Tuple& left, const Tuple& right) const
+ {
+ return std::get<0>(left) > std::get<0>(right);
+ }
+ };
+
+ template <typename... Args>
+ using TimedQueue = PriorityQueueAdapter<
+ TimestampedTuple<Args...>,
+ // std::vector is the default storage for std::priority_queue,
+ // have to restate to specify comparison template parameter
+ std::vector<TimestampedTuple<Args...>>,
+ // std::priority_queue uses a counterintuitive comparison
+ // behavior: the default std::less comparator is used to present
+ // the *highest* value as top(). So to sort by earliest timestamp,
+ // we must invert by using >.
+ ReverseTupleOrder>;
+ } // namespace ThreadSafeSchedulePrivate
+
+ /**
+ * ThreadSafeSchedule is an ordered LLThreadSafeQueue in which every item
+ * is given an associated timestamp. That is, TimePoint is implicitly
+ * prepended to the std::tuple with the specified types.
+ *
+ * Items are popped in increasing chronological order. Moreover, any item
+ * with a timestamp in the future is held back until
+ * std::chrono::steady_clock reaches that timestamp.
+ */
+ template <typename... Args>
+ class ThreadSafeSchedule:
+ public LLThreadSafeQueue<ThreadSafeSchedulePrivate::TimestampedTuple<Args...>,
+ ThreadSafeSchedulePrivate::TimedQueue<Args...>>
+ {
+ public:
+ using DataTuple = std::tuple<Args...>;
+ using TimeTuple = ThreadSafeSchedulePrivate::TimestampedTuple<Args...>;
+
+ private:
+ using super = LLThreadSafeQueue<TimeTuple, ThreadSafeSchedulePrivate::TimedQueue<Args...>>;
+ using lock_t = typename super::lock_t;
+ // VS 2017 needs this due to a bug:
+ // https://developercommunity.visualstudio.com/t/cannot-access-protected-enumerator-of-enclosing-cl/203430
+ enum pop_result { EMPTY=super::EMPTY, DONE=super::DONE, WAITING=super::WAITING, POPPED=super::POPPED };
+
+ public:
+ using Closed = LLThreadSafeQueueInterrupt;
+ using TimePoint = ThreadSafeSchedulePrivate::TimePoint;
+ using Clock = TimePoint::clock;
+
+ ThreadSafeSchedule(U32 capacity=1024):
+ super(capacity)
+ {}
+
+ /*----------------------------- push() -----------------------------*/
+ /// explicitly pass TimeTuple
+ using super::push;
+
+ /// pass DataTuple with implicit now
+ // This could be ambiguous for Args with a single type. Unfortunately
+ // we can't enable_if an individual method with a condition based on
+ // the *class* template arguments, only on that method's template
+ // arguments. We could specialize this class for the single-Args case;
+ // we could minimize redundancy by breaking out a common base class...
+ void push(const DataTuple& tuple)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ push(tuple_cons(Clock::now(), tuple));
+ }
+
+ /// individually pass each component of the TimeTuple
+ void push(const TimePoint& time, Args&&... args)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ push(TimeTuple(time, std::forward<Args>(args)...));
+ }
+
+ /// individually pass every component except the TimePoint (implies now)
+ // This could be ambiguous if the first specified template parameter
+ // type is also TimePoint. We could try to disambiguate, but a simpler
+ // approach would be for the caller to explicitly construct DataTuple
+ // and call that overload.
+ void push(Args&&... args)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ push(Clock::now(), std::forward<Args>(args)...);
+ }
+
+ /*--------------------------- tryPush() ----------------------------*/
+ /// explicit TimeTuple
+ using super::tryPush;
+
+ /// DataTuple with implicit now
+ bool tryPush(const DataTuple& tuple)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ return tryPush(tuple_cons(Clock::now(), tuple));
+ }
+
+ /// individually pass components
+ bool tryPush(const TimePoint& time, Args&&... args)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ return tryPush(TimeTuple(time, std::forward<Args>(args)...));
+ }
+
+ /// individually pass components with implicit now
+ bool tryPush(Args&&... args)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ return tryPush(Clock::now(), std::forward<Args>(args)...);
+ }
+
+ /*-------------------------- tryPushFor() --------------------------*/
+ /// explicit TimeTuple
+ using super::tryPushFor;
+
+ /// DataTuple with implicit now
+ template <typename Rep, typename Period>
+ bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout,
+ const DataTuple& tuple)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ return tryPushFor(timeout, tuple_cons(Clock::now(), tuple));
+ }
+
+ /// individually pass components
+ template <typename Rep, typename Period>
+ bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout,
+ const TimePoint& time, Args&&... args)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ return tryPushFor(TimeTuple(time, std::forward<Args>(args)...));
+ }
+
+ /// individually pass components with implicit now
+ template <typename Rep, typename Period>
+ bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout,
+ Args&&... args)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ return tryPushFor(Clock::now(), std::forward<Args>(args)...);
+ }
+
+ /*------------------------- tryPushUntil() -------------------------*/
+ /// explicit TimeTuple
+ using super::tryPushUntil;
+
+ /// DataTuple with implicit now
+ template <typename Clock, typename Duration>
+ bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until,
+ const DataTuple& tuple)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ return tryPushUntil(until, tuple_cons(Clock::now(), tuple));
+ }
+
+ /// individually pass components
+ template <typename Clock, typename Duration>
+ bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until,
+ const TimePoint& time, Args&&... args)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ return tryPushUntil(until, TimeTuple(time, std::forward<Args>(args)...));
+ }
+
+ /// individually pass components with implicit now
+ template <typename Clock, typename Duration>
+ bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until,
+ Args&&... args)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ return tryPushUntil(until, Clock::now(), std::forward<Args>(args)...);
+ }
+
+ /*----------------------------- pop() ------------------------------*/
+ // Our consumer may or may not care about the timestamp associated
+ // with each popped item, so we allow retrieving either DataTuple or
+ // TimeTuple. One potential use would be to observe, and possibly
+ // adjust for, the time lag between the item time and the actual
+ // current time.
+
+ /// pop DataTuple by value
+ // It would be great to notice when sizeof...(Args) == 1 and directly
+ // return the first (only) value, instead of making pop()'s caller
+ // call std::get<0>(value). See push(DataTuple) remarks for why we
+ // haven't yet jumped through those hoops.
+ DataTuple pop()
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ return tuple_cdr(popWithTime());
+ }
+
+ /// pop TimeTuple by value
+ TimeTuple popWithTime()
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ lock_t lock(super::mLock);
+ // We can't just sit around waiting forever, given that there may
+ // be items in the queue that are not yet ready but will *become*
+ // ready in the near future. So in fact, with this class, every
+ // pop() becomes a tryPopUntil(), constrained to the timestamp of
+ // the head item. It almost doesn't matter what we specify for the
+ // caller's time constraint -- all we really care about is the
+ // head item's timestamp. Since pop() and popWithTime() are
+ // defined to wait until either an item becomes available or the
+ // queue is closed, loop until one of those things happens. The
+ // constraint we pass just determines how often we'll loop while
+ // waiting.
+ TimeTuple tt;
+ while (true)
+ {
+ // Pick a point suitably far into the future.
+ TimePoint until = TimePoint::clock::now() + std::chrono::hours(24);
+ pop_result popped = tryPopUntil_(lock, until, tt);
+ if (popped == POPPED)
+ return std::move(tt);
+
+ // DONE: throw, just as super::pop() does
+ if (popped == DONE)
+ {
+ LLTHROW(LLThreadSafeQueueInterrupt());
+ }
+ // WAITING: we've still got items to drain.
+ // EMPTY: not closed, so it's worth waiting for more items.
+ // Either way, loop back to wait.
+ }
+ }
+
+ // We can use tryPop(TimeTuple&) just as it stands; the only behavior
+ // difference is in our canPop() override method.
+ using super::tryPop;
+
+ /// tryPop(DataTuple&)
+ bool tryPop(DataTuple& tuple)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ TimeTuple tt;
+ if (! super::tryPop(tt))
+ return false;
+ tuple = tuple_cdr(std::move(tt));
+ return true;
+ }
+
+ /// for when Args has exactly one type
+ bool tryPop(typename std::tuple_element<1, TimeTuple>::type& value)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ TimeTuple tt;
+ if (! super::tryPop(tt))
+ return false;
+ value = std::get<1>(std::move(tt));
+ return true;
+ }
+
+ /// tryPopFor()
+ template <typename Rep, typename Period, typename Tuple>
+ bool tryPopFor(const std::chrono::duration<Rep, Period>& timeout, Tuple& tuple)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ // It's important to use OUR tryPopUntil() implementation, rather
+ // than delegating immediately to our base class.
+ return tryPopUntil(Clock::now() + timeout, tuple);
+ }
+
+ /// tryPopUntil(TimeTuple&)
+ template <typename Clock, typename Duration>
+ bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until,
+ TimeTuple& tuple)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ // super::tryPopUntil() wakes up when an item becomes available or
+ // we hit 'until', whichever comes first. Thing is, the current
+ // head of the queue could become ready sooner than either of
+ // those events, and we need to deliver it as soon as it does.
+ // Don't wait past the TimePoint of the head item.
+ // Naturally, lock the queue before peeking at mStorage.
+ return super::tryLockUntil(
+ until,
+ [this, until, &tuple](lock_t& lock)
+ {
+ // Use our time_point_cast to allow for 'until' that's a
+ // time_point type other than TimePoint.
+ return POPPED ==
+ tryPopUntil_(lock, LL::time_point_cast<TimePoint>(until), tuple);
+ });
+ }
+
+ pop_result tryPopUntil_(lock_t& lock, const TimePoint& until, TimeTuple& tuple)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ TimePoint adjusted = until;
+ if (! super::mStorage.empty())
+ {
+ LL_PROFILE_ZONE_NAMED("tpu - adjust");
+ // use whichever is earlier: the head item's timestamp, or
+ // the caller's limit
+ adjusted = min(std::get<0>(super::mStorage.front()), adjusted);
+ }
+ // now delegate to base-class tryPopUntil_()
+ pop_result popped;
+ {
+ LL_PROFILE_ZONE_NAMED("tpu - super");
+ while ((popped = pop_result(super::tryPopUntil_(lock, adjusted, tuple))) == WAITING)
+ {
+ // If super::tryPopUntil_() returns WAITING, it means there's
+ // a head item, but it's not yet time. But it's worth looping
+ // back to recheck.
+ }
+ }
+ return popped;
+ }
+
+ /// tryPopUntil(DataTuple&)
+ template <typename Clock, typename Duration>
+ bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until,
+ DataTuple& tuple)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ TimeTuple tt;
+ if (! tryPopUntil(until, tt))
+ return false;
+ tuple = tuple_cdr(std::move(tt));
+ return true;
+ }
+
+ /// for when Args has exactly one type
+ template <typename Clock, typename Duration>
+ bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until,
+ typename std::tuple_element<1, TimeTuple>::type& value)
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ TimeTuple tt;
+ if (! tryPopUntil(until, tt))
+ return false;
+ value = std::get<1>(std::move(tt));
+ return true;
+ }
+
+ /*------------------------------ etc. ------------------------------*/
+ // We can't hide items that aren't yet ready because we can't traverse
+ // the underlying priority_queue: it has no iterators, only top(). So
+ // a consumer could observe size() > 0 and yet tryPop() returns false.
+ // Shrug, in a multi-consumer scenario that would be expected behavior.
+ using super::size;
+ // open/closed state
+ using super::close;
+ using super::isClosed;
+ using super::done;
+
+ private:
+ // this method is called by base class pop_() every time we're
+ // considering whether to deliver the current head element
+ bool canPop(const TimeTuple& head) const override
+ {
+ LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
+ // an item with a future timestamp isn't yet ready to pop
+ // (should we add some slop for overhead?)
+ return std::get<0>(head) <= Clock::now();
+ }
+ };
+
+} // namespace LL
+
+#endif /* ! defined(LL_THREADSAFESCHEDULE_H) */