From 623ac79120a417ec445ce5c106a907fe46734309 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 7 Oct 2021 15:32:51 -0400 Subject: SL-16024: Add LL::WorkQueue for passing work items between threads. A typical WorkQueue has a string name, which can be used to find it to post work to it. "Work" is a nullary callable. WorkQueue is a multi-producer, multi-consumer thread-safe queue: multiple threads can service the WorkQueue, multiple threads can post work to it. Work can be scheduled in the future by submitting with a timestamp. In addition, a given work item can be scheduled to run on a recurring basis. A requesting thread servicing a WorkQueue of its own, such as the viewer's main thread, can submit work to another WorkQueue along with a callback to be passed the result (of arbitrary type) of the first work item. The callback is posted to the originating WorkQueue, permitting safe data exchange between participating threads. Methods are provided for different kinds of servicing threads. runUntilClose() is useful for a simple worker thread. runFor(duration) devotes no more than a specified time slice to that WorkQueue, e.g. for use by the main thread. --- indra/llcommon/workqueue.h | 325 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 325 insertions(+) create mode 100644 indra/llcommon/workqueue.h (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h new file mode 100644 index 0000000000..a52f7b0e26 --- /dev/null +++ b/indra/llcommon/workqueue.h @@ -0,0 +1,325 @@ +/** + * @file workqueue.h + * @author Nat Goodspeed + * @date 2021-09-30 + * @brief Queue used for inter-thread work passing. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +#if ! defined(LL_WORKQUEUE_H) +#define LL_WORKQUEUE_H + +#include "llinstancetracker.h" +#include "threadsafeschedule.h" +#include +#include // std::function +#include +#include +#include // std::pair +#include + +namespace LL +{ + /** + * A typical WorkQueue has a string name that can be used to find it. + */ + class WorkQueue: public LLInstanceTracker + { + private: + using super = LLInstanceTracker; + + public: + using Work = std::function; + + private: + using Queue = ThreadSafeSchedule; + // helper for postEvery() + template + class BackJack; + + public: + using TimePoint = Queue::TimePoint; + using TimedWork = Queue::TimeTuple; + using Closed = Queue::Closed; + + /** + * You may omit the WorkQueue name, in which case a unique name is + * synthesized; for practical purposes that makes it anonymous. + */ + WorkQueue(const std::string& name = std::string()); + + /** + * Since the point of WorkQueue is to pass work to some other worker + * thread(s) asynchronously, it's important that the WorkQueue continue + * to exist until the worker thread(s) have drained it. To communicate + * that it's time for them to quit, close() the queue. + */ + void close(); + + /*---------------------- fire and forget API -----------------------*/ + + /// fire-and-forget, but at a particular (future?) time + template + void post(const TimePoint& time, CALLABLE&& callable) + { + // Defer reifying an arbitrary CALLABLE until we hit this method. + // All other methods should accept CALLABLEs of arbitrary type to + // avoid multiple levels of std::function indirection. + mQueue.push(TimedWork(time, std::move(callable))); + } + + /// fire-and-forget + template + void post(CALLABLE&& callable) + { + // We use TimePoint::clock::now() instead of TimePoint's + // representation of the epoch because this WorkQueue may contain + // a mix of past-due TimedWork items and TimedWork items scheduled + // for the future. Sift this new item into the correct place. + post(TimePoint::clock::now(), std::move(callable)); + } + + /** + * Launch a callable returning bool that will trigger repeatedly at + * specified interval, until the callable returns false. + * + * If you need to signal that callable from outside, DO NOT bind a + * reference to a simple bool! That's not thread-safe. Instead, bind + * an LLCond variant, e.g. LLOneShotCond or LLBoolCond. + */ + template + void postEvery(const std::chrono::duration& interval, + CALLABLE&& callable); + + /*------------------------- handshake API --------------------------*/ + + /** + * Post work to another WorkQueue to be run at a specified time, + * requesting a specific callback to be run on this WorkQueue on + * completion. + * + * Returns true if able to post, false if the other WorkQueue is + * inaccessible. + */ + template + bool postTo(std::weak_ptr target, + const TimePoint& time, CALLABLE&& callable, CALLBACK&& callback) + { + // We're being asked to post to the WorkQueue at target. + // target is a weak_ptr: have to lock it to check it. + auto tptr = target.lock(); + if (! tptr) + // can't post() if the target WorkQueue has been destroyed + return false; + + // Here we believe target WorkQueue still exists. Post to it a + // lambda that packages our callable, our callback and a weak_ptr + // to this originating WorkQueue. + tptr->post( + time, + [reply = super::getWeak(), + callable = std::move(callable), + callback = std::move(callback)] + () + { + // Call the callable in any case -- but to minimize + // copying the result, immediately bind it into a reply + // lambda. The reply lambda also binds the original + // callback, so that when we, the originating WorkQueue, + // finally receive and process the reply lambda, we'll + // call the bound callback with the bound result -- on the + // same thread that originally called postTo(). + auto rlambda = + [result = callable(), + callback = std::move(callback)] + () + { callback(std::move(result)); }; + // Check if this originating WorkQueue still exists. + // Remember, the outer lambda is now running on a thread + // servicing the target WorkQueue, and real time has + // elapsed since postTo()'s tptr->post() call. + // reply is a weak_ptr: have to lock it to check it. + auto rptr = reply.lock(); + if (rptr) + { + // Only post reply lambda if the originating WorkQueue + // still exists. If not -- who would we tell? Log it? + try + { + rptr->post(std::move(rlambda)); + } + catch (const Closed&) + { + // Originating WorkQueue might still exist, but + // might be Closed. Same thing: just discard the + // callback. + } + } + }); + // looks like we were able to post() + return true; + } + + /** + * Post work to another WorkQueue, requesting a specific callback to + * be run on this WorkQueue on completion. + * + * Returns true if able to post, false if the other WorkQueue is + * inaccessible. + */ + template + bool postTo(std::weak_ptr target, + CALLABLE&& callable, CALLBACK&& callback) + { + return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); + } + + /*--------------------------- worker API ---------------------------*/ + + /** + * runUntilClose() pulls TimedWork items off this WorkQueue until the + * queue is closed, at which point it returns. This would be the + * typical entry point for a simple worker thread. + */ + void runUntilClose(); + + /** + * runPending() runs all TimedWork items that are ready to run. It + * returns true if the queue remains open, false if the queue has been + * closed. This could be used by a thread whose primary purpose is to + * serve the queue, but also wants to do other things with its idle time. + */ + bool runPending(); + + /** + * runOne() runs at most one ready TimedWork item -- zero if none are + * ready. It returns true if the queue remains open, false if the + * queue has been closed. + */ + bool runOne(); + + /** + * runFor() runs a subset of ready TimedWork items, until the + * timeslice has been exceeded. It returns true if the queue remains + * open, false if the queue has been closed. This could be used by a + * busy main thread to lend a bounded few CPU cycles to this WorkQueue + * without risking the WorkQueue blowing out the length of any one + * frame. + */ + template + bool runFor(const std::chrono::duration& timeslice) + { + return runUntil(TimePoint::clock::now() + timeslice); + } + + /** + * runUntil() is just like runFor(), only with a specific end time + * instead of a timeslice duration. + */ + bool runUntil(const TimePoint& until); + + private: + static void error(const std::string& msg); + static std::string makeName(const std::string& name); + void callWork(const Queue::DataTuple& work); + void callWork(const Work& work); + Queue mQueue; + }; + + /** + * BackJack is, in effect, a hand-rolled lambda, binding a WorkQueue, a + * CALLABLE that returns bool, a TimePoint and an interval at which to + * relaunch it. As long as the callable continues returning true, BackJack + * keeps resubmitting it to the target WorkQueue. + */ + // Why is BackJack a class and not a lambda? Because, unlike a lambda, a + // class method gets its own 'this' pointer -- which we need to resubmit + // the whole BackJack callable. + template + class WorkQueue::BackJack + { + public: + // bind the desired data + BackJack(std::weak_ptr target, + const WorkQueue::TimePoint& start, + const std::chrono::duration& interval, + CALLABLE&& callable): + mTarget(target), + mStart(start), + mInterval(interval), + mCallable(std::move(callable)) + {} + + // Call by target WorkQueue -- note that although WE require a + // callable returning bool, WorkQueue wants a void callable. We + // consume the bool. + void operator()() + { + // If mCallable() throws an exception, don't catch it here: if it + // throws once, it's likely to throw every time, so it's a waste + // of time to arrange to call it again. + if (mCallable()) + { + // Modify mStart to the new start time we desire. If we simply + // added mInterval to now, we'd get actual timings of + // (mInterval + slop), where 'slop' is the latency between the + // previous mStart and the WorkQueue actually calling us. + // Instead, add mInterval to mStart so that at least we + // register our intent to fire at exact mIntervals. + mStart += mInterval; + + // We're being called at this moment by the target WorkQueue. + // Assume it still exists, rather than checking the result of + // lock(). + // Resubmit the whole *this callable: that's why we're a class + // rather than a lambda. Allow moving *this so we can carry a + // move-only callable; but naturally this statement must be + // the last time we reference this instance, which may become + // moved-from. + try + { + mTarget.lock()->post(mStart, std::move(*this)); + } + catch (const Closed&) + { + // Once this queue is closed, oh well, just stop + } + } + } + + private: + std::weak_ptr mTarget; + WorkQueue::TimePoint mStart; + std::chrono::duration mInterval; + CALLABLE mCallable; + }; + + template + void WorkQueue::postEvery(const std::chrono::duration& interval, + CALLABLE&& callable) + { + if (interval.count() <= 0) + { + // It's essential that postEvery() be called with a positive + // interval, since each call to BackJack posts another instance of + // itself at (start + interval) and we order by target time. A + // zero or negative interval would result in that BackJack + // instance going to the head of the queue every time, immediately + // ready to run. Effectively that would produce an infinite loop, + // a denial of service on this WorkQueue. + error("postEvery(interval) may not be 0"); + } + // Instantiate and post a suitable BackJack, binding a weak_ptr to + // self, the current time, the desired interval and the desired + // callable. + post( + BackJack( + getWeak(), TimePoint::clock::now(), interval, std::move(callable))); + } + +} // namespace LL + +#endif /* ! defined(LL_WORKQUEUE_H) */ -- cgit v1.2.3 From 54d874b1233586844f87e79ae8f211af0a1cb7a6 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Fri, 8 Oct 2021 11:52:09 -0400 Subject: SL-16024: Resolve bizarre VS compile error. Thanks Callum! It seems CALLBACK is a macro in some Microsoft header file. Bleah. --- indra/llcommon/workqueue.h | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index a52f7b0e26..b88aef989a 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -104,9 +104,13 @@ namespace LL * Returns true if able to post, false if the other WorkQueue is * inaccessible. */ - template - bool postTo(std::weak_ptr target, - const TimePoint& time, CALLABLE&& callable, CALLBACK&& callback) + // Apparently some Microsoft header file defines a macro CALLBACK? The + // natural template argument name CALLBACK produces very weird Visual + // Studio compile errors that seem utterly unrelated to this source + // code. + template + bool postTo(WorkQueue::weak_t target, + const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) { // We're being asked to post to the WorkQueue at target. // target is a weak_ptr: have to lock it to check it. @@ -170,9 +174,9 @@ namespace LL * Returns true if able to post, false if the other WorkQueue is * inaccessible. */ - template - bool postTo(std::weak_ptr target, - CALLABLE&& callable, CALLBACK&& callback) + template + bool postTo(WorkQueue::weak_t target, + CALLABLE&& callable, FOLLOWUP&& callback) { return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); } @@ -243,7 +247,7 @@ namespace LL { public: // bind the desired data - BackJack(std::weak_ptr target, + BackJack(WorkQueue::weak_t target, const WorkQueue::TimePoint& start, const std::chrono::duration& interval, CALLABLE&& callable): @@ -291,7 +295,7 @@ namespace LL } private: - std::weak_ptr mTarget; + WorkQueue::weak_t mTarget; WorkQueue::TimePoint mStart; std::chrono::duration mInterval; CALLABLE mCallable; -- cgit v1.2.3 From 11afa09ea3f56c0e20eb195ae1520a88602ceaca Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Fri, 22 Oct 2021 11:36:31 -0400 Subject: SL-16220: Add LL::ThreadPool class and a "General" instance. ThreadPool bundles a WorkQueue with the specified number of worker threads to service it. Each ThreadPool has a name that can be used to locate its WorkQueue. Each worker thread calls WorkQueue::runUntilClose(). ThreadPool listens on the "LLApp" LLEventPump for shutdown notification. On receiving that, it closes its WorkQueue and then join()s each of its worker threads for orderly shutdown. Add a settings.xml entry "ThreadPoolSizes", the first LLSD-valued settings entry to expect a map: pool name->size. The expectation is that usually code instantiating a particular ThreadPool will have a default size in mind, but it should check "ThreadPoolSizes" for a user override. Make idle_startup()'s STATE_SEED_CAP_GRANTED state instantiate a "General" ThreadPool. This is function-static for lazy initialization. Eliminate LLMainLoopRepeater, which is completely unreferenced. Any potential future use cases are better addressed by posting to the main loop's WorkQueue. Eliminate llappviewer.cpp's private LLDeferredTaskList class, which implemented LLAppViewer::addOnIdleCallback(). Make addOnIdleCallback() post work to the main loop's WorkQueue instead. --- indra/llcommon/workqueue.h | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index b88aef989a..cfae2019dc 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -59,6 +59,11 @@ namespace LL */ void close(); + /// producer end: are we prevented from pushing any additional items? + bool isClosed(); + /// consumer end: are we done, is the queue entirely drained? + bool done(); + /*---------------------- fire and forget API -----------------------*/ /// fire-and-forget, but at a particular (future?) time -- cgit v1.2.3 From e7b8c27741201528bf78f95c96ba820833923dab Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Mon, 25 Oct 2021 15:55:49 -0400 Subject: SL-16220: Specialize WorkQueue for callable with void return. Add a test exercising this feature. --- indra/llcommon/workqueue.h | 167 +++++++++++++++++++++++++++++---------------- 1 file changed, 110 insertions(+), 57 deletions(-) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index cfae2019dc..deef3c8e84 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -115,62 +115,7 @@ namespace LL // code. template bool postTo(WorkQueue::weak_t target, - const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) - { - // We're being asked to post to the WorkQueue at target. - // target is a weak_ptr: have to lock it to check it. - auto tptr = target.lock(); - if (! tptr) - // can't post() if the target WorkQueue has been destroyed - return false; - - // Here we believe target WorkQueue still exists. Post to it a - // lambda that packages our callable, our callback and a weak_ptr - // to this originating WorkQueue. - tptr->post( - time, - [reply = super::getWeak(), - callable = std::move(callable), - callback = std::move(callback)] - () - { - // Call the callable in any case -- but to minimize - // copying the result, immediately bind it into a reply - // lambda. The reply lambda also binds the original - // callback, so that when we, the originating WorkQueue, - // finally receive and process the reply lambda, we'll - // call the bound callback with the bound result -- on the - // same thread that originally called postTo(). - auto rlambda = - [result = callable(), - callback = std::move(callback)] - () - { callback(std::move(result)); }; - // Check if this originating WorkQueue still exists. - // Remember, the outer lambda is now running on a thread - // servicing the target WorkQueue, and real time has - // elapsed since postTo()'s tptr->post() call. - // reply is a weak_ptr: have to lock it to check it. - auto rptr = reply.lock(); - if (rptr) - { - // Only post reply lambda if the originating WorkQueue - // still exists. If not -- who would we tell? Log it? - try - { - rptr->post(std::move(rlambda)); - } - catch (const Closed&) - { - // Originating WorkQueue might still exist, but - // might be Closed. Same thing: just discard the - // callback. - } - } - }); - // looks like we were able to post() - return true; - } + const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback); /** * Post work to another WorkQueue, requesting a specific callback to @@ -183,7 +128,8 @@ namespace LL bool postTo(WorkQueue::weak_t target, CALLABLE&& callable, FOLLOWUP&& callback) { - return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); + return postTo(target, TimePoint::clock::now(), + std::move(callable), std::move(callback)); } /*--------------------------- worker API ---------------------------*/ @@ -231,6 +177,17 @@ namespace LL bool runUntil(const TimePoint& until); private: + template + static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback); + + /// general case: arbitrary C++ return type + template + struct MakeReplyLambda; + + /// specialize for CALLABLE returning void + template + struct MakeReplyLambda; + static void error(const std::string& msg); static std::string makeName(const std::string& name); void callWork(const Queue::DataTuple& work); @@ -329,6 +286,102 @@ namespace LL getWeak(), TimePoint::clock::now(), interval, std::move(callable))); } + template + struct WorkQueue::MakeReplyLambda + { + auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) + { + // Call the callable in any case -- but to minimize + // copying the result, immediately bind it into the reply + // lambda. The reply lambda also binds the original + // callback, so that when we, the originating WorkQueue, + // finally receive and process the reply lambda, we'll + // call the bound callback with the bound result -- on the + // same thread that originally called postTo(). + return + [result = std::forward(callable)(), + callback = std::move(callback)] + () + { callback(std::move(result)); }; + } + }; + + /// specialize for CALLABLE returning void + template + struct WorkQueue::MakeReplyLambda + { + auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) + { + // Call the callable, which produces no result. + std::forward(callable)(); + // This reply lambda binds the original callback, so + // that when we, the originating WorkQueue, finally + // receive and process the reply lambda, we'll call + // the bound callback -- on the same thread that + // originally called postTo(). + return [callback = std::move(callback)](){ callback(); }; + } + }; + + template + auto WorkQueue::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback) + { + return MakeReplyLambda(callable)())>() + (std::move(callable), std::move(callback)); + } + + template + bool WorkQueue::postTo(WorkQueue::weak_t target, + const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) + { + // We're being asked to post to the WorkQueue at target. + // target is a weak_ptr: have to lock it to check it. + auto tptr = target.lock(); + if (! tptr) + // can't post() if the target WorkQueue has been destroyed + return false; + + // Here we believe target WorkQueue still exists. Post to it a + // lambda that packages our callable, our callback and a weak_ptr + // to this originating WorkQueue. + tptr->post( + time, + [reply = super::getWeak(), + callable = std::move(callable), + callback = std::move(callback)] + () + { + // Make a reply lambda to repost to THIS WorkQueue. + // Delegate to makeReplyLambda() so we can partially + // specialize on void return. + auto rlambda = makeReplyLambda(std::move(callable), std::move(callback)); + // Check if this originating WorkQueue still exists. + // Remember, the outer lambda is now running on a thread + // servicing the target WorkQueue, and real time has + // elapsed since postTo()'s tptr->post() call. + // reply is a weak_ptr: have to lock it to check it. + auto rptr = reply.lock(); + if (rptr) + { + // Only post reply lambda if the originating WorkQueue + // still exists. If not -- who would we tell? Log it? + try + { + rptr->post(std::move(rlambda)); + } + catch (const Closed&) + { + // Originating WorkQueue might still exist, but + // might be Closed. Same thing: just discard the + // callback. + } + } + }); + // looks like we were able to post() + return true; + } + } // namespace LL #endif /* ! defined(LL_WORKQUEUE_H) */ -- cgit v1.2.3 From 023d39963e850356e1af6eec7f857e2534ce8d38 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Mon, 25 Oct 2021 17:31:27 -0400 Subject: SL-16220: WorkQueue::runOn() methods submit work, wait for result. The idea is that you can call runOn(target, callable) from a (non-default) coroutine and block that coroutine until the result becomes available. As a safety check, we forbid calling runOn() from a thread's default coroutine, assuming that a given thread's default coroutine is the one servicing the relevant WorkQueue. --- indra/llcommon/workqueue.h | 150 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 139 insertions(+), 11 deletions(-) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index deef3c8e84..b17c666172 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -12,11 +12,18 @@ #if ! defined(LL_WORKQUEUE_H) #define LL_WORKQUEUE_H +#include "llcoros.h" #include "llinstancetracker.h" #include "threadsafeschedule.h" #include #include // std::function -#include +#if __cplusplus >= 201703 +#include +namespace stdopt = std; +#else +#include +namespace stdopt = boost; +#endif #include #include // std::pair #include @@ -44,6 +51,8 @@ namespace LL using TimePoint = Queue::TimePoint; using TimedWork = Queue::TimeTuple; using Closed = Queue::Closed; + template + using optional = stdopt::optional; /** * You may omit the WorkQueue name, in which case a unique name is @@ -114,7 +123,7 @@ namespace LL // Studio compile errors that seem utterly unrelated to this source // code. template - bool postTo(WorkQueue::weak_t target, + bool postTo(weak_t target, const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback); /** @@ -125,13 +134,62 @@ namespace LL * inaccessible. */ template - bool postTo(WorkQueue::weak_t target, - CALLABLE&& callable, FOLLOWUP&& callback) + bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback) { return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); } + /** + * Post work to another WorkQueue to be run at a specified time, + * blocking the calling coroutine until then, returning the result to + * caller on completion. + * + * REQUIRED: + * + * * The calling thread is the thread servicing 'this' WorkQueue. + * * The calling coroutine is not the @em coroutine servicing this + * WorkQueue. We block the calling coroutine until the result is + * available. If this same coroutine is responsible for checking the + * local WorkQueue, the result will never be dequeued. In practice, + * to try to prevent mistakes, we forbid calling runOn() from a + * thread's default coroutine. + * + * Returns result if able to post, empty optional if the other + * WorkQueue is inaccessible. + * + * If the passed callable has void return, runOn() returns bool true + * if able to post, false if the other WorkQueue is inaccessible. + */ + template + auto runOn(weak_t target, const TimePoint& time, CALLABLE&& callable); + + /** + * Post work to another WorkQueue, blocking the calling coroutine + * until then, returning the result to caller on completion. + * + * REQUIRED: + * + * * The calling thread is the thread servicing 'this' WorkQueue. + * * The calling coroutine is not the @em coroutine servicing this + * WorkQueue. We block the calling coroutine until the result is + * available. If this same coroutine is responsible for checking the + * local WorkQueue, the result will never be dequeued. In practice, + * to try to prevent mistakes, we forbid calling runOn() from a + * thread's default coroutine. + * + * Returns result if able to post, empty optional if the other + * WorkQueue is inaccessible. + * + * If the passed callable has void return, runOn() returns bool true + * if able to post, false if the other WorkQueue is inaccessible. + */ + template + auto runOn(weak_t target, CALLABLE&& callable) + { + return runOn(target, TimePoint::clock::now(), std::move(callable)); + } + /*--------------------------- worker API ---------------------------*/ /** @@ -179,15 +237,21 @@ namespace LL private: template static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback); - /// general case: arbitrary C++ return type template struct MakeReplyLambda; - /// specialize for CALLABLE returning void template struct MakeReplyLambda; + /// general case: arbitrary C++ return type + template + struct RunOn; + /// specialize for CALLABLE returning void + template + struct RunOn; + + static void checkCoroutine(const std::string& method); static void error(const std::string& msg); static std::string makeName(const std::string& name); void callWork(const Queue::DataTuple& work); @@ -209,8 +273,8 @@ namespace LL { public: // bind the desired data - BackJack(WorkQueue::weak_t target, - const WorkQueue::TimePoint& start, + BackJack(weak_t target, + const TimePoint& start, const std::chrono::duration& interval, CALLABLE&& callable): mTarget(target), @@ -257,8 +321,8 @@ namespace LL } private: - WorkQueue::weak_t mTarget; - WorkQueue::TimePoint mStart; + weak_t mTarget; + TimePoint mStart; std::chrono::duration mInterval; CALLABLE mCallable; }; @@ -286,6 +350,7 @@ namespace LL getWeak(), TimePoint::clock::now(), interval, std::move(callable))); } + /// general case: arbitrary C++ return type template struct WorkQueue::MakeReplyLambda { @@ -332,7 +397,7 @@ namespace LL } template - bool WorkQueue::postTo(WorkQueue::weak_t target, + bool WorkQueue::postTo(weak_t target, const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) { // We're being asked to post to the WorkQueue at target. @@ -382,6 +447,69 @@ namespace LL return true; } + /// general case: arbitrary C++ return type + template + struct WorkQueue::RunOn + { + optional operator()(WorkQueue* self, weak_t target, + const TimePoint& time, CALLABLE&& callable) + { + LLCoros::Promise promise; + if (! self->postTo( + target, + time, + std::forward(callable), + // We dare to bind a reference to Promise because it's + // specifically intended for cross-thread synchronization. + [&promise] + (RETURNTYPE&& result) + { + promise.set_value(std::forward(result)); + })) + { + // we couldn't even postTo(): return empty optional + return {}; + } + // we were able to post + auto future{ LLCoros::getFuture(promise) }; + return { future.get(); } + } + }; + + /// specialize for CALLABLE returning void + template + struct WorkQueue::RunOn + { + bool operator()(WorkQueue* self, weak_t target, + const TimePoint& time, CALLABLE&& callable) + { + LLCoros::Promise promise; + if (! self->postTo( + target, + time, + std::forward(callable), + // &promise is designed for cross-thread access + [&promise](){ promise.set_value(); })) + { + // we couldn't postTo() + return false; + } + // we were able to post + auto future{ LLCoros::getFuture(promise) }; + // block until set_value() + future.get(); + return true; + } + }; + + template + auto WorkQueue::runOn(weak_t target, const TimePoint& time, CALLABLE&& callable) + { + checkCoroutine("runOn()"); + return RunOn(callable)())>() + (this, target, time, std::forward(callable)); + } + } // namespace LL #endif /* ! defined(LL_WORKQUEUE_H) */ -- cgit v1.2.3 From e6eebea8da545350f6684c191c633dd2fbc6f6f1 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Tue, 26 Oct 2021 11:49:53 -0400 Subject: SL-16220: Change WorkQueue::runOn() to waitForResult(). In addition to the name making the blocking explicit, we changed the signature: instead of specifying a target WorkQueue on which to run, waitForResult() runs the passed callable on its own WorkQueue. Why is that? Because, unlike postTo(), we do not require a handshake between two different WorkQueues. postTo() allows running arbitrary callback code, setting variables or whatever, on the originating WorkQueue (presumably on the originating thread). waitForResult() synchronizes using Promise/Future, which are explicitly designed for cross-thread communication. We need not call set_value() on the originating thread, so we don't need a postTo() callback lambda. --- indra/llcommon/workqueue.h | 145 +++++++++++++++++++-------------------------- 1 file changed, 61 insertions(+), 84 deletions(-) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index b17c666172..869f5d9a82 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -13,20 +13,13 @@ #define LL_WORKQUEUE_H #include "llcoros.h" +#include "llexception.h" #include "llinstancetracker.h" #include "threadsafeschedule.h" #include +#include // std::current_exception #include // std::function -#if __cplusplus >= 201703 -#include -namespace stdopt = std; -#else -#include -namespace stdopt = boost; -#endif #include -#include // std::pair -#include namespace LL { @@ -51,8 +44,11 @@ namespace LL using TimePoint = Queue::TimePoint; using TimedWork = Queue::TimeTuple; using Closed = Queue::Closed; - template - using optional = stdopt::optional; + + struct Error: public LLException + { + Error(const std::string& what): LLException(what) {} + }; /** * You may omit the WorkQueue name, in which case a unique name is @@ -145,49 +141,25 @@ namespace LL * blocking the calling coroutine until then, returning the result to * caller on completion. * - * REQUIRED: - * - * * The calling thread is the thread servicing 'this' WorkQueue. - * * The calling coroutine is not the @em coroutine servicing this - * WorkQueue. We block the calling coroutine until the result is - * available. If this same coroutine is responsible for checking the - * local WorkQueue, the result will never be dequeued. In practice, - * to try to prevent mistakes, we forbid calling runOn() from a - * thread's default coroutine. - * - * Returns result if able to post, empty optional if the other - * WorkQueue is inaccessible. - * - * If the passed callable has void return, runOn() returns bool true - * if able to post, false if the other WorkQueue is inaccessible. + * In general, we assume that each thread's default coroutine is busy + * servicing its WorkQueue or whatever. To try to prevent mistakes, we + * forbid calling waitForResult() from a thread's default coroutine. */ template - auto runOn(weak_t target, const TimePoint& time, CALLABLE&& callable); + auto waitForResult(const TimePoint& time, CALLABLE&& callable); /** * Post work to another WorkQueue, blocking the calling coroutine * until then, returning the result to caller on completion. * - * REQUIRED: - * - * * The calling thread is the thread servicing 'this' WorkQueue. - * * The calling coroutine is not the @em coroutine servicing this - * WorkQueue. We block the calling coroutine until the result is - * available. If this same coroutine is responsible for checking the - * local WorkQueue, the result will never be dequeued. In practice, - * to try to prevent mistakes, we forbid calling runOn() from a - * thread's default coroutine. - * - * Returns result if able to post, empty optional if the other - * WorkQueue is inaccessible. - * - * If the passed callable has void return, runOn() returns bool true - * if able to post, false if the other WorkQueue is inaccessible. + * In general, we assume that each thread's default coroutine is busy + * servicing its WorkQueue or whatever. To try to prevent mistakes, we + * forbid calling waitForResult() from a thread's default coroutine. */ template - auto runOn(weak_t target, CALLABLE&& callable) + auto waitForResult(CALLABLE&& callable) { - return runOn(target, TimePoint::clock::now(), std::move(callable)); + return waitForResult(TimePoint::clock::now(), std::move(callable)); } /*--------------------------- worker API ---------------------------*/ @@ -246,10 +218,10 @@ namespace LL /// general case: arbitrary C++ return type template - struct RunOn; + struct WaitForResult; /// specialize for CALLABLE returning void template - struct RunOn; + struct WaitForResult; static void checkCoroutine(const std::string& method); static void error(const std::string& msg); @@ -449,65 +421,70 @@ namespace LL /// general case: arbitrary C++ return type template - struct WorkQueue::RunOn + struct WorkQueue::WaitForResult { - optional operator()(WorkQueue* self, weak_t target, - const TimePoint& time, CALLABLE&& callable) + auto operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) { LLCoros::Promise promise; - if (! self->postTo( - target, - time, - std::forward(callable), - // We dare to bind a reference to Promise because it's - // specifically intended for cross-thread synchronization. - [&promise] - (RETURNTYPE&& result) + self->post( + time, + // We dare to bind a reference to Promise because it's + // specifically designed for cross-thread communication. + [&promise, callable = std::move(callable)]() + { + try { - promise.set_value(std::forward(result)); - })) - { - // we couldn't even postTo(): return empty optional - return {}; - } - // we were able to post + // call the caller's callable and trigger promise with result + promise.set_value(callable()); + } + catch (...) + { + promise.set_exception(std::current_exception()); + } + }); auto future{ LLCoros::getFuture(promise) }; - return { future.get(); } + // now, on the calling thread, wait for that result + LLCoros::TempStatus st("waiting for WorkQueue::waitForResult()"); + return future.get(); } }; /// specialize for CALLABLE returning void template - struct WorkQueue::RunOn + struct WorkQueue::WaitForResult { - bool operator()(WorkQueue* self, weak_t target, - const TimePoint& time, CALLABLE&& callable) + void operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) { LLCoros::Promise promise; - if (! self->postTo( - target, - time, - std::forward(callable), - // &promise is designed for cross-thread access - [&promise](){ promise.set_value(); })) - { - // we couldn't postTo() - return false; - } - // we were able to post + self->post( + time, + // &promise is designed for cross-thread access + [&promise, callable = std::move(callable)]() + { + try + { + callable(); + promise.set_value(); + } + catch (...) + { + promise.set_exception(std::current_exception()); + } + }); auto future{ LLCoros::getFuture(promise) }; // block until set_value() + LLCoros::TempStatus st("waiting for void WorkQueue::waitForResult()"); future.get(); - return true; } }; template - auto WorkQueue::runOn(weak_t target, const TimePoint& time, CALLABLE&& callable) + auto WorkQueue::waitForResult(const TimePoint& time, CALLABLE&& callable) { - checkCoroutine("runOn()"); - return RunOn(callable)())>() - (this, target, time, std::forward(callable)); + checkCoroutine("waitForResult()"); + // derive callable's return type so we can specialize for void + return WaitForResult(callable)())>() + (this, time, std::forward(callable)); } } // namespace LL -- cgit v1.2.3 From f06765cba868679492934452354d16f9f3af9ade Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Tue, 26 Oct 2021 12:29:49 -0400 Subject: SL-16220: Make WorkQueue::postTo() return exception to caller. postTo() sets up two-way communication: the caller asks to run work on some other WorkQueue, expecting an eventual callback on the originating WorkQueue. That permits us to transport any exception thrown by the work callable back to rethrow on the originating WorkQueue. --- indra/llcommon/workqueue.h | 93 +++++++++++++++++++++++++++++++--------------- 1 file changed, 64 insertions(+), 29 deletions(-) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index 869f5d9a82..42f5d78ba3 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -136,6 +136,25 @@ namespace LL std::move(callable), std::move(callback)); } + /** + * Post work to be run at a specified time to another WorkQueue, which + * may or may not still exist and be open. Return true if we were able + * to post. + */ + template + static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable); + + /** + * Post work to another WorkQueue, which may or may not still exist + * and be open. Return true if we were able to post. + */ + template + static bool postMaybe(weak_t target, CALLABLE&& callable) + { + return postMaybe(target, TimePoint::clock::now(), + std::forward(callable)); + } + /** * Post work to another WorkQueue to be run at a specified time, * blocking the calling coroutine until then, returning the result to @@ -351,12 +370,8 @@ namespace LL { // Call the callable, which produces no result. std::forward(callable)(); - // This reply lambda binds the original callback, so - // that when we, the originating WorkQueue, finally - // receive and process the reply lambda, we'll call - // the bound callback -- on the same thread that - // originally called postTo(). - return [callback = std::move(callback)](){ callback(); }; + // Our completion callback is simply the caller's callback. + return std::move(callback); } }; @@ -389,36 +404,56 @@ namespace LL callback = std::move(callback)] () { - // Make a reply lambda to repost to THIS WorkQueue. - // Delegate to makeReplyLambda() so we can partially - // specialize on void return. - auto rlambda = makeReplyLambda(std::move(callable), std::move(callback)); - // Check if this originating WorkQueue still exists. - // Remember, the outer lambda is now running on a thread - // servicing the target WorkQueue, and real time has - // elapsed since postTo()'s tptr->post() call. - // reply is a weak_ptr: have to lock it to check it. - auto rptr = reply.lock(); - if (rptr) + // Use postMaybe() below in case this originating WorkQueue + // has been closed or destroyed. Remember, the outer lambda is + // now running on a thread servicing the target WorkQueue, and + // real time has elapsed since postTo()'s tptr->post() call. + try { - // Only post reply lambda if the originating WorkQueue - // still exists. If not -- who would we tell? Log it? - try - { - rptr->post(std::move(rlambda)); - } - catch (const Closed&) - { - // Originating WorkQueue might still exist, but - // might be Closed. Same thing: just discard the - // callback. - } + // Make a reply lambda to repost to THIS WorkQueue. + // Delegate to makeReplyLambda() so we can partially + // specialize on void return. + postMaybe(reply, makeReplyLambda(std::move(callable), std::move(callback))); + } + catch (...) + { + // Either variant of makeReplyLambda() is responsible for + // calling the caller's callable. If that throws, return + // the exception to the originating thread. + postMaybe( + reply, + // Bind the current exception to transport back to the + // originating WorkQueue. Once there, rethrow it. + [exc = std::current_exception()](){ std::rethrow_exception(exc); }); } }); + // looks like we were able to post() return true; } + template + bool WorkQueue::postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable) + { + // target is a weak_ptr: have to lock it to check it + auto tptr = target.lock(); + if (tptr) + { + try + { + tptr->post(time, std::forward(callable)); + // we were able to post() + return true; + } + catch (const Closed&) + { + // target WorkQueue still exists, but is Closed + } + } + // either target no longer exists, or its WorkQueue is Closed + return false; + } + /// general case: arbitrary C++ return type template struct WorkQueue::WaitForResult -- cgit v1.2.3 From 8b16ecb9cfb4917fe38e4e5b0e4f40a23dd4ffbf Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Wed, 27 Oct 2021 15:31:54 -0400 Subject: SL-16220: Add tests for WorkQueue::waitForResult(), void & non-void. --- indra/llcommon/workqueue.h | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index 42f5d78ba3..7dbc735c6d 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -92,6 +92,25 @@ namespace LL post(TimePoint::clock::now(), std::move(callable)); } + /** + * Post work to be run at a specified time to another WorkQueue, which + * may or may not still exist and be open. Return true if we were able + * to post. + */ + template + static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable); + + /** + * Post work to another WorkQueue, which may or may not still exist + * and be open. Return true if we were able to post. + */ + template + static bool postMaybe(weak_t target, CALLABLE&& callable) + { + return postMaybe(target, TimePoint::clock::now(), + std::forward(callable)); + } + /** * Launch a callable returning bool that will trigger repeatedly at * specified interval, until the callable returns false. @@ -136,25 +155,6 @@ namespace LL std::move(callable), std::move(callback)); } - /** - * Post work to be run at a specified time to another WorkQueue, which - * may or may not still exist and be open. Return true if we were able - * to post. - */ - template - static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable); - - /** - * Post work to another WorkQueue, which may or may not still exist - * and be open. Return true if we were able to post. - */ - template - static bool postMaybe(weak_t target, CALLABLE&& callable) - { - return postMaybe(target, TimePoint::clock::now(), - std::forward(callable)); - } - /** * Post work to another WorkQueue to be run at a specified time, * blocking the calling coroutine until then, returning the result to -- cgit v1.2.3 From 3faba7515c757ca3183522bd017c0f76d9c4581c Mon Sep 17 00:00:00 2001 From: Mnikolenko Productengine Date: Mon, 1 Nov 2021 19:38:55 +0200 Subject: SL-16237 FIXED Viewer hangs on login --- indra/llcommon/workqueue.h | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index b88aef989a..5ec790da79 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -94,6 +94,12 @@ namespace LL void postEvery(const std::chrono::duration& interval, CALLABLE&& callable); + template + bool tryPost(CALLABLE&& callable) + { + return mQueue.tryPush(TimedWork(TimePoint::clock::now(), std::move(callable))); + } + /*------------------------- handshake API --------------------------*/ /** -- cgit v1.2.3 From 89f2169e9d2c03ed92810689563ca110886abf16 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 4 Nov 2021 16:43:11 -0400 Subject: SL-16202: Add postIfOpen() methods to WorkQueue, LLThreadSafeQueue. postIfOpen() provides a no-exception alternative to post(), which blocks if full but throws if closed. postIfOpen() likewise blocks if full, but returns true if able to post and false if the queue was closed. --- indra/llcommon/workqueue.h | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index 76d31f32a6..d0e3f870fe 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -75,9 +75,10 @@ namespace LL template void post(const TimePoint& time, CALLABLE&& callable) { - // Defer reifying an arbitrary CALLABLE until we hit this method. - // All other methods should accept CALLABLEs of arbitrary type to - // avoid multiple levels of std::function indirection. + // Defer reifying an arbitrary CALLABLE until we hit this or + // postIfOpen(). All other methods should accept CALLABLEs of + // arbitrary type to avoid multiple levels of std::function + // indirection. mQueue.push(TimedWork(time, std::move(callable))); } @@ -92,6 +93,28 @@ namespace LL post(TimePoint::clock::now(), std::move(callable)); } + /** + * post work for a particular time, unless the queue is closed before + * we can post + */ + template + bool postIfOpen(const TimePoint& time, CALLABLE&& callable) + { + // Defer reifying an arbitrary CALLABLE until we hit this or + // post(). All other methods should accept CALLABLEs of arbitrary + // type to avoid multiple levels of std::function indirection. + return mQueue.pushIfOpen(TimedWork(time, std::move(callable))); + } + + /** + * post work, unless the queue is closed before we can post + */ + template + bool postIfOpen(CALLABLE&& callable) + { + return postIfOpen(TimePoint::clock::now(), std::move(callable)); + } + /** * Post work to be run at a specified time to another WorkQueue, which * may or may not still exist and be open. Return true if we were able -- cgit v1.2.3 From 834e7ca088b5f417235327cd290b42459c733594 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 4 Nov 2021 17:18:57 -0400 Subject: SL-16202: Use large WorkQueue size limits for mainloop and General. Give ThreadPool and WorkQueue the ability to override default ThreadSafeSchedule capacity. Instantiate "mainloop" WorkQueue and "General" ThreadPool with very large capacity because we never want to have to block trying to push to either. --- indra/llcommon/workqueue.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index d0e3f870fe..5987883829 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -54,7 +54,7 @@ namespace LL * You may omit the WorkQueue name, in which case a unique name is * synthesized; for practical purposes that makes it anonymous. */ - WorkQueue(const std::string& name = std::string()); + WorkQueue(const std::string& name = std::string(), size_t capacity=1024); /** * Since the point of WorkQueue is to pass work to some other worker -- cgit v1.2.3 From df8e17d8e851c34a83de6c508aba07f6bde12a10 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Wed, 10 Nov 2021 10:13:38 -0500 Subject: SL-16094: Add WorkQueue::size() method to support changeset 08336bb. We want to skip calling PostMessage() to bump the window thread out of GetMessage() in any frame with no work functions pending for that thread. That test depends on being able to sense the size() of the queue. Having converted to WorkQueue, we need that queue to support size(). --- indra/llcommon/workqueue.h | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index 5987883829..c25d787425 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -64,6 +64,21 @@ namespace LL */ void close(); + /** + * WorkQueue supports multiple producers and multiple consumers. In + * the general case it's misleading to test size(), since any other + * thread might change it the nanosecond the lock is released. On that + * basis, some might argue against publishing a size() method at all. + * + * But there are two specific cases in which a test based on size() + * might be reasonable: + * + * * If you're the only producer, noticing that size() == 0 is + * meaningful. + * * If you're the only consumer, noticing that size() > 0 is + * meaningful. + */ + size_t size(); /// producer end: are we prevented from pushing any additional items? bool isClosed(); /// consumer end: are we done, is the queue entirely drained? -- cgit v1.2.3 From 029b41c0419e975bbb28454538b46dc69ce5d2ba Mon Sep 17 00:00:00 2001 From: Dave Houlton Date: Mon, 15 Nov 2021 09:25:35 -0700 Subject: Revert "SL-16220: Merge branch 'origin/DRTVWR-546' into glthread" This reverts commit 5188a26a8521251dda07ac0140bb129f28417e49, reversing changes made to 819088563e13f1d75e048311fbaf0df4a79b7e19. --- indra/llcommon/workqueue.h | 378 +++++++++------------------------------------ 1 file changed, 71 insertions(+), 307 deletions(-) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index c25d787425..5ec790da79 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -12,14 +12,14 @@ #if ! defined(LL_WORKQUEUE_H) #define LL_WORKQUEUE_H -#include "llcoros.h" -#include "llexception.h" #include "llinstancetracker.h" #include "threadsafeschedule.h" #include -#include // std::current_exception #include // std::function +#include #include +#include // std::pair +#include namespace LL { @@ -45,16 +45,11 @@ namespace LL using TimedWork = Queue::TimeTuple; using Closed = Queue::Closed; - struct Error: public LLException - { - Error(const std::string& what): LLException(what) {} - }; - /** * You may omit the WorkQueue name, in which case a unique name is * synthesized; for practical purposes that makes it anonymous. */ - WorkQueue(const std::string& name = std::string(), size_t capacity=1024); + WorkQueue(const std::string& name = std::string()); /** * Since the point of WorkQueue is to pass work to some other worker @@ -64,36 +59,15 @@ namespace LL */ void close(); - /** - * WorkQueue supports multiple producers and multiple consumers. In - * the general case it's misleading to test size(), since any other - * thread might change it the nanosecond the lock is released. On that - * basis, some might argue against publishing a size() method at all. - * - * But there are two specific cases in which a test based on size() - * might be reasonable: - * - * * If you're the only producer, noticing that size() == 0 is - * meaningful. - * * If you're the only consumer, noticing that size() > 0 is - * meaningful. - */ - size_t size(); - /// producer end: are we prevented from pushing any additional items? - bool isClosed(); - /// consumer end: are we done, is the queue entirely drained? - bool done(); - /*---------------------- fire and forget API -----------------------*/ /// fire-and-forget, but at a particular (future?) time template void post(const TimePoint& time, CALLABLE&& callable) { - // Defer reifying an arbitrary CALLABLE until we hit this or - // postIfOpen(). All other methods should accept CALLABLEs of - // arbitrary type to avoid multiple levels of std::function - // indirection. + // Defer reifying an arbitrary CALLABLE until we hit this method. + // All other methods should accept CALLABLEs of arbitrary type to + // avoid multiple levels of std::function indirection. mQueue.push(TimedWork(time, std::move(callable))); } @@ -108,47 +82,6 @@ namespace LL post(TimePoint::clock::now(), std::move(callable)); } - /** - * post work for a particular time, unless the queue is closed before - * we can post - */ - template - bool postIfOpen(const TimePoint& time, CALLABLE&& callable) - { - // Defer reifying an arbitrary CALLABLE until we hit this or - // post(). All other methods should accept CALLABLEs of arbitrary - // type to avoid multiple levels of std::function indirection. - return mQueue.pushIfOpen(TimedWork(time, std::move(callable))); - } - - /** - * post work, unless the queue is closed before we can post - */ - template - bool postIfOpen(CALLABLE&& callable) - { - return postIfOpen(TimePoint::clock::now(), std::move(callable)); - } - - /** - * Post work to be run at a specified time to another WorkQueue, which - * may or may not still exist and be open. Return true if we were able - * to post. - */ - template - static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable); - - /** - * Post work to another WorkQueue, which may or may not still exist - * and be open. Return true if we were able to post. - */ - template - static bool postMaybe(weak_t target, CALLABLE&& callable) - { - return postMaybe(target, TimePoint::clock::now(), - std::forward(callable)); - } - /** * Launch a callable returning bool that will trigger repeatedly at * specified interval, until the callable returns false. @@ -182,8 +115,63 @@ namespace LL // Studio compile errors that seem utterly unrelated to this source // code. template - bool postTo(weak_t target, - const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback); + bool postTo(WorkQueue::weak_t target, + const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) + { + // We're being asked to post to the WorkQueue at target. + // target is a weak_ptr: have to lock it to check it. + auto tptr = target.lock(); + if (! tptr) + // can't post() if the target WorkQueue has been destroyed + return false; + + // Here we believe target WorkQueue still exists. Post to it a + // lambda that packages our callable, our callback and a weak_ptr + // to this originating WorkQueue. + tptr->post( + time, + [reply = super::getWeak(), + callable = std::move(callable), + callback = std::move(callback)] + () + { + // Call the callable in any case -- but to minimize + // copying the result, immediately bind it into a reply + // lambda. The reply lambda also binds the original + // callback, so that when we, the originating WorkQueue, + // finally receive and process the reply lambda, we'll + // call the bound callback with the bound result -- on the + // same thread that originally called postTo(). + auto rlambda = + [result = callable(), + callback = std::move(callback)] + () + { callback(std::move(result)); }; + // Check if this originating WorkQueue still exists. + // Remember, the outer lambda is now running on a thread + // servicing the target WorkQueue, and real time has + // elapsed since postTo()'s tptr->post() call. + // reply is a weak_ptr: have to lock it to check it. + auto rptr = reply.lock(); + if (rptr) + { + // Only post reply lambda if the originating WorkQueue + // still exists. If not -- who would we tell? Log it? + try + { + rptr->post(std::move(rlambda)); + } + catch (const Closed&) + { + // Originating WorkQueue might still exist, but + // might be Closed. Same thing: just discard the + // callback. + } + } + }); + // looks like we were able to post() + return true; + } /** * Post work to another WorkQueue, requesting a specific callback to @@ -193,36 +181,10 @@ namespace LL * inaccessible. */ template - bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback) + bool postTo(WorkQueue::weak_t target, + CALLABLE&& callable, FOLLOWUP&& callback) { - return postTo(target, TimePoint::clock::now(), - std::move(callable), std::move(callback)); - } - - /** - * Post work to another WorkQueue to be run at a specified time, - * blocking the calling coroutine until then, returning the result to - * caller on completion. - * - * In general, we assume that each thread's default coroutine is busy - * servicing its WorkQueue or whatever. To try to prevent mistakes, we - * forbid calling waitForResult() from a thread's default coroutine. - */ - template - auto waitForResult(const TimePoint& time, CALLABLE&& callable); - - /** - * Post work to another WorkQueue, blocking the calling coroutine - * until then, returning the result to caller on completion. - * - * In general, we assume that each thread's default coroutine is busy - * servicing its WorkQueue or whatever. To try to prevent mistakes, we - * forbid calling waitForResult() from a thread's default coroutine. - */ - template - auto waitForResult(CALLABLE&& callable) - { - return waitForResult(TimePoint::clock::now(), std::move(callable)); + return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); } /*--------------------------- worker API ---------------------------*/ @@ -270,23 +232,6 @@ namespace LL bool runUntil(const TimePoint& until); private: - template - static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback); - /// general case: arbitrary C++ return type - template - struct MakeReplyLambda; - /// specialize for CALLABLE returning void - template - struct MakeReplyLambda; - - /// general case: arbitrary C++ return type - template - struct WaitForResult; - /// specialize for CALLABLE returning void - template - struct WaitForResult; - - static void checkCoroutine(const std::string& method); static void error(const std::string& msg); static std::string makeName(const std::string& name); void callWork(const Queue::DataTuple& work); @@ -308,8 +253,8 @@ namespace LL { public: // bind the desired data - BackJack(weak_t target, - const TimePoint& start, + BackJack(WorkQueue::weak_t target, + const WorkQueue::TimePoint& start, const std::chrono::duration& interval, CALLABLE&& callable): mTarget(target), @@ -356,8 +301,8 @@ namespace LL } private: - weak_t mTarget; - TimePoint mStart; + WorkQueue::weak_t mTarget; + WorkQueue::TimePoint mStart; std::chrono::duration mInterval; CALLABLE mCallable; }; @@ -385,187 +330,6 @@ namespace LL getWeak(), TimePoint::clock::now(), interval, std::move(callable))); } - /// general case: arbitrary C++ return type - template - struct WorkQueue::MakeReplyLambda - { - auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) - { - // Call the callable in any case -- but to minimize - // copying the result, immediately bind it into the reply - // lambda. The reply lambda also binds the original - // callback, so that when we, the originating WorkQueue, - // finally receive and process the reply lambda, we'll - // call the bound callback with the bound result -- on the - // same thread that originally called postTo(). - return - [result = std::forward(callable)(), - callback = std::move(callback)] - () - { callback(std::move(result)); }; - } - }; - - /// specialize for CALLABLE returning void - template - struct WorkQueue::MakeReplyLambda - { - auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) - { - // Call the callable, which produces no result. - std::forward(callable)(); - // Our completion callback is simply the caller's callback. - return std::move(callback); - } - }; - - template - auto WorkQueue::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback) - { - return MakeReplyLambda(callable)())>() - (std::move(callable), std::move(callback)); - } - - template - bool WorkQueue::postTo(weak_t target, - const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) - { - // We're being asked to post to the WorkQueue at target. - // target is a weak_ptr: have to lock it to check it. - auto tptr = target.lock(); - if (! tptr) - // can't post() if the target WorkQueue has been destroyed - return false; - - // Here we believe target WorkQueue still exists. Post to it a - // lambda that packages our callable, our callback and a weak_ptr - // to this originating WorkQueue. - tptr->post( - time, - [reply = super::getWeak(), - callable = std::move(callable), - callback = std::move(callback)] - () - { - // Use postMaybe() below in case this originating WorkQueue - // has been closed or destroyed. Remember, the outer lambda is - // now running on a thread servicing the target WorkQueue, and - // real time has elapsed since postTo()'s tptr->post() call. - try - { - // Make a reply lambda to repost to THIS WorkQueue. - // Delegate to makeReplyLambda() so we can partially - // specialize on void return. - postMaybe(reply, makeReplyLambda(std::move(callable), std::move(callback))); - } - catch (...) - { - // Either variant of makeReplyLambda() is responsible for - // calling the caller's callable. If that throws, return - // the exception to the originating thread. - postMaybe( - reply, - // Bind the current exception to transport back to the - // originating WorkQueue. Once there, rethrow it. - [exc = std::current_exception()](){ std::rethrow_exception(exc); }); - } - }); - - // looks like we were able to post() - return true; - } - - template - bool WorkQueue::postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable) - { - // target is a weak_ptr: have to lock it to check it - auto tptr = target.lock(); - if (tptr) - { - try - { - tptr->post(time, std::forward(callable)); - // we were able to post() - return true; - } - catch (const Closed&) - { - // target WorkQueue still exists, but is Closed - } - } - // either target no longer exists, or its WorkQueue is Closed - return false; - } - - /// general case: arbitrary C++ return type - template - struct WorkQueue::WaitForResult - { - auto operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) - { - LLCoros::Promise promise; - self->post( - time, - // We dare to bind a reference to Promise because it's - // specifically designed for cross-thread communication. - [&promise, callable = std::move(callable)]() - { - try - { - // call the caller's callable and trigger promise with result - promise.set_value(callable()); - } - catch (...) - { - promise.set_exception(std::current_exception()); - } - }); - auto future{ LLCoros::getFuture(promise) }; - // now, on the calling thread, wait for that result - LLCoros::TempStatus st("waiting for WorkQueue::waitForResult()"); - return future.get(); - } - }; - - /// specialize for CALLABLE returning void - template - struct WorkQueue::WaitForResult - { - void operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) - { - LLCoros::Promise promise; - self->post( - time, - // &promise is designed for cross-thread access - [&promise, callable = std::move(callable)]() - { - try - { - callable(); - promise.set_value(); - } - catch (...) - { - promise.set_exception(std::current_exception()); - } - }); - auto future{ LLCoros::getFuture(promise) }; - // block until set_value() - LLCoros::TempStatus st("waiting for void WorkQueue::waitForResult()"); - future.get(); - } - }; - - template - auto WorkQueue::waitForResult(const TimePoint& time, CALLABLE&& callable) - { - checkCoroutine("waitForResult()"); - // derive callable's return type so we can specialize for void - return WaitForResult(callable)())>() - (this, time, std::forward(callable)); - } - } // namespace LL #endif /* ! defined(LL_WORKQUEUE_H) */ -- cgit v1.2.3 From cc34e26ef7e74845e4af9e5c5d450c0b12a268e0 Mon Sep 17 00:00:00 2001 From: Runitai Linden Date: Mon, 22 Nov 2021 11:51:03 -0600 Subject: SL-16094 Add WorkQueue profile hooks --- indra/llcommon/workqueue.h | 3 +++ 1 file changed, 3 insertions(+) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index c25d787425..96574a18b9 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -260,6 +260,7 @@ namespace LL template bool runFor(const std::chrono::duration& timeslice) { + LL_PROFILE_ZONE_SCOPED; return runUntil(TimePoint::clock::now() + timeslice); } @@ -431,6 +432,7 @@ namespace LL bool WorkQueue::postTo(weak_t target, const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) { + LL_PROFILE_ZONE_SCOPED; // We're being asked to post to the WorkQueue at target. // target is a weak_ptr: have to lock it to check it. auto tptr = target.lock(); @@ -479,6 +481,7 @@ namespace LL template bool WorkQueue::postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable) { + LL_PROFILE_ZONE_SCOPED; // target is a weak_ptr: have to lock it to check it auto tptr = target.lock(); if (tptr) -- cgit v1.2.3 From 0b066539fe68dc5750900c3452189645c40adb45 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Wed, 24 Nov 2021 10:47:54 -0500 Subject: DRTVWR-546, SL-16220, SL-16094: Undo previous glthread branch revert. Reverting a merge is sticky: it tells git you never want to see that branch again. Merging the DRTVWR-546 branch, which contained the revert, into the glthread branch undid much of the development work on that branch. To restore it we must revert the revert. This reverts commit 029b41c0419e975bbb28454538b46dc69ce5d2ba. --- indra/llcommon/workqueue.h | 197 +++++++++++++++++++++++++++++---------------- 1 file changed, 126 insertions(+), 71 deletions(-) (limited to 'indra/llcommon/workqueue.h') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index 8e4b38c2f3..96574a18b9 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -12,14 +12,14 @@ #if ! defined(LL_WORKQUEUE_H) #define LL_WORKQUEUE_H +#include "llcoros.h" +#include "llexception.h" #include "llinstancetracker.h" #include "threadsafeschedule.h" #include +#include // std::current_exception #include // std::function -#include #include -#include // std::pair -#include namespace LL { @@ -45,11 +45,16 @@ namespace LL using TimedWork = Queue::TimeTuple; using Closed = Queue::Closed; + struct Error: public LLException + { + Error(const std::string& what): LLException(what) {} + }; + /** * You may omit the WorkQueue name, in which case a unique name is * synthesized; for practical purposes that makes it anonymous. */ - WorkQueue(const std::string& name = std::string()); + WorkQueue(const std::string& name = std::string(), size_t capacity=1024); /** * Since the point of WorkQueue is to pass work to some other worker @@ -59,15 +64,36 @@ namespace LL */ void close(); + /** + * WorkQueue supports multiple producers and multiple consumers. In + * the general case it's misleading to test size(), since any other + * thread might change it the nanosecond the lock is released. On that + * basis, some might argue against publishing a size() method at all. + * + * But there are two specific cases in which a test based on size() + * might be reasonable: + * + * * If you're the only producer, noticing that size() == 0 is + * meaningful. + * * If you're the only consumer, noticing that size() > 0 is + * meaningful. + */ + size_t size(); + /// producer end: are we prevented from pushing any additional items? + bool isClosed(); + /// consumer end: are we done, is the queue entirely drained? + bool done(); + /*---------------------- fire and forget API -----------------------*/ /// fire-and-forget, but at a particular (future?) time template void post(const TimePoint& time, CALLABLE&& callable) { - // Defer reifying an arbitrary CALLABLE until we hit this method. - // All other methods should accept CALLABLEs of arbitrary type to - // avoid multiple levels of std::function indirection. + // Defer reifying an arbitrary CALLABLE until we hit this or + // postIfOpen(). All other methods should accept CALLABLEs of + // arbitrary type to avoid multiple levels of std::function + // indirection. mQueue.push(TimedWork(time, std::move(callable))); } @@ -82,6 +108,47 @@ namespace LL post(TimePoint::clock::now(), std::move(callable)); } + /** + * post work for a particular time, unless the queue is closed before + * we can post + */ + template + bool postIfOpen(const TimePoint& time, CALLABLE&& callable) + { + // Defer reifying an arbitrary CALLABLE until we hit this or + // post(). All other methods should accept CALLABLEs of arbitrary + // type to avoid multiple levels of std::function indirection. + return mQueue.pushIfOpen(TimedWork(time, std::move(callable))); + } + + /** + * post work, unless the queue is closed before we can post + */ + template + bool postIfOpen(CALLABLE&& callable) + { + return postIfOpen(TimePoint::clock::now(), std::move(callable)); + } + + /** + * Post work to be run at a specified time to another WorkQueue, which + * may or may not still exist and be open. Return true if we were able + * to post. + */ + template + static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable); + + /** + * Post work to another WorkQueue, which may or may not still exist + * and be open. Return true if we were able to post. + */ + template + static bool postMaybe(weak_t target, CALLABLE&& callable) + { + return postMaybe(target, TimePoint::clock::now(), + std::forward(callable)); + } + /** * Launch a callable returning bool that will trigger repeatedly at * specified interval, until the callable returns false. @@ -115,63 +182,8 @@ namespace LL // Studio compile errors that seem utterly unrelated to this source // code. template - bool postTo(WorkQueue::weak_t target, - const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) - { - // We're being asked to post to the WorkQueue at target. - // target is a weak_ptr: have to lock it to check it. - auto tptr = target.lock(); - if (! tptr) - // can't post() if the target WorkQueue has been destroyed - return false; - - // Here we believe target WorkQueue still exists. Post to it a - // lambda that packages our callable, our callback and a weak_ptr - // to this originating WorkQueue. - tptr->post( - time, - [reply = super::getWeak(), - callable = std::move(callable), - callback = std::move(callback)] - () - { - // Call the callable in any case -- but to minimize - // copying the result, immediately bind it into a reply - // lambda. The reply lambda also binds the original - // callback, so that when we, the originating WorkQueue, - // finally receive and process the reply lambda, we'll - // call the bound callback with the bound result -- on the - // same thread that originally called postTo(). - auto rlambda = - [result = callable(), - callback = std::move(callback)] - () - { callback(std::move(result)); }; - // Check if this originating WorkQueue still exists. - // Remember, the outer lambda is now running on a thread - // servicing the target WorkQueue, and real time has - // elapsed since postTo()'s tptr->post() call. - // reply is a weak_ptr: have to lock it to check it. - auto rptr = reply.lock(); - if (rptr) - { - // Only post reply lambda if the originating WorkQueue - // still exists. If not -- who would we tell? Log it? - try - { - rptr->post(std::move(rlambda)); - } - catch (const Closed&) - { - // Originating WorkQueue might still exist, but - // might be Closed. Same thing: just discard the - // callback. - } - } - }); - // looks like we were able to post() - return true; - } + bool postTo(weak_t target, + const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback); /** * Post work to another WorkQueue, requesting a specific callback to @@ -181,10 +193,36 @@ namespace LL * inaccessible. */ template - bool postTo(WorkQueue::weak_t target, - CALLABLE&& callable, FOLLOWUP&& callback) + bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback) + { + return postTo(target, TimePoint::clock::now(), + std::move(callable), std::move(callback)); + } + + /** + * Post work to another WorkQueue to be run at a specified time, + * blocking the calling coroutine until then, returning the result to + * caller on completion. + * + * In general, we assume that each thread's default coroutine is busy + * servicing its WorkQueue or whatever. To try to prevent mistakes, we + * forbid calling waitForResult() from a thread's default coroutine. + */ + template + auto waitForResult(const TimePoint& time, CALLABLE&& callable); + + /** + * Post work to another WorkQueue, blocking the calling coroutine + * until then, returning the result to caller on completion. + * + * In general, we assume that each thread's default coroutine is busy + * servicing its WorkQueue or whatever. To try to prevent mistakes, we + * forbid calling waitForResult() from a thread's default coroutine. + */ + template + auto waitForResult(CALLABLE&& callable) { - return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); + return waitForResult(TimePoint::clock::now(), std::move(callable)); } /*--------------------------- worker API ---------------------------*/ @@ -233,6 +271,23 @@ namespace LL bool runUntil(const TimePoint& until); private: + template + static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback); + /// general case: arbitrary C++ return type + template + struct MakeReplyLambda; + /// specialize for CALLABLE returning void + template + struct MakeReplyLambda; + + /// general case: arbitrary C++ return type + template + struct WaitForResult; + /// specialize for CALLABLE returning void + template + struct WaitForResult; + + static void checkCoroutine(const std::string& method); static void error(const std::string& msg); static std::string makeName(const std::string& name); void callWork(const Queue::DataTuple& work); @@ -254,8 +309,8 @@ namespace LL { public: // bind the desired data - BackJack(WorkQueue::weak_t target, - const WorkQueue::TimePoint& start, + BackJack(weak_t target, + const TimePoint& start, const std::chrono::duration& interval, CALLABLE&& callable): mTarget(target), @@ -302,8 +357,8 @@ namespace LL } private: - WorkQueue::weak_t mTarget; - WorkQueue::TimePoint mStart; + weak_t mTarget; + TimePoint mStart; std::chrono::duration mInterval; CALLABLE mCallable; }; -- cgit v1.2.3