summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--indra/llcommon/CMakeLists.txt3
-rw-r--r--indra/llcommon/chrono.h65
-rw-r--r--indra/llcommon/llthreadsafequeue.h121
-rw-r--r--indra/llcommon/tests/threadsafeschedule_test.cpp65
-rw-r--r--indra/llcommon/threadsafeschedule.h334
5 files changed, 535 insertions, 53 deletions
diff --git a/indra/llcommon/CMakeLists.txt b/indra/llcommon/CMakeLists.txt
index 6558219462..5efcfabf24 100644
--- a/indra/llcommon/CMakeLists.txt
+++ b/indra/llcommon/CMakeLists.txt
@@ -127,6 +127,7 @@ set(llcommon_SOURCE_FILES
set(llcommon_HEADER_FILES
CMakeLists.txt
+ chrono.h
ctype_workaround.h
fix_macros.h
indra_constants.h
@@ -253,6 +254,7 @@ set(llcommon_HEADER_FILES
lockstatic.h
stdtypes.h
stringize.h
+ threadsafeschedule.h
timer.h
tuple.h
u64.h
@@ -359,6 +361,7 @@ if (LL_TESTS)
LL_ADD_INTEGRATION_TEST(lluri "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llunits "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(stringize "" "${test_libs}")
+ LL_ADD_INTEGRATION_TEST(threadsafeschedule "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(tuple "" "${test_libs}")
## llexception_test.cpp isn't a regression test, and doesn't need to be run
diff --git a/indra/llcommon/chrono.h b/indra/llcommon/chrono.h
new file mode 100644
index 0000000000..806e871892
--- /dev/null
+++ b/indra/llcommon/chrono.h
@@ -0,0 +1,65 @@
+/**
+ * @file chrono.h
+ * @author Nat Goodspeed
+ * @date 2021-10-05
+ * @brief supplement <chrono> with utility functions
+ *
+ * $LicenseInfo:firstyear=2021&license=viewerlgpl$
+ * Copyright (c) 2021, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+#if ! defined(LL_CHRONO_H)
+#define LL_CHRONO_H
+
+#include <chrono>
+#include <type_traits> // std::enable_if
+
+namespace LL
+{
+
+// time_point_cast() is derived from https://stackoverflow.com/a/35293183
+// without the iteration: we think errors in the ~1 microsecond range are
+// probably acceptable.
+
+// This variant is for the optimal case when the source and dest use the same
+// clock: that case is handled by std::chrono.
+template <typename DestTimePoint, typename SrcTimePoint,
+ typename std::enable_if<std::is_same<typename DestTimePoint::clock,
+ typename SrcTimePoint::clock>::value,
+ bool>::type = true>
+DestTimePoint time_point_cast(const SrcTimePoint& time)
+{
+ return std::chrono::time_point_cast<typename DestTimePoint::duration>(time);
+}
+
+// This variant is for when the source and dest use different clocks -- see
+// the linked StackOverflow answer, also Howard Hinnant's, for more context.
+template <typename DestTimePoint, typename SrcTimePoint,
+ typename std::enable_if<! std::is_same<typename DestTimePoint::clock,
+ typename SrcTimePoint::clock>::value,
+ bool>::type = true>
+DestTimePoint time_point_cast(const SrcTimePoint& time)
+{
+ // The basic idea is that we must adjust the passed time_point by the
+ // difference between the clocks' epochs. But since time_point doesn't
+ // expose its epoch, we fall back on what each of them thinks is now().
+ // However, since we necessarily make sequential calls to those now()
+ // functions, the answers differ not only by the cycles spent executing
+ // those calls, but by potential OS interruptions between them. Try to
+ // reduce that error by capturing the source clock time both before and
+ // after the dest clock, and splitting the difference. Of course an
+ // interruption between two of these now() calls without a comparable
+ // interruption between the other two will skew the result, but better is
+ // more expensive.
+ const auto src_before = typename SrcTimePoint::clock::now();
+ const auto dest_now = typename DestTimePoint::clock::now();
+ const auto src_after = typename SrcTimePoint::clock::now();
+ const auto src_diff = src_after - src_before;
+ const auto src_now = src_before + src_diff / 2;
+ return dest_now + (time - src_now);
+}
+
+} // namespace LL
+
+#endif /* ! defined(LL_CHRONO_H) */
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h
index 1dffad6b89..bd2d82d4c3 100644
--- a/indra/llcommon/llthreadsafequeue.h
+++ b/indra/llcommon/llthreadsafequeue.h
@@ -83,6 +83,7 @@ public:
// Limiting the number of pending items prevents unbounded growth of the
// underlying queue.
LLThreadSafeQueue(U32 capacity = 1024);
+ virtual ~LLThreadSafeQueue() {}
// Add an element to the queue (will block if the queue has
// reached capacity).
@@ -162,10 +163,10 @@ public:
// then every subsequent tryPop() call will return false
void close();
- // detect closed state
+ // producer end: are we prevented from pushing any additional items?
bool isClosed();
- // inverse of isClosed()
- explicit operator bool();
+ // consumer end: are we done, is the queue entirely drained?
+ bool done();
protected:
typedef QueueT queue_type;
@@ -178,6 +179,11 @@ protected:
boost::fibers::condition_variable_any mCapacityCond;
boost::fibers::condition_variable_any mEmptyCond;
+ // implementation logic, suitable for passing to tryLockUntil()
+ template <typename Clock, typename Duration>
+ bool tryPopUntil_(lock_t& lock,
+ const std::chrono::time_point<Clock, Duration>& until,
+ ElementT& element);
// if we're able to lock immediately, do so and run the passed callable,
// which must accept lock_t& and return bool
template <typename CALLABLE>
@@ -191,11 +197,11 @@ protected:
template <typename T>
bool push_(lock_t& lock, T&& element);
// while lock is locked, really pop the head element, if we can
- bool pop_(lock_t& lock, ElementT& element);
- // pop_() with an explicit predicate indicating whether the head element
- // is ready to be popped
- template <typename PRED>
- bool pop_(lock_t& lock, ElementT& element, PRED&& pred);
+ enum pop_result { EMPTY, WAITING, POPPED };
+ pop_result pop_(lock_t& lock, ElementT& element);
+ // Is the current head element ready to pop? We say yes; subclass can
+ // override as needed.
+ virtual bool canPop(const ElementT& head) const { return true; }
};
/*****************************************************************************
@@ -387,26 +393,16 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPushUntil(
// while lock is locked, really pop the head element, if we can
template <typename ElementT, typename QueueT>
-bool LLThreadSafeQueue<ElementT, QueueT>::pop_(lock_t& lock, ElementT& element)
-{
- // default predicate: head element, if present, is always ready to pop
- return pop_(lock, element, [](const ElementT&){ return true; });
-}
-
-
-// pop_() with an explicit predicate indicating whether the head element
-// is ready to be popped
-template <typename ElementT, typename QueueT>
-template <typename PRED>
-bool LLThreadSafeQueue<ElementT, QueueT>::pop_(
- lock_t& lock, ElementT& element, PRED&& pred)
+typename LLThreadSafeQueue<ElementT, QueueT>::pop_result
+LLThreadSafeQueue<ElementT, QueueT>::pop_(lock_t& lock, ElementT& element)
{
// If mStorage is empty, there's no head element.
- // If there's a head element, pass it to the predicate to see if caller
- // considers it ready to pop.
- // Unless both are satisfied, no point in continuing.
- if (mStorage.empty() || ! std::forward<PRED>(pred)(mStorage.front()))
- return false;
+ if (mStorage.empty())
+ return EMPTY;
+
+ // If there's a head element, pass it to canPop() to see if it's ready to pop.
+ if (! canPop(mStorage.front()))
+ return WAITING;
// std::queue::front() is the element about to pop()
element = mStorage.front();
@@ -414,7 +410,7 @@ bool LLThreadSafeQueue<ElementT, QueueT>::pop_(
lock.unlock();
// now that we've popped, if somebody's been waiting to push, signal them
mCapacityCond.notify_one();
- return true;
+ return POPPED;
}
@@ -422,17 +418,20 @@ template<typename ElementT, typename QueueT>
ElementT LLThreadSafeQueue<ElementT, QueueT>::pop(void)
{
lock_t lock1(mLock);
+ ElementT value;
while (true)
{
// On the consumer side, we always try to pop before checking mClosed
// so we can finish draining the queue.
- ElementT value;
- if (pop_(lock1, value))
+ pop_result popped = pop_(lock1, value);
+ if (popped == POPPED)
return std::move(value);
// Once the queue is empty, mClosed lets us know if there will ever be
- // any more coming.
- if (mClosed)
+ // any more coming. If we didn't pop because WAITING, i.e. canPop()
+ // returned false, then even if the producer end has been closed,
+ // there's still at least one item to drain: wait for it.
+ if (popped == EMPTY && mClosed)
{
LLTHROW(LLThreadSafeQueueInterrupt());
}
@@ -452,7 +451,7 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPop(ElementT & element)
// no need to check mClosed: tryPop() behavior when the queue is
// closed is implemented by simple inability to push any new
// elements
- return pop_(lock, element);
+ return pop_(lock, element) == POPPED;
});
}
@@ -479,26 +478,38 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil(
until,
[this, until, &element](lock_t& lock)
{
- while (true)
- {
- if (pop_(lock, element))
- return true;
+ return tryPopUntil_(lock, until, element);
+ });
+}
- if (mClosed)
- {
- return false;
- }
- // Storage empty. Wait for signal.
- if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until))
- {
- // timed out -- formally we might recheck both conditions above
- return false;
- }
- // If we didn't time out, we were notified for some reason. Loop back
- // to check.
- }
- });
+// body of tryPopUntil(), called once we have the lock
+template <typename ElementT, typename QueueT>
+template <typename Clock, typename Duration>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil_(
+ lock_t& lock,
+ const std::chrono::time_point<Clock, Duration>& until,
+ ElementT& element)
+{
+ while (true)
+ {
+ if (pop_(lock, element) == POPPED)
+ return true;
+
+ if (mClosed)
+ {
+ return false;
+ }
+
+ // Storage empty. Wait for signal.
+ if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until))
+ {
+ // timed out -- formally we might recheck both conditions above
+ return false;
+ }
+ // If we didn't time out, we were notified for some reason. Loop back
+ // to check.
+ }
}
@@ -509,6 +520,7 @@ size_t LLThreadSafeQueue<ElementT, QueueT>::size(void)
return mStorage.size();
}
+
template<typename ElementT, typename QueueT>
void LLThreadSafeQueue<ElementT, QueueT>::close()
{
@@ -521,17 +533,20 @@ void LLThreadSafeQueue<ElementT, QueueT>::close()
mCapacityCond.notify_all();
}
+
template<typename ElementT, typename QueueT>
bool LLThreadSafeQueue<ElementT, QueueT>::isClosed()
{
lock_t lock(mLock);
- return mClosed && mStorage.size() == 0;
+ return mClosed;
}
+
template<typename ElementT, typename QueueT>
-LLThreadSafeQueue<ElementT, QueueT>::operator bool()
+bool LLThreadSafeQueue<ElementT, QueueT>::done()
{
- return ! isClosed();
+ lock_t lock(mLock);
+ return mClosed && mStorage.size() == 0;
}
#endif
diff --git a/indra/llcommon/tests/threadsafeschedule_test.cpp b/indra/llcommon/tests/threadsafeschedule_test.cpp
new file mode 100644
index 0000000000..ec0fa0c928
--- /dev/null
+++ b/indra/llcommon/tests/threadsafeschedule_test.cpp
@@ -0,0 +1,65 @@
+/**
+ * @file threadsafeschedule_test.cpp
+ * @author Nat Goodspeed
+ * @date 2021-10-04
+ * @brief Test for threadsafeschedule.
+ *
+ * $LicenseInfo:firstyear=2021&license=viewerlgpl$
+ * Copyright (c) 2021, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+// Precompiled header
+#include "linden_common.h"
+// associated header
+#include "threadsafeschedule.h"
+// STL headers
+// std headers
+#include <chrono>
+// external library headers
+// other Linden headers
+#include "../test/lltut.h"
+
+using namespace std::literals::chrono_literals; // ms suffix
+using namespace std::literals::string_literals; // s suffix
+using Queue = LL::ThreadSafeSchedule<std::string>;
+
+/*****************************************************************************
+* TUT
+*****************************************************************************/
+namespace tut
+{
+ struct threadsafeschedule_data
+ {
+ Queue queue;
+ };
+ typedef test_group<threadsafeschedule_data> threadsafeschedule_group;
+ typedef threadsafeschedule_group::object object;
+ threadsafeschedule_group threadsafeschedulegrp("threadsafeschedule");
+
+ template<> template<>
+ void object::test<1>()
+ {
+ set_test_name("push");
+ // Simply calling push() a few times might result in indeterminate
+ // delivery order if the resolution of steady_clock is coarser than
+ // the real time required for each push() call. Explicitly increment
+ // the timestamp for each one -- but since we're passing explicit
+ // timestamps, make the queue reorder them.
+ queue.push(Queue::TimeTuple(Queue::Clock::now() + 20ms, "ghi"));
+ queue.push("abc"s);
+ queue.push(Queue::Clock::now() + 10ms, "def");
+ queue.close();
+ auto entry = queue.pop();
+ ensure_equals("failed to pop first", std::get<0>(entry), "abc"s);
+ entry = queue.pop();
+ ensure_equals("failed to pop second", std::get<0>(entry), "def"s);
+ ensure("queue not closed", queue.isClosed());
+ ensure("queue prematurely done", ! queue.done());
+ entry = queue.pop();
+ ensure_equals("failed to pop third", std::get<0>(entry), "ghi"s);
+ bool popped = queue.tryPop(entry);
+ ensure("queue not empty", ! popped);
+ ensure("queue not done", queue.done());
+ }
+} // namespace tut
diff --git a/indra/llcommon/threadsafeschedule.h b/indra/llcommon/threadsafeschedule.h
new file mode 100644
index 0000000000..545c820f53
--- /dev/null
+++ b/indra/llcommon/threadsafeschedule.h
@@ -0,0 +1,334 @@
+/**
+ * @file threadsafeschedule.h
+ * @author Nat Goodspeed
+ * @date 2021-10-02
+ * @brief ThreadSafeSchedule is an ordered queue in which every item has an
+ * associated timestamp.
+ *
+ * $LicenseInfo:firstyear=2021&license=viewerlgpl$
+ * Copyright (c) 2021, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+#if ! defined(LL_THREADSAFESCHEDULE_H)
+#define LL_THREADSAFESCHEDULE_H
+
+#include "chrono.h"
+#include "llexception.h"
+#include "llthreadsafequeue.h"
+#include "tuple.h"
+#include <chrono>
+#include <tuple>
+
+namespace LL
+{
+ namespace ThreadSafeSchedulePrivate
+ {
+ using TimePoint = std::chrono::steady_clock::time_point;
+ // Bundle consumer's data with a TimePoint to order items by timestamp.
+ template <typename... Args>
+ using TimestampedTuple = std::tuple<TimePoint, Args...>;
+
+ // comparison functor for TimedTuples -- see TimedQueue comments
+ struct ReverseTupleOrder
+ {
+ template <typename Tuple>
+ bool operator()(const Tuple& left, const Tuple& right) const
+ {
+ return std::get<0>(left) > std::get<0>(right);
+ }
+ };
+
+ template <typename... Args>
+ using TimedQueue = PriorityQueueAdapter<
+ TimestampedTuple<Args...>,
+ // std::vector is the default storage for std::priority_queue,
+ // have to restate to specify comparison template parameter
+ std::vector<TimestampedTuple<Args...>>,
+ // std::priority_queue uses a counterintuitive comparison
+ // behavior: the default std::less comparator is used to present
+ // the *highest* value as top(). So to sort by earliest timestamp,
+ // we must invert by using >.
+ ReverseTupleOrder>;
+ } // namespace ThreadSafeSchedulePrivate
+
+ /**
+ * ThreadSafeSchedule is an ordered LLThreadSafeQueue in which every item
+ * is given an associated timestamp. That is, TimePoint is implicitly
+ * prepended to the std::tuple with the specified types.
+ *
+ * Items are popped in increasing chronological order. Moreover, any item
+ * with a timestamp in the future is held back until
+ * std::chrono::steady_clock reaches that timestamp.
+ */
+ template <typename... Args>
+ class ThreadSafeSchedule:
+ public LLThreadSafeQueue<ThreadSafeSchedulePrivate::TimestampedTuple<Args...>,
+ ThreadSafeSchedulePrivate::TimedQueue<Args...>>
+ {
+ public:
+ using DataTuple = std::tuple<Args...>;
+ using TimeTuple = ThreadSafeSchedulePrivate::TimestampedTuple<Args...>;
+
+ private:
+ using super = LLThreadSafeQueue<TimeTuple, ThreadSafeSchedulePrivate::TimedQueue<Args...>>;
+ using lock_t = typename super::lock_t;
+ using super::pop_;
+ using super::push_;
+ using super::mClosed;
+ using super::mEmptyCond;
+ using super::mCapacityCond;
+
+ public:
+ using TimePoint = ThreadSafeSchedulePrivate::TimePoint;
+ using Clock = TimePoint::clock;
+
+ ThreadSafeSchedule(U32 capacity=1024):
+ super(capacity)
+ {}
+
+ /*----------------------------- push() -----------------------------*/
+ /// explicitly pass TimeTuple
+ using super::push;
+
+ /// pass DataTuple with implicit now
+ void push(const DataTuple& tuple)
+ {
+ push(tuple_cons(Clock::now(), tuple));
+ }
+
+ /// individually pass each component of the TimeTuple
+ void push(const TimePoint& time, Args&&... args)
+ {
+ push(TimeTuple(time, std::forward<Args>(args)...));
+ }
+
+ /// individually pass every component except the TimePoint (implies
+ /// now) -- could be ambiguous if the first specified template
+ /// parameter type is also TimePoint -- we could try to disambiguate,
+ /// but a simpler approach would be for the caller to explicitly
+ /// construct DataTuple and call that overload
+ void push(Args&&... args)
+ {
+ push(Clock::now(), std::forward<Args>(args)...);
+ }
+
+ /*--------------------------- tryPush() ----------------------------*/
+ /// explicit TimeTuple
+ using super::tryPush;
+
+ /// DataTuple with implicit now
+ bool tryPush(const DataTuple& tuple)
+ {
+ return tryPush(tuple_cons(Clock::now(), tuple));
+ }
+
+ /// individually pass components
+ bool tryPush(const TimePoint& time, Args&&... args)
+ {
+ return tryPush(TimeTuple(time, std::forward<Args>(args)...));
+ }
+
+ /// individually pass components with implicit now
+ bool tryPush(Args&&... args)
+ {
+ return tryPush(Clock::now(), std::forward<Args>(args)...);
+ }
+
+ /*-------------------------- tryPushFor() --------------------------*/
+ /// explicit TimeTuple
+ using super::tryPushFor;
+
+ /// DataTuple with implicit now
+ template <typename Rep, typename Period>
+ bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout,
+ const DataTuple& tuple)
+ {
+ return tryPushFor(timeout, tuple_cons(Clock::now(), tuple));
+ }
+
+ /// individually pass components
+ template <typename Rep, typename Period>
+ bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout,
+ const TimePoint& time, Args&&... args)
+ {
+ return tryPushFor(TimeTuple(time, std::forward<Args>(args)...));
+ }
+
+ /// individually pass components with implicit now
+ template <typename Rep, typename Period>
+ bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout,
+ Args&&... args)
+ {
+ return tryPushFor(Clock::now(), std::forward<Args>(args)...);
+ }
+
+ /*------------------------- tryPushUntil() -------------------------*/
+ /// explicit TimeTuple
+ using super::tryPushUntil;
+
+ /// DataTuple with implicit now
+ template <typename Clock, typename Duration>
+ bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until,
+ const DataTuple& tuple)
+ {
+ return tryPushUntil(until, tuple_cons(Clock::now(), tuple));
+ }
+
+ /// individually pass components
+ template <typename Clock, typename Duration>
+ bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until,
+ const TimePoint& time, Args&&... args)
+ {
+ return tryPushUntil(until, TimeTuple(time, std::forward<Args>(args)...));
+ }
+
+ /// individually pass components with implicit now
+ template <typename Clock, typename Duration>
+ bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until,
+ Args&&... args)
+ {
+ return tryPushUntil(until, Clock::now(), std::forward<Args>(args)...);
+ }
+
+ /*----------------------------- pop() ------------------------------*/
+ // Our consumer may or may not care about the timestamp associated
+ // with each popped item, so we allow retrieving either DataTuple or
+ // TimeTuple. One potential use would be to observe, and possibly
+ // adjust for, the time lag between the item time and the actual
+ // current time.
+
+ /// pop DataTuple by value
+ DataTuple pop()
+ {
+ return tuple_cdr(popWithTime());
+ }
+
+ /// pop TimeTuple by value
+ TimeTuple popWithTime()
+ {
+ lock_t lock(super::mLock);
+ // We can't just sit around waiting forever, given that there may
+ // be items in the queue that are not yet ready but will *become*
+ // ready in the near future. So in fact, with this class, every
+ // pop() becomes a tryPopUntil(), constrained to the timestamp of
+ // the head item. It almost doesn't matter what we specify for the
+ // caller's time constraint -- all we really care about is the
+ // head item's timestamp. Since pop() and popWithTime() are
+ // defined to wait until either an item becomes available or the
+ // queue is closed, loop until one of those things happens. The
+ // constraint we pass just determines how often we'll loop while
+ // waiting.
+ TimeTuple tt;
+ while (true)
+ {
+ // Pick a point suitably far into the future.
+ TimePoint until = TimePoint::clock::now() + std::chrono::hours(24);
+ if (tryPopUntil_(lock, until, tt))
+ return std::move(tt);
+
+ // empty and closed: throw, just as super::pop() does
+ if (super::mStorage.empty() && super::mClosed)
+ {
+ LLTHROW(LLThreadSafeQueueInterrupt());
+ }
+ // If not empty, we've still got items to drain.
+ // If not closed, it's worth waiting for more items.
+ // Either way, loop back to wait.
+ }
+ }
+
+ // We can use tryPop(TimeTuple&) just as it stands; the only behavior
+ // difference is in our canPop() override method.
+ using super::tryPop;
+
+ /// tryPop(DataTuple&)
+ bool tryPop(DataTuple& tuple)
+ {
+ TimeTuple tt;
+ if (! super::tryPop(tt))
+ return false;
+ tuple = tuple_cdr(std::move(tt));
+ return true;
+ }
+
+ /// tryPopFor()
+ template <typename Rep, typename Period, typename Tuple>
+ bool tryPopFor(const std::chrono::duration<Rep, Period>& timeout, Tuple& tuple)
+ {
+ // It's important to use OUR tryPopUntil() implementation, rather
+ // than delegating immediately to our base class.
+ return tryPopUntil(Clock::now() + timeout, tuple);
+ }
+
+ /// tryPopUntil(TimeTuple&)
+ template <typename Clock, typename Duration>
+ bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until,
+ TimeTuple& tuple)
+ {
+ // super::tryPopUntil() wakes up when an item becomes available or
+ // we hit 'until', whichever comes first. Thing is, the current
+ // head of the queue could become ready sooner than either of
+ // those events, and we need to deliver it as soon as it does.
+ // Don't wait past the TimePoint of the head item.
+ // Naturally, lock the queue before peeking at mStorage.
+ return super::tryLockUntil(
+ until,
+ [this, until, &tuple](lock_t& lock)
+ {
+ // Use our time_point_cast to allow for 'until' that's a
+ // time_point type other than TimePoint.
+ return tryPopUntil_(lock, time_point_cast<TimePoint>(until), tuple);
+ });
+ }
+
+ bool tryPopUntil_(lock_t& lock, const TimePoint& until, TimeTuple& tuple)
+ {
+ TimePoint adjusted = until;
+ if (! super::mStorage.empty())
+ {
+ // use whichever is earlier: the head item's timestamp, or
+ // the caller's limit
+ adjusted = min(std::get<0>(super::mStorage.front()), adjusted);
+ }
+ // now delegate to base-class tryPopUntil_()
+ return super::tryPopUntil_(lock, adjusted, tuple);
+ }
+
+ /// tryPopUntil(DataTuple&)
+ template <typename Clock, typename Duration>
+ bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until,
+ DataTuple& tuple)
+ {
+ TimeTuple tt;
+ if (! tryPopUntil(until, tt))
+ return false;
+ tuple = tuple_cdr(std::move(tt));
+ return true;
+ }
+
+ /*------------------------------ etc. ------------------------------*/
+ // We can't hide items that aren't yet ready because we can't traverse
+ // the underlying priority_queue: it has no iterators, only top(). So
+ // a consumer could observe size() > 0 and yet tryPop() returns false.
+ // Shrug, in a multi-consumer scenario that would be expected behavior.
+ using super::size;
+ // open/closed state
+ using super::close;
+ using super::isClosed;
+ using super::done;
+
+ private:
+ // this method is called by base class pop_() every time we're
+ // considering whether to deliver the current head element
+ bool canPop(const TimeTuple& head) const override
+ {
+ // an item with a future timestamp isn't yet ready to pop
+ // (should we add some slop for overhead?)
+ return std::get<0>(head) <= Clock::now();
+ }
+ };
+
+} // namespace LL
+
+#endif /* ! defined(LL_THREADSAFESCHEDULE_H) */