summaryrefslogtreecommitdiff
path: root/indra/llcommon/llthreadsafequeue.h
diff options
context:
space:
mode:
authorNat Goodspeed <nat@lindenlab.com>2021-10-05 17:31:53 -0400
committerNat Goodspeed <nat@lindenlab.com>2021-10-05 17:31:53 -0400
commit955b967623983cb50ba09f7b82e5f01f2c6bcebb (patch)
tree069161d4519c50c807e943c9f57e833d7ccee6be /indra/llcommon/llthreadsafequeue.h
parenta35e266547e4d2c8dbd6b003c64b719d91eaaf87 (diff)
SL-16024: Add ThreadSafeSchedule, a timestamped LLThreadSafeQueue.
ThreadSafeSchedule orders its items by timestamp, which can be passed either implicitly or explicitly. The timestamp specifies earliest delivery time: an item cannot be popped until that time. Add initial tests. Tweak the LLThreadSafeQueue base class to support ThreadSafeSchedule: introduce virtual canPop() method to report whether the current head item is available to pop. The base class unconditionally says yes, ThreadSafeSchedule says it depends on whether its timestamp is still in the future. This replaces the protected pop_() overload accepting a predicate. Rather than explicitly passing a predicate through a couple levels of function call, use canPop() at the level it matters. Runtime behavior that varies depending on an object's leaf class is what virtual functions were invented for. Give pop_() a three-state enum return so pop() can distinguish between "closed and empty" (throws exception) versus "closed, not yet drained because we're not yet ready to pop the head item" (waits). Also break out protected tryPopUntil_() method, the body logic of tryPopUntil(). The public method locks the data structure, the protected method requires that its caller has already done so. Add chrono.h with a more full-featured LL::time_point_cast() function than the one found in <chrono>, which only converts between time_point durations, not between time_points based on different clocks.
Diffstat (limited to 'indra/llcommon/llthreadsafequeue.h')
-rw-r--r--indra/llcommon/llthreadsafequeue.h121
1 files changed, 68 insertions, 53 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h
index 1dffad6b89..bd2d82d4c3 100644
--- a/indra/llcommon/llthreadsafequeue.h
+++ b/indra/llcommon/llthreadsafequeue.h
@@ -83,6 +83,7 @@ public:
// Limiting the number of pending items prevents unbounded growth of the
// underlying queue.
LLThreadSafeQueue(U32 capacity = 1024);
+ virtual ~LLThreadSafeQueue() {}
// Add an element to the queue (will block if the queue has
// reached capacity).
@@ -162,10 +163,10 @@ public:
// then every subsequent tryPop() call will return false
void close();
- // detect closed state
+ // producer end: are we prevented from pushing any additional items?
bool isClosed();
- // inverse of isClosed()
- explicit operator bool();
+ // consumer end: are we done, is the queue entirely drained?
+ bool done();
protected:
typedef QueueT queue_type;
@@ -178,6 +179,11 @@ protected:
boost::fibers::condition_variable_any mCapacityCond;
boost::fibers::condition_variable_any mEmptyCond;
+ // implementation logic, suitable for passing to tryLockUntil()
+ template <typename Clock, typename Duration>
+ bool tryPopUntil_(lock_t& lock,
+ const std::chrono::time_point<Clock, Duration>& until,
+ ElementT& element);
// if we're able to lock immediately, do so and run the passed callable,
// which must accept lock_t& and return bool
template <typename CALLABLE>
@@ -191,11 +197,11 @@ protected:
template <typename T>
bool push_(lock_t& lock, T&& element);
// while lock is locked, really pop the head element, if we can
- bool pop_(lock_t& lock, ElementT& element);
- // pop_() with an explicit predicate indicating whether the head element
- // is ready to be popped
- template <typename PRED>
- bool pop_(lock_t& lock, ElementT& element, PRED&& pred);
+ enum pop_result { EMPTY, WAITING, POPPED };
+ pop_result pop_(lock_t& lock, ElementT& element);
+ // Is the current head element ready to pop? We say yes; subclass can
+ // override as needed.
+ virtual bool canPop(const ElementT& head) const { return true; }
};
/*****************************************************************************
@@ -387,26 +393,16 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPushUntil(
// while lock is locked, really pop the head element, if we can
template <typename ElementT, typename QueueT>
-bool LLThreadSafeQueue<ElementT, QueueT>::pop_(lock_t& lock, ElementT& element)
-{
- // default predicate: head element, if present, is always ready to pop
- return pop_(lock, element, [](const ElementT&){ return true; });
-}
-
-
-// pop_() with an explicit predicate indicating whether the head element
-// is ready to be popped
-template <typename ElementT, typename QueueT>
-template <typename PRED>
-bool LLThreadSafeQueue<ElementT, QueueT>::pop_(
- lock_t& lock, ElementT& element, PRED&& pred)
+typename LLThreadSafeQueue<ElementT, QueueT>::pop_result
+LLThreadSafeQueue<ElementT, QueueT>::pop_(lock_t& lock, ElementT& element)
{
// If mStorage is empty, there's no head element.
- // If there's a head element, pass it to the predicate to see if caller
- // considers it ready to pop.
- // Unless both are satisfied, no point in continuing.
- if (mStorage.empty() || ! std::forward<PRED>(pred)(mStorage.front()))
- return false;
+ if (mStorage.empty())
+ return EMPTY;
+
+ // If there's a head element, pass it to canPop() to see if it's ready to pop.
+ if (! canPop(mStorage.front()))
+ return WAITING;
// std::queue::front() is the element about to pop()
element = mStorage.front();
@@ -414,7 +410,7 @@ bool LLThreadSafeQueue<ElementT, QueueT>::pop_(
lock.unlock();
// now that we've popped, if somebody's been waiting to push, signal them
mCapacityCond.notify_one();
- return true;
+ return POPPED;
}
@@ -422,17 +418,20 @@ template<typename ElementT, typename QueueT>
ElementT LLThreadSafeQueue<ElementT, QueueT>::pop(void)
{
lock_t lock1(mLock);
+ ElementT value;
while (true)
{
// On the consumer side, we always try to pop before checking mClosed
// so we can finish draining the queue.
- ElementT value;
- if (pop_(lock1, value))
+ pop_result popped = pop_(lock1, value);
+ if (popped == POPPED)
return std::move(value);
// Once the queue is empty, mClosed lets us know if there will ever be
- // any more coming.
- if (mClosed)
+ // any more coming. If we didn't pop because WAITING, i.e. canPop()
+ // returned false, then even if the producer end has been closed,
+ // there's still at least one item to drain: wait for it.
+ if (popped == EMPTY && mClosed)
{
LLTHROW(LLThreadSafeQueueInterrupt());
}
@@ -452,7 +451,7 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPop(ElementT & element)
// no need to check mClosed: tryPop() behavior when the queue is
// closed is implemented by simple inability to push any new
// elements
- return pop_(lock, element);
+ return pop_(lock, element) == POPPED;
});
}
@@ -479,26 +478,38 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil(
until,
[this, until, &element](lock_t& lock)
{
- while (true)
- {
- if (pop_(lock, element))
- return true;
+ return tryPopUntil_(lock, until, element);
+ });
+}
- if (mClosed)
- {
- return false;
- }
- // Storage empty. Wait for signal.
- if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until))
- {
- // timed out -- formally we might recheck both conditions above
- return false;
- }
- // If we didn't time out, we were notified for some reason. Loop back
- // to check.
- }
- });
+// body of tryPopUntil(), called once we have the lock
+template <typename ElementT, typename QueueT>
+template <typename Clock, typename Duration>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil_(
+ lock_t& lock,
+ const std::chrono::time_point<Clock, Duration>& until,
+ ElementT& element)
+{
+ while (true)
+ {
+ if (pop_(lock, element) == POPPED)
+ return true;
+
+ if (mClosed)
+ {
+ return false;
+ }
+
+ // Storage empty. Wait for signal.
+ if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until))
+ {
+ // timed out -- formally we might recheck both conditions above
+ return false;
+ }
+ // If we didn't time out, we were notified for some reason. Loop back
+ // to check.
+ }
}
@@ -509,6 +520,7 @@ size_t LLThreadSafeQueue<ElementT, QueueT>::size(void)
return mStorage.size();
}
+
template<typename ElementT, typename QueueT>
void LLThreadSafeQueue<ElementT, QueueT>::close()
{
@@ -521,17 +533,20 @@ void LLThreadSafeQueue<ElementT, QueueT>::close()
mCapacityCond.notify_all();
}
+
template<typename ElementT, typename QueueT>
bool LLThreadSafeQueue<ElementT, QueueT>::isClosed()
{
lock_t lock(mLock);
- return mClosed && mStorage.size() == 0;
+ return mClosed;
}
+
template<typename ElementT, typename QueueT>
-LLThreadSafeQueue<ElementT, QueueT>::operator bool()
+bool LLThreadSafeQueue<ElementT, QueueT>::done()
{
- return ! isClosed();
+ lock_t lock(mLock);
+ return mClosed && mStorage.size() == 0;
}
#endif