summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--indra/llcommon/llthreadsafequeue.h51
-rw-r--r--indra/llcommon/tests/threadsafeschedule_test.cpp10
-rw-r--r--indra/llcommon/threadsafeschedule.h72
3 files changed, 88 insertions, 45 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h
index bd2d82d4c3..719edcd579 100644
--- a/indra/llcommon/llthreadsafequeue.h
+++ b/indra/llcommon/llthreadsafequeue.h
@@ -179,11 +179,12 @@ protected:
boost::fibers::condition_variable_any mCapacityCond;
boost::fibers::condition_variable_any mEmptyCond;
+ enum pop_result { EMPTY, DONE, WAITING, POPPED };
// implementation logic, suitable for passing to tryLockUntil()
template <typename Clock, typename Duration>
- bool tryPopUntil_(lock_t& lock,
- const std::chrono::time_point<Clock, Duration>& until,
- ElementT& element);
+ pop_result tryPopUntil_(lock_t& lock,
+ const std::chrono::time_point<Clock, Duration>& until,
+ ElementT& element);
// if we're able to lock immediately, do so and run the passed callable,
// which must accept lock_t& and return bool
template <typename CALLABLE>
@@ -197,7 +198,6 @@ protected:
template <typename T>
bool push_(lock_t& lock, T&& element);
// while lock is locked, really pop the head element, if we can
- enum pop_result { EMPTY, WAITING, POPPED };
pop_result pop_(lock_t& lock, ElementT& element);
// Is the current head element ready to pop? We say yes; subclass can
// override as needed.
@@ -398,7 +398,7 @@ LLThreadSafeQueue<ElementT, QueueT>::pop_(lock_t& lock, ElementT& element)
{
// If mStorage is empty, there's no head element.
if (mStorage.empty())
- return EMPTY;
+ return mClosed? DONE : EMPTY;
// If there's a head element, pass it to canPop() to see if it's ready to pop.
if (! canPop(mStorage.front()))
@@ -427,16 +427,16 @@ ElementT LLThreadSafeQueue<ElementT, QueueT>::pop(void)
if (popped == POPPED)
return std::move(value);
- // Once the queue is empty, mClosed lets us know if there will ever be
- // any more coming. If we didn't pop because WAITING, i.e. canPop()
- // returned false, then even if the producer end has been closed,
- // there's still at least one item to drain: wait for it.
- if (popped == EMPTY && mClosed)
+ // Once the queue is DONE, there will never be any more coming.
+ if (popped == DONE)
{
LLTHROW(LLThreadSafeQueueInterrupt());
}
- // Storage empty, queue still open. Wait for signal.
+ // If we didn't pop because WAITING, i.e. canPop() returned false,
+ // then even if the producer end has been closed, there's still at
+ // least one item to drain: wait for it. Or we might be EMPTY, with
+ // the queue still open. Either way, wait for signal.
mEmptyCond.wait(lock1);
}
}
@@ -448,8 +448,8 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPop(ElementT & element)
return tryLock(
[this, &element](lock_t& lock)
{
- // no need to check mClosed: tryPop() behavior when the queue is
- // closed is implemented by simple inability to push any new
+ // conflate EMPTY, DONE, WAITING: tryPop() behavior when the queue
+ // is closed is implemented by simple inability to push any new
// elements
return pop_(lock, element) == POPPED;
});
@@ -478,7 +478,8 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil(
until,
[this, until, &element](lock_t& lock)
{
- return tryPopUntil_(lock, until, element);
+ // conflate EMPTY, DONE, WAITING
+ return tryPopUntil_(lock, until, element) == POPPED;
});
}
@@ -486,26 +487,28 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil(
// body of tryPopUntil(), called once we have the lock
template <typename ElementT, typename QueueT>
template <typename Clock, typename Duration>
-bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil_(
+typename LLThreadSafeQueue<ElementT, QueueT>::pop_result
+LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil_(
lock_t& lock,
const std::chrono::time_point<Clock, Duration>& until,
ElementT& element)
{
while (true)
{
- if (pop_(lock, element) == POPPED)
- return true;
-
- if (mClosed)
+ pop_result popped = pop_(lock, element);
+ if (popped == POPPED || popped == DONE)
{
- return false;
+ // If we succeeded, great! If we've drained the last item, so be
+ // it. Either way, break the loop and tell caller.
+ return popped;
}
- // Storage empty. Wait for signal.
+ // EMPTY or WAITING: wait for signal.
if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until))
{
- // timed out -- formally we might recheck both conditions above
- return false;
+ // timed out -- formally we might recheck
+ // as it is, break loop
+ return popped;
}
// If we didn't time out, we were notified for some reason. Loop back
// to check.
@@ -546,7 +549,7 @@ template<typename ElementT, typename QueueT>
bool LLThreadSafeQueue<ElementT, QueueT>::done()
{
lock_t lock(mLock);
- return mClosed && mStorage.size() == 0;
+ return mClosed && mStorage.empty();
}
#endif
diff --git a/indra/llcommon/tests/threadsafeschedule_test.cpp b/indra/llcommon/tests/threadsafeschedule_test.cpp
index ec0fa0c928..af67b9f492 100644
--- a/indra/llcommon/tests/threadsafeschedule_test.cpp
+++ b/indra/llcommon/tests/threadsafeschedule_test.cpp
@@ -47,6 +47,8 @@ namespace tut
// the timestamp for each one -- but since we're passing explicit
// timestamps, make the queue reorder them.
queue.push(Queue::TimeTuple(Queue::Clock::now() + 20ms, "ghi"));
+ // Given the various push() overloads, you have to match the type
+ // exactly: conversions are ambiguous.
queue.push("abc"s);
queue.push(Queue::Clock::now() + 10ms, "def");
queue.close();
@@ -56,9 +58,11 @@ namespace tut
ensure_equals("failed to pop second", std::get<0>(entry), "def"s);
ensure("queue not closed", queue.isClosed());
ensure("queue prematurely done", ! queue.done());
- entry = queue.pop();
- ensure_equals("failed to pop third", std::get<0>(entry), "ghi"s);
- bool popped = queue.tryPop(entry);
+ std::string s;
+ bool popped = queue.tryPopFor(1s, s);
+ ensure("failed to pop third", popped);
+ ensure_equals("third is wrong", s, "ghi"s);
+ popped = queue.tryPop(s);
ensure("queue not empty", ! popped);
ensure("queue not done", queue.done());
}
diff --git a/indra/llcommon/threadsafeschedule.h b/indra/llcommon/threadsafeschedule.h
index 545c820f53..8ab4311ca1 100644
--- a/indra/llcommon/threadsafeschedule.h
+++ b/indra/llcommon/threadsafeschedule.h
@@ -73,11 +73,7 @@ namespace LL
private:
using super = LLThreadSafeQueue<TimeTuple, ThreadSafeSchedulePrivate::TimedQueue<Args...>>;
using lock_t = typename super::lock_t;
- using super::pop_;
- using super::push_;
- using super::mClosed;
- using super::mEmptyCond;
- using super::mCapacityCond;
+ using pop_result = typename super::pop_result;
public:
using TimePoint = ThreadSafeSchedulePrivate::TimePoint;
@@ -92,6 +88,11 @@ namespace LL
using super::push;
/// pass DataTuple with implicit now
+ // This could be ambiguous for Args with a single type. Unfortunately
+ // we can't enable_if an individual method with a condition based on
+ // the *class* template arguments, only on that method's template
+ // arguments. We could specialize this class for the single-Args case;
+ // we could minimize redundancy by breaking out a common base class...
void push(const DataTuple& tuple)
{
push(tuple_cons(Clock::now(), tuple));
@@ -103,11 +104,11 @@ namespace LL
push(TimeTuple(time, std::forward<Args>(args)...));
}
- /// individually pass every component except the TimePoint (implies
- /// now) -- could be ambiguous if the first specified template
- /// parameter type is also TimePoint -- we could try to disambiguate,
- /// but a simpler approach would be for the caller to explicitly
- /// construct DataTuple and call that overload
+ /// individually pass every component except the TimePoint (implies now)
+ // This could be ambiguous if the first specified template parameter
+ // type is also TimePoint. We could try to disambiguate, but a simpler
+ // approach would be for the caller to explicitly construct DataTuple
+ // and call that overload.
void push(Args&&... args)
{
push(Clock::now(), std::forward<Args>(args)...);
@@ -199,6 +200,10 @@ namespace LL
// current time.
/// pop DataTuple by value
+ // It would be great to notice when sizeof...(Args) == 1 and directly
+ // return the first (only) value, instead of making pop()'s caller
+ // call std::get<0>(value). See push(DataTuple) remarks for why we
+ // haven't yet jumped through those hoops.
DataTuple pop()
{
return tuple_cdr(popWithTime());
@@ -224,16 +229,17 @@ namespace LL
{
// Pick a point suitably far into the future.
TimePoint until = TimePoint::clock::now() + std::chrono::hours(24);
- if (tryPopUntil_(lock, until, tt))
+ pop_result popped = tryPopUntil_(lock, until, tt);
+ if (popped == super::POPPED)
return std::move(tt);
- // empty and closed: throw, just as super::pop() does
- if (super::mStorage.empty() && super::mClosed)
+ // DONE: throw, just as super::pop() does
+ if (popped == super::DONE)
{
LLTHROW(LLThreadSafeQueueInterrupt());
}
- // If not empty, we've still got items to drain.
- // If not closed, it's worth waiting for more items.
+ // WAITING: we've still got items to drain.
+ // EMPTY: not closed, so it's worth waiting for more items.
// Either way, loop back to wait.
}
}
@@ -252,6 +258,16 @@ namespace LL
return true;
}
+ /// for when Args has exactly one type
+ bool tryPop(typename std::tuple_element<1, TimeTuple>::type& value)
+ {
+ TimeTuple tt;
+ if (! super::tryPop(tt))
+ return false;
+ value = std::get<1>(std::move(tt));
+ return true;
+ }
+
/// tryPopFor()
template <typename Rep, typename Period, typename Tuple>
bool tryPopFor(const std::chrono::duration<Rep, Period>& timeout, Tuple& tuple)
@@ -278,11 +294,12 @@ namespace LL
{
// Use our time_point_cast to allow for 'until' that's a
// time_point type other than TimePoint.
- return tryPopUntil_(lock, time_point_cast<TimePoint>(until), tuple);
+ return super::POPPED ==
+ tryPopUntil_(lock, LL::time_point_cast<TimePoint>(until), tuple);
});
}
- bool tryPopUntil_(lock_t& lock, const TimePoint& until, TimeTuple& tuple)
+ pop_result tryPopUntil_(lock_t& lock, const TimePoint& until, TimeTuple& tuple)
{
TimePoint adjusted = until;
if (! super::mStorage.empty())
@@ -292,7 +309,14 @@ namespace LL
adjusted = min(std::get<0>(super::mStorage.front()), adjusted);
}
// now delegate to base-class tryPopUntil_()
- return super::tryPopUntil_(lock, adjusted, tuple);
+ pop_result popped;
+ while ((popped = super::tryPopUntil_(lock, adjusted, tuple)) == super::WAITING)
+ {
+ // If super::tryPopUntil_() returns WAITING, it means there's
+ // a head item, but it's not yet time. But it's worth looping
+ // back to recheck.
+ }
+ return popped;
}
/// tryPopUntil(DataTuple&)
@@ -307,6 +331,18 @@ namespace LL
return true;
}
+ /// for when Args has exactly one type
+ template <typename Clock, typename Duration>
+ bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until,
+ typename std::tuple_element<1, TimeTuple>::type& value)
+ {
+ TimeTuple tt;
+ if (! tryPopUntil(until, tt))
+ return false;
+ value = std::get<1>(std::move(tt));
+ return true;
+ }
+
/*------------------------------ etc. ------------------------------*/
// We can't hide items that aren't yet ready because we can't traverse
// the underlying priority_queue: it has no iterators, only top(). So