diff options
| -rw-r--r-- | indra/llcommon/llthreadsafequeue.h | 358 | 
1 files changed, 235 insertions, 123 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 04f51816d7..c57520c01f 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -37,6 +37,9 @@  #include <queue>  #include <string> +/***************************************************************************** +*   LLThreadSafeQueue +*****************************************************************************/  //  // A general queue exception.  // @@ -77,8 +80,8 @@ class LLThreadSafeQueue  public:  	typedef ElementT value_type; -	// If the pool is set to NULL one will be allocated and managed by this -	// queue. +	// Limiting the number of pending items prevents unbounded growth of the +	// underlying queue.  	LLThreadSafeQueue(U32 capacity = 1024);  	// Add an element to the queue (will block if the queue has @@ -86,13 +89,15 @@ public:  	//  	// This call will raise an interrupt error if the queue is closed while  	// the caller is blocked. -	void push(ElementT const& element); +	template <typename T> +	void push(T&& element);  	// legacy name  	void pushFront(ElementT const & element) { return push(element); }  	// Try to add an element to the queue without blocking. Returns  	// true only if the element was actually added. -	bool tryPush(ElementT const& element); +	template <typename T> +	bool tryPush(T&& element);  	// legacy name  	bool tryPushFront(ElementT const & element) { return tryPush(element); } @@ -102,9 +107,9 @@ public:  	// to lock the mutex, versus how long to wait for the queue to stop being  	// full. Careful settings for each timeout might be orders of magnitude  	// apart. However, this method conflates them. -	template <typename Rep, typename Period> +	template <typename Rep, typename Period, typename T>  	bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout, -					ElementT const & element); +					T&& element);  	// legacy name  	template <typename Rep, typename Period>  	bool tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout, @@ -112,9 +117,9 @@ public:  	// Try to add an element to the queue, blocking if full but with  	// timeout at specified time_point. Returns true if the element was added. -	template <typename Clock, typename Duration> -	bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& timeout, -					  ElementT const& element); +	template <typename Clock, typename Duration, typename T> +	bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until, +					  T&& element);  	// no legacy name because this is a newer method  	// Pop the element at the head of the queue (will block if the queue is @@ -141,7 +146,7 @@ public:  	// Pop the element at the head of the queue, blocking if empty, with  	// timeout at specified time_point. Returns true if an element was popped.  	template <typename Clock, typename Duration> -	bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& timeout, +	bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until,  					 ElementT& element);  	// no legacy name because this is a newer method @@ -172,11 +177,74 @@ protected:  	typedef std::unique_lock<decltype(mLock)> lock_t;  	boost::fibers::condition_variable_any mCapacityCond;  	boost::fibers::condition_variable_any mEmptyCond; -}; -// LLThreadSafeQueue -//----------------------------------------------------------------------------- +	// if we're able to lock immediately, do so and run the passed callable, +	// which must accept lock_t& and return bool +	template <typename CALLABLE> +	bool tryLock(CALLABLE&& callable); +	// if we're able to lock before the passed time_point, do so and run the +	// passed callable, which must accept lock_t& and return bool +	template <typename Clock, typename Duration, typename CALLABLE> +	bool tryLockUntil(const std::chrono::time_point<Clock, Duration>& until, +					  CALLABLE&& callable); +	// while lock is locked, really push the passed element, if we can +	template <typename T> +	bool push_(lock_t& lock, T&& element); +	// while lock is locked, really pop the head element, if we can +	template <typename PRED> +	bool pop_(lock_t& lock, ElementT& element, +			  PRED&& pred=[](const ElementT&){ return true; }); +}; +/***************************************************************************** +*   PriorityQueueAdapter +*****************************************************************************/ +namespace LL +{ +    /** +     * std::priority_queue's API is almost like std::queue, intentionally of +     * course, but you must access the element about to pop() as top() rather +     * than as front(). Make an adapter for use with LLThreadSafeQueue. +     */ +    template <typename T, typename Container=std::vector<T>, +              typename Compare=std::less<typename Container::value_type>> +    class PriorityQueueAdapter +    { +    public: +        // publish all the same types +        typedef std::priority_queue<T, Container, Compare> queue_type; +        typedef typename queue_type::container_type  container_type; +        typedef typename queue_type::value_compare   value_compare; +        typedef typename queue_type::value_type      value_type; +        typedef typename queue_type::size_type       size_type; +        typedef typename queue_type::reference       reference; +        typedef typename queue_type::const_reference const_reference; + +        // Although std::queue defines both const and non-const front() +        // methods, std::priority_queue defines only const top(). +        const_reference front() const { return mQ.top(); } +        // std::priority_queue has no equivalent to back(), so it's good that +        // LLThreadSafeQueue doesn't use it. + +        // All the rest of these merely forward to the corresponding +        // queue_type methods. +        bool empty() const                 { return mQ.empty(); } +        size_type size() const             { return mQ.size(); } +        void push(const value_type& value) { mQ.push(value); } +        void push(value_type&& value)      { mQ.push(std::move(value)); } +        template <typename... Args> +        void emplace(Args&&... args)       { mQ.emplace(std::forward<Args>(args)...); } +        void pop()                         { mQ.pop(); } + +    private: +        queue_type mQ; +    }; +} // namespace LL + + +/***************************************************************************** +*   LLThreadSafeQueue implementation +*****************************************************************************/  template<typename ElementT, typename QueueT>  LLThreadSafeQueue<ElementT, QueueT>::LLThreadSafeQueue(U32 capacity) :      mCapacity(capacity), @@ -185,24 +253,69 @@ LLThreadSafeQueue<ElementT, QueueT>::LLThreadSafeQueue(U32 capacity) :  } -template<typename ElementT, typename QueueT> -void LLThreadSafeQueue<ElementT, QueueT>::push(ElementT const & element) +// if we're able to lock immediately, do so and run the passed callable, which +// must accept lock_t& and return bool +template <typename ElementT, typename QueueT> +template <typename CALLABLE> +bool LLThreadSafeQueue<ElementT, QueueT>::tryLock(CALLABLE&& callable) +{ +    lock_t lock1(mLock, std::defer_lock); +    if (!lock1.try_lock()) +        return false; + +    return std::forward<CALLABLE>(callable)(lock1); +} + + +// if we're able to lock before the passed time_point, do so and run the +// passed callable, which must accept lock_t& and return bool +template <typename ElementT, typename QueueT> +template <typename Clock, typename Duration, typename CALLABLE> +bool LLThreadSafeQueue<ElementT, QueueT>::tryLockUntil( +    const std::chrono::time_point<Clock, Duration>& until, +    CALLABLE&& callable) +{ +    lock_t lock1(mLock, std::defer_lock); +    if (!lock1.try_lock_until(until)) +        return false; + +    return std::forward<CALLABLE>(callable)(lock1); +} + + +// while lock is locked, really push the passed element, if we can +template <typename ElementT, typename QueueT> +template <typename T> +bool LLThreadSafeQueue<ElementT, QueueT>::push_(lock_t& lock, T&& element) +{ +    if (mStorage.size() >= mCapacity) +        return false; + +    mStorage.push(std::forward<T>(element)); +    lock.unlock(); +    // now that we've pushed, if somebody's been waiting to pop, signal them +    mEmptyCond.notify_one(); +    return true; +} + + +template <typename ElementT, typename QueueT> +template<typename T> +void LLThreadSafeQueue<ElementT, QueueT>::push(T&& element)  {      lock_t lock1(mLock);      while (true)      { +        // On the producer side, it doesn't matter whether the queue has been +        // drained or not: the moment either end calls close(), further push() +        // operations will fail.          if (mClosed)          {              LLTHROW(LLThreadSafeQueueInterrupt());          } -        if (mStorage.size() < mCapacity) -        { -            mStorage.push(element); -            lock1.unlock(); -            mEmptyCond.notify_one(); +        if (push_(lock1, std::forward<T>(element)))              return; -        }          // Storage Full. Wait for signal.          mCapacityCond.wait(lock1); @@ -210,71 +323,85 @@ void LLThreadSafeQueue<ElementT, QueueT>::push(ElementT const & element)  } +template<typename ElementT, typename QueueT> +template<typename T> +bool LLThreadSafeQueue<ElementT, QueueT>::tryPush(T&& element) +{ +    return tryLock( +        [this, element=std::move(element)](lock_t& lock) +        { +            if (mClosed) +                return false; +            return push_(lock, std::move(element)); +        }); +} + +  template <typename ElementT, typename QueueT> -template <typename Rep, typename Period> +template <typename Rep, typename Period, typename T>  bool LLThreadSafeQueue<ElementT, QueueT>::tryPushFor(      const std::chrono::duration<Rep, Period>& timeout, -    ElementT const & element) +    T&& element)  {      // Convert duration to time_point: passing the same timeout duration to      // each of multiple calls is wrong. -    return tryPushUntil(std::chrono::steady_clock::now() + timeout, element); +    return tryPushUntil(std::chrono::steady_clock::now() + timeout, +                        std::forward<T>(element));  }  template <typename ElementT, typename QueueT> -template <typename Clock, typename Duration> +template <typename Clock, typename Duration, typename T>  bool LLThreadSafeQueue<ElementT, QueueT>::tryPushUntil( -    const std::chrono::time_point<Clock, Duration>& endpoint, -    ElementT const& element) +    const std::chrono::time_point<Clock, Duration>& until, +    T&& element)  { -    lock_t lock1(mLock, std::defer_lock); -    if (!lock1.try_lock_until(endpoint)) -        return false; - -    while (true) -    { -        if (mClosed) +    return tryLockUntil( +        until, +        [this, until, element=std::move(element)](lock_t& lock)          { -            return false; -        } - -        if (mStorage.size() < mCapacity) -        { -            mStorage.push(element); -            lock1.unlock(); -            mEmptyCond.notify_one(); -            return true; -        } - -        // Storage Full. Wait for signal. -        if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock1, endpoint)) -        { -            // timed out -- formally we might recheck both conditions above -            return false; -        } -        // If we didn't time out, we were notified for some reason. Loop back -        // to check. -    } +            while (true) +            { +                if (mClosed) +                { +                    return false; +                } + +                if (push_(lock, std::move(element))) +                    return true; + +                // Storage Full. Wait for signal. +                if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock, until)) +                { +                    // timed out -- formally we might recheck both conditions above +                    return false; +                } +                // If we didn't time out, we were notified for some reason. Loop back +                // to check. +            } +        });  } -template<typename ElementT, typename QueueT> -bool LLThreadSafeQueue<ElementT, QueueT>::tryPush(ElementT const & element) +// while lock is locked, really pop the head element, if we can +template <typename ElementT, typename QueueT> +template <typename PRED> +bool LLThreadSafeQueue<ElementT, QueueT>::pop_( +    lock_t& lock, ElementT& element, PRED&& pred)  { -    lock_t lock1(mLock, std::defer_lock); -    if (!lock1.try_lock()) -        return false; - -    if (mClosed) -        return false; - -    if (mStorage.size() >= mCapacity) +    // If mStorage is empty, there's no head element. +    // If there's a head element, pass it to the predicate to see if caller +    // considers it ready to pop. +    // Unless both are satisfied, no point in continuing. +    if (mStorage.empty() || ! std::forward<PRED>(pred)(mStorage.front()))          return false; -    mStorage.push(element); -    lock1.unlock(); -    mEmptyCond.notify_one(); +    // std::queue::front() is the element about to pop() +    element = mStorage.front(); +    mStorage.pop(); +    lock.unlock(); +    // now that we've popped, if somebody's been waiting to push, signal them +    mCapacityCond.notify_one();      return true;  } @@ -285,22 +412,20 @@ ElementT LLThreadSafeQueue<ElementT, QueueT>::pop(void)      lock_t lock1(mLock);      while (true)      { -        if (!mStorage.empty()) -        { -            // std::queue::front() is the element about to pop() -            ElementT value = mStorage.front(); -            mStorage.pop(); -            lock1.unlock(); -            mCapacityCond.notify_one(); -            return value; -        } - +        // On the consumer side, we always try to pop before checking mClosed +        // so we can finish draining the queue. +        ElementT value; +        if (pop_(lock1, value)) +            return std::move(value); + +        // Once the queue is empty, mClosed lets us know if there will ever be +        // any more coming.          if (mClosed)          {              LLTHROW(LLThreadSafeQueueInterrupt());          } -        // Storage empty. Wait for signal. +        // Storage empty, queue still open. Wait for signal.          mEmptyCond.wait(lock1);      }  } @@ -309,21 +434,14 @@ ElementT LLThreadSafeQueue<ElementT, QueueT>::pop(void)  template<typename ElementT, typename QueueT>  bool LLThreadSafeQueue<ElementT, QueueT>::tryPop(ElementT & element)  { -    lock_t lock1(mLock, std::defer_lock); -    if (!lock1.try_lock()) -        return false; - -    // no need to check mClosed: tryPop() behavior when the queue is -    // closed is implemented by simple inability to push any new elements -    if (mStorage.empty()) -        return false; - -    // std::queue::front() is the element about to pop() -    element = mStorage.front(); -    mStorage.pop(); -    lock1.unlock(); -    mCapacityCond.notify_one(); -    return true; +    return tryLock( +        [this, &element](lock_t& lock) +        { +            // no need to check mClosed: tryPop() behavior when the queue is +            // closed is implemented by simple inability to push any new +            // elements +            return pop_(lock, element); +        });  } @@ -342,39 +460,33 @@ bool LLThreadSafeQueue<ElementT, QueueT>::tryPopFor(  template <typename ElementT, typename QueueT>  template <typename Clock, typename Duration>  bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil( -    const std::chrono::time_point<Clock, Duration>& endpoint, +    const std::chrono::time_point<Clock, Duration>& until,      ElementT& element)  { -    lock_t lock1(mLock, std::defer_lock); -    if (!lock1.try_lock_until(endpoint)) -        return false; - -    while (true) -    { -        if (!mStorage.empty()) +    return tryLockUntil( +        until, +        [this, until, &element](lock_t& lock)          { -            // std::queue::front() is the element about to pop() -            element = mStorage.front(); -            mStorage.pop(); -            lock1.unlock(); -            mCapacityCond.notify_one(); -            return true; -        } - -        if (mClosed) -        { -            return false; -        } - -        // Storage empty. Wait for signal. -        if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock1, endpoint)) -        { -            // timed out -- formally we might recheck both conditions above -            return false; -        } -        // If we didn't time out, we were notified for some reason. Loop back -        // to check. -    } +            while (true) +            { +                if (pop_(lock, element)) +                    return true; + +                if (mClosed) +                { +                    return false; +                } + +                // Storage empty. Wait for signal. +                if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until)) +                { +                    // timed out -- formally we might recheck both conditions above +                    return false; +                } +                // If we didn't time out, we were notified for some reason. Loop back +                // to check. +            } +        });  }  | 
