summaryrefslogtreecommitdiff
path: root/indra/llcommon/workqueue.h
diff options
context:
space:
mode:
authorDave Houlton <euclid@lindenlab.com>2021-11-15 09:25:35 -0700
committerDave Houlton <euclid@lindenlab.com>2021-11-15 09:25:35 -0700
commit029b41c0419e975bbb28454538b46dc69ce5d2ba (patch)
tree4f9a28bb36ee07fe9a7b45a434384afd1f24bb85 /indra/llcommon/workqueue.h
parentaeed774ff9cc55c0c1dd2784e23b2366ff367fbe (diff)
Revert "SL-16220: Merge branch 'origin/DRTVWR-546' into glthread"
This reverts commit 5188a26a8521251dda07ac0140bb129f28417e49, reversing changes made to 819088563e13f1d75e048311fbaf0df4a79b7e19.
Diffstat (limited to 'indra/llcommon/workqueue.h')
-rw-r--r--indra/llcommon/workqueue.h378
1 files changed, 71 insertions, 307 deletions
diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h
index c25d787425..5ec790da79 100644
--- a/indra/llcommon/workqueue.h
+++ b/indra/llcommon/workqueue.h
@@ -12,14 +12,14 @@
#if ! defined(LL_WORKQUEUE_H)
#define LL_WORKQUEUE_H
-#include "llcoros.h"
-#include "llexception.h"
#include "llinstancetracker.h"
#include "threadsafeschedule.h"
#include <chrono>
-#include <exception> // std::current_exception
#include <functional> // std::function
+#include <queue>
#include <string>
+#include <utility> // std::pair
+#include <vector>
namespace LL
{
@@ -45,16 +45,11 @@ namespace LL
using TimedWork = Queue::TimeTuple;
using Closed = Queue::Closed;
- struct Error: public LLException
- {
- Error(const std::string& what): LLException(what) {}
- };
-
/**
* You may omit the WorkQueue name, in which case a unique name is
* synthesized; for practical purposes that makes it anonymous.
*/
- WorkQueue(const std::string& name = std::string(), size_t capacity=1024);
+ WorkQueue(const std::string& name = std::string());
/**
* Since the point of WorkQueue is to pass work to some other worker
@@ -64,36 +59,15 @@ namespace LL
*/
void close();
- /**
- * WorkQueue supports multiple producers and multiple consumers. In
- * the general case it's misleading to test size(), since any other
- * thread might change it the nanosecond the lock is released. On that
- * basis, some might argue against publishing a size() method at all.
- *
- * But there are two specific cases in which a test based on size()
- * might be reasonable:
- *
- * * If you're the only producer, noticing that size() == 0 is
- * meaningful.
- * * If you're the only consumer, noticing that size() > 0 is
- * meaningful.
- */
- size_t size();
- /// producer end: are we prevented from pushing any additional items?
- bool isClosed();
- /// consumer end: are we done, is the queue entirely drained?
- bool done();
-
/*---------------------- fire and forget API -----------------------*/
/// fire-and-forget, but at a particular (future?) time
template <typename CALLABLE>
void post(const TimePoint& time, CALLABLE&& callable)
{
- // Defer reifying an arbitrary CALLABLE until we hit this or
- // postIfOpen(). All other methods should accept CALLABLEs of
- // arbitrary type to avoid multiple levels of std::function
- // indirection.
+ // Defer reifying an arbitrary CALLABLE until we hit this method.
+ // All other methods should accept CALLABLEs of arbitrary type to
+ // avoid multiple levels of std::function indirection.
mQueue.push(TimedWork(time, std::move(callable)));
}
@@ -109,47 +83,6 @@ namespace LL
}
/**
- * post work for a particular time, unless the queue is closed before
- * we can post
- */
- template <typename CALLABLE>
- bool postIfOpen(const TimePoint& time, CALLABLE&& callable)
- {
- // Defer reifying an arbitrary CALLABLE until we hit this or
- // post(). All other methods should accept CALLABLEs of arbitrary
- // type to avoid multiple levels of std::function indirection.
- return mQueue.pushIfOpen(TimedWork(time, std::move(callable)));
- }
-
- /**
- * post work, unless the queue is closed before we can post
- */
- template <typename CALLABLE>
- bool postIfOpen(CALLABLE&& callable)
- {
- return postIfOpen(TimePoint::clock::now(), std::move(callable));
- }
-
- /**
- * Post work to be run at a specified time to another WorkQueue, which
- * may or may not still exist and be open. Return true if we were able
- * to post.
- */
- template <typename CALLABLE>
- static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable);
-
- /**
- * Post work to another WorkQueue, which may or may not still exist
- * and be open. Return true if we were able to post.
- */
- template <typename CALLABLE>
- static bool postMaybe(weak_t target, CALLABLE&& callable)
- {
- return postMaybe(target, TimePoint::clock::now(),
- std::forward<CALLABLE>(callable));
- }
-
- /**
* Launch a callable returning bool that will trigger repeatedly at
* specified interval, until the callable returns false.
*
@@ -182,8 +115,63 @@ namespace LL
// Studio compile errors that seem utterly unrelated to this source
// code.
template <typename CALLABLE, typename FOLLOWUP>
- bool postTo(weak_t target,
- const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback);
+ bool postTo(WorkQueue::weak_t target,
+ const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback)
+ {
+ // We're being asked to post to the WorkQueue at target.
+ // target is a weak_ptr: have to lock it to check it.
+ auto tptr = target.lock();
+ if (! tptr)
+ // can't post() if the target WorkQueue has been destroyed
+ return false;
+
+ // Here we believe target WorkQueue still exists. Post to it a
+ // lambda that packages our callable, our callback and a weak_ptr
+ // to this originating WorkQueue.
+ tptr->post(
+ time,
+ [reply = super::getWeak(),
+ callable = std::move(callable),
+ callback = std::move(callback)]
+ ()
+ {
+ // Call the callable in any case -- but to minimize
+ // copying the result, immediately bind it into a reply
+ // lambda. The reply lambda also binds the original
+ // callback, so that when we, the originating WorkQueue,
+ // finally receive and process the reply lambda, we'll
+ // call the bound callback with the bound result -- on the
+ // same thread that originally called postTo().
+ auto rlambda =
+ [result = callable(),
+ callback = std::move(callback)]
+ ()
+ { callback(std::move(result)); };
+ // Check if this originating WorkQueue still exists.
+ // Remember, the outer lambda is now running on a thread
+ // servicing the target WorkQueue, and real time has
+ // elapsed since postTo()'s tptr->post() call.
+ // reply is a weak_ptr: have to lock it to check it.
+ auto rptr = reply.lock();
+ if (rptr)
+ {
+ // Only post reply lambda if the originating WorkQueue
+ // still exists. If not -- who would we tell? Log it?
+ try
+ {
+ rptr->post(std::move(rlambda));
+ }
+ catch (const Closed&)
+ {
+ // Originating WorkQueue might still exist, but
+ // might be Closed. Same thing: just discard the
+ // callback.
+ }
+ }
+ });
+ // looks like we were able to post()
+ return true;
+ }
/**
* Post work to another WorkQueue, requesting a specific callback to
@@ -193,36 +181,10 @@ namespace LL
* inaccessible.
*/
template <typename CALLABLE, typename FOLLOWUP>
- bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback)
+ bool postTo(WorkQueue::weak_t target,
+ CALLABLE&& callable, FOLLOWUP&& callback)
{
- return postTo(target, TimePoint::clock::now(),
- std::move(callable), std::move(callback));
- }
-
- /**
- * Post work to another WorkQueue to be run at a specified time,
- * blocking the calling coroutine until then, returning the result to
- * caller on completion.
- *
- * In general, we assume that each thread's default coroutine is busy
- * servicing its WorkQueue or whatever. To try to prevent mistakes, we
- * forbid calling waitForResult() from a thread's default coroutine.
- */
- template <typename CALLABLE>
- auto waitForResult(const TimePoint& time, CALLABLE&& callable);
-
- /**
- * Post work to another WorkQueue, blocking the calling coroutine
- * until then, returning the result to caller on completion.
- *
- * In general, we assume that each thread's default coroutine is busy
- * servicing its WorkQueue or whatever. To try to prevent mistakes, we
- * forbid calling waitForResult() from a thread's default coroutine.
- */
- template <typename CALLABLE>
- auto waitForResult(CALLABLE&& callable)
- {
- return waitForResult(TimePoint::clock::now(), std::move(callable));
+ return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback));
}
/*--------------------------- worker API ---------------------------*/
@@ -270,23 +232,6 @@ namespace LL
bool runUntil(const TimePoint& until);
private:
- template <typename CALLABLE, typename FOLLOWUP>
- static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback);
- /// general case: arbitrary C++ return type
- template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE>
- struct MakeReplyLambda;
- /// specialize for CALLABLE returning void
- template <typename CALLABLE, typename FOLLOWUP>
- struct MakeReplyLambda<CALLABLE, FOLLOWUP, void>;
-
- /// general case: arbitrary C++ return type
- template <typename CALLABLE, typename RETURNTYPE>
- struct WaitForResult;
- /// specialize for CALLABLE returning void
- template <typename CALLABLE>
- struct WaitForResult<CALLABLE, void>;
-
- static void checkCoroutine(const std::string& method);
static void error(const std::string& msg);
static std::string makeName(const std::string& name);
void callWork(const Queue::DataTuple& work);
@@ -308,8 +253,8 @@ namespace LL
{
public:
// bind the desired data
- BackJack(weak_t target,
- const TimePoint& start,
+ BackJack(WorkQueue::weak_t target,
+ const WorkQueue::TimePoint& start,
const std::chrono::duration<Rep, Period>& interval,
CALLABLE&& callable):
mTarget(target),
@@ -356,8 +301,8 @@ namespace LL
}
private:
- weak_t mTarget;
- TimePoint mStart;
+ WorkQueue::weak_t mTarget;
+ WorkQueue::TimePoint mStart;
std::chrono::duration<Rep, Period> mInterval;
CALLABLE mCallable;
};
@@ -385,187 +330,6 @@ namespace LL
getWeak(), TimePoint::clock::now(), interval, std::move(callable)));
}
- /// general case: arbitrary C++ return type
- template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE>
- struct WorkQueue::MakeReplyLambda
- {
- auto operator()(CALLABLE&& callable, FOLLOWUP&& callback)
- {
- // Call the callable in any case -- but to minimize
- // copying the result, immediately bind it into the reply
- // lambda. The reply lambda also binds the original
- // callback, so that when we, the originating WorkQueue,
- // finally receive and process the reply lambda, we'll
- // call the bound callback with the bound result -- on the
- // same thread that originally called postTo().
- return
- [result = std::forward<CALLABLE>(callable)(),
- callback = std::move(callback)]
- ()
- { callback(std::move(result)); };
- }
- };
-
- /// specialize for CALLABLE returning void
- template <typename CALLABLE, typename FOLLOWUP>
- struct WorkQueue::MakeReplyLambda<CALLABLE, FOLLOWUP, void>
- {
- auto operator()(CALLABLE&& callable, FOLLOWUP&& callback)
- {
- // Call the callable, which produces no result.
- std::forward<CALLABLE>(callable)();
- // Our completion callback is simply the caller's callback.
- return std::move(callback);
- }
- };
-
- template <typename CALLABLE, typename FOLLOWUP>
- auto WorkQueue::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback)
- {
- return MakeReplyLambda<CALLABLE, FOLLOWUP,
- decltype(std::forward<CALLABLE>(callable)())>()
- (std::move(callable), std::move(callback));
- }
-
- template <typename CALLABLE, typename FOLLOWUP>
- bool WorkQueue::postTo(weak_t target,
- const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback)
- {
- // We're being asked to post to the WorkQueue at target.
- // target is a weak_ptr: have to lock it to check it.
- auto tptr = target.lock();
- if (! tptr)
- // can't post() if the target WorkQueue has been destroyed
- return false;
-
- // Here we believe target WorkQueue still exists. Post to it a
- // lambda that packages our callable, our callback and a weak_ptr
- // to this originating WorkQueue.
- tptr->post(
- time,
- [reply = super::getWeak(),
- callable = std::move(callable),
- callback = std::move(callback)]
- ()
- {
- // Use postMaybe() below in case this originating WorkQueue
- // has been closed or destroyed. Remember, the outer lambda is
- // now running on a thread servicing the target WorkQueue, and
- // real time has elapsed since postTo()'s tptr->post() call.
- try
- {
- // Make a reply lambda to repost to THIS WorkQueue.
- // Delegate to makeReplyLambda() so we can partially
- // specialize on void return.
- postMaybe(reply, makeReplyLambda(std::move(callable), std::move(callback)));
- }
- catch (...)
- {
- // Either variant of makeReplyLambda() is responsible for
- // calling the caller's callable. If that throws, return
- // the exception to the originating thread.
- postMaybe(
- reply,
- // Bind the current exception to transport back to the
- // originating WorkQueue. Once there, rethrow it.
- [exc = std::current_exception()](){ std::rethrow_exception(exc); });
- }
- });
-
- // looks like we were able to post()
- return true;
- }
-
- template <typename CALLABLE>
- bool WorkQueue::postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable)
- {
- // target is a weak_ptr: have to lock it to check it
- auto tptr = target.lock();
- if (tptr)
- {
- try
- {
- tptr->post(time, std::forward<CALLABLE>(callable));
- // we were able to post()
- return true;
- }
- catch (const Closed&)
- {
- // target WorkQueue still exists, but is Closed
- }
- }
- // either target no longer exists, or its WorkQueue is Closed
- return false;
- }
-
- /// general case: arbitrary C++ return type
- template <typename CALLABLE, typename RETURNTYPE>
- struct WorkQueue::WaitForResult
- {
- auto operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable)
- {
- LLCoros::Promise<RETURNTYPE> promise;
- self->post(
- time,
- // We dare to bind a reference to Promise because it's
- // specifically designed for cross-thread communication.
- [&promise, callable = std::move(callable)]()
- {
- try
- {
- // call the caller's callable and trigger promise with result
- promise.set_value(callable());
- }
- catch (...)
- {
- promise.set_exception(std::current_exception());
- }
- });
- auto future{ LLCoros::getFuture(promise) };
- // now, on the calling thread, wait for that result
- LLCoros::TempStatus st("waiting for WorkQueue::waitForResult()");
- return future.get();
- }
- };
-
- /// specialize for CALLABLE returning void
- template <typename CALLABLE>
- struct WorkQueue::WaitForResult<CALLABLE, void>
- {
- void operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable)
- {
- LLCoros::Promise<void> promise;
- self->post(
- time,
- // &promise is designed for cross-thread access
- [&promise, callable = std::move(callable)]()
- {
- try
- {
- callable();
- promise.set_value();
- }
- catch (...)
- {
- promise.set_exception(std::current_exception());
- }
- });
- auto future{ LLCoros::getFuture(promise) };
- // block until set_value()
- LLCoros::TempStatus st("waiting for void WorkQueue::waitForResult()");
- future.get();
- }
- };
-
- template <typename CALLABLE>
- auto WorkQueue::waitForResult(const TimePoint& time, CALLABLE&& callable)
- {
- checkCoroutine("waitForResult()");
- // derive callable's return type so we can specialize for void
- return WaitForResult<CALLABLE, decltype(std::forward<CALLABLE>(callable)())>()
- (this, time, std::forward<CALLABLE>(callable));
- }
-
} // namespace LL
#endif /* ! defined(LL_WORKQUEUE_H) */