summaryrefslogtreecommitdiff
path: root/indra/llcommon/llthreadsafequeue.h
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llcommon/llthreadsafequeue.h')
-rw-r--r--indra/llcommon/llthreadsafequeue.h121
1 files changed, 68 insertions, 53 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h
index 1dffad6b89..bd2d82d4c3 100644
--- a/indra/llcommon/llthreadsafequeue.h
+++ b/indra/llcommon/llthreadsafequeue.h
@@ -83,6 +83,7 @@ public:
// Limiting the number of pending items prevents unbounded growth of the
// underlying queue.
LLThreadSafeQueue(U32 capacity = 1024);
+ virtual ~LLThreadSafeQueue() {}
// Add an element to the queue (will block if the queue has
// reached capacity).
@@ -162,10 +163,10 @@ public:
// then every subsequent tryPop() call will return false
void close();
- // detect closed state
+ // producer end: are we prevented from pushing any additional items?
bool isClosed();
- // inverse of isClosed()
- explicit operator bool();
+ // consumer end: are we done, is the queue entirely drained?
+ bool done();
protected:
typedef QueueT queue_type;
@@ -178,6 +179,11 @@ protected:
boost::fibers::condition_variable_any mCapacityCond;
boost::fibers::condition_variable_any mEmptyCond;
+ // implementation logic, suitable for passing to tryLockUntil()
+ template <typename Clock, typename Duration>
+ bool tryPopUntil_(lock_t& lock,
+ const std::chrono::time_point<Clock, Duration>& until,
+ ElementT& element);
// if we're able to lock immediately, do so and run the passed callable,
// which must accept lock_t& and return bool
template <typename CALLABLE>
@@ -191,11 +197,11 @@ protected:
template <typename T>
bool push_(lock_t& lock, T&& element);
// while lock is locked, really pop the head element, if we can
- bool pop_(lock_t& lock, ElementT& element);
- // pop_() with an explicit predicate indicating whether the head element
- // is ready to be popped
- template <typename PRED>
- bool pop_(lock_t& lock, ElementT& element, PRED&& pred);
+ enum pop_result { EMPTY, WAITING, POPPED };
+ pop_result pop_(lock_t& lock, ElementT& element);
+ // Is the current head element ready to pop? We say yes; subclass can
+ // override as needed.
+ virtual bool canPop(const ElementT& head) const { return true; }
};
/*****************************************************************************
@@ -387,26 +393,16 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPushUntil(
// while lock is locked, really pop the head element, if we can
template <typename ElementT, typename QueueT>
-bool LLThreadSafeQueue<ElementT, QueueT>::pop_(lock_t& lock, ElementT& element)
-{
- // default predicate: head element, if present, is always ready to pop
- return pop_(lock, element, [](const ElementT&){ return true; });
-}
-
-
-// pop_() with an explicit predicate indicating whether the head element
-// is ready to be popped
-template <typename ElementT, typename QueueT>
-template <typename PRED>
-bool LLThreadSafeQueue<ElementT, QueueT>::pop_(
- lock_t& lock, ElementT& element, PRED&& pred)
+typename LLThreadSafeQueue<ElementT, QueueT>::pop_result
+LLThreadSafeQueue<ElementT, QueueT>::pop_(lock_t& lock, ElementT& element)
{
// If mStorage is empty, there's no head element.
- // If there's a head element, pass it to the predicate to see if caller
- // considers it ready to pop.
- // Unless both are satisfied, no point in continuing.
- if (mStorage.empty() || ! std::forward<PRED>(pred)(mStorage.front()))
- return false;
+ if (mStorage.empty())
+ return EMPTY;
+
+ // If there's a head element, pass it to canPop() to see if it's ready to pop.
+ if (! canPop(mStorage.front()))
+ return WAITING;
// std::queue::front() is the element about to pop()
element = mStorage.front();
@@ -414,7 +410,7 @@ bool LLThreadSafeQueue<ElementT, QueueT>::pop_(
lock.unlock();
// now that we've popped, if somebody's been waiting to push, signal them
mCapacityCond.notify_one();
- return true;
+ return POPPED;
}
@@ -422,17 +418,20 @@ template<typename ElementT, typename QueueT>
ElementT LLThreadSafeQueue<ElementT, QueueT>::pop(void)
{
lock_t lock1(mLock);
+ ElementT value;
while (true)
{
// On the consumer side, we always try to pop before checking mClosed
// so we can finish draining the queue.
- ElementT value;
- if (pop_(lock1, value))
+ pop_result popped = pop_(lock1, value);
+ if (popped == POPPED)
return std::move(value);
// Once the queue is empty, mClosed lets us know if there will ever be
- // any more coming.
- if (mClosed)
+ // any more coming. If we didn't pop because WAITING, i.e. canPop()
+ // returned false, then even if the producer end has been closed,
+ // there's still at least one item to drain: wait for it.
+ if (popped == EMPTY && mClosed)
{
LLTHROW(LLThreadSafeQueueInterrupt());
}
@@ -452,7 +451,7 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPop(ElementT & element)
// no need to check mClosed: tryPop() behavior when the queue is
// closed is implemented by simple inability to push any new
// elements
- return pop_(lock, element);
+ return pop_(lock, element) == POPPED;
});
}
@@ -479,26 +478,38 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil(
until,
[this, until, &element](lock_t& lock)
{
- while (true)
- {
- if (pop_(lock, element))
- return true;
+ return tryPopUntil_(lock, until, element);
+ });
+}
- if (mClosed)
- {
- return false;
- }
- // Storage empty. Wait for signal.
- if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until))
- {
- // timed out -- formally we might recheck both conditions above
- return false;
- }
- // If we didn't time out, we were notified for some reason. Loop back
- // to check.
- }
- });
+// body of tryPopUntil(), called once we have the lock
+template <typename ElementT, typename QueueT>
+template <typename Clock, typename Duration>
+bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil_(
+ lock_t& lock,
+ const std::chrono::time_point<Clock, Duration>& until,
+ ElementT& element)
+{
+ while (true)
+ {
+ if (pop_(lock, element) == POPPED)
+ return true;
+
+ if (mClosed)
+ {
+ return false;
+ }
+
+ // Storage empty. Wait for signal.
+ if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until))
+ {
+ // timed out -- formally we might recheck both conditions above
+ return false;
+ }
+ // If we didn't time out, we were notified for some reason. Loop back
+ // to check.
+ }
}
@@ -509,6 +520,7 @@ size_t LLThreadSafeQueue<ElementT, QueueT>::size(void)
return mStorage.size();
}
+
template<typename ElementT, typename QueueT>
void LLThreadSafeQueue<ElementT, QueueT>::close()
{
@@ -521,17 +533,20 @@ void LLThreadSafeQueue<ElementT, QueueT>::close()
mCapacityCond.notify_all();
}
+
template<typename ElementT, typename QueueT>
bool LLThreadSafeQueue<ElementT, QueueT>::isClosed()
{
lock_t lock(mLock);
- return mClosed && mStorage.size() == 0;
+ return mClosed;
}
+
template<typename ElementT, typename QueueT>
-LLThreadSafeQueue<ElementT, QueueT>::operator bool()
+bool LLThreadSafeQueue<ElementT, QueueT>::done()
{
- return ! isClosed();
+ lock_t lock(mLock);
+ return mClosed && mStorage.size() == 0;
}
#endif