From 11afa09ea3f56c0e20eb195ae1520a88602ceaca Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Fri, 22 Oct 2021 11:36:31 -0400 Subject: SL-16220: Add LL::ThreadPool class and a "General" instance. ThreadPool bundles a WorkQueue with the specified number of worker threads to service it. Each ThreadPool has a name that can be used to locate its WorkQueue. Each worker thread calls WorkQueue::runUntilClose(). ThreadPool listens on the "LLApp" LLEventPump for shutdown notification. On receiving that, it closes its WorkQueue and then join()s each of its worker threads for orderly shutdown. Add a settings.xml entry "ThreadPoolSizes", the first LLSD-valued settings entry to expect a map: pool name->size. The expectation is that usually code instantiating a particular ThreadPool will have a default size in mind, but it should check "ThreadPoolSizes" for a user override. Make idle_startup()'s STATE_SEED_CAP_GRANTED state instantiate a "General" ThreadPool. This is function-static for lazy initialization. Eliminate LLMainLoopRepeater, which is completely unreferenced. Any potential future use cases are better addressed by posting to the main loop's WorkQueue. Eliminate llappviewer.cpp's private LLDeferredTaskList class, which implemented LLAppViewer::addOnIdleCallback(). Make addOnIdleCallback() post work to the main loop's WorkQueue instead. --- indra/llcommon/workqueue.h | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index b88aef989a..cfae2019dc 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -59,6 +59,11 @@ namespace LL */ void close(); + /// producer end: are we prevented from pushing any additional items? + bool isClosed(); + /// consumer end: are we done, is the queue entirely drained? + bool done(); + /*---------------------- fire and forget API -----------------------*/ /// fire-and-forget, but at a particular (future?) time -- cgit v1.2.3 From e7b8c27741201528bf78f95c96ba820833923dab Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Mon, 25 Oct 2021 15:55:49 -0400 Subject: SL-16220: Specialize WorkQueue for callable with void return. Add a test exercising this feature. --- indra/llcommon/workqueue.h | 167 +++++++++++++++++++++++++++++---------------- 1 file changed, 110 insertions(+), 57 deletions(-) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index cfae2019dc..deef3c8e84 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -115,62 +115,7 @@ namespace LL // code. template bool postTo(WorkQueue::weak_t target, - const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) - { - // We're being asked to post to the WorkQueue at target. - // target is a weak_ptr: have to lock it to check it. - auto tptr = target.lock(); - if (! tptr) - // can't post() if the target WorkQueue has been destroyed - return false; - - // Here we believe target WorkQueue still exists. Post to it a - // lambda that packages our callable, our callback and a weak_ptr - // to this originating WorkQueue. - tptr->post( - time, - [reply = super::getWeak(), - callable = std::move(callable), - callback = std::move(callback)] - () - { - // Call the callable in any case -- but to minimize - // copying the result, immediately bind it into a reply - // lambda. The reply lambda also binds the original - // callback, so that when we, the originating WorkQueue, - // finally receive and process the reply lambda, we'll - // call the bound callback with the bound result -- on the - // same thread that originally called postTo(). - auto rlambda = - [result = callable(), - callback = std::move(callback)] - () - { callback(std::move(result)); }; - // Check if this originating WorkQueue still exists. - // Remember, the outer lambda is now running on a thread - // servicing the target WorkQueue, and real time has - // elapsed since postTo()'s tptr->post() call. - // reply is a weak_ptr: have to lock it to check it. - auto rptr = reply.lock(); - if (rptr) - { - // Only post reply lambda if the originating WorkQueue - // still exists. If not -- who would we tell? Log it? - try - { - rptr->post(std::move(rlambda)); - } - catch (const Closed&) - { - // Originating WorkQueue might still exist, but - // might be Closed. Same thing: just discard the - // callback. - } - } - }); - // looks like we were able to post() - return true; - } + const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback); /** * Post work to another WorkQueue, requesting a specific callback to @@ -183,7 +128,8 @@ namespace LL bool postTo(WorkQueue::weak_t target, CALLABLE&& callable, FOLLOWUP&& callback) { - return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); + return postTo(target, TimePoint::clock::now(), + std::move(callable), std::move(callback)); } /*--------------------------- worker API ---------------------------*/ @@ -231,6 +177,17 @@ namespace LL bool runUntil(const TimePoint& until); private: + template + static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback); + + /// general case: arbitrary C++ return type + template + struct MakeReplyLambda; + + /// specialize for CALLABLE returning void + template + struct MakeReplyLambda; + static void error(const std::string& msg); static std::string makeName(const std::string& name); void callWork(const Queue::DataTuple& work); @@ -329,6 +286,102 @@ namespace LL getWeak(), TimePoint::clock::now(), interval, std::move(callable))); } + template + struct WorkQueue::MakeReplyLambda + { + auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) + { + // Call the callable in any case -- but to minimize + // copying the result, immediately bind it into the reply + // lambda. The reply lambda also binds the original + // callback, so that when we, the originating WorkQueue, + // finally receive and process the reply lambda, we'll + // call the bound callback with the bound result -- on the + // same thread that originally called postTo(). + return + [result = std::forward(callable)(), + callback = std::move(callback)] + () + { callback(std::move(result)); }; + } + }; + + /// specialize for CALLABLE returning void + template + struct WorkQueue::MakeReplyLambda + { + auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) + { + // Call the callable, which produces no result. + std::forward(callable)(); + // This reply lambda binds the original callback, so + // that when we, the originating WorkQueue, finally + // receive and process the reply lambda, we'll call + // the bound callback -- on the same thread that + // originally called postTo(). + return [callback = std::move(callback)](){ callback(); }; + } + }; + + template + auto WorkQueue::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback) + { + return MakeReplyLambda(callable)())>() + (std::move(callable), std::move(callback)); + } + + template + bool WorkQueue::postTo(WorkQueue::weak_t target, + const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) + { + // We're being asked to post to the WorkQueue at target. + // target is a weak_ptr: have to lock it to check it. + auto tptr = target.lock(); + if (! tptr) + // can't post() if the target WorkQueue has been destroyed + return false; + + // Here we believe target WorkQueue still exists. Post to it a + // lambda that packages our callable, our callback and a weak_ptr + // to this originating WorkQueue. + tptr->post( + time, + [reply = super::getWeak(), + callable = std::move(callable), + callback = std::move(callback)] + () + { + // Make a reply lambda to repost to THIS WorkQueue. + // Delegate to makeReplyLambda() so we can partially + // specialize on void return. + auto rlambda = makeReplyLambda(std::move(callable), std::move(callback)); + // Check if this originating WorkQueue still exists. + // Remember, the outer lambda is now running on a thread + // servicing the target WorkQueue, and real time has + // elapsed since postTo()'s tptr->post() call. + // reply is a weak_ptr: have to lock it to check it. + auto rptr = reply.lock(); + if (rptr) + { + // Only post reply lambda if the originating WorkQueue + // still exists. If not -- who would we tell? Log it? + try + { + rptr->post(std::move(rlambda)); + } + catch (const Closed&) + { + // Originating WorkQueue might still exist, but + // might be Closed. Same thing: just discard the + // callback. + } + } + }); + // looks like we were able to post() + return true; + } + } // namespace LL #endif /* ! defined(LL_WORKQUEUE_H) */ -- cgit v1.2.3 From 023d39963e850356e1af6eec7f857e2534ce8d38 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Mon, 25 Oct 2021 17:31:27 -0400 Subject: SL-16220: WorkQueue::runOn() methods submit work, wait for result. The idea is that you can call runOn(target, callable) from a (non-default) coroutine and block that coroutine until the result becomes available. As a safety check, we forbid calling runOn() from a thread's default coroutine, assuming that a given thread's default coroutine is the one servicing the relevant WorkQueue. --- indra/llcommon/workqueue.h | 150 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 139 insertions(+), 11 deletions(-) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index deef3c8e84..b17c666172 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -12,11 +12,18 @@ #if ! defined(LL_WORKQUEUE_H) #define LL_WORKQUEUE_H +#include "llcoros.h" #include "llinstancetracker.h" #include "threadsafeschedule.h" #include #include // std::function -#include +#if __cplusplus >= 201703 +#include +namespace stdopt = std; +#else +#include +namespace stdopt = boost; +#endif #include #include // std::pair #include @@ -44,6 +51,8 @@ namespace LL using TimePoint = Queue::TimePoint; using TimedWork = Queue::TimeTuple; using Closed = Queue::Closed; + template + using optional = stdopt::optional; /** * You may omit the WorkQueue name, in which case a unique name is @@ -114,7 +123,7 @@ namespace LL // Studio compile errors that seem utterly unrelated to this source // code. template - bool postTo(WorkQueue::weak_t target, + bool postTo(weak_t target, const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback); /** @@ -125,13 +134,62 @@ namespace LL * inaccessible. */ template - bool postTo(WorkQueue::weak_t target, - CALLABLE&& callable, FOLLOWUP&& callback) + bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback) { return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); } + /** + * Post work to another WorkQueue to be run at a specified time, + * blocking the calling coroutine until then, returning the result to + * caller on completion. + * + * REQUIRED: + * + * * The calling thread is the thread servicing 'this' WorkQueue. + * * The calling coroutine is not the @em coroutine servicing this + * WorkQueue. We block the calling coroutine until the result is + * available. If this same coroutine is responsible for checking the + * local WorkQueue, the result will never be dequeued. In practice, + * to try to prevent mistakes, we forbid calling runOn() from a + * thread's default coroutine. + * + * Returns result if able to post, empty optional if the other + * WorkQueue is inaccessible. + * + * If the passed callable has void return, runOn() returns bool true + * if able to post, false if the other WorkQueue is inaccessible. + */ + template + auto runOn(weak_t target, const TimePoint& time, CALLABLE&& callable); + + /** + * Post work to another WorkQueue, blocking the calling coroutine + * until then, returning the result to caller on completion. + * + * REQUIRED: + * + * * The calling thread is the thread servicing 'this' WorkQueue. + * * The calling coroutine is not the @em coroutine servicing this + * WorkQueue. We block the calling coroutine until the result is + * available. If this same coroutine is responsible for checking the + * local WorkQueue, the result will never be dequeued. In practice, + * to try to prevent mistakes, we forbid calling runOn() from a + * thread's default coroutine. + * + * Returns result if able to post, empty optional if the other + * WorkQueue is inaccessible. + * + * If the passed callable has void return, runOn() returns bool true + * if able to post, false if the other WorkQueue is inaccessible. + */ + template + auto runOn(weak_t target, CALLABLE&& callable) + { + return runOn(target, TimePoint::clock::now(), std::move(callable)); + } + /*--------------------------- worker API ---------------------------*/ /** @@ -179,15 +237,21 @@ namespace LL private: template static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback); - /// general case: arbitrary C++ return type template struct MakeReplyLambda; - /// specialize for CALLABLE returning void template struct MakeReplyLambda; + /// general case: arbitrary C++ return type + template + struct RunOn; + /// specialize for CALLABLE returning void + template + struct RunOn; + + static void checkCoroutine(const std::string& method); static void error(const std::string& msg); static std::string makeName(const std::string& name); void callWork(const Queue::DataTuple& work); @@ -209,8 +273,8 @@ namespace LL { public: // bind the desired data - BackJack(WorkQueue::weak_t target, - const WorkQueue::TimePoint& start, + BackJack(weak_t target, + const TimePoint& start, const std::chrono::duration& interval, CALLABLE&& callable): mTarget(target), @@ -257,8 +321,8 @@ namespace LL } private: - WorkQueue::weak_t mTarget; - WorkQueue::TimePoint mStart; + weak_t mTarget; + TimePoint mStart; std::chrono::duration mInterval; CALLABLE mCallable; }; @@ -286,6 +350,7 @@ namespace LL getWeak(), TimePoint::clock::now(), interval, std::move(callable))); } + /// general case: arbitrary C++ return type template struct WorkQueue::MakeReplyLambda { @@ -332,7 +397,7 @@ namespace LL } template - bool WorkQueue::postTo(WorkQueue::weak_t target, + bool WorkQueue::postTo(weak_t target, const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) { // We're being asked to post to the WorkQueue at target. @@ -382,6 +447,69 @@ namespace LL return true; } + /// general case: arbitrary C++ return type + template + struct WorkQueue::RunOn + { + optional operator()(WorkQueue* self, weak_t target, + const TimePoint& time, CALLABLE&& callable) + { + LLCoros::Promise promise; + if (! self->postTo( + target, + time, + std::forward(callable), + // We dare to bind a reference to Promise because it's + // specifically intended for cross-thread synchronization. + [&promise] + (RETURNTYPE&& result) + { + promise.set_value(std::forward(result)); + })) + { + // we couldn't even postTo(): return empty optional + return {}; + } + // we were able to post + auto future{ LLCoros::getFuture(promise) }; + return { future.get(); } + } + }; + + /// specialize for CALLABLE returning void + template + struct WorkQueue::RunOn + { + bool operator()(WorkQueue* self, weak_t target, + const TimePoint& time, CALLABLE&& callable) + { + LLCoros::Promise promise; + if (! self->postTo( + target, + time, + std::forward(callable), + // &promise is designed for cross-thread access + [&promise](){ promise.set_value(); })) + { + // we couldn't postTo() + return false; + } + // we were able to post + auto future{ LLCoros::getFuture(promise) }; + // block until set_value() + future.get(); + return true; + } + }; + + template + auto WorkQueue::runOn(weak_t target, const TimePoint& time, CALLABLE&& callable) + { + checkCoroutine("runOn()"); + return RunOn(callable)())>() + (this, target, time, std::forward(callable)); + } + } // namespace LL #endif /* ! defined(LL_WORKQUEUE_H) */ -- cgit v1.2.3 From e6eebea8da545350f6684c191c633dd2fbc6f6f1 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Tue, 26 Oct 2021 11:49:53 -0400 Subject: SL-16220: Change WorkQueue::runOn() to waitForResult(). In addition to the name making the blocking explicit, we changed the signature: instead of specifying a target WorkQueue on which to run, waitForResult() runs the passed callable on its own WorkQueue. Why is that? Because, unlike postTo(), we do not require a handshake between two different WorkQueues. postTo() allows running arbitrary callback code, setting variables or whatever, on the originating WorkQueue (presumably on the originating thread). waitForResult() synchronizes using Promise/Future, which are explicitly designed for cross-thread communication. We need not call set_value() on the originating thread, so we don't need a postTo() callback lambda. --- indra/llcommon/workqueue.h | 145 +++++++++++++++++++-------------------------- 1 file changed, 61 insertions(+), 84 deletions(-) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index b17c666172..869f5d9a82 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -13,20 +13,13 @@ #define LL_WORKQUEUE_H #include "llcoros.h" +#include "llexception.h" #include "llinstancetracker.h" #include "threadsafeschedule.h" #include +#include // std::current_exception #include // std::function -#if __cplusplus >= 201703 -#include -namespace stdopt = std; -#else -#include -namespace stdopt = boost; -#endif #include -#include // std::pair -#include namespace LL { @@ -51,8 +44,11 @@ namespace LL using TimePoint = Queue::TimePoint; using TimedWork = Queue::TimeTuple; using Closed = Queue::Closed; - template - using optional = stdopt::optional; + + struct Error: public LLException + { + Error(const std::string& what): LLException(what) {} + }; /** * You may omit the WorkQueue name, in which case a unique name is @@ -145,49 +141,25 @@ namespace LL * blocking the calling coroutine until then, returning the result to * caller on completion. * - * REQUIRED: - * - * * The calling thread is the thread servicing 'this' WorkQueue. - * * The calling coroutine is not the @em coroutine servicing this - * WorkQueue. We block the calling coroutine until the result is - * available. If this same coroutine is responsible for checking the - * local WorkQueue, the result will never be dequeued. In practice, - * to try to prevent mistakes, we forbid calling runOn() from a - * thread's default coroutine. - * - * Returns result if able to post, empty optional if the other - * WorkQueue is inaccessible. - * - * If the passed callable has void return, runOn() returns bool true - * if able to post, false if the other WorkQueue is inaccessible. + * In general, we assume that each thread's default coroutine is busy + * servicing its WorkQueue or whatever. To try to prevent mistakes, we + * forbid calling waitForResult() from a thread's default coroutine. */ template - auto runOn(weak_t target, const TimePoint& time, CALLABLE&& callable); + auto waitForResult(const TimePoint& time, CALLABLE&& callable); /** * Post work to another WorkQueue, blocking the calling coroutine * until then, returning the result to caller on completion. * - * REQUIRED: - * - * * The calling thread is the thread servicing 'this' WorkQueue. - * * The calling coroutine is not the @em coroutine servicing this - * WorkQueue. We block the calling coroutine until the result is - * available. If this same coroutine is responsible for checking the - * local WorkQueue, the result will never be dequeued. In practice, - * to try to prevent mistakes, we forbid calling runOn() from a - * thread's default coroutine. - * - * Returns result if able to post, empty optional if the other - * WorkQueue is inaccessible. - * - * If the passed callable has void return, runOn() returns bool true - * if able to post, false if the other WorkQueue is inaccessible. + * In general, we assume that each thread's default coroutine is busy + * servicing its WorkQueue or whatever. To try to prevent mistakes, we + * forbid calling waitForResult() from a thread's default coroutine. */ template - auto runOn(weak_t target, CALLABLE&& callable) + auto waitForResult(CALLABLE&& callable) { - return runOn(target, TimePoint::clock::now(), std::move(callable)); + return waitForResult(TimePoint::clock::now(), std::move(callable)); } /*--------------------------- worker API ---------------------------*/ @@ -246,10 +218,10 @@ namespace LL /// general case: arbitrary C++ return type template - struct RunOn; + struct WaitForResult; /// specialize for CALLABLE returning void template - struct RunOn; + struct WaitForResult; static void checkCoroutine(const std::string& method); static void error(const std::string& msg); @@ -449,65 +421,70 @@ namespace LL /// general case: arbitrary C++ return type template - struct WorkQueue::RunOn + struct WorkQueue::WaitForResult { - optional operator()(WorkQueue* self, weak_t target, - const TimePoint& time, CALLABLE&& callable) + auto operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) { LLCoros::Promise promise; - if (! self->postTo( - target, - time, - std::forward(callable), - // We dare to bind a reference to Promise because it's - // specifically intended for cross-thread synchronization. - [&promise] - (RETURNTYPE&& result) + self->post( + time, + // We dare to bind a reference to Promise because it's + // specifically designed for cross-thread communication. + [&promise, callable = std::move(callable)]() + { + try { - promise.set_value(std::forward(result)); - })) - { - // we couldn't even postTo(): return empty optional - return {}; - } - // we were able to post + // call the caller's callable and trigger promise with result + promise.set_value(callable()); + } + catch (...) + { + promise.set_exception(std::current_exception()); + } + }); auto future{ LLCoros::getFuture(promise) }; - return { future.get(); } + // now, on the calling thread, wait for that result + LLCoros::TempStatus st("waiting for WorkQueue::waitForResult()"); + return future.get(); } }; /// specialize for CALLABLE returning void template - struct WorkQueue::RunOn + struct WorkQueue::WaitForResult { - bool operator()(WorkQueue* self, weak_t target, - const TimePoint& time, CALLABLE&& callable) + void operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) { LLCoros::Promise promise; - if (! self->postTo( - target, - time, - std::forward(callable), - // &promise is designed for cross-thread access - [&promise](){ promise.set_value(); })) - { - // we couldn't postTo() - return false; - } - // we were able to post + self->post( + time, + // &promise is designed for cross-thread access + [&promise, callable = std::move(callable)]() + { + try + { + callable(); + promise.set_value(); + } + catch (...) + { + promise.set_exception(std::current_exception()); + } + }); auto future{ LLCoros::getFuture(promise) }; // block until set_value() + LLCoros::TempStatus st("waiting for void WorkQueue::waitForResult()"); future.get(); - return true; } }; template - auto WorkQueue::runOn(weak_t target, const TimePoint& time, CALLABLE&& callable) + auto WorkQueue::waitForResult(const TimePoint& time, CALLABLE&& callable) { - checkCoroutine("runOn()"); - return RunOn(callable)())>() - (this, target, time, std::forward(callable)); + checkCoroutine("waitForResult()"); + // derive callable's return type so we can specialize for void + return WaitForResult(callable)())>() + (this, time, std::forward(callable)); } } // namespace LL -- cgit v1.2.3 From f06765cba868679492934452354d16f9f3af9ade Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Tue, 26 Oct 2021 12:29:49 -0400 Subject: SL-16220: Make WorkQueue::postTo() return exception to caller. postTo() sets up two-way communication: the caller asks to run work on some other WorkQueue, expecting an eventual callback on the originating WorkQueue. That permits us to transport any exception thrown by the work callable back to rethrow on the originating WorkQueue. --- indra/llcommon/workqueue.h | 93 +++++++++++++++++++++++++++++++--------------- 1 file changed, 64 insertions(+), 29 deletions(-) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index 869f5d9a82..42f5d78ba3 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -136,6 +136,25 @@ namespace LL std::move(callable), std::move(callback)); } + /** + * Post work to be run at a specified time to another WorkQueue, which + * may or may not still exist and be open. Return true if we were able + * to post. + */ + template + static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable); + + /** + * Post work to another WorkQueue, which may or may not still exist + * and be open. Return true if we were able to post. + */ + template + static bool postMaybe(weak_t target, CALLABLE&& callable) + { + return postMaybe(target, TimePoint::clock::now(), + std::forward(callable)); + } + /** * Post work to another WorkQueue to be run at a specified time, * blocking the calling coroutine until then, returning the result to @@ -351,12 +370,8 @@ namespace LL { // Call the callable, which produces no result. std::forward(callable)(); - // This reply lambda binds the original callback, so - // that when we, the originating WorkQueue, finally - // receive and process the reply lambda, we'll call - // the bound callback -- on the same thread that - // originally called postTo(). - return [callback = std::move(callback)](){ callback(); }; + // Our completion callback is simply the caller's callback. + return std::move(callback); } }; @@ -389,36 +404,56 @@ namespace LL callback = std::move(callback)] () { - // Make a reply lambda to repost to THIS WorkQueue. - // Delegate to makeReplyLambda() so we can partially - // specialize on void return. - auto rlambda = makeReplyLambda(std::move(callable), std::move(callback)); - // Check if this originating WorkQueue still exists. - // Remember, the outer lambda is now running on a thread - // servicing the target WorkQueue, and real time has - // elapsed since postTo()'s tptr->post() call. - // reply is a weak_ptr: have to lock it to check it. - auto rptr = reply.lock(); - if (rptr) + // Use postMaybe() below in case this originating WorkQueue + // has been closed or destroyed. Remember, the outer lambda is + // now running on a thread servicing the target WorkQueue, and + // real time has elapsed since postTo()'s tptr->post() call. + try { - // Only post reply lambda if the originating WorkQueue - // still exists. If not -- who would we tell? Log it? - try - { - rptr->post(std::move(rlambda)); - } - catch (const Closed&) - { - // Originating WorkQueue might still exist, but - // might be Closed. Same thing: just discard the - // callback. - } + // Make a reply lambda to repost to THIS WorkQueue. + // Delegate to makeReplyLambda() so we can partially + // specialize on void return. + postMaybe(reply, makeReplyLambda(std::move(callable), std::move(callback))); + } + catch (...) + { + // Either variant of makeReplyLambda() is responsible for + // calling the caller's callable. If that throws, return + // the exception to the originating thread. + postMaybe( + reply, + // Bind the current exception to transport back to the + // originating WorkQueue. Once there, rethrow it. + [exc = std::current_exception()](){ std::rethrow_exception(exc); }); } }); + // looks like we were able to post() return true; } + template + bool WorkQueue::postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable) + { + // target is a weak_ptr: have to lock it to check it + auto tptr = target.lock(); + if (tptr) + { + try + { + tptr->post(time, std::forward(callable)); + // we were able to post() + return true; + } + catch (const Closed&) + { + // target WorkQueue still exists, but is Closed + } + } + // either target no longer exists, or its WorkQueue is Closed + return false; + } + /// general case: arbitrary C++ return type template struct WorkQueue::WaitForResult -- cgit v1.2.3 From 8b16ecb9cfb4917fe38e4e5b0e4f40a23dd4ffbf Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Wed, 27 Oct 2021 15:31:54 -0400 Subject: SL-16220: Add tests for WorkQueue::waitForResult(), void & non-void. --- indra/llcommon/workqueue.h | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index 42f5d78ba3..7dbc735c6d 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -92,6 +92,25 @@ namespace LL post(TimePoint::clock::now(), std::move(callable)); } + /** + * Post work to be run at a specified time to another WorkQueue, which + * may or may not still exist and be open. Return true if we were able + * to post. + */ + template + static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable); + + /** + * Post work to another WorkQueue, which may or may not still exist + * and be open. Return true if we were able to post. + */ + template + static bool postMaybe(weak_t target, CALLABLE&& callable) + { + return postMaybe(target, TimePoint::clock::now(), + std::forward(callable)); + } + /** * Launch a callable returning bool that will trigger repeatedly at * specified interval, until the callable returns false. @@ -136,25 +155,6 @@ namespace LL std::move(callable), std::move(callback)); } - /** - * Post work to be run at a specified time to another WorkQueue, which - * may or may not still exist and be open. Return true if we were able - * to post. - */ - template - static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable); - - /** - * Post work to another WorkQueue, which may or may not still exist - * and be open. Return true if we were able to post. - */ - template - static bool postMaybe(weak_t target, CALLABLE&& callable) - { - return postMaybe(target, TimePoint::clock::now(), - std::forward(callable)); - } - /** * Post work to another WorkQueue to be run at a specified time, * blocking the calling coroutine until then, returning the result to -- cgit v1.2.3