diff options
author | Nat Goodspeed <nat@lindenlab.com> | 2021-10-05 17:31:53 -0400 |
---|---|---|
committer | Nat Goodspeed <nat@lindenlab.com> | 2021-10-05 17:31:53 -0400 |
commit | 955b967623983cb50ba09f7b82e5f01f2c6bcebb (patch) | |
tree | 069161d4519c50c807e943c9f57e833d7ccee6be /indra | |
parent | a35e266547e4d2c8dbd6b003c64b719d91eaaf87 (diff) |
SL-16024: Add ThreadSafeSchedule, a timestamped LLThreadSafeQueue.
ThreadSafeSchedule orders its items by timestamp, which can be passed either
implicitly or explicitly. The timestamp specifies earliest delivery time: an
item cannot be popped until that time.
Add initial tests.
Tweak the LLThreadSafeQueue base class to support ThreadSafeSchedule:
introduce virtual canPop() method to report whether the current head item is
available to pop. The base class unconditionally says yes, ThreadSafeSchedule
says it depends on whether its timestamp is still in the future.
This replaces the protected pop_() overload accepting a predicate. Rather than
explicitly passing a predicate through a couple levels of function call, use
canPop() at the level it matters. Runtime behavior that varies depending on
an object's leaf class is what virtual functions were invented for.
Give pop_() a three-state enum return so pop() can distinguish between "closed
and empty" (throws exception) versus "closed, not yet drained because we're
not yet ready to pop the head item" (waits).
Also break out protected tryPopUntil_() method, the body logic of
tryPopUntil(). The public method locks the data structure, the protected
method requires that its caller has already done so.
Add chrono.h with a more full-featured LL::time_point_cast() function than the
one found in <chrono>, which only converts between time_point durations, not
between time_points based on different clocks.
Diffstat (limited to 'indra')
-rw-r--r-- | indra/llcommon/CMakeLists.txt | 3 | ||||
-rw-r--r-- | indra/llcommon/chrono.h | 65 | ||||
-rw-r--r-- | indra/llcommon/llthreadsafequeue.h | 121 | ||||
-rw-r--r-- | indra/llcommon/tests/threadsafeschedule_test.cpp | 65 | ||||
-rw-r--r-- | indra/llcommon/threadsafeschedule.h | 334 |
5 files changed, 535 insertions, 53 deletions
diff --git a/indra/llcommon/CMakeLists.txt b/indra/llcommon/CMakeLists.txt index 6558219462..5efcfabf24 100644 --- a/indra/llcommon/CMakeLists.txt +++ b/indra/llcommon/CMakeLists.txt @@ -127,6 +127,7 @@ set(llcommon_SOURCE_FILES set(llcommon_HEADER_FILES CMakeLists.txt + chrono.h ctype_workaround.h fix_macros.h indra_constants.h @@ -253,6 +254,7 @@ set(llcommon_HEADER_FILES lockstatic.h stdtypes.h stringize.h + threadsafeschedule.h timer.h tuple.h u64.h @@ -359,6 +361,7 @@ if (LL_TESTS) LL_ADD_INTEGRATION_TEST(lluri "" "${test_libs}") LL_ADD_INTEGRATION_TEST(llunits "" "${test_libs}") LL_ADD_INTEGRATION_TEST(stringize "" "${test_libs}") + LL_ADD_INTEGRATION_TEST(threadsafeschedule "" "${test_libs}") LL_ADD_INTEGRATION_TEST(tuple "" "${test_libs}") ## llexception_test.cpp isn't a regression test, and doesn't need to be run diff --git a/indra/llcommon/chrono.h b/indra/llcommon/chrono.h new file mode 100644 index 0000000000..806e871892 --- /dev/null +++ b/indra/llcommon/chrono.h @@ -0,0 +1,65 @@ +/** + * @file chrono.h + * @author Nat Goodspeed + * @date 2021-10-05 + * @brief supplement <chrono> with utility functions + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +#if ! defined(LL_CHRONO_H) +#define LL_CHRONO_H + +#include <chrono> +#include <type_traits> // std::enable_if + +namespace LL +{ + +// time_point_cast() is derived from https://stackoverflow.com/a/35293183 +// without the iteration: we think errors in the ~1 microsecond range are +// probably acceptable. + +// This variant is for the optimal case when the source and dest use the same +// clock: that case is handled by std::chrono. +template <typename DestTimePoint, typename SrcTimePoint, + typename std::enable_if<std::is_same<typename DestTimePoint::clock, + typename SrcTimePoint::clock>::value, + bool>::type = true> +DestTimePoint time_point_cast(const SrcTimePoint& time) +{ + return std::chrono::time_point_cast<typename DestTimePoint::duration>(time); +} + +// This variant is for when the source and dest use different clocks -- see +// the linked StackOverflow answer, also Howard Hinnant's, for more context. +template <typename DestTimePoint, typename SrcTimePoint, + typename std::enable_if<! std::is_same<typename DestTimePoint::clock, + typename SrcTimePoint::clock>::value, + bool>::type = true> +DestTimePoint time_point_cast(const SrcTimePoint& time) +{ + // The basic idea is that we must adjust the passed time_point by the + // difference between the clocks' epochs. But since time_point doesn't + // expose its epoch, we fall back on what each of them thinks is now(). + // However, since we necessarily make sequential calls to those now() + // functions, the answers differ not only by the cycles spent executing + // those calls, but by potential OS interruptions between them. Try to + // reduce that error by capturing the source clock time both before and + // after the dest clock, and splitting the difference. Of course an + // interruption between two of these now() calls without a comparable + // interruption between the other two will skew the result, but better is + // more expensive. + const auto src_before = typename SrcTimePoint::clock::now(); + const auto dest_now = typename DestTimePoint::clock::now(); + const auto src_after = typename SrcTimePoint::clock::now(); + const auto src_diff = src_after - src_before; + const auto src_now = src_before + src_diff / 2; + return dest_now + (time - src_now); +} + +} // namespace LL + +#endif /* ! defined(LL_CHRONO_H) */ diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 1dffad6b89..bd2d82d4c3 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -83,6 +83,7 @@ public: // Limiting the number of pending items prevents unbounded growth of the // underlying queue. LLThreadSafeQueue(U32 capacity = 1024); + virtual ~LLThreadSafeQueue() {} // Add an element to the queue (will block if the queue has // reached capacity). @@ -162,10 +163,10 @@ public: // then every subsequent tryPop() call will return false void close(); - // detect closed state + // producer end: are we prevented from pushing any additional items? bool isClosed(); - // inverse of isClosed() - explicit operator bool(); + // consumer end: are we done, is the queue entirely drained? + bool done(); protected: typedef QueueT queue_type; @@ -178,6 +179,11 @@ protected: boost::fibers::condition_variable_any mCapacityCond; boost::fibers::condition_variable_any mEmptyCond; + // implementation logic, suitable for passing to tryLockUntil() + template <typename Clock, typename Duration> + bool tryPopUntil_(lock_t& lock, + const std::chrono::time_point<Clock, Duration>& until, + ElementT& element); // if we're able to lock immediately, do so and run the passed callable, // which must accept lock_t& and return bool template <typename CALLABLE> @@ -191,11 +197,11 @@ protected: template <typename T> bool push_(lock_t& lock, T&& element); // while lock is locked, really pop the head element, if we can - bool pop_(lock_t& lock, ElementT& element); - // pop_() with an explicit predicate indicating whether the head element - // is ready to be popped - template <typename PRED> - bool pop_(lock_t& lock, ElementT& element, PRED&& pred); + enum pop_result { EMPTY, WAITING, POPPED }; + pop_result pop_(lock_t& lock, ElementT& element); + // Is the current head element ready to pop? We say yes; subclass can + // override as needed. + virtual bool canPop(const ElementT& head) const { return true; } }; /***************************************************************************** @@ -387,26 +393,16 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPushUntil( // while lock is locked, really pop the head element, if we can template <typename ElementT, typename QueueT> -bool LLThreadSafeQueue<ElementT, QueueT>::pop_(lock_t& lock, ElementT& element) -{ - // default predicate: head element, if present, is always ready to pop - return pop_(lock, element, [](const ElementT&){ return true; }); -} - - -// pop_() with an explicit predicate indicating whether the head element -// is ready to be popped -template <typename ElementT, typename QueueT> -template <typename PRED> -bool LLThreadSafeQueue<ElementT, QueueT>::pop_( - lock_t& lock, ElementT& element, PRED&& pred) +typename LLThreadSafeQueue<ElementT, QueueT>::pop_result +LLThreadSafeQueue<ElementT, QueueT>::pop_(lock_t& lock, ElementT& element) { // If mStorage is empty, there's no head element. - // If there's a head element, pass it to the predicate to see if caller - // considers it ready to pop. - // Unless both are satisfied, no point in continuing. - if (mStorage.empty() || ! std::forward<PRED>(pred)(mStorage.front())) - return false; + if (mStorage.empty()) + return EMPTY; + + // If there's a head element, pass it to canPop() to see if it's ready to pop. + if (! canPop(mStorage.front())) + return WAITING; // std::queue::front() is the element about to pop() element = mStorage.front(); @@ -414,7 +410,7 @@ bool LLThreadSafeQueue<ElementT, QueueT>::pop_( lock.unlock(); // now that we've popped, if somebody's been waiting to push, signal them mCapacityCond.notify_one(); - return true; + return POPPED; } @@ -422,17 +418,20 @@ template<typename ElementT, typename QueueT> ElementT LLThreadSafeQueue<ElementT, QueueT>::pop(void) { lock_t lock1(mLock); + ElementT value; while (true) { // On the consumer side, we always try to pop before checking mClosed // so we can finish draining the queue. - ElementT value; - if (pop_(lock1, value)) + pop_result popped = pop_(lock1, value); + if (popped == POPPED) return std::move(value); // Once the queue is empty, mClosed lets us know if there will ever be - // any more coming. - if (mClosed) + // any more coming. If we didn't pop because WAITING, i.e. canPop() + // returned false, then even if the producer end has been closed, + // there's still at least one item to drain: wait for it. + if (popped == EMPTY && mClosed) { LLTHROW(LLThreadSafeQueueInterrupt()); } @@ -452,7 +451,7 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPop(ElementT & element) // no need to check mClosed: tryPop() behavior when the queue is // closed is implemented by simple inability to push any new // elements - return pop_(lock, element); + return pop_(lock, element) == POPPED; }); } @@ -479,26 +478,38 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil( until, [this, until, &element](lock_t& lock) { - while (true) - { - if (pop_(lock, element)) - return true; + return tryPopUntil_(lock, until, element); + }); +} - if (mClosed) - { - return false; - } - // Storage empty. Wait for signal. - if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until)) - { - // timed out -- formally we might recheck both conditions above - return false; - } - // If we didn't time out, we were notified for some reason. Loop back - // to check. - } - }); +// body of tryPopUntil(), called once we have the lock +template <typename ElementT, typename QueueT> +template <typename Clock, typename Duration> +bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil_( + lock_t& lock, + const std::chrono::time_point<Clock, Duration>& until, + ElementT& element) +{ + while (true) + { + if (pop_(lock, element) == POPPED) + return true; + + if (mClosed) + { + return false; + } + + // Storage empty. Wait for signal. + if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until)) + { + // timed out -- formally we might recheck both conditions above + return false; + } + // If we didn't time out, we were notified for some reason. Loop back + // to check. + } } @@ -509,6 +520,7 @@ size_t LLThreadSafeQueue<ElementT, QueueT>::size(void) return mStorage.size(); } + template<typename ElementT, typename QueueT> void LLThreadSafeQueue<ElementT, QueueT>::close() { @@ -521,17 +533,20 @@ void LLThreadSafeQueue<ElementT, QueueT>::close() mCapacityCond.notify_all(); } + template<typename ElementT, typename QueueT> bool LLThreadSafeQueue<ElementT, QueueT>::isClosed() { lock_t lock(mLock); - return mClosed && mStorage.size() == 0; + return mClosed; } + template<typename ElementT, typename QueueT> -LLThreadSafeQueue<ElementT, QueueT>::operator bool() +bool LLThreadSafeQueue<ElementT, QueueT>::done() { - return ! isClosed(); + lock_t lock(mLock); + return mClosed && mStorage.size() == 0; } #endif diff --git a/indra/llcommon/tests/threadsafeschedule_test.cpp b/indra/llcommon/tests/threadsafeschedule_test.cpp new file mode 100644 index 0000000000..ec0fa0c928 --- /dev/null +++ b/indra/llcommon/tests/threadsafeschedule_test.cpp @@ -0,0 +1,65 @@ +/** + * @file threadsafeschedule_test.cpp + * @author Nat Goodspeed + * @date 2021-10-04 + * @brief Test for threadsafeschedule. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +// Precompiled header +#include "linden_common.h" +// associated header +#include "threadsafeschedule.h" +// STL headers +// std headers +#include <chrono> +// external library headers +// other Linden headers +#include "../test/lltut.h" + +using namespace std::literals::chrono_literals; // ms suffix +using namespace std::literals::string_literals; // s suffix +using Queue = LL::ThreadSafeSchedule<std::string>; + +/***************************************************************************** +* TUT +*****************************************************************************/ +namespace tut +{ + struct threadsafeschedule_data + { + Queue queue; + }; + typedef test_group<threadsafeschedule_data> threadsafeschedule_group; + typedef threadsafeschedule_group::object object; + threadsafeschedule_group threadsafeschedulegrp("threadsafeschedule"); + + template<> template<> + void object::test<1>() + { + set_test_name("push"); + // Simply calling push() a few times might result in indeterminate + // delivery order if the resolution of steady_clock is coarser than + // the real time required for each push() call. Explicitly increment + // the timestamp for each one -- but since we're passing explicit + // timestamps, make the queue reorder them. + queue.push(Queue::TimeTuple(Queue::Clock::now() + 20ms, "ghi")); + queue.push("abc"s); + queue.push(Queue::Clock::now() + 10ms, "def"); + queue.close(); + auto entry = queue.pop(); + ensure_equals("failed to pop first", std::get<0>(entry), "abc"s); + entry = queue.pop(); + ensure_equals("failed to pop second", std::get<0>(entry), "def"s); + ensure("queue not closed", queue.isClosed()); + ensure("queue prematurely done", ! queue.done()); + entry = queue.pop(); + ensure_equals("failed to pop third", std::get<0>(entry), "ghi"s); + bool popped = queue.tryPop(entry); + ensure("queue not empty", ! popped); + ensure("queue not done", queue.done()); + } +} // namespace tut diff --git a/indra/llcommon/threadsafeschedule.h b/indra/llcommon/threadsafeschedule.h new file mode 100644 index 0000000000..545c820f53 --- /dev/null +++ b/indra/llcommon/threadsafeschedule.h @@ -0,0 +1,334 @@ +/** + * @file threadsafeschedule.h + * @author Nat Goodspeed + * @date 2021-10-02 + * @brief ThreadSafeSchedule is an ordered queue in which every item has an + * associated timestamp. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +#if ! defined(LL_THREADSAFESCHEDULE_H) +#define LL_THREADSAFESCHEDULE_H + +#include "chrono.h" +#include "llexception.h" +#include "llthreadsafequeue.h" +#include "tuple.h" +#include <chrono> +#include <tuple> + +namespace LL +{ + namespace ThreadSafeSchedulePrivate + { + using TimePoint = std::chrono::steady_clock::time_point; + // Bundle consumer's data with a TimePoint to order items by timestamp. + template <typename... Args> + using TimestampedTuple = std::tuple<TimePoint, Args...>; + + // comparison functor for TimedTuples -- see TimedQueue comments + struct ReverseTupleOrder + { + template <typename Tuple> + bool operator()(const Tuple& left, const Tuple& right) const + { + return std::get<0>(left) > std::get<0>(right); + } + }; + + template <typename... Args> + using TimedQueue = PriorityQueueAdapter< + TimestampedTuple<Args...>, + // std::vector is the default storage for std::priority_queue, + // have to restate to specify comparison template parameter + std::vector<TimestampedTuple<Args...>>, + // std::priority_queue uses a counterintuitive comparison + // behavior: the default std::less comparator is used to present + // the *highest* value as top(). So to sort by earliest timestamp, + // we must invert by using >. + ReverseTupleOrder>; + } // namespace ThreadSafeSchedulePrivate + + /** + * ThreadSafeSchedule is an ordered LLThreadSafeQueue in which every item + * is given an associated timestamp. That is, TimePoint is implicitly + * prepended to the std::tuple with the specified types. + * + * Items are popped in increasing chronological order. Moreover, any item + * with a timestamp in the future is held back until + * std::chrono::steady_clock reaches that timestamp. + */ + template <typename... Args> + class ThreadSafeSchedule: + public LLThreadSafeQueue<ThreadSafeSchedulePrivate::TimestampedTuple<Args...>, + ThreadSafeSchedulePrivate::TimedQueue<Args...>> + { + public: + using DataTuple = std::tuple<Args...>; + using TimeTuple = ThreadSafeSchedulePrivate::TimestampedTuple<Args...>; + + private: + using super = LLThreadSafeQueue<TimeTuple, ThreadSafeSchedulePrivate::TimedQueue<Args...>>; + using lock_t = typename super::lock_t; + using super::pop_; + using super::push_; + using super::mClosed; + using super::mEmptyCond; + using super::mCapacityCond; + + public: + using TimePoint = ThreadSafeSchedulePrivate::TimePoint; + using Clock = TimePoint::clock; + + ThreadSafeSchedule(U32 capacity=1024): + super(capacity) + {} + + /*----------------------------- push() -----------------------------*/ + /// explicitly pass TimeTuple + using super::push; + + /// pass DataTuple with implicit now + void push(const DataTuple& tuple) + { + push(tuple_cons(Clock::now(), tuple)); + } + + /// individually pass each component of the TimeTuple + void push(const TimePoint& time, Args&&... args) + { + push(TimeTuple(time, std::forward<Args>(args)...)); + } + + /// individually pass every component except the TimePoint (implies + /// now) -- could be ambiguous if the first specified template + /// parameter type is also TimePoint -- we could try to disambiguate, + /// but a simpler approach would be for the caller to explicitly + /// construct DataTuple and call that overload + void push(Args&&... args) + { + push(Clock::now(), std::forward<Args>(args)...); + } + + /*--------------------------- tryPush() ----------------------------*/ + /// explicit TimeTuple + using super::tryPush; + + /// DataTuple with implicit now + bool tryPush(const DataTuple& tuple) + { + return tryPush(tuple_cons(Clock::now(), tuple)); + } + + /// individually pass components + bool tryPush(const TimePoint& time, Args&&... args) + { + return tryPush(TimeTuple(time, std::forward<Args>(args)...)); + } + + /// individually pass components with implicit now + bool tryPush(Args&&... args) + { + return tryPush(Clock::now(), std::forward<Args>(args)...); + } + + /*-------------------------- tryPushFor() --------------------------*/ + /// explicit TimeTuple + using super::tryPushFor; + + /// DataTuple with implicit now + template <typename Rep, typename Period> + bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout, + const DataTuple& tuple) + { + return tryPushFor(timeout, tuple_cons(Clock::now(), tuple)); + } + + /// individually pass components + template <typename Rep, typename Period> + bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout, + const TimePoint& time, Args&&... args) + { + return tryPushFor(TimeTuple(time, std::forward<Args>(args)...)); + } + + /// individually pass components with implicit now + template <typename Rep, typename Period> + bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout, + Args&&... args) + { + return tryPushFor(Clock::now(), std::forward<Args>(args)...); + } + + /*------------------------- tryPushUntil() -------------------------*/ + /// explicit TimeTuple + using super::tryPushUntil; + + /// DataTuple with implicit now + template <typename Clock, typename Duration> + bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until, + const DataTuple& tuple) + { + return tryPushUntil(until, tuple_cons(Clock::now(), tuple)); + } + + /// individually pass components + template <typename Clock, typename Duration> + bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until, + const TimePoint& time, Args&&... args) + { + return tryPushUntil(until, TimeTuple(time, std::forward<Args>(args)...)); + } + + /// individually pass components with implicit now + template <typename Clock, typename Duration> + bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until, + Args&&... args) + { + return tryPushUntil(until, Clock::now(), std::forward<Args>(args)...); + } + + /*----------------------------- pop() ------------------------------*/ + // Our consumer may or may not care about the timestamp associated + // with each popped item, so we allow retrieving either DataTuple or + // TimeTuple. One potential use would be to observe, and possibly + // adjust for, the time lag between the item time and the actual + // current time. + + /// pop DataTuple by value + DataTuple pop() + { + return tuple_cdr(popWithTime()); + } + + /// pop TimeTuple by value + TimeTuple popWithTime() + { + lock_t lock(super::mLock); + // We can't just sit around waiting forever, given that there may + // be items in the queue that are not yet ready but will *become* + // ready in the near future. So in fact, with this class, every + // pop() becomes a tryPopUntil(), constrained to the timestamp of + // the head item. It almost doesn't matter what we specify for the + // caller's time constraint -- all we really care about is the + // head item's timestamp. Since pop() and popWithTime() are + // defined to wait until either an item becomes available or the + // queue is closed, loop until one of those things happens. The + // constraint we pass just determines how often we'll loop while + // waiting. + TimeTuple tt; + while (true) + { + // Pick a point suitably far into the future. + TimePoint until = TimePoint::clock::now() + std::chrono::hours(24); + if (tryPopUntil_(lock, until, tt)) + return std::move(tt); + + // empty and closed: throw, just as super::pop() does + if (super::mStorage.empty() && super::mClosed) + { + LLTHROW(LLThreadSafeQueueInterrupt()); + } + // If not empty, we've still got items to drain. + // If not closed, it's worth waiting for more items. + // Either way, loop back to wait. + } + } + + // We can use tryPop(TimeTuple&) just as it stands; the only behavior + // difference is in our canPop() override method. + using super::tryPop; + + /// tryPop(DataTuple&) + bool tryPop(DataTuple& tuple) + { + TimeTuple tt; + if (! super::tryPop(tt)) + return false; + tuple = tuple_cdr(std::move(tt)); + return true; + } + + /// tryPopFor() + template <typename Rep, typename Period, typename Tuple> + bool tryPopFor(const std::chrono::duration<Rep, Period>& timeout, Tuple& tuple) + { + // It's important to use OUR tryPopUntil() implementation, rather + // than delegating immediately to our base class. + return tryPopUntil(Clock::now() + timeout, tuple); + } + + /// tryPopUntil(TimeTuple&) + template <typename Clock, typename Duration> + bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until, + TimeTuple& tuple) + { + // super::tryPopUntil() wakes up when an item becomes available or + // we hit 'until', whichever comes first. Thing is, the current + // head of the queue could become ready sooner than either of + // those events, and we need to deliver it as soon as it does. + // Don't wait past the TimePoint of the head item. + // Naturally, lock the queue before peeking at mStorage. + return super::tryLockUntil( + until, + [this, until, &tuple](lock_t& lock) + { + // Use our time_point_cast to allow for 'until' that's a + // time_point type other than TimePoint. + return tryPopUntil_(lock, time_point_cast<TimePoint>(until), tuple); + }); + } + + bool tryPopUntil_(lock_t& lock, const TimePoint& until, TimeTuple& tuple) + { + TimePoint adjusted = until; + if (! super::mStorage.empty()) + { + // use whichever is earlier: the head item's timestamp, or + // the caller's limit + adjusted = min(std::get<0>(super::mStorage.front()), adjusted); + } + // now delegate to base-class tryPopUntil_() + return super::tryPopUntil_(lock, adjusted, tuple); + } + + /// tryPopUntil(DataTuple&) + template <typename Clock, typename Duration> + bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until, + DataTuple& tuple) + { + TimeTuple tt; + if (! tryPopUntil(until, tt)) + return false; + tuple = tuple_cdr(std::move(tt)); + return true; + } + + /*------------------------------ etc. ------------------------------*/ + // We can't hide items that aren't yet ready because we can't traverse + // the underlying priority_queue: it has no iterators, only top(). So + // a consumer could observe size() > 0 and yet tryPop() returns false. + // Shrug, in a multi-consumer scenario that would be expected behavior. + using super::size; + // open/closed state + using super::close; + using super::isClosed; + using super::done; + + private: + // this method is called by base class pop_() every time we're + // considering whether to deliver the current head element + bool canPop(const TimeTuple& head) const override + { + // an item with a future timestamp isn't yet ready to pop + // (should we add some slop for overhead?) + return std::get<0>(head) <= Clock::now(); + } + }; + +} // namespace LL + +#endif /* ! defined(LL_THREADSAFESCHEDULE_H) */ |