summaryrefslogtreecommitdiff
path: root/indra/llcommon/llthreadsafequeue.h
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llcommon/llthreadsafequeue.h')
-rw-r--r--indra/llcommon/llthreadsafequeue.h135
1 files changed, 72 insertions, 63 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h
index 45289ef0b4..b0bddac8e5 100644
--- a/indra/llcommon/llthreadsafequeue.h
+++ b/indra/llcommon/llthreadsafequeue.h
@@ -28,12 +28,20 @@
#define LL_LLTHREADSAFEQUEUE_H
#include "llexception.h"
+#include <deque>
#include <string>
+#if LL_WINDOWS
+#pragma warning (push)
+#pragma warning (disable:4265)
+#endif
+// 'std::_Pad' : class has virtual functions, but destructor is not virtual
+#include <mutex>
+#include <condition_variable>
-struct apr_pool_t; // From apr_pools.h
-class LLThreadSafeQueueImplementation; // See below.
-
+#if LL_WINDOWS
+#pragma warning (pop)
+#endif
//
// A general queue exception.
@@ -64,31 +72,6 @@ public:
}
};
-
-struct apr_queue_t; // From apr_queue.h
-
-
-//
-// Implementation details.
-//
-class LL_COMMON_API LLThreadSafeQueueImplementation
-{
-public:
- LLThreadSafeQueueImplementation(apr_pool_t * pool, unsigned int capacity);
- ~LLThreadSafeQueueImplementation();
- void pushFront(void * element);
- bool tryPushFront(void * element);
- void * popBack(void);
- bool tryPopBack(void *& element);
- size_t size();
-
-private:
- bool mOwnsPool;
- apr_pool_t * mPool;
- apr_queue_t * mQueue;
-};
-
-
//
// Implements a thread safe FIFO.
//
@@ -100,7 +83,7 @@ public:
// If the pool is set to NULL one will be allocated and managed by this
// queue.
- LLThreadSafeQueue(apr_pool_t * pool = 0, unsigned int capacity = 1024);
+ LLThreadSafeQueue(U32 capacity = 1024);
// Add an element to the front of queue (will block if the queue has
// reached capacity).
@@ -128,77 +111,103 @@ public:
size_t size();
private:
- LLThreadSafeQueueImplementation mImplementation;
-};
-
+ std::deque< ElementT > mStorage;
+ U32 mCapacity;
+ std::mutex mLock;
+ std::condition_variable mCapacityCond;
+ std::condition_variable mEmptyCond;
+};
// LLThreadSafeQueue
//-----------------------------------------------------------------------------
-
template<typename ElementT>
-LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(apr_pool_t * pool, unsigned int capacity):
- mImplementation(pool, capacity)
+LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(U32 capacity) :
+mCapacity(capacity)
{
- ; // No op.
}
template<typename ElementT>
void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)
{
- ElementT * elementCopy = new ElementT(element);
- try {
- mImplementation.pushFront(elementCopy);
- } catch (LLThreadSafeQueueInterrupt) {
- delete elementCopy;
- throw;
- }
+ while (true)
+ {
+ std::unique_lock<std::mutex> lock1(mLock);
+
+ if (mStorage.size() < mCapacity)
+ {
+ mStorage.push_front(element);
+ mEmptyCond.notify_one();
+ return;
+ }
+
+ // Storage Full. Wait for signal.
+ mCapacityCond.wait(lock1);
+ }
}
template<typename ElementT>
bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)
{
- ElementT * elementCopy = new ElementT(element);
- bool result = mImplementation.tryPushFront(elementCopy);
- if(!result) delete elementCopy;
- return result;
+ std::unique_lock<std::mutex> lock1(mLock, std::defer_lock);
+ if (!lock1.try_lock())
+ return false;
+
+ if (mStorage.size() >= mCapacity)
+ return false;
+
+ mStorage.push_front(element);
+ mEmptyCond.notify_one();
+ return true;
}
template<typename ElementT>
ElementT LLThreadSafeQueue<ElementT>::popBack(void)
{
- ElementT * element = reinterpret_cast<ElementT *> (mImplementation.popBack());
- ElementT result(*element);
- delete element;
- return result;
+ while (true)
+ {
+ std::unique_lock<std::mutex> lock1(mLock);
+
+ if (!mStorage.empty())
+ {
+ ElementT value = mStorage.back();
+ mStorage.pop_back();
+ mCapacityCond.notify_one();
+ return value;
+ }
+
+ // Storage empty. Wait for signal.
+ mEmptyCond.wait(lock1);
+ }
}
template<typename ElementT>
bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element)
{
- void * storedElement;
- bool result = mImplementation.tryPopBack(storedElement);
- if(result) {
- ElementT * elementPtr = reinterpret_cast<ElementT *>(storedElement);
- element = *elementPtr;
- delete elementPtr;
- } else {
- ; // No op.
- }
- return result;
+ std::unique_lock<std::mutex> lock1(mLock, std::defer_lock);
+ if (!lock1.try_lock())
+ return false;
+
+ if (mStorage.empty())
+ return false;
+
+ element = mStorage.back();
+ mStorage.pop_back();
+ mCapacityCond.notify_one();
+ return true;
}
template<typename ElementT>
size_t LLThreadSafeQueue<ElementT>::size(void)
{
- return mImplementation.size();
+ std::lock_guard<std::mutex> lock(mLock);
+ return mStorage.size();
}
-
#endif