diff options
Diffstat (limited to 'indra/llcommon')
| -rw-r--r-- | indra/llcommon/llthreadsafequeue.h | 78 | 
1 files changed, 70 insertions, 8 deletions
| diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 3f49dbccc2..bac536f7ee 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -78,7 +78,7 @@ public:  	// Add an element to the front of queue (will block if the queue has  	// reached capacity).  	// -	// This call will raise an interrupt error if the queue is deleted while +	// This call will raise an interrupt error if the queue is closed while  	// the caller is blocked.  	void pushFront(ElementT const & element); @@ -89,7 +89,7 @@ public:  	// Pop the element at the end of the queue (will block if the queue is  	// empty).  	// -	// This call will raise an interrupt error if the queue is deleted while +	// This call will raise an interrupt error if the queue is closed while  	// the caller is blocked.  	ElementT popBack(void); @@ -100,11 +100,27 @@ public:  	// Returns the size of the queue.  	size_t size(); +	// closes the queue: +	// - every subsequent pushFront() call will throw LLThreadSafeQueueInterrupt +	// - every subsequent tryPushFront() call will return false +	// - popBack() calls will return normally until the queue is drained, then +	//   every subsequent popBack() will throw LLThreadSafeQueueInterrupt +	// - tryPopBack() calls will return normally until the queue is drained, +	//   then every subsequent tryPopBack() call will return false +	void close(); + +	// detect closed state +	bool isClosed(); +	// inverse of isClosed() +	explicit operator bool(); +  private:  	std::deque< ElementT > mStorage;  	U32 mCapacity; +	bool mClosed;  	boost::fibers::mutex mLock; +	typedef std::unique_lock<decltype(mLock)> lock_t;  	boost::fibers::condition_variable mCapacityCond;  	boost::fibers::condition_variable mEmptyCond;  }; @@ -114,7 +130,8 @@ private:  template<typename ElementT>  LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(U32 capacity) : -mCapacity(capacity) +    mCapacity(capacity), +    mClosed(false)  {  } @@ -122,12 +139,18 @@ mCapacity(capacity)  template<typename ElementT>  void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)  { -    std::unique_lock<decltype(mLock)> lock1(mLock); +    lock_t lock1(mLock);      while (true)      { +        if (mClosed) +        { +            LLTHROW(LLThreadSafeQueueInterrupt()); +        } +          if (mStorage.size() < mCapacity)          {              mStorage.push_front(element); +            lock1.unlock();              mEmptyCond.notify_one();              return;          } @@ -141,14 +164,18 @@ void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)  template<typename ElementT>  bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)  { -    std::unique_lock<decltype(mLock)> lock1(mLock, std::defer_lock); +    lock_t lock1(mLock, std::defer_lock);      if (!lock1.try_lock())          return false; +    if (mClosed) +        return false; +      if (mStorage.size() >= mCapacity)          return false;      mStorage.push_front(element); +    lock1.unlock();      mEmptyCond.notify_one();      return true;  } @@ -157,17 +184,23 @@ bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)  template<typename ElementT>  ElementT LLThreadSafeQueue<ElementT>::popBack(void)  { -    std::unique_lock<decltype(mLock)> lock1(mLock); +    lock_t lock1(mLock);      while (true)      {          if (!mStorage.empty())          {              ElementT value = mStorage.back();              mStorage.pop_back(); +            lock1.unlock();              mCapacityCond.notify_one();              return value;          } +        if (mClosed) +        { +            LLTHROW(LLThreadSafeQueueInterrupt()); +        } +          // Storage empty. Wait for signal.          mEmptyCond.wait(lock1);      } @@ -177,15 +210,18 @@ ElementT LLThreadSafeQueue<ElementT>::popBack(void)  template<typename ElementT>  bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element)  { -    std::unique_lock<decltype(mLock)> lock1(mLock, std::defer_lock); +    lock_t lock1(mLock, std::defer_lock);      if (!lock1.try_lock())          return false; +    // no need to check mClosed: tryPopBack() behavior when the queue is +    // closed is implemented by simple inability to push any new elements      if (mStorage.empty())          return false;      element = mStorage.back();      mStorage.pop_back(); +    lock1.unlock();      mCapacityCond.notify_one();      return true;  } @@ -194,8 +230,34 @@ bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element)  template<typename ElementT>  size_t LLThreadSafeQueue<ElementT>::size(void)  { -    std::lock_guard<decltype(mLock)> lock(mLock); +    lock_t lock(mLock);      return mStorage.size();  } +template<typename ElementT> +void LLThreadSafeQueue<ElementT>::close() +{ +    lock_t lock(mLock); +    mClosed = true; +    lock.unlock(); +    // wake up any blocked popBack() calls +    mEmptyCond.notify_all(); +    // wake up any blocked pushFront() calls +    mCapacityCond.notify_all(); +} + +template<typename ElementT> +bool LLThreadSafeQueue<ElementT>::isClosed() +{ +    lock_t lock(mLock); +    return mClosed; +} + +template<typename ElementT> +LLThreadSafeQueue<ElementT>::operator bool() +{ +    lock_t lock(mLock); +    return ! mClosed; +} +  #endif | 
