diff options
Diffstat (limited to 'indra/llcommon/workqueue.h')
-rw-r--r-- | indra/llcommon/workqueue.h | 197 |
1 files changed, 71 insertions, 126 deletions
diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index 96574a18b9..8e4b38c2f3 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -12,14 +12,14 @@ #if ! defined(LL_WORKQUEUE_H) #define LL_WORKQUEUE_H -#include "llcoros.h" -#include "llexception.h" #include "llinstancetracker.h" #include "threadsafeschedule.h" #include <chrono> -#include <exception> // std::current_exception #include <functional> // std::function +#include <queue> #include <string> +#include <utility> // std::pair +#include <vector> namespace LL { @@ -45,16 +45,11 @@ namespace LL using TimedWork = Queue::TimeTuple; using Closed = Queue::Closed; - struct Error: public LLException - { - Error(const std::string& what): LLException(what) {} - }; - /** * You may omit the WorkQueue name, in which case a unique name is * synthesized; for practical purposes that makes it anonymous. */ - WorkQueue(const std::string& name = std::string(), size_t capacity=1024); + WorkQueue(const std::string& name = std::string()); /** * Since the point of WorkQueue is to pass work to some other worker @@ -64,36 +59,15 @@ namespace LL */ void close(); - /** - * WorkQueue supports multiple producers and multiple consumers. In - * the general case it's misleading to test size(), since any other - * thread might change it the nanosecond the lock is released. On that - * basis, some might argue against publishing a size() method at all. - * - * But there are two specific cases in which a test based on size() - * might be reasonable: - * - * * If you're the only producer, noticing that size() == 0 is - * meaningful. - * * If you're the only consumer, noticing that size() > 0 is - * meaningful. - */ - size_t size(); - /// producer end: are we prevented from pushing any additional items? - bool isClosed(); - /// consumer end: are we done, is the queue entirely drained? - bool done(); - /*---------------------- fire and forget API -----------------------*/ /// fire-and-forget, but at a particular (future?) time template <typename CALLABLE> void post(const TimePoint& time, CALLABLE&& callable) { - // Defer reifying an arbitrary CALLABLE until we hit this or - // postIfOpen(). All other methods should accept CALLABLEs of - // arbitrary type to avoid multiple levels of std::function - // indirection. + // Defer reifying an arbitrary CALLABLE until we hit this method. + // All other methods should accept CALLABLEs of arbitrary type to + // avoid multiple levels of std::function indirection. mQueue.push(TimedWork(time, std::move(callable))); } @@ -109,47 +83,6 @@ namespace LL } /** - * post work for a particular time, unless the queue is closed before - * we can post - */ - template <typename CALLABLE> - bool postIfOpen(const TimePoint& time, CALLABLE&& callable) - { - // Defer reifying an arbitrary CALLABLE until we hit this or - // post(). All other methods should accept CALLABLEs of arbitrary - // type to avoid multiple levels of std::function indirection. - return mQueue.pushIfOpen(TimedWork(time, std::move(callable))); - } - - /** - * post work, unless the queue is closed before we can post - */ - template <typename CALLABLE> - bool postIfOpen(CALLABLE&& callable) - { - return postIfOpen(TimePoint::clock::now(), std::move(callable)); - } - - /** - * Post work to be run at a specified time to another WorkQueue, which - * may or may not still exist and be open. Return true if we were able - * to post. - */ - template <typename CALLABLE> - static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable); - - /** - * Post work to another WorkQueue, which may or may not still exist - * and be open. Return true if we were able to post. - */ - template <typename CALLABLE> - static bool postMaybe(weak_t target, CALLABLE&& callable) - { - return postMaybe(target, TimePoint::clock::now(), - std::forward<CALLABLE>(callable)); - } - - /** * Launch a callable returning bool that will trigger repeatedly at * specified interval, until the callable returns false. * @@ -182,8 +115,63 @@ namespace LL // Studio compile errors that seem utterly unrelated to this source // code. template <typename CALLABLE, typename FOLLOWUP> - bool postTo(weak_t target, - const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback); + bool postTo(WorkQueue::weak_t target, + const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) + { + // We're being asked to post to the WorkQueue at target. + // target is a weak_ptr: have to lock it to check it. + auto tptr = target.lock(); + if (! tptr) + // can't post() if the target WorkQueue has been destroyed + return false; + + // Here we believe target WorkQueue still exists. Post to it a + // lambda that packages our callable, our callback and a weak_ptr + // to this originating WorkQueue. + tptr->post( + time, + [reply = super::getWeak(), + callable = std::move(callable), + callback = std::move(callback)] + () + { + // Call the callable in any case -- but to minimize + // copying the result, immediately bind it into a reply + // lambda. The reply lambda also binds the original + // callback, so that when we, the originating WorkQueue, + // finally receive and process the reply lambda, we'll + // call the bound callback with the bound result -- on the + // same thread that originally called postTo(). + auto rlambda = + [result = callable(), + callback = std::move(callback)] + () + { callback(std::move(result)); }; + // Check if this originating WorkQueue still exists. + // Remember, the outer lambda is now running on a thread + // servicing the target WorkQueue, and real time has + // elapsed since postTo()'s tptr->post() call. + // reply is a weak_ptr: have to lock it to check it. + auto rptr = reply.lock(); + if (rptr) + { + // Only post reply lambda if the originating WorkQueue + // still exists. If not -- who would we tell? Log it? + try + { + rptr->post(std::move(rlambda)); + } + catch (const Closed&) + { + // Originating WorkQueue might still exist, but + // might be Closed. Same thing: just discard the + // callback. + } + } + }); + // looks like we were able to post() + return true; + } /** * Post work to another WorkQueue, requesting a specific callback to @@ -193,36 +181,10 @@ namespace LL * inaccessible. */ template <typename CALLABLE, typename FOLLOWUP> - bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback) - { - return postTo(target, TimePoint::clock::now(), - std::move(callable), std::move(callback)); - } - - /** - * Post work to another WorkQueue to be run at a specified time, - * blocking the calling coroutine until then, returning the result to - * caller on completion. - * - * In general, we assume that each thread's default coroutine is busy - * servicing its WorkQueue or whatever. To try to prevent mistakes, we - * forbid calling waitForResult() from a thread's default coroutine. - */ - template <typename CALLABLE> - auto waitForResult(const TimePoint& time, CALLABLE&& callable); - - /** - * Post work to another WorkQueue, blocking the calling coroutine - * until then, returning the result to caller on completion. - * - * In general, we assume that each thread's default coroutine is busy - * servicing its WorkQueue or whatever. To try to prevent mistakes, we - * forbid calling waitForResult() from a thread's default coroutine. - */ - template <typename CALLABLE> - auto waitForResult(CALLABLE&& callable) + bool postTo(WorkQueue::weak_t target, + CALLABLE&& callable, FOLLOWUP&& callback) { - return waitForResult(TimePoint::clock::now(), std::move(callable)); + return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); } /*--------------------------- worker API ---------------------------*/ @@ -271,23 +233,6 @@ namespace LL bool runUntil(const TimePoint& until); private: - template <typename CALLABLE, typename FOLLOWUP> - static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback); - /// general case: arbitrary C++ return type - template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE> - struct MakeReplyLambda; - /// specialize for CALLABLE returning void - template <typename CALLABLE, typename FOLLOWUP> - struct MakeReplyLambda<CALLABLE, FOLLOWUP, void>; - - /// general case: arbitrary C++ return type - template <typename CALLABLE, typename RETURNTYPE> - struct WaitForResult; - /// specialize for CALLABLE returning void - template <typename CALLABLE> - struct WaitForResult<CALLABLE, void>; - - static void checkCoroutine(const std::string& method); static void error(const std::string& msg); static std::string makeName(const std::string& name); void callWork(const Queue::DataTuple& work); @@ -309,8 +254,8 @@ namespace LL { public: // bind the desired data - BackJack(weak_t target, - const TimePoint& start, + BackJack(WorkQueue::weak_t target, + const WorkQueue::TimePoint& start, const std::chrono::duration<Rep, Period>& interval, CALLABLE&& callable): mTarget(target), @@ -357,8 +302,8 @@ namespace LL } private: - weak_t mTarget; - TimePoint mStart; + WorkQueue::weak_t mTarget; + WorkQueue::TimePoint mStart; std::chrono::duration<Rep, Period> mInterval; CALLABLE mCallable; }; |