diff options
| -rw-r--r-- | indra/llcommon/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | indra/llcommon/chrono.h | 65 | ||||
| -rw-r--r-- | indra/llcommon/llthreadsafequeue.h | 121 | ||||
| -rw-r--r-- | indra/llcommon/tests/threadsafeschedule_test.cpp | 65 | ||||
| -rw-r--r-- | indra/llcommon/threadsafeschedule.h | 334 | 
5 files changed, 535 insertions, 53 deletions
| diff --git a/indra/llcommon/CMakeLists.txt b/indra/llcommon/CMakeLists.txt index 6558219462..5efcfabf24 100644 --- a/indra/llcommon/CMakeLists.txt +++ b/indra/llcommon/CMakeLists.txt @@ -127,6 +127,7 @@ set(llcommon_SOURCE_FILES  set(llcommon_HEADER_FILES      CMakeLists.txt +    chrono.h      ctype_workaround.h      fix_macros.h      indra_constants.h @@ -253,6 +254,7 @@ set(llcommon_HEADER_FILES      lockstatic.h      stdtypes.h      stringize.h +    threadsafeschedule.h      timer.h      tuple.h      u64.h @@ -359,6 +361,7 @@ if (LL_TESTS)    LL_ADD_INTEGRATION_TEST(lluri "" "${test_libs}")    LL_ADD_INTEGRATION_TEST(llunits "" "${test_libs}")    LL_ADD_INTEGRATION_TEST(stringize "" "${test_libs}") +  LL_ADD_INTEGRATION_TEST(threadsafeschedule "" "${test_libs}")    LL_ADD_INTEGRATION_TEST(tuple "" "${test_libs}")  ## llexception_test.cpp isn't a regression test, and doesn't need to be run diff --git a/indra/llcommon/chrono.h b/indra/llcommon/chrono.h new file mode 100644 index 0000000000..806e871892 --- /dev/null +++ b/indra/llcommon/chrono.h @@ -0,0 +1,65 @@ +/** + * @file   chrono.h + * @author Nat Goodspeed + * @date   2021-10-05 + * @brief  supplement <chrono> with utility functions + *  + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +#if ! defined(LL_CHRONO_H) +#define LL_CHRONO_H + +#include <chrono> +#include <type_traits>              // std::enable_if + +namespace LL +{ + +// time_point_cast() is derived from https://stackoverflow.com/a/35293183 +// without the iteration: we think errors in the ~1 microsecond range are +// probably acceptable. + +// This variant is for the optimal case when the source and dest use the same +// clock: that case is handled by std::chrono. +template <typename DestTimePoint, typename SrcTimePoint, +          typename std::enable_if<std::is_same<typename DestTimePoint::clock, +                                               typename SrcTimePoint::clock>::value, +                                  bool>::type = true> +DestTimePoint time_point_cast(const SrcTimePoint& time) +{ +    return std::chrono::time_point_cast<typename DestTimePoint::duration>(time); +} + +// This variant is for when the source and dest use different clocks -- see +// the linked StackOverflow answer, also Howard Hinnant's, for more context. +template <typename DestTimePoint, typename SrcTimePoint, +          typename std::enable_if<! std::is_same<typename DestTimePoint::clock, +                                                 typename SrcTimePoint::clock>::value, +                                  bool>::type = true> +DestTimePoint time_point_cast(const SrcTimePoint& time) +{ +    // The basic idea is that we must adjust the passed time_point by the +    // difference between the clocks' epochs. But since time_point doesn't +    // expose its epoch, we fall back on what each of them thinks is now(). +    // However, since we necessarily make sequential calls to those now() +    // functions, the answers differ not only by the cycles spent executing +    // those calls, but by potential OS interruptions between them. Try to +    // reduce that error by capturing the source clock time both before and +    // after the dest clock, and splitting the difference. Of course an +    // interruption between two of these now() calls without a comparable +    // interruption between the other two will skew the result, but better is +    // more expensive. +    const auto src_before = typename SrcTimePoint::clock::now(); +    const auto dest_now   = typename DestTimePoint::clock::now(); +    const auto src_after  = typename SrcTimePoint::clock::now(); +    const auto src_diff   = src_after - src_before; +    const auto src_now    = src_before + src_diff / 2; +    return dest_now + (time - src_now); +} + +} // namespace LL + +#endif /* ! defined(LL_CHRONO_H) */ diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 1dffad6b89..bd2d82d4c3 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -83,6 +83,7 @@ public:  	// Limiting the number of pending items prevents unbounded growth of the  	// underlying queue.  	LLThreadSafeQueue(U32 capacity = 1024); +	virtual ~LLThreadSafeQueue() {}  	// Add an element to the queue (will block if the queue has  	// reached capacity). @@ -162,10 +163,10 @@ public:  	//   then every subsequent tryPop() call will return false  	void close(); -	// detect closed state +	// producer end: are we prevented from pushing any additional items?  	bool isClosed(); -	// inverse of isClosed() -	explicit operator bool(); +	// consumer end: are we done, is the queue entirely drained? +	bool done();  protected:  	typedef QueueT queue_type; @@ -178,6 +179,11 @@ protected:  	boost::fibers::condition_variable_any mCapacityCond;  	boost::fibers::condition_variable_any mEmptyCond; +	// implementation logic, suitable for passing to tryLockUntil() +	template <typename Clock, typename Duration> +	bool tryPopUntil_(lock_t& lock, +					  const std::chrono::time_point<Clock, Duration>& until, +					  ElementT& element);  	// if we're able to lock immediately, do so and run the passed callable,  	// which must accept lock_t& and return bool  	template <typename CALLABLE> @@ -191,11 +197,11 @@ protected:  	template <typename T>  	bool push_(lock_t& lock, T&& element);  	// while lock is locked, really pop the head element, if we can -	bool pop_(lock_t& lock, ElementT& element); -	// pop_() with an explicit predicate indicating whether the head element -	// is ready to be popped -	template <typename PRED> -	bool pop_(lock_t& lock, ElementT& element, PRED&& pred); +	enum pop_result { EMPTY, WAITING, POPPED }; +	pop_result pop_(lock_t& lock, ElementT& element); +	// Is the current head element ready to pop? We say yes; subclass can +	// override as needed. +	virtual bool canPop(const ElementT& head) const { return true; }  };  /***************************************************************************** @@ -387,26 +393,16 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPushUntil(  // while lock is locked, really pop the head element, if we can  template <typename ElementT, typename QueueT> -bool LLThreadSafeQueue<ElementT, QueueT>::pop_(lock_t& lock, ElementT& element) -{ -    // default predicate: head element, if present, is always ready to pop -    return pop_(lock, element, [](const ElementT&){ return true; }); -} - - -// pop_() with an explicit predicate indicating whether the head element -// is ready to be popped -template <typename ElementT, typename QueueT> -template <typename PRED> -bool LLThreadSafeQueue<ElementT, QueueT>::pop_( -    lock_t& lock, ElementT& element, PRED&& pred) +typename LLThreadSafeQueue<ElementT, QueueT>::pop_result +LLThreadSafeQueue<ElementT, QueueT>::pop_(lock_t& lock, ElementT& element)  {      // If mStorage is empty, there's no head element. -    // If there's a head element, pass it to the predicate to see if caller -    // considers it ready to pop. -    // Unless both are satisfied, no point in continuing. -    if (mStorage.empty() || ! std::forward<PRED>(pred)(mStorage.front())) -        return false; +    if (mStorage.empty()) +        return EMPTY; + +    // If there's a head element, pass it to canPop() to see if it's ready to pop.  +    if (! canPop(mStorage.front())) +        return WAITING;      // std::queue::front() is the element about to pop()      element = mStorage.front(); @@ -414,7 +410,7 @@ bool LLThreadSafeQueue<ElementT, QueueT>::pop_(      lock.unlock();      // now that we've popped, if somebody's been waiting to push, signal them      mCapacityCond.notify_one(); -    return true; +    return POPPED;  } @@ -422,17 +418,20 @@ template<typename ElementT, typename QueueT>  ElementT LLThreadSafeQueue<ElementT, QueueT>::pop(void)  {      lock_t lock1(mLock); +    ElementT value;      while (true)      {          // On the consumer side, we always try to pop before checking mClosed          // so we can finish draining the queue. -        ElementT value; -        if (pop_(lock1, value)) +        pop_result popped = pop_(lock1, value); +        if (popped == POPPED)              return std::move(value);          // Once the queue is empty, mClosed lets us know if there will ever be -        // any more coming. -        if (mClosed) +        // any more coming. If we didn't pop because WAITING, i.e. canPop() +        // returned false, then even if the producer end has been closed, +        // there's still at least one item to drain: wait for it. +        if (popped == EMPTY && mClosed)          {              LLTHROW(LLThreadSafeQueueInterrupt());          } @@ -452,7 +451,7 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPop(ElementT & element)              // no need to check mClosed: tryPop() behavior when the queue is              // closed is implemented by simple inability to push any new              // elements -            return pop_(lock, element); +            return pop_(lock, element) == POPPED;          });  } @@ -479,26 +478,38 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil(          until,          [this, until, &element](lock_t& lock)          { -            while (true) -            { -                if (pop_(lock, element)) -                    return true; +            return tryPopUntil_(lock, until, element); +        }); +} -                if (mClosed) -                { -                    return false; -                } -                // Storage empty. Wait for signal. -                if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until)) -                { -                    // timed out -- formally we might recheck both conditions above -                    return false; -                } -                // If we didn't time out, we were notified for some reason. Loop back -                // to check. -            } -        }); +// body of tryPopUntil(), called once we have the lock +template <typename ElementT, typename QueueT> +template <typename Clock, typename Duration> +bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil_( +    lock_t& lock, +    const std::chrono::time_point<Clock, Duration>& until, +    ElementT& element) +{ +    while (true) +    { +        if (pop_(lock, element) == POPPED) +            return true; + +        if (mClosed) +        { +            return false; +        } + +        // Storage empty. Wait for signal. +        if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until)) +        { +            // timed out -- formally we might recheck both conditions above +            return false; +        } +        // If we didn't time out, we were notified for some reason. Loop back +        // to check. +    }  } @@ -509,6 +520,7 @@ size_t LLThreadSafeQueue<ElementT, QueueT>::size(void)      return mStorage.size();  } +  template<typename ElementT, typename QueueT>  void LLThreadSafeQueue<ElementT, QueueT>::close()  { @@ -521,17 +533,20 @@ void LLThreadSafeQueue<ElementT, QueueT>::close()      mCapacityCond.notify_all();  } +  template<typename ElementT, typename QueueT>  bool LLThreadSafeQueue<ElementT, QueueT>::isClosed()  {      lock_t lock(mLock); -    return mClosed && mStorage.size() == 0; +    return mClosed;  } +  template<typename ElementT, typename QueueT> -LLThreadSafeQueue<ElementT, QueueT>::operator bool() +bool LLThreadSafeQueue<ElementT, QueueT>::done()  { -    return ! isClosed(); +    lock_t lock(mLock); +    return mClosed && mStorage.size() == 0;  }  #endif diff --git a/indra/llcommon/tests/threadsafeschedule_test.cpp b/indra/llcommon/tests/threadsafeschedule_test.cpp new file mode 100644 index 0000000000..ec0fa0c928 --- /dev/null +++ b/indra/llcommon/tests/threadsafeschedule_test.cpp @@ -0,0 +1,65 @@ +/** + * @file   threadsafeschedule_test.cpp + * @author Nat Goodspeed + * @date   2021-10-04 + * @brief  Test for threadsafeschedule. + *  + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +// Precompiled header +#include "linden_common.h" +// associated header +#include "threadsafeschedule.h" +// STL headers +// std headers +#include <chrono> +// external library headers +// other Linden headers +#include "../test/lltut.h" + +using namespace std::literals::chrono_literals; // ms suffix +using namespace std::literals::string_literals; // s suffix +using Queue = LL::ThreadSafeSchedule<std::string>; + +/***************************************************************************** +*   TUT +*****************************************************************************/ +namespace tut +{ +    struct threadsafeschedule_data +    { +        Queue queue; +    }; +    typedef test_group<threadsafeschedule_data> threadsafeschedule_group; +    typedef threadsafeschedule_group::object object; +    threadsafeschedule_group threadsafeschedulegrp("threadsafeschedule"); + +    template<> template<> +    void object::test<1>() +    { +        set_test_name("push"); +        // Simply calling push() a few times might result in indeterminate +        // delivery order if the resolution of steady_clock is coarser than +        // the real time required for each push() call. Explicitly increment +        // the timestamp for each one -- but since we're passing explicit +        // timestamps, make the queue reorder them. +        queue.push(Queue::TimeTuple(Queue::Clock::now() + 20ms, "ghi")); +        queue.push("abc"s); +        queue.push(Queue::Clock::now() + 10ms, "def"); +        queue.close(); +        auto entry = queue.pop(); +        ensure_equals("failed to pop first", std::get<0>(entry), "abc"s); +        entry = queue.pop(); +        ensure_equals("failed to pop second", std::get<0>(entry), "def"s); +        ensure("queue not closed", queue.isClosed()); +        ensure("queue prematurely done", ! queue.done()); +        entry = queue.pop(); +        ensure_equals("failed to pop third", std::get<0>(entry), "ghi"s); +        bool popped = queue.tryPop(entry); +        ensure("queue not empty", ! popped); +        ensure("queue not done", queue.done()); +    } +} // namespace tut diff --git a/indra/llcommon/threadsafeschedule.h b/indra/llcommon/threadsafeschedule.h new file mode 100644 index 0000000000..545c820f53 --- /dev/null +++ b/indra/llcommon/threadsafeschedule.h @@ -0,0 +1,334 @@ +/** + * @file   threadsafeschedule.h + * @author Nat Goodspeed + * @date   2021-10-02 + * @brief  ThreadSafeSchedule is an ordered queue in which every item has an + *         associated timestamp. + *  + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +#if ! defined(LL_THREADSAFESCHEDULE_H) +#define LL_THREADSAFESCHEDULE_H + +#include "chrono.h" +#include "llexception.h" +#include "llthreadsafequeue.h" +#include "tuple.h" +#include <chrono> +#include <tuple>   + +namespace LL +{ +    namespace ThreadSafeSchedulePrivate +    { +        using TimePoint = std::chrono::steady_clock::time_point; +        // Bundle consumer's data with a TimePoint to order items by timestamp. +        template <typename... Args> +        using TimestampedTuple = std::tuple<TimePoint, Args...>; + +        // comparison functor for TimedTuples -- see TimedQueue comments +        struct ReverseTupleOrder +        { +            template <typename Tuple> +            bool operator()(const Tuple& left, const Tuple& right) const +            { +                return std::get<0>(left) > std::get<0>(right); +            } +        }; + +        template <typename... Args> +        using TimedQueue = PriorityQueueAdapter< +            TimestampedTuple<Args...>, +            // std::vector is the default storage for std::priority_queue, +            // have to restate to specify comparison template parameter +            std::vector<TimestampedTuple<Args...>>, +            // std::priority_queue uses a counterintuitive comparison +            // behavior: the default std::less comparator is used to present +            // the *highest* value as top(). So to sort by earliest timestamp, +            // we must invert by using >. +            ReverseTupleOrder>; +    } // namespace ThreadSafeSchedulePrivate + +    /** +     * ThreadSafeSchedule is an ordered LLThreadSafeQueue in which every item +     * is given an associated timestamp. That is, TimePoint is implicitly +     * prepended to the std::tuple with the specified types. +     * +     * Items are popped in increasing chronological order. Moreover, any item +     * with a timestamp in the future is held back until +     * std::chrono::steady_clock reaches that timestamp. +     */ +    template <typename... Args> +    class ThreadSafeSchedule: +        public LLThreadSafeQueue<ThreadSafeSchedulePrivate::TimestampedTuple<Args...>, +                                 ThreadSafeSchedulePrivate::TimedQueue<Args...>> +    { +    public: +        using DataTuple = std::tuple<Args...>; +        using TimeTuple = ThreadSafeSchedulePrivate::TimestampedTuple<Args...>; + +    private: +        using super = LLThreadSafeQueue<TimeTuple, ThreadSafeSchedulePrivate::TimedQueue<Args...>>; +        using lock_t = typename super::lock_t; +        using super::pop_; +        using super::push_; +        using super::mClosed; +        using super::mEmptyCond; +        using super::mCapacityCond; + +    public: +        using TimePoint = ThreadSafeSchedulePrivate::TimePoint; +        using Clock = TimePoint::clock; + +        ThreadSafeSchedule(U32 capacity=1024): +            super(capacity) +        {} + +        /*----------------------------- push() -----------------------------*/ +        /// explicitly pass TimeTuple +        using super::push; + +        /// pass DataTuple with implicit now +        void push(const DataTuple& tuple) +        { +            push(tuple_cons(Clock::now(), tuple)); +        } + +        /// individually pass each component of the TimeTuple +        void push(const TimePoint& time, Args&&... args) +        { +            push(TimeTuple(time, std::forward<Args>(args)...)); +        } + +        /// individually pass every component except the TimePoint (implies +        /// now) -- could be ambiguous if the first specified template +        /// parameter type is also TimePoint -- we could try to disambiguate, +        /// but a simpler approach would be for the caller to explicitly +        /// construct DataTuple and call that overload +        void push(Args&&... args) +        { +            push(Clock::now(), std::forward<Args>(args)...); +        } + +        /*--------------------------- tryPush() ----------------------------*/ +        /// explicit TimeTuple +        using super::tryPush; + +        /// DataTuple with implicit now +        bool tryPush(const DataTuple& tuple) +        { +            return tryPush(tuple_cons(Clock::now(), tuple)); +        } + +        /// individually pass components +        bool tryPush(const TimePoint& time, Args&&... args) +        { +            return tryPush(TimeTuple(time, std::forward<Args>(args)...)); +        } + +        /// individually pass components with implicit now +        bool tryPush(Args&&... args) +        { +            return tryPush(Clock::now(), std::forward<Args>(args)...); +        } + +        /*-------------------------- tryPushFor() --------------------------*/ +        /// explicit TimeTuple +        using super::tryPushFor; + +        /// DataTuple with implicit now +        template <typename Rep, typename Period> +        bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout, +                        const DataTuple& tuple) +        { +            return tryPushFor(timeout, tuple_cons(Clock::now(), tuple)); +        } + +        /// individually pass components +        template <typename Rep, typename Period> +        bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout, +                        const TimePoint& time, Args&&... args) +        { +            return tryPushFor(TimeTuple(time, std::forward<Args>(args)...)); +        } + +        /// individually pass components with implicit now +        template <typename Rep, typename Period> +        bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout, +                        Args&&... args) +        { +            return tryPushFor(Clock::now(), std::forward<Args>(args)...); +        } + +        /*------------------------- tryPushUntil() -------------------------*/ +        /// explicit TimeTuple +        using super::tryPushUntil; + +        /// DataTuple with implicit now +        template <typename Clock, typename Duration> +        bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until, +                          const DataTuple& tuple) +        { +            return tryPushUntil(until, tuple_cons(Clock::now(), tuple)); +        } + +        /// individually pass components +        template <typename Clock, typename Duration> +        bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until, +                          const TimePoint& time, Args&&... args) +        { +            return tryPushUntil(until, TimeTuple(time, std::forward<Args>(args)...)); +        } + +        /// individually pass components with implicit now +        template <typename Clock, typename Duration> +        bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until, +                          Args&&... args) +        { +            return tryPushUntil(until, Clock::now(), std::forward<Args>(args)...); +        } + +        /*----------------------------- pop() ------------------------------*/ +        // Our consumer may or may not care about the timestamp associated +        // with each popped item, so we allow retrieving either DataTuple or +        // TimeTuple. One potential use would be to observe, and possibly +        // adjust for, the time lag between the item time and the actual +        // current time. + +        /// pop DataTuple by value +        DataTuple pop() +        { +            return tuple_cdr(popWithTime()); +        } + +        /// pop TimeTuple by value +        TimeTuple popWithTime() +        { +            lock_t lock(super::mLock); +            // We can't just sit around waiting forever, given that there may +            // be items in the queue that are not yet ready but will *become* +            // ready in the near future. So in fact, with this class, every +            // pop() becomes a tryPopUntil(), constrained to the timestamp of +            // the head item. It almost doesn't matter what we specify for the +            // caller's time constraint -- all we really care about is the +            // head item's timestamp. Since pop() and popWithTime() are +            // defined to wait until either an item becomes available or the +            // queue is closed, loop until one of those things happens. The +            // constraint we pass just determines how often we'll loop while +            // waiting. +            TimeTuple tt; +            while (true) +            { +                // Pick a point suitably far into the future. +                TimePoint until = TimePoint::clock::now() + std::chrono::hours(24); +                if (tryPopUntil_(lock, until, tt)) +                    return std::move(tt); + +                // empty and closed: throw, just as super::pop() does +                if (super::mStorage.empty() && super::mClosed) +                { +                    LLTHROW(LLThreadSafeQueueInterrupt()); +                } +                // If not empty, we've still got items to drain. +                // If not closed, it's worth waiting for more items. +                // Either way, loop back to wait. +            } +        } + +        // We can use tryPop(TimeTuple&) just as it stands; the only behavior +        // difference is in our canPop() override method. +        using super::tryPop; + +        /// tryPop(DataTuple&) +        bool tryPop(DataTuple& tuple) +        { +            TimeTuple tt; +            if (! super::tryPop(tt)) +                return false; +            tuple = tuple_cdr(std::move(tt)); +            return true; +        } + +        /// tryPopFor() +        template <typename Rep, typename Period, typename Tuple> +        bool tryPopFor(const std::chrono::duration<Rep, Period>& timeout, Tuple& tuple) +        { +            // It's important to use OUR tryPopUntil() implementation, rather +            // than delegating immediately to our base class. +            return tryPopUntil(Clock::now() + timeout, tuple); +        } + +        /// tryPopUntil(TimeTuple&) +        template <typename Clock, typename Duration> +        bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until, +                         TimeTuple& tuple) +        { +            // super::tryPopUntil() wakes up when an item becomes available or +            // we hit 'until', whichever comes first. Thing is, the current +            // head of the queue could become ready sooner than either of +            // those events, and we need to deliver it as soon as it does. +            // Don't wait past the TimePoint of the head item. +            // Naturally, lock the queue before peeking at mStorage. +            return super::tryLockUntil( +                until, +                [this, until, &tuple](lock_t& lock) +                { +                    // Use our time_point_cast to allow for 'until' that's a +                    // time_point type other than TimePoint. +                    return tryPopUntil_(lock, time_point_cast<TimePoint>(until), tuple); +                }); +        } + +        bool tryPopUntil_(lock_t& lock, const TimePoint& until, TimeTuple& tuple) +        { +            TimePoint adjusted = until; +            if (! super::mStorage.empty()) +            { +                // use whichever is earlier: the head item's timestamp, or +                // the caller's limit +                adjusted = min(std::get<0>(super::mStorage.front()), adjusted); +            } +            // now delegate to base-class tryPopUntil_() +            return super::tryPopUntil_(lock, adjusted, tuple); +        } + +        /// tryPopUntil(DataTuple&) +        template <typename Clock, typename Duration> +        bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until, +                         DataTuple& tuple) +        { +            TimeTuple tt; +            if (! tryPopUntil(until, tt)) +                return false; +            tuple = tuple_cdr(std::move(tt)); +            return true; +        } + +        /*------------------------------ etc. ------------------------------*/ +        // We can't hide items that aren't yet ready because we can't traverse +        // the underlying priority_queue: it has no iterators, only top(). So +        // a consumer could observe size() > 0 and yet tryPop() returns false. +        // Shrug, in a multi-consumer scenario that would be expected behavior. +        using super::size; +        // open/closed state +        using super::close; +        using super::isClosed; +        using super::done; + +    private: +        // this method is called by base class pop_() every time we're +        // considering whether to deliver the current head element +        bool canPop(const TimeTuple& head) const override +        { +            // an item with a future timestamp isn't yet ready to pop +            // (should we add some slop for overhead?) +            return std::get<0>(head) <= Clock::now(); +        } +    }; + +} // namespace LL + +#endif /* ! defined(LL_THREADSAFESCHEDULE_H) */ | 
