diff options
author | Nat Goodspeed <nat@lindenlab.com> | 2021-10-26 11:49:53 -0400 |
---|---|---|
committer | Nat Goodspeed <nat@lindenlab.com> | 2021-10-26 11:49:53 -0400 |
commit | e6eebea8da545350f6684c191c633dd2fbc6f6f1 (patch) | |
tree | ab5885d5a0b65436964b5dbe8ddc9051d1723bac /indra/llcommon | |
parent | 023d39963e850356e1af6eec7f857e2534ce8d38 (diff) |
SL-16220: Change WorkQueue::runOn() to waitForResult().
In addition to the name making the blocking explicit, we changed the
signature: instead of specifying a target WorkQueue on which to run,
waitForResult() runs the passed callable on its own WorkQueue.
Why is that? Because, unlike postTo(), we do not require a handshake between
two different WorkQueues. postTo() allows running arbitrary callback code,
setting variables or whatever, on the originating WorkQueue (presumably on the
originating thread). waitForResult() synchronizes using Promise/Future, which
are explicitly designed for cross-thread communication. We need not call
set_value() on the originating thread, so we don't need a postTo() callback
lambda.
Diffstat (limited to 'indra/llcommon')
-rw-r--r-- | indra/llcommon/workqueue.cpp | 7 | ||||
-rw-r--r-- | indra/llcommon/workqueue.h | 145 |
2 files changed, 62 insertions, 90 deletions
diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index f7ffc8233c..ac3086aac5 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -26,11 +26,6 @@ using Mutex = LLCoros::Mutex; using Lock = LLCoros::LockType; -struct NotOnDftCoro: public LLException -{ - NotOnDftCoro(const std::string& what): LLException(what) {} -}; - LL::WorkQueue::WorkQueue(const std::string& name): super(makeName(name)) { @@ -148,6 +143,6 @@ void LL::WorkQueue::checkCoroutine(const std::string& method) // string. See also LLCoros::logname(). if (LLCoros::getName().empty()) { - LLTHROW(NotOnDftCoro("Do not call " + method + " from a thread's default coroutine")); + LLTHROW(Error("Do not call " + method + " from a thread's default coroutine")); } } diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index b17c666172..869f5d9a82 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -13,20 +13,13 @@ #define LL_WORKQUEUE_H #include "llcoros.h" +#include "llexception.h" #include "llinstancetracker.h" #include "threadsafeschedule.h" #include <chrono> +#include <exception> // std::current_exception #include <functional> // std::function -#if __cplusplus >= 201703 -#include <optional> -namespace stdopt = std; -#else -#include <boost/optional.hpp> -namespace stdopt = boost; -#endif #include <string> -#include <utility> // std::pair -#include <vector> namespace LL { @@ -51,8 +44,11 @@ namespace LL using TimePoint = Queue::TimePoint; using TimedWork = Queue::TimeTuple; using Closed = Queue::Closed; - template <typename T> - using optional = stdopt::optional<T>; + + struct Error: public LLException + { + Error(const std::string& what): LLException(what) {} + }; /** * You may omit the WorkQueue name, in which case a unique name is @@ -145,49 +141,25 @@ namespace LL * blocking the calling coroutine until then, returning the result to * caller on completion. * - * REQUIRED: - * - * * The calling thread is the thread servicing 'this' WorkQueue. - * * The calling coroutine is not the @em coroutine servicing this - * WorkQueue. We block the calling coroutine until the result is - * available. If this same coroutine is responsible for checking the - * local WorkQueue, the result will never be dequeued. In practice, - * to try to prevent mistakes, we forbid calling runOn() from a - * thread's default coroutine. - * - * Returns result if able to post, empty optional if the other - * WorkQueue is inaccessible. - * - * If the passed callable has void return, runOn() returns bool true - * if able to post, false if the other WorkQueue is inaccessible. + * In general, we assume that each thread's default coroutine is busy + * servicing its WorkQueue or whatever. To try to prevent mistakes, we + * forbid calling waitForResult() from a thread's default coroutine. */ template <typename CALLABLE> - auto runOn(weak_t target, const TimePoint& time, CALLABLE&& callable); + auto waitForResult(const TimePoint& time, CALLABLE&& callable); /** * Post work to another WorkQueue, blocking the calling coroutine * until then, returning the result to caller on completion. * - * REQUIRED: - * - * * The calling thread is the thread servicing 'this' WorkQueue. - * * The calling coroutine is not the @em coroutine servicing this - * WorkQueue. We block the calling coroutine until the result is - * available. If this same coroutine is responsible for checking the - * local WorkQueue, the result will never be dequeued. In practice, - * to try to prevent mistakes, we forbid calling runOn() from a - * thread's default coroutine. - * - * Returns result if able to post, empty optional if the other - * WorkQueue is inaccessible. - * - * If the passed callable has void return, runOn() returns bool true - * if able to post, false if the other WorkQueue is inaccessible. + * In general, we assume that each thread's default coroutine is busy + * servicing its WorkQueue or whatever. To try to prevent mistakes, we + * forbid calling waitForResult() from a thread's default coroutine. */ template <typename CALLABLE> - auto runOn(weak_t target, CALLABLE&& callable) + auto waitForResult(CALLABLE&& callable) { - return runOn(target, TimePoint::clock::now(), std::move(callable)); + return waitForResult(TimePoint::clock::now(), std::move(callable)); } /*--------------------------- worker API ---------------------------*/ @@ -246,10 +218,10 @@ namespace LL /// general case: arbitrary C++ return type template <typename CALLABLE, typename RETURNTYPE> - struct RunOn; + struct WaitForResult; /// specialize for CALLABLE returning void template <typename CALLABLE> - struct RunOn<CALLABLE, void>; + struct WaitForResult<CALLABLE, void>; static void checkCoroutine(const std::string& method); static void error(const std::string& msg); @@ -449,65 +421,70 @@ namespace LL /// general case: arbitrary C++ return type template <typename CALLABLE, typename RETURNTYPE> - struct WorkQueue::RunOn + struct WorkQueue::WaitForResult { - optional<RETURNTYPE> operator()(WorkQueue* self, weak_t target, - const TimePoint& time, CALLABLE&& callable) + auto operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) { LLCoros::Promise<RETURNTYPE> promise; - if (! self->postTo( - target, - time, - std::forward<CALLABLE>(callable), - // We dare to bind a reference to Promise because it's - // specifically intended for cross-thread synchronization. - [&promise] - (RETURNTYPE&& result) + self->post( + time, + // We dare to bind a reference to Promise because it's + // specifically designed for cross-thread communication. + [&promise, callable = std::move(callable)]() + { + try { - promise.set_value(std::forward<RETURNTYPE>(result)); - })) - { - // we couldn't even postTo(): return empty optional - return {}; - } - // we were able to post + // call the caller's callable and trigger promise with result + promise.set_value(callable()); + } + catch (...) + { + promise.set_exception(std::current_exception()); + } + }); auto future{ LLCoros::getFuture(promise) }; - return { future.get(); } + // now, on the calling thread, wait for that result + LLCoros::TempStatus st("waiting for WorkQueue::waitForResult()"); + return future.get(); } }; /// specialize for CALLABLE returning void template <typename CALLABLE> - struct WorkQueue::RunOn<CALLABLE, void> + struct WorkQueue::WaitForResult<CALLABLE, void> { - bool operator()(WorkQueue* self, weak_t target, - const TimePoint& time, CALLABLE&& callable) + void operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) { LLCoros::Promise<void> promise; - if (! self->postTo( - target, - time, - std::forward<CALLABLE>(callable), - // &promise is designed for cross-thread access - [&promise](){ promise.set_value(); })) - { - // we couldn't postTo() - return false; - } - // we were able to post + self->post( + time, + // &promise is designed for cross-thread access + [&promise, callable = std::move(callable)]() + { + try + { + callable(); + promise.set_value(); + } + catch (...) + { + promise.set_exception(std::current_exception()); + } + }); auto future{ LLCoros::getFuture(promise) }; // block until set_value() + LLCoros::TempStatus st("waiting for void WorkQueue::waitForResult()"); future.get(); - return true; } }; template <typename CALLABLE> - auto WorkQueue::runOn(weak_t target, const TimePoint& time, CALLABLE&& callable) + auto WorkQueue::waitForResult(const TimePoint& time, CALLABLE&& callable) { - checkCoroutine("runOn()"); - return RunOn<CALLABLE, decltype(std::forward<CALLABLE>(callable)())>() - (this, target, time, std::forward<CALLABLE>(callable)); + checkCoroutine("waitForResult()"); + // derive callable's return type so we can specialize for void + return WaitForResult<CALLABLE, decltype(std::forward<CALLABLE>(callable)())>() + (this, time, std::forward<CALLABLE>(callable)); } } // namespace LL |