summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNat Goodspeed <nat@lindenlab.com>2021-11-04 16:43:11 -0400
committerNat Goodspeed <nat@lindenlab.com>2021-11-04 16:43:11 -0400
commit89f2169e9d2c03ed92810689563ca110886abf16 (patch)
tree0a67a5036ce3d167c4ff6ec9922d5048dd4b67db
parent8458ad8890cf0a11804996210d7bcfbdaa3eec2e (diff)
SL-16202: Add postIfOpen() methods to WorkQueue, LLThreadSafeQueue.
postIfOpen() provides a no-exception alternative to post(), which blocks if full but throws if closed. postIfOpen() likewise blocks if full, but returns true if able to post and false if the queue was closed.
-rw-r--r--indra/llcommon/llthreadsafequeue.h30
-rw-r--r--indra/llcommon/workqueue.h29
2 files changed, 48 insertions, 11 deletions
diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h
index 06e8d8f609..5c934791fe 100644
--- a/indra/llcommon/llthreadsafequeue.h
+++ b/indra/llcommon/llthreadsafequeue.h
@@ -85,8 +85,8 @@ public:
LLThreadSafeQueue(U32 capacity = 1024);
virtual ~LLThreadSafeQueue() {}
- // Add an element to the queue (will block if the queue has
- // reached capacity).
+ // Add an element to the queue (will block if the queue has reached
+ // capacity).
//
// This call will raise an interrupt error if the queue is closed while
// the caller is blocked.
@@ -95,6 +95,11 @@ public:
// legacy name
void pushFront(ElementT const & element) { return push(element); }
+ // Add an element to the queue (will block if the queue has reached
+ // capacity). Return false if the queue is closed before push is possible.
+ template <typename T>
+ bool pushIfOpen(T&& element);
+
// Try to add an element to the queue without blocking. Returns
// true only if the element was actually added.
template <typename T>
@@ -311,8 +316,8 @@ bool LLThreadSafeQueue<ElementT, QueueT>::push_(lock_t& lock, T&& element)
template <typename ElementT, typename QueueT>
-template<typename T>
-void LLThreadSafeQueue<ElementT, QueueT>::push(T&& element)
+template <typename T>
+bool LLThreadSafeQueue<ElementT, QueueT>::pushIfOpen(T&& element)
{
lock_t lock1(mLock);
while (true)
@@ -321,12 +326,10 @@ void LLThreadSafeQueue<ElementT, QueueT>::push(T&& element)
// drained or not: the moment either end calls close(), further push()
// operations will fail.
if (mClosed)
- {
- LLTHROW(LLThreadSafeQueueInterrupt());
- }
+ return false;
if (push_(lock1, std::forward<T>(element)))
- return;
+ return true;
// Storage Full. Wait for signal.
mCapacityCond.wait(lock1);
@@ -334,6 +337,17 @@ void LLThreadSafeQueue<ElementT, QueueT>::push(T&& element)
}
+template <typename ElementT, typename QueueT>
+template<typename T>
+void LLThreadSafeQueue<ElementT, QueueT>::push(T&& element)
+{
+ if (! pushIfOpen(std::forward<T>(element)))
+ {
+ LLTHROW(LLThreadSafeQueueInterrupt());
+ }
+}
+
+
template<typename ElementT, typename QueueT>
template<typename T>
bool LLThreadSafeQueue<ElementT, QueueT>::tryPush(T&& element)
diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h
index 76d31f32a6..d0e3f870fe 100644
--- a/indra/llcommon/workqueue.h
+++ b/indra/llcommon/workqueue.h
@@ -75,9 +75,10 @@ namespace LL
template <typename CALLABLE>
void post(const TimePoint& time, CALLABLE&& callable)
{
- // Defer reifying an arbitrary CALLABLE until we hit this method.
- // All other methods should accept CALLABLEs of arbitrary type to
- // avoid multiple levels of std::function indirection.
+ // Defer reifying an arbitrary CALLABLE until we hit this or
+ // postIfOpen(). All other methods should accept CALLABLEs of
+ // arbitrary type to avoid multiple levels of std::function
+ // indirection.
mQueue.push(TimedWork(time, std::move(callable)));
}
@@ -93,6 +94,28 @@ namespace LL
}
/**
+ * post work for a particular time, unless the queue is closed before
+ * we can post
+ */
+ template <typename CALLABLE>
+ bool postIfOpen(const TimePoint& time, CALLABLE&& callable)
+ {
+ // Defer reifying an arbitrary CALLABLE until we hit this or
+ // post(). All other methods should accept CALLABLEs of arbitrary
+ // type to avoid multiple levels of std::function indirection.
+ return mQueue.pushIfOpen(TimedWork(time, std::move(callable)));
+ }
+
+ /**
+ * post work, unless the queue is closed before we can post
+ */
+ template <typename CALLABLE>
+ bool postIfOpen(CALLABLE&& callable)
+ {
+ return postIfOpen(TimePoint::clock::now(), std::move(callable));
+ }
+
+ /**
* Post work to be run at a specified time to another WorkQueue, which
* may or may not still exist and be open. Return true if we were able
* to post.