diff options
Diffstat (limited to 'indra/llcommon/workqueue.h')
-rw-r--r-- | indra/llcommon/workqueue.h | 410 |
1 files changed, 244 insertions, 166 deletions
diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index 70fd65bd0c..5461ce6c23 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -15,6 +15,7 @@ #include "llcoros.h" #include "llexception.h" #include "llinstancetracker.h" +#include "llinstancetrackersubclass.h" #include "threadsafeschedule.h" #include <chrono> #include <exception> // std::current_exception @@ -23,27 +24,23 @@ namespace LL { + +/***************************************************************************** +* WorkQueueBase: API for WorkQueue and WorkSchedule +*****************************************************************************/ /** * A typical WorkQueue has a string name that can be used to find it. */ - class WorkQueue: public LLInstanceTracker<WorkQueue, std::string> + class WorkQueueBase: public LLInstanceTracker<WorkQueueBase, std::string> { private: - using super = LLInstanceTracker<WorkQueue, std::string>; + using super = LLInstanceTracker<WorkQueueBase, std::string>; public: using Work = std::function<void()>; - - private: - using Queue = ThreadSafeSchedule<Work>; - // helper for postEvery() - template <typename Rep, typename Period, typename CALLABLE> - class BackJack; - - public: - using TimePoint = Queue::TimePoint; - using TimedWork = Queue::TimeTuple; - using Closed = Queue::Closed; + using Closed = LLThreadSafeQueueInterrupt; + // for runFor() + using TimePoint = std::chrono::steady_clock::time_point; struct Error: public LLException { @@ -51,18 +48,18 @@ namespace LL }; /** - * You may omit the WorkQueue name, in which case a unique name is + * You may omit the WorkQueueBase name, in which case a unique name is * synthesized; for practical purposes that makes it anonymous. */ - WorkQueue(const std::string& name = std::string(), size_t capacity=1024); + WorkQueueBase(const std::string& name); /** * Since the point of WorkQueue is to pass work to some other worker - * thread(s) asynchronously, it's important that the WorkQueue continue - * to exist until the worker thread(s) have drained it. To communicate - * that it's time for them to quit, close() the queue. + * thread(s) asynchronously, it's important that it continue to exist + * until the worker thread(s) have drained it. To communicate that + * it's time for them to quit, close() the queue. */ - void close(); + virtual void close() = 0; /** * WorkQueue supports multiple producers and multiple consumers. In @@ -78,152 +75,60 @@ namespace LL * * If you're the only consumer, noticing that size() > 0 is * meaningful. */ - size_t size(); + virtual size_t size() = 0; /// producer end: are we prevented from pushing any additional items? - bool isClosed(); + virtual bool isClosed() = 0; /// consumer end: are we done, is the queue entirely drained? - bool done(); + virtual bool done() = 0; /*---------------------- fire and forget API -----------------------*/ - /// fire-and-forget, but at a particular (future?) time - template <typename CALLABLE> - void post(const TimePoint& time, CALLABLE&& callable) - { - // Defer reifying an arbitrary CALLABLE until we hit this or - // postIfOpen(). All other methods should accept CALLABLEs of - // arbitrary type to avoid multiple levels of std::function - // indirection. - mQueue.push(TimedWork(time, std::move(callable))); - } - /// fire-and-forget - template <typename CALLABLE> - void post(CALLABLE&& callable) - { - // We use TimePoint::clock::now() instead of TimePoint's - // representation of the epoch because this WorkQueue may contain - // a mix of past-due TimedWork items and TimedWork items scheduled - // for the future. Sift this new item into the correct place. - post(TimePoint::clock::now(), std::move(callable)); - } - - /** - * post work for a particular time, unless the queue is closed before - * we can post - */ - template <typename CALLABLE> - bool postIfOpen(const TimePoint& time, CALLABLE&& callable) - { - // Defer reifying an arbitrary CALLABLE until we hit this or - // post(). All other methods should accept CALLABLEs of arbitrary - // type to avoid multiple levels of std::function indirection. - return mQueue.pushIfOpen(TimedWork(time, std::move(callable))); - } + virtual void post(const Work&) = 0; /** * post work, unless the queue is closed before we can post */ - template <typename CALLABLE> - bool postIfOpen(CALLABLE&& callable) - { - return postIfOpen(TimePoint::clock::now(), std::move(callable)); - } + virtual bool postIfOpen(const Work&) = 0; /** - * Post work to be run at a specified time to another WorkQueue, which - * may or may not still exist and be open. Return true if we were able - * to post. + * post work, unless the queue is full */ - template <typename CALLABLE> - static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable); + virtual bool tryPost(const Work&) = 0; /** * Post work to another WorkQueue, which may or may not still exist - * and be open. Return true if we were able to post. + * and be open. Support any post() overload. Return true if we were + * able to post. */ - template <typename CALLABLE> - static bool postMaybe(weak_t target, CALLABLE&& callable) - { - return postMaybe(target, TimePoint::clock::now(), - std::forward<CALLABLE>(callable)); - } - - /** - * Launch a callable returning bool that will trigger repeatedly at - * specified interval, until the callable returns false. - * - * If you need to signal that callable from outside, DO NOT bind a - * reference to a simple bool! That's not thread-safe. Instead, bind - * an LLCond variant, e.g. LLOneShotCond or LLBoolCond. - */ - template <typename Rep, typename Period, typename CALLABLE> - void postEvery(const std::chrono::duration<Rep, Period>& interval, - CALLABLE&& callable); - - template <typename CALLABLE> - bool tryPost(CALLABLE&& callable) - { - return mQueue.tryPush(TimedWork(TimePoint::clock::now(), std::move(callable))); - } + template <typename... ARGS> + static bool postMaybe(weak_t target, ARGS&&... args); /*------------------------- handshake API --------------------------*/ /** - * Post work to another WorkQueue to be run at a specified time, - * requesting a specific callback to be run on this WorkQueue on - * completion. - * - * Returns true if able to post, false if the other WorkQueue is - * inaccessible. - */ - // Apparently some Microsoft header file defines a macro CALLBACK? The - // natural template argument name CALLBACK produces very weird Visual - // Studio compile errors that seem utterly unrelated to this source - // code. - template <typename CALLABLE, typename FOLLOWUP> - bool postTo(weak_t target, - const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback); - - /** * Post work to another WorkQueue, requesting a specific callback to - * be run on this WorkQueue on completion. + * be run on this WorkQueue on completion. Optional final argument is + * TimePoint for WorkSchedule. * * Returns true if able to post, false if the other WorkQueue is * inaccessible. */ - template <typename CALLABLE, typename FOLLOWUP> - bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback) - { - return postTo(target, TimePoint::clock::now(), - std::move(callable), std::move(callback)); - } - - /** - * Post work to another WorkQueue to be run at a specified time, - * blocking the calling coroutine until then, returning the result to - * caller on completion. - * - * In general, we assume that each thread's default coroutine is busy - * servicing its WorkQueue or whatever. To try to prevent mistakes, we - * forbid calling waitForResult() from a thread's default coroutine. - */ - template <typename CALLABLE> - auto waitForResult(const TimePoint& time, CALLABLE&& callable); + template <typename CALLABLE, typename FOLLOWUP, typename... ARGS> + bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback, + ARGS&&... args); /** * Post work to another WorkQueue, blocking the calling coroutine - * until then, returning the result to caller on completion. + * until then, returning the result to caller on completion. Optional + * final argument is TimePoint for WorkSchedule. * * In general, we assume that each thread's default coroutine is busy * servicing its WorkQueue or whatever. To try to prevent mistakes, we * forbid calling waitForResult() from a thread's default coroutine. */ - template <typename CALLABLE> - auto waitForResult(CALLABLE&& callable) - { - return waitForResult(TimePoint::clock::now(), std::move(callable)); - } + template <typename CALLABLE, typename... ARGS> + auto waitForResult(CALLABLE&& callable, ARGS&&... args); /*--------------------------- worker API ---------------------------*/ @@ -270,7 +175,7 @@ namespace LL */ bool runUntil(const TimePoint& until); - private: + protected: template <typename CALLABLE, typename FOLLOWUP> static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback); /// general case: arbitrary C++ return type @@ -290,13 +195,179 @@ namespace LL static void checkCoroutine(const std::string& method); static void error(const std::string& msg); static std::string makeName(const std::string& name); - void callWork(const Queue::DataTuple& work); void callWork(const Work& work); + + private: + virtual Work pop_() = 0; + virtual bool tryPop_(Work&) = 0; + }; + +/***************************************************************************** +* WorkQueue: no timestamped task support +*****************************************************************************/ + class WorkQueue: public LLInstanceTrackerSubclass<WorkQueue, WorkQueueBase> + { + private: + using super = LLInstanceTrackerSubclass<WorkQueue, WorkQueueBase>; + + public: + /** + * You may omit the WorkQueue name, in which case a unique name is + * synthesized; for practical purposes that makes it anonymous. + */ + WorkQueue(const std::string& name = std::string(), size_t capacity=1024); + + /** + * Since the point of WorkQueue is to pass work to some other worker + * thread(s) asynchronously, it's important that it continue to exist + * until the worker thread(s) have drained it. To communicate that + * it's time for them to quit, close() the queue. + */ + void close() override; + + /** + * WorkQueue supports multiple producers and multiple consumers. In + * the general case it's misleading to test size(), since any other + * thread might change it the nanosecond the lock is released. On that + * basis, some might argue against publishing a size() method at all. + * + * But there are two specific cases in which a test based on size() + * might be reasonable: + * + * * If you're the only producer, noticing that size() == 0 is + * meaningful. + * * If you're the only consumer, noticing that size() > 0 is + * meaningful. + */ + size_t size() override; + /// producer end: are we prevented from pushing any additional items? + bool isClosed() override; + /// consumer end: are we done, is the queue entirely drained? + bool done() override; + + /*---------------------- fire and forget API -----------------------*/ + + /// fire-and-forget + void post(const Work&) override; + + /** + * post work, unless the queue is closed before we can post + */ + bool postIfOpen(const Work&) override; + + /** + * post work, unless the queue is full + */ + bool tryPost(const Work&) override; + + private: + using Queue = LLThreadSafeQueue<Work>; Queue mQueue; + + Work pop_() override; + bool tryPop_(Work&) override; + }; + +/***************************************************************************** +* WorkSchedule: add support for timestamped tasks +*****************************************************************************/ + class WorkSchedule: public LLInstanceTrackerSubclass<WorkSchedule, WorkQueueBase> + { + private: + using super = LLInstanceTrackerSubclass<WorkSchedule, WorkQueueBase>; + using Queue = ThreadSafeSchedule<Work>; + // helper for postEvery() + template <typename Rep, typename Period, typename CALLABLE> + class BackJack; + + public: + using TimePoint = Queue::TimePoint; + using TimedWork = Queue::TimeTuple; + + /** + * You may omit the WorkSchedule name, in which case a unique name is + * synthesized; for practical purposes that makes it anonymous. + */ + WorkSchedule(const std::string& name = std::string(), size_t capacity=1024); + + /** + * Since the point of WorkSchedule is to pass work to some other worker + * thread(s) asynchronously, it's important that the WorkSchedule continue + * to exist until the worker thread(s) have drained it. To communicate + * that it's time for them to quit, close() the queue. + */ + void close() override; + + /** + * WorkSchedule supports multiple producers and multiple consumers. In + * the general case it's misleading to test size(), since any other + * thread might change it the nanosecond the lock is released. On that + * basis, some might argue against publishing a size() method at all. + * + * But there are two specific cases in which a test based on size() + * might be reasonable: + * + * * If you're the only producer, noticing that size() == 0 is + * meaningful. + * * If you're the only consumer, noticing that size() > 0 is + * meaningful. + */ + size_t size() override; + /// producer end: are we prevented from pushing any additional items? + bool isClosed() override; + /// consumer end: are we done, is the queue entirely drained? + bool done() override; + + /*---------------------- fire and forget API -----------------------*/ + + /// fire-and-forget + void post(const Work& callable) override; + + /// fire-and-forget, but at a particular (future?) time + void post(const Work& callable, const TimePoint& time); + + /** + * post work, unless the queue is closed before we can post + */ + bool postIfOpen(const Work& callable) override; + + /** + * post work for a particular time, unless the queue is closed before + * we can post + */ + bool postIfOpen(const Work& callable, const TimePoint& time); + + /** + * post work, unless the queue is full + */ + bool tryPost(const Work& callable) override; + + /** + * post work for a particular time, unless the queue is full + */ + bool tryPost(const Work& callable, const TimePoint& time); + + /** + * Launch a callable returning bool that will trigger repeatedly at + * specified interval, until the callable returns false. + * + * If you need to signal that callable from outside, DO NOT bind a + * reference to a simple bool! That's not thread-safe. Instead, bind + * an LLCond variant, e.g. LLOneShotCond or LLBoolCond. + */ + template <typename Rep, typename Period, typename CALLABLE> + void postEvery(const std::chrono::duration<Rep, Period>& interval, + CALLABLE&& callable); + + private: + Queue mQueue; + + Work pop_() override; + bool tryPop_(Work&) override; }; /** - * BackJack is, in effect, a hand-rolled lambda, binding a WorkQueue, a + * BackJack is, in effect, a hand-rolled lambda, binding a WorkSchedule, a * CALLABLE that returns bool, a TimePoint and an interval at which to * relaunch it. As long as the callable continues returning true, BackJack * keeps resubmitting it to the target WorkQueue. @@ -305,7 +376,7 @@ namespace LL // class method gets its own 'this' pointer -- which we need to resubmit // the whole BackJack callable. template <typename Rep, typename Period, typename CALLABLE> - class WorkQueue::BackJack + class WorkSchedule::BackJack { public: // bind the desired data @@ -319,9 +390,10 @@ namespace LL mCallable(std::move(callable)) {} - // Call by target WorkQueue -- note that although WE require a - // callable returning bool, WorkQueue wants a void callable. We - // consume the bool. + // This operator() method, called by target WorkSchedule, is what + // makes this object a Work item. Although WE require a callable + // returning bool, WorkSchedule wants a void callable. We consume the + // bool. void operator()() { // If mCallable() throws an exception, don't catch it here: if it @@ -337,7 +409,7 @@ namespace LL // register our intent to fire at exact mIntervals. mStart += mInterval; - // We're being called at this moment by the target WorkQueue. + // We're being called at this moment by the target WorkSchedule. // Assume it still exists, rather than checking the result of // lock(). // Resubmit the whole *this callable: that's why we're a class @@ -347,7 +419,8 @@ namespace LL // moved-from. try { - mTarget.lock()->post(mStart, std::move(*this)); + auto target{ std::dynamic_pointer_cast<WorkSchedule>(mTarget.lock()) }; + target->post(std::move(*this), mStart); } catch (const Closed&) { @@ -364,8 +437,8 @@ namespace LL }; template <typename Rep, typename Period, typename CALLABLE> - void WorkQueue::postEvery(const std::chrono::duration<Rep, Period>& interval, - CALLABLE&& callable) + void WorkSchedule::postEvery(const std::chrono::duration<Rep, Period>& interval, + CALLABLE&& callable) { if (interval.count() <= 0) { @@ -388,7 +461,7 @@ namespace LL /// general case: arbitrary C++ return type template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE> - struct WorkQueue::MakeReplyLambda + struct WorkQueueBase::MakeReplyLambda { auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) { @@ -409,7 +482,7 @@ namespace LL /// specialize for CALLABLE returning void template <typename CALLABLE, typename FOLLOWUP> - struct WorkQueue::MakeReplyLambda<CALLABLE, FOLLOWUP, void> + struct WorkQueueBase::MakeReplyLambda<CALLABLE, FOLLOWUP, void> { auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) { @@ -421,16 +494,16 @@ namespace LL }; template <typename CALLABLE, typename FOLLOWUP> - auto WorkQueue::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback) + auto WorkQueueBase::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback) { return MakeReplyLambda<CALLABLE, FOLLOWUP, decltype(std::forward<CALLABLE>(callable)())>() (std::move(callable), std::move(callback)); } - template <typename CALLABLE, typename FOLLOWUP> - bool WorkQueue::postTo(weak_t target, - const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) + template <typename CALLABLE, typename FOLLOWUP, typename... ARGS> + bool WorkQueueBase::postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback, + ARGS&&... args) { LL_PROFILE_ZONE_SCOPED; // We're being asked to post to the WorkQueue at target. @@ -444,12 +517,11 @@ namespace LL // lambda that packages our callable, our callback and a weak_ptr // to this originating WorkQueue. tptr->post( - time, [reply = super::getWeak(), callable = std::move(callable), callback = std::move(callback)] - () - mutable { + () mutable + { // Use postMaybe() below in case this originating WorkQueue // has been closed or destroyed. Remember, the outer lambda is // now running on a thread servicing the target WorkQueue, and @@ -472,14 +544,16 @@ namespace LL // originating WorkQueue. Once there, rethrow it. [exc = std::current_exception()](){ std::rethrow_exception(exc); }); } - }); + }, + // if caller passed a TimePoint, pass it along to post() + std::forward<ARGS>(args)...); // looks like we were able to post() return true; } - template <typename CALLABLE> - bool WorkQueue::postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable) + template <typename... ARGS> + bool WorkQueueBase::postMaybe(weak_t target, ARGS&&... args) { LL_PROFILE_ZONE_SCOPED; // target is a weak_ptr: have to lock it to check it @@ -488,7 +562,7 @@ namespace LL { try { - tptr->post(time, std::forward<CALLABLE>(callable)); + tptr->post(std::forward<ARGS>(args)...); // we were able to post() return true; } @@ -503,13 +577,13 @@ namespace LL /// general case: arbitrary C++ return type template <typename CALLABLE, typename RETURNTYPE> - struct WorkQueue::WaitForResult + struct WorkQueueBase::WaitForResult { - auto operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) + template <typename... ARGS> + auto operator()(WorkQueueBase* self, CALLABLE&& callable, ARGS&&... args) { LLCoros::Promise<RETURNTYPE> promise; self->post( - time, // We dare to bind a reference to Promise because it's // specifically designed for cross-thread communication. [&promise, callable = std::move(callable)]() @@ -523,7 +597,9 @@ namespace LL { promise.set_exception(std::current_exception()); } - }); + }, + // if caller passed a TimePoint, pass it to post() + std::forward<ARGS>(args)...); auto future{ LLCoros::getFuture(promise) }; // now, on the calling thread, wait for that result LLCoros::TempStatus st("waiting for WorkQueue::waitForResult()"); @@ -533,13 +609,13 @@ namespace LL /// specialize for CALLABLE returning void template <typename CALLABLE> - struct WorkQueue::WaitForResult<CALLABLE, void> + struct WorkQueueBase::WaitForResult<CALLABLE, void> { - void operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) + template <typename... ARGS> + void operator()(WorkQueueBase* self, CALLABLE&& callable, ARGS&&... args) { LLCoros::Promise<void> promise; self->post( - time, // &promise is designed for cross-thread access [&promise, callable = std::move(callable)]() mutable { @@ -552,7 +628,9 @@ namespace LL { promise.set_exception(std::current_exception()); } - }); + }, + // if caller passed a TimePoint, pass it to post() + std::forward<ARGS>(args)...); auto future{ LLCoros::getFuture(promise) }; // block until set_value() LLCoros::TempStatus st("waiting for void WorkQueue::waitForResult()"); @@ -560,13 +638,13 @@ namespace LL } }; - template <typename CALLABLE> - auto WorkQueue::waitForResult(const TimePoint& time, CALLABLE&& callable) + template <typename CALLABLE, typename... ARGS> + auto WorkQueueBase::waitForResult(CALLABLE&& callable, ARGS&&... args) { checkCoroutine("waitForResult()"); // derive callable's return type so we can specialize for void return WaitForResult<CALLABLE, decltype(std::forward<CALLABLE>(callable)())>() - (this, time, std::forward<CALLABLE>(callable)); + (this, std::forward<CALLABLE>(callable), std::forward<ARGS>(args)...); } } // namespace LL |