diff options
-rw-r--r-- | indra/llcommon/llthreadsafequeue.h | 65 |
1 files changed, 59 insertions, 6 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 8f5e0f3bf3..30dd507f73 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -30,8 +30,11 @@ #include "llexception.h" #include <deque> #include <string> +#include <chrono> #include "mutex.h" #include "llcoros.h" +#include LLCOROS_MUTEX_HEADER +#include <boost/fiber/timed_mutex.hpp> #include LLCOROS_CONDVAR_HEADER // @@ -83,10 +86,20 @@ public: // the caller is blocked. void pushFront(ElementT const & element); - // Try to add an element to the front ofqueue without blocking. Returns + // Try to add an element to the front of queue without blocking. Returns // true only if the element was actually added. bool tryPushFront(ElementT const & element); - + + // Try to add an element to the front of queue, blocking if full but with + // timeout. Returns true if the element was added. + // There are potentially two different timeouts involved: how long to try + // to lock the mutex, versus how long to wait for the queue to stop being + // full. Careful settings for each timeout might be orders of magnitude + // apart. However, this method conflates them. + template <typename Rep, typename Period> + bool tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout, + ElementT const & element); + // Pop the element at the end of the queue (will block if the queue is // empty). // @@ -120,10 +133,10 @@ private: U32 mCapacity; bool mClosed; - LLCoros::Mutex mLock; - typedef LLCoros::LockType lock_t; - LLCoros::ConditionVariable mCapacityCond; - LLCoros::ConditionVariable mEmptyCond; + boost::fibers::timed_mutex mLock; + typedef std::unique_lock<decltype(mLock)> lock_t; + boost::fibers::condition_variable_any mCapacityCond; + boost::fibers::condition_variable_any mEmptyCond; }; // LLThreadSafeQueue @@ -162,6 +175,46 @@ void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element) } +template <typename ElementT> +template <typename Rep, typename Period> +bool LLThreadSafeQueue<ElementT>::tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout, + ElementT const & element) +{ + // Convert duration to time_point: passing the same timeout duration to + // each of multiple calls is wrong. + auto endpoint = std::chrono::steady_clock::now() + timeout; + + lock_t lock1(mLock, std::defer_lock); + if (!lock1.try_lock_until(endpoint)) + return false; + + while (true) + { + if (mClosed) + { + return false; + } + + if (mStorage.size() < mCapacity) + { + mStorage.push_front(element); + lock1.unlock(); + mEmptyCond.notify_one(); + return true; + } + + // Storage Full. Wait for signal. + if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock1, endpoint)) + { + // timed out -- formally we might recheck both conditions above + return false; + } + // If we didn't time out, we were notified for some reason. Loop back + // to check. + } +} + + template<typename ElementT> bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element) { |