diff options
-rw-r--r-- | indra/llcommon/workqueue.cpp | 15 | ||||
-rw-r--r-- | indra/llcommon/workqueue.h | 150 |
2 files changed, 154 insertions, 11 deletions
diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index 114aeea1f3..f7ffc8233c 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -26,6 +26,11 @@ using Mutex = LLCoros::Mutex; using Lock = LLCoros::LockType; +struct NotOnDftCoro: public LLException +{ + NotOnDftCoro(const std::string& what): LLException(what) {} +}; + LL::WorkQueue::WorkQueue(const std::string& name): super(makeName(name)) { @@ -136,3 +141,13 @@ void LL::WorkQueue::error(const std::string& msg) { LL_ERRS("WorkQueue") << msg << LL_ENDL; } + +void LL::WorkQueue::checkCoroutine(const std::string& method) +{ + // By convention, the default coroutine on each thread has an empty name + // string. See also LLCoros::logname(). + if (LLCoros::getName().empty()) + { + LLTHROW(NotOnDftCoro("Do not call " + method + " from a thread's default coroutine")); + } +} diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index deef3c8e84..b17c666172 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -12,11 +12,18 @@ #if ! defined(LL_WORKQUEUE_H) #define LL_WORKQUEUE_H +#include "llcoros.h" #include "llinstancetracker.h" #include "threadsafeschedule.h" #include <chrono> #include <functional> // std::function -#include <queue> +#if __cplusplus >= 201703 +#include <optional> +namespace stdopt = std; +#else +#include <boost/optional.hpp> +namespace stdopt = boost; +#endif #include <string> #include <utility> // std::pair #include <vector> @@ -44,6 +51,8 @@ namespace LL using TimePoint = Queue::TimePoint; using TimedWork = Queue::TimeTuple; using Closed = Queue::Closed; + template <typename T> + using optional = stdopt::optional<T>; /** * You may omit the WorkQueue name, in which case a unique name is @@ -114,7 +123,7 @@ namespace LL // Studio compile errors that seem utterly unrelated to this source // code. template <typename CALLABLE, typename FOLLOWUP> - bool postTo(WorkQueue::weak_t target, + bool postTo(weak_t target, const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback); /** @@ -125,13 +134,62 @@ namespace LL * inaccessible. */ template <typename CALLABLE, typename FOLLOWUP> - bool postTo(WorkQueue::weak_t target, - CALLABLE&& callable, FOLLOWUP&& callback) + bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback) { return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); } + /** + * Post work to another WorkQueue to be run at a specified time, + * blocking the calling coroutine until then, returning the result to + * caller on completion. + * + * REQUIRED: + * + * * The calling thread is the thread servicing 'this' WorkQueue. + * * The calling coroutine is not the @em coroutine servicing this + * WorkQueue. We block the calling coroutine until the result is + * available. If this same coroutine is responsible for checking the + * local WorkQueue, the result will never be dequeued. In practice, + * to try to prevent mistakes, we forbid calling runOn() from a + * thread's default coroutine. + * + * Returns result if able to post, empty optional if the other + * WorkQueue is inaccessible. + * + * If the passed callable has void return, runOn() returns bool true + * if able to post, false if the other WorkQueue is inaccessible. + */ + template <typename CALLABLE> + auto runOn(weak_t target, const TimePoint& time, CALLABLE&& callable); + + /** + * Post work to another WorkQueue, blocking the calling coroutine + * until then, returning the result to caller on completion. + * + * REQUIRED: + * + * * The calling thread is the thread servicing 'this' WorkQueue. + * * The calling coroutine is not the @em coroutine servicing this + * WorkQueue. We block the calling coroutine until the result is + * available. If this same coroutine is responsible for checking the + * local WorkQueue, the result will never be dequeued. In practice, + * to try to prevent mistakes, we forbid calling runOn() from a + * thread's default coroutine. + * + * Returns result if able to post, empty optional if the other + * WorkQueue is inaccessible. + * + * If the passed callable has void return, runOn() returns bool true + * if able to post, false if the other WorkQueue is inaccessible. + */ + template <typename CALLABLE> + auto runOn(weak_t target, CALLABLE&& callable) + { + return runOn(target, TimePoint::clock::now(), std::move(callable)); + } + /*--------------------------- worker API ---------------------------*/ /** @@ -179,15 +237,21 @@ namespace LL private: template <typename CALLABLE, typename FOLLOWUP> static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback); - /// general case: arbitrary C++ return type template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE> struct MakeReplyLambda; - /// specialize for CALLABLE returning void template <typename CALLABLE, typename FOLLOWUP> struct MakeReplyLambda<CALLABLE, FOLLOWUP, void>; + /// general case: arbitrary C++ return type + template <typename CALLABLE, typename RETURNTYPE> + struct RunOn; + /// specialize for CALLABLE returning void + template <typename CALLABLE> + struct RunOn<CALLABLE, void>; + + static void checkCoroutine(const std::string& method); static void error(const std::string& msg); static std::string makeName(const std::string& name); void callWork(const Queue::DataTuple& work); @@ -209,8 +273,8 @@ namespace LL { public: // bind the desired data - BackJack(WorkQueue::weak_t target, - const WorkQueue::TimePoint& start, + BackJack(weak_t target, + const TimePoint& start, const std::chrono::duration<Rep, Period>& interval, CALLABLE&& callable): mTarget(target), @@ -257,8 +321,8 @@ namespace LL } private: - WorkQueue::weak_t mTarget; - WorkQueue::TimePoint mStart; + weak_t mTarget; + TimePoint mStart; std::chrono::duration<Rep, Period> mInterval; CALLABLE mCallable; }; @@ -286,6 +350,7 @@ namespace LL getWeak(), TimePoint::clock::now(), interval, std::move(callable))); } + /// general case: arbitrary C++ return type template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE> struct WorkQueue::MakeReplyLambda { @@ -332,7 +397,7 @@ namespace LL } template <typename CALLABLE, typename FOLLOWUP> - bool WorkQueue::postTo(WorkQueue::weak_t target, + bool WorkQueue::postTo(weak_t target, const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) { // We're being asked to post to the WorkQueue at target. @@ -382,6 +447,69 @@ namespace LL return true; } + /// general case: arbitrary C++ return type + template <typename CALLABLE, typename RETURNTYPE> + struct WorkQueue::RunOn + { + optional<RETURNTYPE> operator()(WorkQueue* self, weak_t target, + const TimePoint& time, CALLABLE&& callable) + { + LLCoros::Promise<RETURNTYPE> promise; + if (! self->postTo( + target, + time, + std::forward<CALLABLE>(callable), + // We dare to bind a reference to Promise because it's + // specifically intended for cross-thread synchronization. + [&promise] + (RETURNTYPE&& result) + { + promise.set_value(std::forward<RETURNTYPE>(result)); + })) + { + // we couldn't even postTo(): return empty optional + return {}; + } + // we were able to post + auto future{ LLCoros::getFuture(promise) }; + return { future.get(); } + } + }; + + /// specialize for CALLABLE returning void + template <typename CALLABLE> + struct WorkQueue::RunOn<CALLABLE, void> + { + bool operator()(WorkQueue* self, weak_t target, + const TimePoint& time, CALLABLE&& callable) + { + LLCoros::Promise<void> promise; + if (! self->postTo( + target, + time, + std::forward<CALLABLE>(callable), + // &promise is designed for cross-thread access + [&promise](){ promise.set_value(); })) + { + // we couldn't postTo() + return false; + } + // we were able to post + auto future{ LLCoros::getFuture(promise) }; + // block until set_value() + future.get(); + return true; + } + }; + + template <typename CALLABLE> + auto WorkQueue::runOn(weak_t target, const TimePoint& time, CALLABLE&& callable) + { + checkCoroutine("runOn()"); + return RunOn<CALLABLE, decltype(std::forward<CALLABLE>(callable)())>() + (this, target, time, std::forward<CALLABLE>(callable)); + } + } // namespace LL #endif /* ! defined(LL_WORKQUEUE_H) */ |