/** * @file workqueue.h * @author Nat Goodspeed * @date 2021-09-30 * @brief Queue used for inter-thread work passing. * * $LicenseInfo:firstyear=2021&license=viewerlgpl$ * Copyright (c) 2021, Linden Research, Inc. * $/LicenseInfo$ */ #if ! defined(LL_WORKQUEUE_H) #define LL_WORKQUEUE_H #include "llcoros.h" #include "llexception.h" #include "llinstancetracker.h" #include "llinstancetrackersubclass.h" #include "threadsafeschedule.h" #include <chrono> #include <exception> // std::current_exception #include <functional> // std::function #include <string> namespace LL { /***************************************************************************** * WorkQueueBase: API for WorkQueue and WorkSchedule *****************************************************************************/ /** * A typical WorkQueue has a string name that can be used to find it. */ class WorkQueueBase: public LLInstanceTracker<WorkQueueBase, std::string> { private: using super = LLInstanceTracker<WorkQueueBase, std::string>; public: using Work = std::function<void()>; using Closed = LLThreadSafeQueueInterrupt; // for runFor() using TimePoint = std::chrono::steady_clock::time_point; struct Error: public LLException { Error(const std::string& what): LLException(what) {} }; /** * You may omit the WorkQueueBase name, in which case a unique name is * synthesized; for practical purposes that makes it anonymous. */ WorkQueueBase(const std::string& name); /** * Since the point of WorkQueue is to pass work to some other worker * thread(s) asynchronously, it's important that it continue to exist * until the worker thread(s) have drained it. To communicate that * it's time for them to quit, close() the queue. */ virtual void close() = 0; /** * WorkQueue supports multiple producers and multiple consumers. In * the general case it's misleading to test size(), since any other * thread might change it the nanosecond the lock is released. On that * basis, some might argue against publishing a size() method at all. * * But there are two specific cases in which a test based on size() * might be reasonable: * * * If you're the only producer, noticing that size() == 0 is * meaningful. * * If you're the only consumer, noticing that size() > 0 is * meaningful. */ virtual size_t size() = 0; /// producer end: are we prevented from pushing any additional items? virtual bool isClosed() = 0; /// consumer end: are we done, is the queue entirely drained? virtual bool done() = 0; /*---------------------- fire and forget API -----------------------*/ /** * post work, unless the queue is closed before we can post */ virtual bool post(const Work&) = 0; /** * post work, unless the queue is full */ virtual bool tryPost(const Work&) = 0; /** * Post work to another WorkQueue, which may or may not still exist * and be open. Support any post() overload. Return true if we were * able to post. */ template <typename... ARGS> static bool postMaybe(weak_t target, ARGS&&... args); /*------------------------- handshake API --------------------------*/ /** * Post work to another WorkQueue, requesting a specific callback to * be run on this WorkQueue on completion. Optional final argument is * TimePoint for WorkSchedule. * * Returns true if able to post, false if the other WorkQueue is * inaccessible. */ template <typename CALLABLE, typename FOLLOWUP, typename... ARGS> bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback, ARGS&&... args); /** * Post work to another WorkQueue, blocking the calling coroutine * until then, returning the result to caller on completion. Optional * final argument is TimePoint for WorkSchedule. * * In general, we assume that each thread's default coroutine is busy * servicing its WorkQueue or whatever. To try to prevent mistakes, we * forbid calling waitForResult() from a thread's default coroutine. */ template <typename CALLABLE, typename... ARGS> auto waitForResult(CALLABLE&& callable, ARGS&&... args); /*--------------------------- worker API ---------------------------*/ /** * runUntilClose() pulls TimedWork items off this WorkQueue until the * queue is closed, at which point it returns. This would be the * typical entry point for a simple worker thread. */ void runUntilClose(); /** * runPending() runs all TimedWork items that are ready to run. It * returns true if the queue remains open, false if the queue has been * closed. This could be used by a thread whose primary purpose is to * serve the queue, but also wants to do other things with its idle time. */ bool runPending(); /** * runOne() runs at most one ready TimedWork item -- zero if none are * ready. It returns true if the queue remains open, false if the * queue has been closed. */ bool runOne(); /** * runFor() runs a subset of ready TimedWork items, until the * timeslice has been exceeded. It returns true if the queue remains * open, false if the queue has been closed. This could be used by a * busy main thread to lend a bounded few CPU cycles to this WorkQueue * without risking the WorkQueue blowing out the length of any one * frame. */ template <typename Rep, typename Period> bool runFor(const std::chrono::duration<Rep, Period>& timeslice) { LL_PROFILE_ZONE_SCOPED; return runUntil(TimePoint::clock::now() + timeslice); } /** * runUntil() is just like runFor(), only with a specific end time * instead of a timeslice duration. */ bool runUntil(const TimePoint& until); protected: template <typename CALLABLE, typename FOLLOWUP> static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback); /// general case: arbitrary C++ return type template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE> struct MakeReplyLambda; /// specialize for CALLABLE returning void template <typename CALLABLE, typename FOLLOWUP> struct MakeReplyLambda<CALLABLE, FOLLOWUP, void>; /// general case: arbitrary C++ return type template <typename CALLABLE, typename RETURNTYPE> struct WaitForResult; /// specialize for CALLABLE returning void template <typename CALLABLE> struct WaitForResult<CALLABLE, void>; static void checkCoroutine(const std::string& method); static void error(const std::string& msg); static std::string makeName(const std::string& name); void callWork(const Work& work); private: virtual Work pop_() = 0; virtual bool tryPop_(Work&) = 0; }; /***************************************************************************** * WorkQueue: no timestamped task support *****************************************************************************/ class WorkQueue: public LLInstanceTrackerSubclass<WorkQueue, WorkQueueBase> { private: using super = LLInstanceTrackerSubclass<WorkQueue, WorkQueueBase>; public: /** * You may omit the WorkQueue name, in which case a unique name is * synthesized; for practical purposes that makes it anonymous. */ WorkQueue(const std::string& name = std::string(), size_t capacity=1024); /** * Since the point of WorkQueue is to pass work to some other worker * thread(s) asynchronously, it's important that it continue to exist * until the worker thread(s) have drained it. To communicate that * it's time for them to quit, close() the queue. */ void close() override; /** * WorkQueue supports multiple producers and multiple consumers. In * the general case it's misleading to test size(), since any other * thread might change it the nanosecond the lock is released. On that * basis, some might argue against publishing a size() method at all. * * But there are two specific cases in which a test based on size() * might be reasonable: * * * If you're the only producer, noticing that size() == 0 is * meaningful. * * If you're the only consumer, noticing that size() > 0 is * meaningful. */ size_t size() override; /// producer end: are we prevented from pushing any additional items? bool isClosed() override; /// consumer end: are we done, is the queue entirely drained? bool done() override; /*---------------------- fire and forget API -----------------------*/ /** * post work, unless the queue is closed before we can post */ bool post(const Work&) override; /** * post work, unless the queue is full */ bool tryPost(const Work&) override; private: using Queue = LLThreadSafeQueue<Work>; Queue mQueue; Work pop_() override; bool tryPop_(Work&) override; }; /***************************************************************************** * WorkSchedule: add support for timestamped tasks *****************************************************************************/ class WorkSchedule: public LLInstanceTrackerSubclass<WorkSchedule, WorkQueueBase> { private: using super = LLInstanceTrackerSubclass<WorkSchedule, WorkQueueBase>; using Queue = ThreadSafeSchedule<Work>; // helper for postEvery() template <typename Rep, typename Period, typename CALLABLE> class BackJack; public: using TimePoint = Queue::TimePoint; using TimedWork = Queue::TimeTuple; /** * You may omit the WorkSchedule name, in which case a unique name is * synthesized; for practical purposes that makes it anonymous. */ WorkSchedule(const std::string& name = std::string(), size_t capacity=1024); /** * Since the point of WorkSchedule is to pass work to some other worker * thread(s) asynchronously, it's important that the WorkSchedule continue * to exist until the worker thread(s) have drained it. To communicate * that it's time for them to quit, close() the queue. */ void close() override; /** * WorkSchedule supports multiple producers and multiple consumers. In * the general case it's misleading to test size(), since any other * thread might change it the nanosecond the lock is released. On that * basis, some might argue against publishing a size() method at all. * * But there are two specific cases in which a test based on size() * might be reasonable: * * * If you're the only producer, noticing that size() == 0 is * meaningful. * * If you're the only consumer, noticing that size() > 0 is * meaningful. */ size_t size() override; /// producer end: are we prevented from pushing any additional items? bool isClosed() override; /// consumer end: are we done, is the queue entirely drained? bool done() override; /*---------------------- fire and forget API -----------------------*/ /** * post work, unless the queue is closed before we can post */ bool post(const Work& callable) override; /** * post work for a particular time, unless the queue is closed before * we can post */ bool post(const Work& callable, const TimePoint& time); /** * post work, unless the queue is full */ bool tryPost(const Work& callable) override; /** * post work for a particular time, unless the queue is full */ bool tryPost(const Work& callable, const TimePoint& time); /** * Launch a callable returning bool that will trigger repeatedly at * specified interval, until the callable returns false. * * If you need to signal that callable from outside, DO NOT bind a * reference to a simple bool! That's not thread-safe. Instead, bind * an LLCond variant, e.g. LLOneShotCond or LLBoolCond. */ template <typename Rep, typename Period, typename CALLABLE> bool postEvery(const std::chrono::duration<Rep, Period>& interval, CALLABLE&& callable); private: Queue mQueue; Work pop_() override; bool tryPop_(Work&) override; }; /** * BackJack is, in effect, a hand-rolled lambda, binding a WorkSchedule, a * CALLABLE that returns bool, a TimePoint and an interval at which to * relaunch it. As long as the callable continues returning true, BackJack * keeps resubmitting it to the target WorkQueue. */ // Why is BackJack a class and not a lambda? Because, unlike a lambda, a // class method gets its own 'this' pointer -- which we need to resubmit // the whole BackJack callable. template <typename Rep, typename Period, typename CALLABLE> class WorkSchedule::BackJack { public: // bind the desired data BackJack(weak_t target, const TimePoint& start, const std::chrono::duration<Rep, Period>& interval, CALLABLE&& callable): mTarget(target), mStart(start), mInterval(interval), mCallable(std::move(callable)) {} // This operator() method, called by target WorkSchedule, is what // makes this object a Work item. Although WE require a callable // returning bool, WorkSchedule wants a void callable. We consume the // bool. void operator()() { // If mCallable() throws an exception, don't catch it here: if it // throws once, it's likely to throw every time, so it's a waste // of time to arrange to call it again. if (mCallable()) { // Modify mStart to the new start time we desire. If we simply // added mInterval to now, we'd get actual timings of // (mInterval + slop), where 'slop' is the latency between the // previous mStart and the WorkQueue actually calling us. // Instead, add mInterval to mStart so that at least we // register our intent to fire at exact mIntervals. mStart += mInterval; // We're being called at this moment by the target WorkSchedule. // Assume it still exists, rather than checking the result of // lock(). // Resubmit the whole *this callable: that's why we're a class // rather than a lambda. Allow moving *this so we can carry a // move-only callable; but naturally this statement must be // the last time we reference this instance, which may become // moved-from. auto target{ std::dynamic_pointer_cast<WorkSchedule>(mTarget.lock()) }; // Discard bool return: once this queue is closed, oh well, // just stop target->post(std::move(*this), mStart); } } private: weak_t mTarget; TimePoint mStart; std::chrono::duration<Rep, Period> mInterval; CALLABLE mCallable; }; template <typename Rep, typename Period, typename CALLABLE> bool WorkSchedule::postEvery(const std::chrono::duration<Rep, Period>& interval, CALLABLE&& callable) { if (interval.count() <= 0) { // It's essential that postEvery() be called with a positive // interval, since each call to BackJack posts another instance of // itself at (start + interval) and we order by target time. A // zero or negative interval would result in that BackJack // instance going to the head of the queue every time, immediately // ready to run. Effectively that would produce an infinite loop, // a denial of service on this WorkQueue. error("postEvery(interval) may not be 0"); } // Instantiate and post a suitable BackJack, binding a weak_ptr to // self, the current time, the desired interval and the desired // callable. return post( BackJack<Rep, Period, CALLABLE>( getWeak(), TimePoint::clock::now(), interval, std::move(callable))); } /// general case: arbitrary C++ return type template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE> struct WorkQueueBase::MakeReplyLambda { auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) { // Call the callable in any case -- but to minimize // copying the result, immediately bind it into the reply // lambda. The reply lambda also binds the original // callback, so that when we, the originating WorkQueue, // finally receive and process the reply lambda, we'll // call the bound callback with the bound result -- on the // same thread that originally called postTo(). return [result = std::forward<CALLABLE>(callable)(), callback = std::move(callback)] () mutable { callback(std::move(result)); }; } }; /// specialize for CALLABLE returning void template <typename CALLABLE, typename FOLLOWUP> struct WorkQueueBase::MakeReplyLambda<CALLABLE, FOLLOWUP, void> { auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) { // Call the callable, which produces no result. std::forward<CALLABLE>(callable)(); // Our completion callback is simply the caller's callback. return std::move(callback); } }; template <typename CALLABLE, typename FOLLOWUP> auto WorkQueueBase::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback) { return MakeReplyLambda<CALLABLE, FOLLOWUP, decltype(std::forward<CALLABLE>(callable)())>() (std::move(callable), std::move(callback)); } template <typename CALLABLE, typename FOLLOWUP, typename... ARGS> bool WorkQueueBase::postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback, ARGS&&... args) { LL_PROFILE_ZONE_SCOPED; // We're being asked to post to the WorkQueue at target. // target is a weak_ptr: have to lock it to check it. auto tptr = target.lock(); if (! tptr) // can't post() if the target WorkQueue has been destroyed return false; // Here we believe target WorkQueue still exists. Post to it a // lambda that packages our callable, our callback and a weak_ptr // to this originating WorkQueue. return tptr->post( [reply = super::getWeak(), callable = std::move(callable), callback = std::move(callback)] () mutable { // Use postMaybe() below in case this originating WorkQueue // has been closed or destroyed. Remember, the outer lambda is // now running on a thread servicing the target WorkQueue, and // real time has elapsed since postTo()'s tptr->post() call. try { // Make a reply lambda to repost to THIS WorkQueue. // Delegate to makeReplyLambda() so we can partially // specialize on void return. postMaybe(reply, makeReplyLambda(std::move(callable), std::move(callback))); } catch (...) { // Either variant of makeReplyLambda() is responsible for // calling the caller's callable. If that throws, return // the exception to the originating thread. postMaybe( reply, // Bind the current exception to transport back to the // originating WorkQueue. Once there, rethrow it. [exc = std::current_exception()](){ std::rethrow_exception(exc); }); } }, // if caller passed a TimePoint, pass it along to post() std::forward<ARGS>(args)...); } template <typename... ARGS> bool WorkQueueBase::postMaybe(weak_t target, ARGS&&... args) { LL_PROFILE_ZONE_SCOPED; // target is a weak_ptr: have to lock it to check it auto tptr = target.lock(); if (tptr) { return tptr->post(std::forward<ARGS>(args)...); } // target no longer exists return false; } /// general case: arbitrary C++ return type template <typename CALLABLE, typename RETURNTYPE> struct WorkQueueBase::WaitForResult { template <typename... ARGS> auto operator()(WorkQueueBase* self, CALLABLE&& callable, ARGS&&... args) { LLCoros::Promise<RETURNTYPE> promise; bool posted = self->post( // We dare to bind a reference to Promise because it's // specifically designed for cross-thread communication. [&promise, callable = std::move(callable)]() mutable { try { // call the caller's callable and trigger promise with result promise.set_value(callable()); } catch (...) { promise.set_exception(std::current_exception()); } }, // if caller passed a TimePoint, pass it to post() std::forward<ARGS>(args)...); if (! posted) { LLTHROW(WorkQueueBase::Closed()); } auto future{ LLCoros::getFuture(promise) }; // now, on the calling thread, wait for that result LLCoros::TempStatus st("waiting for WorkQueue::waitForResult()"); return future.get(); } }; /// specialize for CALLABLE returning void template <typename CALLABLE> struct WorkQueueBase::WaitForResult<CALLABLE, void> { template <typename... ARGS> void operator()(WorkQueueBase* self, CALLABLE&& callable, ARGS&&... args) { LLCoros::Promise<void> promise; bool posted = self->post( // &promise is designed for cross-thread access [&promise, callable = std::move(callable)]() mutable { try { callable(); promise.set_value(); } catch (...) { promise.set_exception(std::current_exception()); } }, // if caller passed a TimePoint, pass it to post() std::forward<ARGS>(args)...); if (! posted) { LLTHROW(WorkQueueBase::Closed()); } auto future{ LLCoros::getFuture(promise) }; // block until set_value() LLCoros::TempStatus st("waiting for void WorkQueue::waitForResult()"); future.get(); } }; template <typename CALLABLE, typename... ARGS> auto WorkQueueBase::waitForResult(CALLABLE&& callable, ARGS&&... args) { checkCoroutine("waitForResult()"); // derive callable's return type so we can specialize for void return WaitForResult<CALLABLE, decltype(std::forward<CALLABLE>(callable)())>() (this, std::forward<CALLABLE>(callable), std::forward<ARGS>(args)...); } } // namespace LL #endif /* ! defined(LL_WORKQUEUE_H) */