diff options
-rw-r--r-- | indra/llcommon/llthreadsafequeue.h | 51 | ||||
-rw-r--r-- | indra/llcommon/tests/threadsafeschedule_test.cpp | 10 | ||||
-rw-r--r-- | indra/llcommon/threadsafeschedule.h | 72 |
3 files changed, 88 insertions, 45 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index bd2d82d4c3..719edcd579 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -179,11 +179,12 @@ protected: boost::fibers::condition_variable_any mCapacityCond; boost::fibers::condition_variable_any mEmptyCond; + enum pop_result { EMPTY, DONE, WAITING, POPPED }; // implementation logic, suitable for passing to tryLockUntil() template <typename Clock, typename Duration> - bool tryPopUntil_(lock_t& lock, - const std::chrono::time_point<Clock, Duration>& until, - ElementT& element); + pop_result tryPopUntil_(lock_t& lock, + const std::chrono::time_point<Clock, Duration>& until, + ElementT& element); // if we're able to lock immediately, do so and run the passed callable, // which must accept lock_t& and return bool template <typename CALLABLE> @@ -197,7 +198,6 @@ protected: template <typename T> bool push_(lock_t& lock, T&& element); // while lock is locked, really pop the head element, if we can - enum pop_result { EMPTY, WAITING, POPPED }; pop_result pop_(lock_t& lock, ElementT& element); // Is the current head element ready to pop? We say yes; subclass can // override as needed. @@ -398,7 +398,7 @@ LLThreadSafeQueue<ElementT, QueueT>::pop_(lock_t& lock, ElementT& element) { // If mStorage is empty, there's no head element. if (mStorage.empty()) - return EMPTY; + return mClosed? DONE : EMPTY; // If there's a head element, pass it to canPop() to see if it's ready to pop. if (! canPop(mStorage.front())) @@ -427,16 +427,16 @@ ElementT LLThreadSafeQueue<ElementT, QueueT>::pop(void) if (popped == POPPED) return std::move(value); - // Once the queue is empty, mClosed lets us know if there will ever be - // any more coming. If we didn't pop because WAITING, i.e. canPop() - // returned false, then even if the producer end has been closed, - // there's still at least one item to drain: wait for it. - if (popped == EMPTY && mClosed) + // Once the queue is DONE, there will never be any more coming. + if (popped == DONE) { LLTHROW(LLThreadSafeQueueInterrupt()); } - // Storage empty, queue still open. Wait for signal. + // If we didn't pop because WAITING, i.e. canPop() returned false, + // then even if the producer end has been closed, there's still at + // least one item to drain: wait for it. Or we might be EMPTY, with + // the queue still open. Either way, wait for signal. mEmptyCond.wait(lock1); } } @@ -448,8 +448,8 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPop(ElementT & element) return tryLock( [this, &element](lock_t& lock) { - // no need to check mClosed: tryPop() behavior when the queue is - // closed is implemented by simple inability to push any new + // conflate EMPTY, DONE, WAITING: tryPop() behavior when the queue + // is closed is implemented by simple inability to push any new // elements return pop_(lock, element) == POPPED; }); @@ -478,7 +478,8 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil( until, [this, until, &element](lock_t& lock) { - return tryPopUntil_(lock, until, element); + // conflate EMPTY, DONE, WAITING + return tryPopUntil_(lock, until, element) == POPPED; }); } @@ -486,26 +487,28 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil( // body of tryPopUntil(), called once we have the lock template <typename ElementT, typename QueueT> template <typename Clock, typename Duration> -bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil_( +typename LLThreadSafeQueue<ElementT, QueueT>::pop_result +LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil_( lock_t& lock, const std::chrono::time_point<Clock, Duration>& until, ElementT& element) { while (true) { - if (pop_(lock, element) == POPPED) - return true; - - if (mClosed) + pop_result popped = pop_(lock, element); + if (popped == POPPED || popped == DONE) { - return false; + // If we succeeded, great! If we've drained the last item, so be + // it. Either way, break the loop and tell caller. + return popped; } - // Storage empty. Wait for signal. + // EMPTY or WAITING: wait for signal. if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until)) { - // timed out -- formally we might recheck both conditions above - return false; + // timed out -- formally we might recheck + // as it is, break loop + return popped; } // If we didn't time out, we were notified for some reason. Loop back // to check. @@ -546,7 +549,7 @@ template<typename ElementT, typename QueueT> bool LLThreadSafeQueue<ElementT, QueueT>::done() { lock_t lock(mLock); - return mClosed && mStorage.size() == 0; + return mClosed && mStorage.empty(); } #endif diff --git a/indra/llcommon/tests/threadsafeschedule_test.cpp b/indra/llcommon/tests/threadsafeschedule_test.cpp index ec0fa0c928..af67b9f492 100644 --- a/indra/llcommon/tests/threadsafeschedule_test.cpp +++ b/indra/llcommon/tests/threadsafeschedule_test.cpp @@ -47,6 +47,8 @@ namespace tut // the timestamp for each one -- but since we're passing explicit // timestamps, make the queue reorder them. queue.push(Queue::TimeTuple(Queue::Clock::now() + 20ms, "ghi")); + // Given the various push() overloads, you have to match the type + // exactly: conversions are ambiguous. queue.push("abc"s); queue.push(Queue::Clock::now() + 10ms, "def"); queue.close(); @@ -56,9 +58,11 @@ namespace tut ensure_equals("failed to pop second", std::get<0>(entry), "def"s); ensure("queue not closed", queue.isClosed()); ensure("queue prematurely done", ! queue.done()); - entry = queue.pop(); - ensure_equals("failed to pop third", std::get<0>(entry), "ghi"s); - bool popped = queue.tryPop(entry); + std::string s; + bool popped = queue.tryPopFor(1s, s); + ensure("failed to pop third", popped); + ensure_equals("third is wrong", s, "ghi"s); + popped = queue.tryPop(s); ensure("queue not empty", ! popped); ensure("queue not done", queue.done()); } diff --git a/indra/llcommon/threadsafeschedule.h b/indra/llcommon/threadsafeschedule.h index 545c820f53..8ab4311ca1 100644 --- a/indra/llcommon/threadsafeschedule.h +++ b/indra/llcommon/threadsafeschedule.h @@ -73,11 +73,7 @@ namespace LL private: using super = LLThreadSafeQueue<TimeTuple, ThreadSafeSchedulePrivate::TimedQueue<Args...>>; using lock_t = typename super::lock_t; - using super::pop_; - using super::push_; - using super::mClosed; - using super::mEmptyCond; - using super::mCapacityCond; + using pop_result = typename super::pop_result; public: using TimePoint = ThreadSafeSchedulePrivate::TimePoint; @@ -92,6 +88,11 @@ namespace LL using super::push; /// pass DataTuple with implicit now + // This could be ambiguous for Args with a single type. Unfortunately + // we can't enable_if an individual method with a condition based on + // the *class* template arguments, only on that method's template + // arguments. We could specialize this class for the single-Args case; + // we could minimize redundancy by breaking out a common base class... void push(const DataTuple& tuple) { push(tuple_cons(Clock::now(), tuple)); @@ -103,11 +104,11 @@ namespace LL push(TimeTuple(time, std::forward<Args>(args)...)); } - /// individually pass every component except the TimePoint (implies - /// now) -- could be ambiguous if the first specified template - /// parameter type is also TimePoint -- we could try to disambiguate, - /// but a simpler approach would be for the caller to explicitly - /// construct DataTuple and call that overload + /// individually pass every component except the TimePoint (implies now) + // This could be ambiguous if the first specified template parameter + // type is also TimePoint. We could try to disambiguate, but a simpler + // approach would be for the caller to explicitly construct DataTuple + // and call that overload. void push(Args&&... args) { push(Clock::now(), std::forward<Args>(args)...); @@ -199,6 +200,10 @@ namespace LL // current time. /// pop DataTuple by value + // It would be great to notice when sizeof...(Args) == 1 and directly + // return the first (only) value, instead of making pop()'s caller + // call std::get<0>(value). See push(DataTuple) remarks for why we + // haven't yet jumped through those hoops. DataTuple pop() { return tuple_cdr(popWithTime()); @@ -224,16 +229,17 @@ namespace LL { // Pick a point suitably far into the future. TimePoint until = TimePoint::clock::now() + std::chrono::hours(24); - if (tryPopUntil_(lock, until, tt)) + pop_result popped = tryPopUntil_(lock, until, tt); + if (popped == super::POPPED) return std::move(tt); - // empty and closed: throw, just as super::pop() does - if (super::mStorage.empty() && super::mClosed) + // DONE: throw, just as super::pop() does + if (popped == super::DONE) { LLTHROW(LLThreadSafeQueueInterrupt()); } - // If not empty, we've still got items to drain. - // If not closed, it's worth waiting for more items. + // WAITING: we've still got items to drain. + // EMPTY: not closed, so it's worth waiting for more items. // Either way, loop back to wait. } } @@ -252,6 +258,16 @@ namespace LL return true; } + /// for when Args has exactly one type + bool tryPop(typename std::tuple_element<1, TimeTuple>::type& value) + { + TimeTuple tt; + if (! super::tryPop(tt)) + return false; + value = std::get<1>(std::move(tt)); + return true; + } + /// tryPopFor() template <typename Rep, typename Period, typename Tuple> bool tryPopFor(const std::chrono::duration<Rep, Period>& timeout, Tuple& tuple) @@ -278,11 +294,12 @@ namespace LL { // Use our time_point_cast to allow for 'until' that's a // time_point type other than TimePoint. - return tryPopUntil_(lock, time_point_cast<TimePoint>(until), tuple); + return super::POPPED == + tryPopUntil_(lock, LL::time_point_cast<TimePoint>(until), tuple); }); } - bool tryPopUntil_(lock_t& lock, const TimePoint& until, TimeTuple& tuple) + pop_result tryPopUntil_(lock_t& lock, const TimePoint& until, TimeTuple& tuple) { TimePoint adjusted = until; if (! super::mStorage.empty()) @@ -292,7 +309,14 @@ namespace LL adjusted = min(std::get<0>(super::mStorage.front()), adjusted); } // now delegate to base-class tryPopUntil_() - return super::tryPopUntil_(lock, adjusted, tuple); + pop_result popped; + while ((popped = super::tryPopUntil_(lock, adjusted, tuple)) == super::WAITING) + { + // If super::tryPopUntil_() returns WAITING, it means there's + // a head item, but it's not yet time. But it's worth looping + // back to recheck. + } + return popped; } /// tryPopUntil(DataTuple&) @@ -307,6 +331,18 @@ namespace LL return true; } + /// for when Args has exactly one type + template <typename Clock, typename Duration> + bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until, + typename std::tuple_element<1, TimeTuple>::type& value) + { + TimeTuple tt; + if (! tryPopUntil(until, tt)) + return false; + value = std::get<1>(std::move(tt)); + return true; + } + /*------------------------------ etc. ------------------------------*/ // We can't hide items that aren't yet ready because we can't traverse // the underlying priority_queue: it has no iterators, only top(). So |