From e6eebea8da545350f6684c191c633dd2fbc6f6f1 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Tue, 26 Oct 2021 11:49:53 -0400 Subject: SL-16220: Change WorkQueue::runOn() to waitForResult(). In addition to the name making the blocking explicit, we changed the signature: instead of specifying a target WorkQueue on which to run, waitForResult() runs the passed callable on its own WorkQueue. Why is that? Because, unlike postTo(), we do not require a handshake between two different WorkQueues. postTo() allows running arbitrary callback code, setting variables or whatever, on the originating WorkQueue (presumably on the originating thread). waitForResult() synchronizes using Promise/Future, which are explicitly designed for cross-thread communication. We need not call set_value() on the originating thread, so we don't need a postTo() callback lambda. --- indra/llcommon/workqueue.h | 145 +++++++++++++++++++-------------------------- 1 file changed, 61 insertions(+), 84 deletions(-) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index b17c666172..869f5d9a82 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -13,20 +13,13 @@ #define LL_WORKQUEUE_H #include "llcoros.h" +#include "llexception.h" #include "llinstancetracker.h" #include "threadsafeschedule.h" #include +#include // std::current_exception #include // std::function -#if __cplusplus >= 201703 -#include -namespace stdopt = std; -#else -#include -namespace stdopt = boost; -#endif #include -#include // std::pair -#include namespace LL { @@ -51,8 +44,11 @@ namespace LL using TimePoint = Queue::TimePoint; using TimedWork = Queue::TimeTuple; using Closed = Queue::Closed; - template - using optional = stdopt::optional; + + struct Error: public LLException + { + Error(const std::string& what): LLException(what) {} + }; /** * You may omit the WorkQueue name, in which case a unique name is @@ -145,49 +141,25 @@ namespace LL * blocking the calling coroutine until then, returning the result to * caller on completion. * - * REQUIRED: - * - * * The calling thread is the thread servicing 'this' WorkQueue. - * * The calling coroutine is not the @em coroutine servicing this - * WorkQueue. We block the calling coroutine until the result is - * available. If this same coroutine is responsible for checking the - * local WorkQueue, the result will never be dequeued. In practice, - * to try to prevent mistakes, we forbid calling runOn() from a - * thread's default coroutine. - * - * Returns result if able to post, empty optional if the other - * WorkQueue is inaccessible. - * - * If the passed callable has void return, runOn() returns bool true - * if able to post, false if the other WorkQueue is inaccessible. + * In general, we assume that each thread's default coroutine is busy + * servicing its WorkQueue or whatever. To try to prevent mistakes, we + * forbid calling waitForResult() from a thread's default coroutine. */ template - auto runOn(weak_t target, const TimePoint& time, CALLABLE&& callable); + auto waitForResult(const TimePoint& time, CALLABLE&& callable); /** * Post work to another WorkQueue, blocking the calling coroutine * until then, returning the result to caller on completion. * - * REQUIRED: - * - * * The calling thread is the thread servicing 'this' WorkQueue. - * * The calling coroutine is not the @em coroutine servicing this - * WorkQueue. We block the calling coroutine until the result is - * available. If this same coroutine is responsible for checking the - * local WorkQueue, the result will never be dequeued. In practice, - * to try to prevent mistakes, we forbid calling runOn() from a - * thread's default coroutine. - * - * Returns result if able to post, empty optional if the other - * WorkQueue is inaccessible. - * - * If the passed callable has void return, runOn() returns bool true - * if able to post, false if the other WorkQueue is inaccessible. + * In general, we assume that each thread's default coroutine is busy + * servicing its WorkQueue or whatever. To try to prevent mistakes, we + * forbid calling waitForResult() from a thread's default coroutine. */ template - auto runOn(weak_t target, CALLABLE&& callable) + auto waitForResult(CALLABLE&& callable) { - return runOn(target, TimePoint::clock::now(), std::move(callable)); + return waitForResult(TimePoint::clock::now(), std::move(callable)); } /*--------------------------- worker API ---------------------------*/ @@ -246,10 +218,10 @@ namespace LL /// general case: arbitrary C++ return type template - struct RunOn; + struct WaitForResult; /// specialize for CALLABLE returning void template - struct RunOn; + struct WaitForResult; static void checkCoroutine(const std::string& method); static void error(const std::string& msg); @@ -449,65 +421,70 @@ namespace LL /// general case: arbitrary C++ return type template - struct WorkQueue::RunOn + struct WorkQueue::WaitForResult { - optional operator()(WorkQueue* self, weak_t target, - const TimePoint& time, CALLABLE&& callable) + auto operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) { LLCoros::Promise promise; - if (! self->postTo( - target, - time, - std::forward(callable), - // We dare to bind a reference to Promise because it's - // specifically intended for cross-thread synchronization. - [&promise] - (RETURNTYPE&& result) + self->post( + time, + // We dare to bind a reference to Promise because it's + // specifically designed for cross-thread communication. + [&promise, callable = std::move(callable)]() + { + try { - promise.set_value(std::forward(result)); - })) - { - // we couldn't even postTo(): return empty optional - return {}; - } - // we were able to post + // call the caller's callable and trigger promise with result + promise.set_value(callable()); + } + catch (...) + { + promise.set_exception(std::current_exception()); + } + }); auto future{ LLCoros::getFuture(promise) }; - return { future.get(); } + // now, on the calling thread, wait for that result + LLCoros::TempStatus st("waiting for WorkQueue::waitForResult()"); + return future.get(); } }; /// specialize for CALLABLE returning void template - struct WorkQueue::RunOn + struct WorkQueue::WaitForResult { - bool operator()(WorkQueue* self, weak_t target, - const TimePoint& time, CALLABLE&& callable) + void operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) { LLCoros::Promise promise; - if (! self->postTo( - target, - time, - std::forward(callable), - // &promise is designed for cross-thread access - [&promise](){ promise.set_value(); })) - { - // we couldn't postTo() - return false; - } - // we were able to post + self->post( + time, + // &promise is designed for cross-thread access + [&promise, callable = std::move(callable)]() + { + try + { + callable(); + promise.set_value(); + } + catch (...) + { + promise.set_exception(std::current_exception()); + } + }); auto future{ LLCoros::getFuture(promise) }; // block until set_value() + LLCoros::TempStatus st("waiting for void WorkQueue::waitForResult()"); future.get(); - return true; } }; template - auto WorkQueue::runOn(weak_t target, const TimePoint& time, CALLABLE&& callable) + auto WorkQueue::waitForResult(const TimePoint& time, CALLABLE&& callable) { - checkCoroutine("runOn()"); - return RunOn(callable)())>() - (this, target, time, std::forward(callable)); + checkCoroutine("waitForResult()"); + // derive callable's return type so we can specialize for void + return WaitForResult(callable)())>() + (this, time, std::forward(callable)); } } // namespace LL -- cgit v1.2.3