summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--indra/llcommon/llthreadsafequeue.h78
1 files changed, 70 insertions, 8 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h
index 3f49dbccc2..bac536f7ee 100644
--- a/indra/llcommon/llthreadsafequeue.h
+++ b/indra/llcommon/llthreadsafequeue.h
@@ -78,7 +78,7 @@ public:
// Add an element to the front of queue (will block if the queue has
// reached capacity).
//
- // This call will raise an interrupt error if the queue is deleted while
+ // This call will raise an interrupt error if the queue is closed while
// the caller is blocked.
void pushFront(ElementT const & element);
@@ -89,7 +89,7 @@ public:
// Pop the element at the end of the queue (will block if the queue is
// empty).
//
- // This call will raise an interrupt error if the queue is deleted while
+ // This call will raise an interrupt error if the queue is closed while
// the caller is blocked.
ElementT popBack(void);
@@ -100,11 +100,27 @@ public:
// Returns the size of the queue.
size_t size();
+ // closes the queue:
+ // - every subsequent pushFront() call will throw LLThreadSafeQueueInterrupt
+ // - every subsequent tryPushFront() call will return false
+ // - popBack() calls will return normally until the queue is drained, then
+ // every subsequent popBack() will throw LLThreadSafeQueueInterrupt
+ // - tryPopBack() calls will return normally until the queue is drained,
+ // then every subsequent tryPopBack() call will return false
+ void close();
+
+ // detect closed state
+ bool isClosed();
+ // inverse of isClosed()
+ explicit operator bool();
+
private:
std::deque< ElementT > mStorage;
U32 mCapacity;
+ bool mClosed;
boost::fibers::mutex mLock;
+ typedef std::unique_lock<decltype(mLock)> lock_t;
boost::fibers::condition_variable mCapacityCond;
boost::fibers::condition_variable mEmptyCond;
};
@@ -114,7 +130,8 @@ private:
template<typename ElementT>
LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(U32 capacity) :
-mCapacity(capacity)
+ mCapacity(capacity),
+ mClosed(false)
{
}
@@ -122,12 +139,18 @@ mCapacity(capacity)
template<typename ElementT>
void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)
{
- std::unique_lock<decltype(mLock)> lock1(mLock);
+ lock_t lock1(mLock);
while (true)
{
+ if (mClosed)
+ {
+ LLTHROW(LLThreadSafeQueueInterrupt());
+ }
+
if (mStorage.size() < mCapacity)
{
mStorage.push_front(element);
+ lock1.unlock();
mEmptyCond.notify_one();
return;
}
@@ -141,14 +164,18 @@ void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)
template<typename ElementT>
bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)
{
- std::unique_lock<decltype(mLock)> lock1(mLock, std::defer_lock);
+ lock_t lock1(mLock, std::defer_lock);
if (!lock1.try_lock())
return false;
+ if (mClosed)
+ return false;
+
if (mStorage.size() >= mCapacity)
return false;
mStorage.push_front(element);
+ lock1.unlock();
mEmptyCond.notify_one();
return true;
}
@@ -157,17 +184,23 @@ bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)
template<typename ElementT>
ElementT LLThreadSafeQueue<ElementT>::popBack(void)
{
- std::unique_lock<decltype(mLock)> lock1(mLock);
+ lock_t lock1(mLock);
while (true)
{
if (!mStorage.empty())
{
ElementT value = mStorage.back();
mStorage.pop_back();
+ lock1.unlock();
mCapacityCond.notify_one();
return value;
}
+ if (mClosed)
+ {
+ LLTHROW(LLThreadSafeQueueInterrupt());
+ }
+
// Storage empty. Wait for signal.
mEmptyCond.wait(lock1);
}
@@ -177,15 +210,18 @@ ElementT LLThreadSafeQueue<ElementT>::popBack(void)
template<typename ElementT>
bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element)
{
- std::unique_lock<decltype(mLock)> lock1(mLock, std::defer_lock);
+ lock_t lock1(mLock, std::defer_lock);
if (!lock1.try_lock())
return false;
+ // no need to check mClosed: tryPopBack() behavior when the queue is
+ // closed is implemented by simple inability to push any new elements
if (mStorage.empty())
return false;
element = mStorage.back();
mStorage.pop_back();
+ lock1.unlock();
mCapacityCond.notify_one();
return true;
}
@@ -194,8 +230,34 @@ bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element)
template<typename ElementT>
size_t LLThreadSafeQueue<ElementT>::size(void)
{
- std::lock_guard<decltype(mLock)> lock(mLock);
+ lock_t lock(mLock);
return mStorage.size();
}
+template<typename ElementT>
+void LLThreadSafeQueue<ElementT>::close()
+{
+ lock_t lock(mLock);
+ mClosed = true;
+ lock.unlock();
+ // wake up any blocked popBack() calls
+ mEmptyCond.notify_all();
+ // wake up any blocked pushFront() calls
+ mCapacityCond.notify_all();
+}
+
+template<typename ElementT>
+bool LLThreadSafeQueue<ElementT>::isClosed()
+{
+ lock_t lock(mLock);
+ return mClosed;
+}
+
+template<typename ElementT>
+LLThreadSafeQueue<ElementT>::operator bool()
+{
+ lock_t lock(mLock);
+ return ! mClosed;
+}
+
#endif