diff options
author | Euclid Linden <euclid@lindenlab.com> | 2022-05-27 21:15:49 +0000 |
---|---|---|
committer | Euclid Linden <euclid@lindenlab.com> | 2022-05-27 21:15:49 +0000 |
commit | 3f98411c56f4daa06c9102346a8dd37af18d2cb6 (patch) | |
tree | 4255c2827cc3e07921275bfd7937f4f43f29d5fa /indra/llcommon/threadsafeschedule.h | |
parent | 3f58ec2fdfb76ce2160884a3e97be49f60b6ac90 (diff) | |
parent | 02c71b0ac2f99dd1c26a649ffce2182b2fc9a7d9 (diff) |
Merged in DV528-merge-6.6.1 (pull request #1000)
DRTVWR-528 merge up to 6.6.1
Diffstat (limited to 'indra/llcommon/threadsafeschedule.h')
-rw-r--r-- | indra/llcommon/threadsafeschedule.h | 399 |
1 files changed, 399 insertions, 0 deletions
diff --git a/indra/llcommon/threadsafeschedule.h b/indra/llcommon/threadsafeschedule.h new file mode 100644 index 0000000000..3e0da94c02 --- /dev/null +++ b/indra/llcommon/threadsafeschedule.h @@ -0,0 +1,399 @@ +/** + * @file threadsafeschedule.h + * @author Nat Goodspeed + * @date 2021-10-02 + * @brief ThreadSafeSchedule is an ordered queue in which every item has an + * associated timestamp. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +#if ! defined(LL_THREADSAFESCHEDULE_H) +#define LL_THREADSAFESCHEDULE_H + +#include "chrono.h" +#include "llexception.h" +#include "llthreadsafequeue.h" +#include "tuple.h" +#include <chrono> +#include <tuple> + +namespace LL +{ + namespace ThreadSafeSchedulePrivate + { + using TimePoint = std::chrono::steady_clock::time_point; + // Bundle consumer's data with a TimePoint to order items by timestamp. + template <typename... Args> + using TimestampedTuple = std::tuple<TimePoint, Args...>; + + // comparison functor for TimedTuples -- see TimedQueue comments + struct ReverseTupleOrder + { + template <typename Tuple> + bool operator()(const Tuple& left, const Tuple& right) const + { + return std::get<0>(left) > std::get<0>(right); + } + }; + + template <typename... Args> + using TimedQueue = PriorityQueueAdapter< + TimestampedTuple<Args...>, + // std::vector is the default storage for std::priority_queue, + // have to restate to specify comparison template parameter + std::vector<TimestampedTuple<Args...>>, + // std::priority_queue uses a counterintuitive comparison + // behavior: the default std::less comparator is used to present + // the *highest* value as top(). So to sort by earliest timestamp, + // we must invert by using >. + ReverseTupleOrder>; + } // namespace ThreadSafeSchedulePrivate + + /** + * ThreadSafeSchedule is an ordered LLThreadSafeQueue in which every item + * is given an associated timestamp. That is, TimePoint is implicitly + * prepended to the std::tuple with the specified types. + * + * Items are popped in increasing chronological order. Moreover, any item + * with a timestamp in the future is held back until + * std::chrono::steady_clock reaches that timestamp. + */ + template <typename... Args> + class ThreadSafeSchedule: + public LLThreadSafeQueue<ThreadSafeSchedulePrivate::TimestampedTuple<Args...>, + ThreadSafeSchedulePrivate::TimedQueue<Args...>> + { + public: + using DataTuple = std::tuple<Args...>; + using TimeTuple = ThreadSafeSchedulePrivate::TimestampedTuple<Args...>; + + private: + using super = LLThreadSafeQueue<TimeTuple, ThreadSafeSchedulePrivate::TimedQueue<Args...>>; + using lock_t = typename super::lock_t; + // VS 2017 needs this due to a bug: + // https://developercommunity.visualstudio.com/t/cannot-access-protected-enumerator-of-enclosing-cl/203430 + enum pop_result { EMPTY=super::EMPTY, DONE=super::DONE, WAITING=super::WAITING, POPPED=super::POPPED }; + + public: + using Closed = LLThreadSafeQueueInterrupt; + using TimePoint = ThreadSafeSchedulePrivate::TimePoint; + using Clock = TimePoint::clock; + + ThreadSafeSchedule(U32 capacity=1024): + super(capacity) + {} + + /*----------------------------- push() -----------------------------*/ + /// explicitly pass TimeTuple + using super::push; + + /// pass DataTuple with implicit now + // This could be ambiguous for Args with a single type. Unfortunately + // we can't enable_if an individual method with a condition based on + // the *class* template arguments, only on that method's template + // arguments. We could specialize this class for the single-Args case; + // we could minimize redundancy by breaking out a common base class... + void push(const DataTuple& tuple) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + push(tuple_cons(Clock::now(), tuple)); + } + + /// individually pass each component of the TimeTuple + void push(const TimePoint& time, Args&&... args) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + push(TimeTuple(time, std::forward<Args>(args)...)); + } + + /// individually pass every component except the TimePoint (implies now) + // This could be ambiguous if the first specified template parameter + // type is also TimePoint. We could try to disambiguate, but a simpler + // approach would be for the caller to explicitly construct DataTuple + // and call that overload. + void push(Args&&... args) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + push(Clock::now(), std::forward<Args>(args)...); + } + + /*--------------------------- tryPush() ----------------------------*/ + /// explicit TimeTuple + using super::tryPush; + + /// DataTuple with implicit now + bool tryPush(const DataTuple& tuple) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + return tryPush(tuple_cons(Clock::now(), tuple)); + } + + /// individually pass components + bool tryPush(const TimePoint& time, Args&&... args) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + return tryPush(TimeTuple(time, std::forward<Args>(args)...)); + } + + /// individually pass components with implicit now + bool tryPush(Args&&... args) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + return tryPush(Clock::now(), std::forward<Args>(args)...); + } + + /*-------------------------- tryPushFor() --------------------------*/ + /// explicit TimeTuple + using super::tryPushFor; + + /// DataTuple with implicit now + template <typename Rep, typename Period> + bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout, + const DataTuple& tuple) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + return tryPushFor(timeout, tuple_cons(Clock::now(), tuple)); + } + + /// individually pass components + template <typename Rep, typename Period> + bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout, + const TimePoint& time, Args&&... args) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + return tryPushFor(TimeTuple(time, std::forward<Args>(args)...)); + } + + /// individually pass components with implicit now + template <typename Rep, typename Period> + bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout, + Args&&... args) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + return tryPushFor(Clock::now(), std::forward<Args>(args)...); + } + + /*------------------------- tryPushUntil() -------------------------*/ + /// explicit TimeTuple + using super::tryPushUntil; + + /// DataTuple with implicit now + template <typename Clock, typename Duration> + bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until, + const DataTuple& tuple) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + return tryPushUntil(until, tuple_cons(Clock::now(), tuple)); + } + + /// individually pass components + template <typename Clock, typename Duration> + bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until, + const TimePoint& time, Args&&... args) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + return tryPushUntil(until, TimeTuple(time, std::forward<Args>(args)...)); + } + + /// individually pass components with implicit now + template <typename Clock, typename Duration> + bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until, + Args&&... args) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + return tryPushUntil(until, Clock::now(), std::forward<Args>(args)...); + } + + /*----------------------------- pop() ------------------------------*/ + // Our consumer may or may not care about the timestamp associated + // with each popped item, so we allow retrieving either DataTuple or + // TimeTuple. One potential use would be to observe, and possibly + // adjust for, the time lag between the item time and the actual + // current time. + + /// pop DataTuple by value + // It would be great to notice when sizeof...(Args) == 1 and directly + // return the first (only) value, instead of making pop()'s caller + // call std::get<0>(value). See push(DataTuple) remarks for why we + // haven't yet jumped through those hoops. + DataTuple pop() + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + return tuple_cdr(popWithTime()); + } + + /// pop TimeTuple by value + TimeTuple popWithTime() + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + lock_t lock(super::mLock); + // We can't just sit around waiting forever, given that there may + // be items in the queue that are not yet ready but will *become* + // ready in the near future. So in fact, with this class, every + // pop() becomes a tryPopUntil(), constrained to the timestamp of + // the head item. It almost doesn't matter what we specify for the + // caller's time constraint -- all we really care about is the + // head item's timestamp. Since pop() and popWithTime() are + // defined to wait until either an item becomes available or the + // queue is closed, loop until one of those things happens. The + // constraint we pass just determines how often we'll loop while + // waiting. + TimeTuple tt; + while (true) + { + // Pick a point suitably far into the future. + TimePoint until = TimePoint::clock::now() + std::chrono::hours(24); + pop_result popped = tryPopUntil_(lock, until, tt); + if (popped == POPPED) + return std::move(tt); + + // DONE: throw, just as super::pop() does + if (popped == DONE) + { + LLTHROW(LLThreadSafeQueueInterrupt()); + } + // WAITING: we've still got items to drain. + // EMPTY: not closed, so it's worth waiting for more items. + // Either way, loop back to wait. + } + } + + // We can use tryPop(TimeTuple&) just as it stands; the only behavior + // difference is in our canPop() override method. + using super::tryPop; + + /// tryPop(DataTuple&) + bool tryPop(DataTuple& tuple) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + TimeTuple tt; + if (! super::tryPop(tt)) + return false; + tuple = tuple_cdr(std::move(tt)); + return true; + } + + /// for when Args has exactly one type + bool tryPop(typename std::tuple_element<1, TimeTuple>::type& value) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + TimeTuple tt; + if (! super::tryPop(tt)) + return false; + value = std::get<1>(std::move(tt)); + return true; + } + + /// tryPopFor() + template <typename Rep, typename Period, typename Tuple> + bool tryPopFor(const std::chrono::duration<Rep, Period>& timeout, Tuple& tuple) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + // It's important to use OUR tryPopUntil() implementation, rather + // than delegating immediately to our base class. + return tryPopUntil(Clock::now() + timeout, tuple); + } + + /// tryPopUntil(TimeTuple&) + template <typename Clock, typename Duration> + bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until, + TimeTuple& tuple) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + // super::tryPopUntil() wakes up when an item becomes available or + // we hit 'until', whichever comes first. Thing is, the current + // head of the queue could become ready sooner than either of + // those events, and we need to deliver it as soon as it does. + // Don't wait past the TimePoint of the head item. + // Naturally, lock the queue before peeking at mStorage. + return super::tryLockUntil( + until, + [this, until, &tuple](lock_t& lock) + { + // Use our time_point_cast to allow for 'until' that's a + // time_point type other than TimePoint. + return POPPED == + tryPopUntil_(lock, LL::time_point_cast<TimePoint>(until), tuple); + }); + } + + pop_result tryPopUntil_(lock_t& lock, const TimePoint& until, TimeTuple& tuple) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + TimePoint adjusted = until; + if (! super::mStorage.empty()) + { + LL_PROFILE_ZONE_NAMED("tpu - adjust"); + // use whichever is earlier: the head item's timestamp, or + // the caller's limit + adjusted = min(std::get<0>(super::mStorage.front()), adjusted); + } + // now delegate to base-class tryPopUntil_() + pop_result popped; + { + LL_PROFILE_ZONE_NAMED("tpu - super"); + while ((popped = pop_result(super::tryPopUntil_(lock, adjusted, tuple))) == WAITING) + { + // If super::tryPopUntil_() returns WAITING, it means there's + // a head item, but it's not yet time. But it's worth looping + // back to recheck. + } + } + return popped; + } + + /// tryPopUntil(DataTuple&) + template <typename Clock, typename Duration> + bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until, + DataTuple& tuple) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + TimeTuple tt; + if (! tryPopUntil(until, tt)) + return false; + tuple = tuple_cdr(std::move(tt)); + return true; + } + + /// for when Args has exactly one type + template <typename Clock, typename Duration> + bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until, + typename std::tuple_element<1, TimeTuple>::type& value) + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + TimeTuple tt; + if (! tryPopUntil(until, tt)) + return false; + value = std::get<1>(std::move(tt)); + return true; + } + + /*------------------------------ etc. ------------------------------*/ + // We can't hide items that aren't yet ready because we can't traverse + // the underlying priority_queue: it has no iterators, only top(). So + // a consumer could observe size() > 0 and yet tryPop() returns false. + // Shrug, in a multi-consumer scenario that would be expected behavior. + using super::size; + // open/closed state + using super::close; + using super::isClosed; + using super::done; + + private: + // this method is called by base class pop_() every time we're + // considering whether to deliver the current head element + bool canPop(const TimeTuple& head) const override + { + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; + // an item with a future timestamp isn't yet ready to pop + // (should we add some slop for overhead?) + return std::get<0>(head) <= Clock::now(); + } + }; + +} // namespace LL + +#endif /* ! defined(LL_THREADSAFESCHEDULE_H) */ |