diff options
author | Dave Houlton <euclid@lindenlab.com> | 2021-11-15 09:25:35 -0700 |
---|---|---|
committer | Dave Houlton <euclid@lindenlab.com> | 2021-11-15 09:25:35 -0700 |
commit | 029b41c0419e975bbb28454538b46dc69ce5d2ba (patch) | |
tree | 4f9a28bb36ee07fe9a7b45a434384afd1f24bb85 /indra/llcommon/workqueue.h | |
parent | aeed774ff9cc55c0c1dd2784e23b2366ff367fbe (diff) |
Revert "SL-16220: Merge branch 'origin/DRTVWR-546' into glthread"
This reverts commit 5188a26a8521251dda07ac0140bb129f28417e49, reversing
changes made to 819088563e13f1d75e048311fbaf0df4a79b7e19.
Diffstat (limited to 'indra/llcommon/workqueue.h')
-rw-r--r-- | indra/llcommon/workqueue.h | 378 |
1 files changed, 71 insertions, 307 deletions
diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index c25d787425..5ec790da79 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -12,14 +12,14 @@ #if ! defined(LL_WORKQUEUE_H) #define LL_WORKQUEUE_H -#include "llcoros.h" -#include "llexception.h" #include "llinstancetracker.h" #include "threadsafeschedule.h" #include <chrono> -#include <exception> // std::current_exception #include <functional> // std::function +#include <queue> #include <string> +#include <utility> // std::pair +#include <vector> namespace LL { @@ -45,16 +45,11 @@ namespace LL using TimedWork = Queue::TimeTuple; using Closed = Queue::Closed; - struct Error: public LLException - { - Error(const std::string& what): LLException(what) {} - }; - /** * You may omit the WorkQueue name, in which case a unique name is * synthesized; for practical purposes that makes it anonymous. */ - WorkQueue(const std::string& name = std::string(), size_t capacity=1024); + WorkQueue(const std::string& name = std::string()); /** * Since the point of WorkQueue is to pass work to some other worker @@ -64,36 +59,15 @@ namespace LL */ void close(); - /** - * WorkQueue supports multiple producers and multiple consumers. In - * the general case it's misleading to test size(), since any other - * thread might change it the nanosecond the lock is released. On that - * basis, some might argue against publishing a size() method at all. - * - * But there are two specific cases in which a test based on size() - * might be reasonable: - * - * * If you're the only producer, noticing that size() == 0 is - * meaningful. - * * If you're the only consumer, noticing that size() > 0 is - * meaningful. - */ - size_t size(); - /// producer end: are we prevented from pushing any additional items? - bool isClosed(); - /// consumer end: are we done, is the queue entirely drained? - bool done(); - /*---------------------- fire and forget API -----------------------*/ /// fire-and-forget, but at a particular (future?) time template <typename CALLABLE> void post(const TimePoint& time, CALLABLE&& callable) { - // Defer reifying an arbitrary CALLABLE until we hit this or - // postIfOpen(). All other methods should accept CALLABLEs of - // arbitrary type to avoid multiple levels of std::function - // indirection. + // Defer reifying an arbitrary CALLABLE until we hit this method. + // All other methods should accept CALLABLEs of arbitrary type to + // avoid multiple levels of std::function indirection. mQueue.push(TimedWork(time, std::move(callable))); } @@ -109,47 +83,6 @@ namespace LL } /** - * post work for a particular time, unless the queue is closed before - * we can post - */ - template <typename CALLABLE> - bool postIfOpen(const TimePoint& time, CALLABLE&& callable) - { - // Defer reifying an arbitrary CALLABLE until we hit this or - // post(). All other methods should accept CALLABLEs of arbitrary - // type to avoid multiple levels of std::function indirection. - return mQueue.pushIfOpen(TimedWork(time, std::move(callable))); - } - - /** - * post work, unless the queue is closed before we can post - */ - template <typename CALLABLE> - bool postIfOpen(CALLABLE&& callable) - { - return postIfOpen(TimePoint::clock::now(), std::move(callable)); - } - - /** - * Post work to be run at a specified time to another WorkQueue, which - * may or may not still exist and be open. Return true if we were able - * to post. - */ - template <typename CALLABLE> - static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable); - - /** - * Post work to another WorkQueue, which may or may not still exist - * and be open. Return true if we were able to post. - */ - template <typename CALLABLE> - static bool postMaybe(weak_t target, CALLABLE&& callable) - { - return postMaybe(target, TimePoint::clock::now(), - std::forward<CALLABLE>(callable)); - } - - /** * Launch a callable returning bool that will trigger repeatedly at * specified interval, until the callable returns false. * @@ -182,8 +115,63 @@ namespace LL // Studio compile errors that seem utterly unrelated to this source // code. template <typename CALLABLE, typename FOLLOWUP> - bool postTo(weak_t target, - const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback); + bool postTo(WorkQueue::weak_t target, + const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) + { + // We're being asked to post to the WorkQueue at target. + // target is a weak_ptr: have to lock it to check it. + auto tptr = target.lock(); + if (! tptr) + // can't post() if the target WorkQueue has been destroyed + return false; + + // Here we believe target WorkQueue still exists. Post to it a + // lambda that packages our callable, our callback and a weak_ptr + // to this originating WorkQueue. + tptr->post( + time, + [reply = super::getWeak(), + callable = std::move(callable), + callback = std::move(callback)] + () + { + // Call the callable in any case -- but to minimize + // copying the result, immediately bind it into a reply + // lambda. The reply lambda also binds the original + // callback, so that when we, the originating WorkQueue, + // finally receive and process the reply lambda, we'll + // call the bound callback with the bound result -- on the + // same thread that originally called postTo(). + auto rlambda = + [result = callable(), + callback = std::move(callback)] + () + { callback(std::move(result)); }; + // Check if this originating WorkQueue still exists. + // Remember, the outer lambda is now running on a thread + // servicing the target WorkQueue, and real time has + // elapsed since postTo()'s tptr->post() call. + // reply is a weak_ptr: have to lock it to check it. + auto rptr = reply.lock(); + if (rptr) + { + // Only post reply lambda if the originating WorkQueue + // still exists. If not -- who would we tell? Log it? + try + { + rptr->post(std::move(rlambda)); + } + catch (const Closed&) + { + // Originating WorkQueue might still exist, but + // might be Closed. Same thing: just discard the + // callback. + } + } + }); + // looks like we were able to post() + return true; + } /** * Post work to another WorkQueue, requesting a specific callback to @@ -193,36 +181,10 @@ namespace LL * inaccessible. */ template <typename CALLABLE, typename FOLLOWUP> - bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback) + bool postTo(WorkQueue::weak_t target, + CALLABLE&& callable, FOLLOWUP&& callback) { - return postTo(target, TimePoint::clock::now(), - std::move(callable), std::move(callback)); - } - - /** - * Post work to another WorkQueue to be run at a specified time, - * blocking the calling coroutine until then, returning the result to - * caller on completion. - * - * In general, we assume that each thread's default coroutine is busy - * servicing its WorkQueue or whatever. To try to prevent mistakes, we - * forbid calling waitForResult() from a thread's default coroutine. - */ - template <typename CALLABLE> - auto waitForResult(const TimePoint& time, CALLABLE&& callable); - - /** - * Post work to another WorkQueue, blocking the calling coroutine - * until then, returning the result to caller on completion. - * - * In general, we assume that each thread's default coroutine is busy - * servicing its WorkQueue or whatever. To try to prevent mistakes, we - * forbid calling waitForResult() from a thread's default coroutine. - */ - template <typename CALLABLE> - auto waitForResult(CALLABLE&& callable) - { - return waitForResult(TimePoint::clock::now(), std::move(callable)); + return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); } /*--------------------------- worker API ---------------------------*/ @@ -270,23 +232,6 @@ namespace LL bool runUntil(const TimePoint& until); private: - template <typename CALLABLE, typename FOLLOWUP> - static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback); - /// general case: arbitrary C++ return type - template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE> - struct MakeReplyLambda; - /// specialize for CALLABLE returning void - template <typename CALLABLE, typename FOLLOWUP> - struct MakeReplyLambda<CALLABLE, FOLLOWUP, void>; - - /// general case: arbitrary C++ return type - template <typename CALLABLE, typename RETURNTYPE> - struct WaitForResult; - /// specialize for CALLABLE returning void - template <typename CALLABLE> - struct WaitForResult<CALLABLE, void>; - - static void checkCoroutine(const std::string& method); static void error(const std::string& msg); static std::string makeName(const std::string& name); void callWork(const Queue::DataTuple& work); @@ -308,8 +253,8 @@ namespace LL { public: // bind the desired data - BackJack(weak_t target, - const TimePoint& start, + BackJack(WorkQueue::weak_t target, + const WorkQueue::TimePoint& start, const std::chrono::duration<Rep, Period>& interval, CALLABLE&& callable): mTarget(target), @@ -356,8 +301,8 @@ namespace LL } private: - weak_t mTarget; - TimePoint mStart; + WorkQueue::weak_t mTarget; + WorkQueue::TimePoint mStart; std::chrono::duration<Rep, Period> mInterval; CALLABLE mCallable; }; @@ -385,187 +330,6 @@ namespace LL getWeak(), TimePoint::clock::now(), interval, std::move(callable))); } - /// general case: arbitrary C++ return type - template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE> - struct WorkQueue::MakeReplyLambda - { - auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) - { - // Call the callable in any case -- but to minimize - // copying the result, immediately bind it into the reply - // lambda. The reply lambda also binds the original - // callback, so that when we, the originating WorkQueue, - // finally receive and process the reply lambda, we'll - // call the bound callback with the bound result -- on the - // same thread that originally called postTo(). - return - [result = std::forward<CALLABLE>(callable)(), - callback = std::move(callback)] - () - { callback(std::move(result)); }; - } - }; - - /// specialize for CALLABLE returning void - template <typename CALLABLE, typename FOLLOWUP> - struct WorkQueue::MakeReplyLambda<CALLABLE, FOLLOWUP, void> - { - auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) - { - // Call the callable, which produces no result. - std::forward<CALLABLE>(callable)(); - // Our completion callback is simply the caller's callback. - return std::move(callback); - } - }; - - template <typename CALLABLE, typename FOLLOWUP> - auto WorkQueue::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback) - { - return MakeReplyLambda<CALLABLE, FOLLOWUP, - decltype(std::forward<CALLABLE>(callable)())>() - (std::move(callable), std::move(callback)); - } - - template <typename CALLABLE, typename FOLLOWUP> - bool WorkQueue::postTo(weak_t target, - const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) - { - // We're being asked to post to the WorkQueue at target. - // target is a weak_ptr: have to lock it to check it. - auto tptr = target.lock(); - if (! tptr) - // can't post() if the target WorkQueue has been destroyed - return false; - - // Here we believe target WorkQueue still exists. Post to it a - // lambda that packages our callable, our callback and a weak_ptr - // to this originating WorkQueue. - tptr->post( - time, - [reply = super::getWeak(), - callable = std::move(callable), - callback = std::move(callback)] - () - { - // Use postMaybe() below in case this originating WorkQueue - // has been closed or destroyed. Remember, the outer lambda is - // now running on a thread servicing the target WorkQueue, and - // real time has elapsed since postTo()'s tptr->post() call. - try - { - // Make a reply lambda to repost to THIS WorkQueue. - // Delegate to makeReplyLambda() so we can partially - // specialize on void return. - postMaybe(reply, makeReplyLambda(std::move(callable), std::move(callback))); - } - catch (...) - { - // Either variant of makeReplyLambda() is responsible for - // calling the caller's callable. If that throws, return - // the exception to the originating thread. - postMaybe( - reply, - // Bind the current exception to transport back to the - // originating WorkQueue. Once there, rethrow it. - [exc = std::current_exception()](){ std::rethrow_exception(exc); }); - } - }); - - // looks like we were able to post() - return true; - } - - template <typename CALLABLE> - bool WorkQueue::postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable) - { - // target is a weak_ptr: have to lock it to check it - auto tptr = target.lock(); - if (tptr) - { - try - { - tptr->post(time, std::forward<CALLABLE>(callable)); - // we were able to post() - return true; - } - catch (const Closed&) - { - // target WorkQueue still exists, but is Closed - } - } - // either target no longer exists, or its WorkQueue is Closed - return false; - } - - /// general case: arbitrary C++ return type - template <typename CALLABLE, typename RETURNTYPE> - struct WorkQueue::WaitForResult - { - auto operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) - { - LLCoros::Promise<RETURNTYPE> promise; - self->post( - time, - // We dare to bind a reference to Promise because it's - // specifically designed for cross-thread communication. - [&promise, callable = std::move(callable)]() - { - try - { - // call the caller's callable and trigger promise with result - promise.set_value(callable()); - } - catch (...) - { - promise.set_exception(std::current_exception()); - } - }); - auto future{ LLCoros::getFuture(promise) }; - // now, on the calling thread, wait for that result - LLCoros::TempStatus st("waiting for WorkQueue::waitForResult()"); - return future.get(); - } - }; - - /// specialize for CALLABLE returning void - template <typename CALLABLE> - struct WorkQueue::WaitForResult<CALLABLE, void> - { - void operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) - { - LLCoros::Promise<void> promise; - self->post( - time, - // &promise is designed for cross-thread access - [&promise, callable = std::move(callable)]() - { - try - { - callable(); - promise.set_value(); - } - catch (...) - { - promise.set_exception(std::current_exception()); - } - }); - auto future{ LLCoros::getFuture(promise) }; - // block until set_value() - LLCoros::TempStatus st("waiting for void WorkQueue::waitForResult()"); - future.get(); - } - }; - - template <typename CALLABLE> - auto WorkQueue::waitForResult(const TimePoint& time, CALLABLE&& callable) - { - checkCoroutine("waitForResult()"); - // derive callable's return type so we can specialize for void - return WaitForResult<CALLABLE, decltype(std::forward<CALLABLE>(callable)())>() - (this, time, std::forward<CALLABLE>(callable)); - } - } // namespace LL #endif /* ! defined(LL_WORKQUEUE_H) */ |