summaryrefslogtreecommitdiff
path: root/indra/llcommon/workqueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llcommon/workqueue.h')
-rw-r--r--indra/llcommon/workqueue.h410
1 files changed, 244 insertions, 166 deletions
diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h
index 70fd65bd0c..eea8886a7a 100644
--- a/indra/llcommon/workqueue.h
+++ b/indra/llcommon/workqueue.h
@@ -15,6 +15,7 @@
#include "llcoros.h"
#include "llexception.h"
#include "llinstancetracker.h"
+#include "llinstancetrackersubclass.h"
#include "threadsafeschedule.h"
#include <chrono>
#include <exception> // std::current_exception
@@ -23,27 +24,23 @@
namespace LL
{
+
+/*****************************************************************************
+* WorkQueueBase: API for WorkQueue and WorkSchedule
+*****************************************************************************/
/**
* A typical WorkQueue has a string name that can be used to find it.
*/
- class WorkQueue: public LLInstanceTracker<WorkQueue, std::string>
+ class WorkQueueBase: public LLInstanceTracker<WorkQueueBase, std::string>
{
private:
- using super = LLInstanceTracker<WorkQueue, std::string>;
+ using super = LLInstanceTracker<WorkQueueBase, std::string>;
public:
using Work = std::function<void()>;
-
- private:
- using Queue = ThreadSafeSchedule<Work>;
- // helper for postEvery()
- template <typename Rep, typename Period, typename CALLABLE>
- class BackJack;
-
- public:
- using TimePoint = Queue::TimePoint;
- using TimedWork = Queue::TimeTuple;
- using Closed = Queue::Closed;
+ using Closed = LLThreadSafeQueueInterrupt;
+ // for runFor()
+ using TimePoint = std::chrono::steady_clock::time_point;
struct Error: public LLException
{
@@ -51,18 +48,18 @@ namespace LL
};
/**
- * You may omit the WorkQueue name, in which case a unique name is
+ * You may omit the WorkQueueBase name, in which case a unique name is
* synthesized; for practical purposes that makes it anonymous.
*/
- WorkQueue(const std::string& name = std::string(), size_t capacity=1024);
+ WorkQueueBase(const std::string& name);
/**
* Since the point of WorkQueue is to pass work to some other worker
- * thread(s) asynchronously, it's important that the WorkQueue continue
- * to exist until the worker thread(s) have drained it. To communicate
- * that it's time for them to quit, close() the queue.
+ * thread(s) asynchronously, it's important that it continue to exist
+ * until the worker thread(s) have drained it. To communicate that
+ * it's time for them to quit, close() the queue.
*/
- void close();
+ virtual void close() = 0;
/**
* WorkQueue supports multiple producers and multiple consumers. In
@@ -78,152 +75,60 @@ namespace LL
* * If you're the only consumer, noticing that size() > 0 is
* meaningful.
*/
- size_t size();
+ virtual size_t size() = 0;
/// producer end: are we prevented from pushing any additional items?
- bool isClosed();
+ virtual bool isClosed() = 0;
/// consumer end: are we done, is the queue entirely drained?
- bool done();
+ virtual bool done() = 0;
/*---------------------- fire and forget API -----------------------*/
- /// fire-and-forget, but at a particular (future?) time
- template <typename CALLABLE>
- void post(const TimePoint& time, CALLABLE&& callable)
- {
- // Defer reifying an arbitrary CALLABLE until we hit this or
- // postIfOpen(). All other methods should accept CALLABLEs of
- // arbitrary type to avoid multiple levels of std::function
- // indirection.
- mQueue.push(TimedWork(time, std::move(callable)));
- }
-
/// fire-and-forget
- template <typename CALLABLE>
- void post(CALLABLE&& callable)
- {
- // We use TimePoint::clock::now() instead of TimePoint's
- // representation of the epoch because this WorkQueue may contain
- // a mix of past-due TimedWork items and TimedWork items scheduled
- // for the future. Sift this new item into the correct place.
- post(TimePoint::clock::now(), std::move(callable));
- }
-
- /**
- * post work for a particular time, unless the queue is closed before
- * we can post
- */
- template <typename CALLABLE>
- bool postIfOpen(const TimePoint& time, CALLABLE&& callable)
- {
- // Defer reifying an arbitrary CALLABLE until we hit this or
- // post(). All other methods should accept CALLABLEs of arbitrary
- // type to avoid multiple levels of std::function indirection.
- return mQueue.pushIfOpen(TimedWork(time, std::move(callable)));
- }
+ virtual void post(const Work&) = 0;
/**
* post work, unless the queue is closed before we can post
*/
- template <typename CALLABLE>
- bool postIfOpen(CALLABLE&& callable)
- {
- return postIfOpen(TimePoint::clock::now(), std::move(callable));
- }
+ virtual bool postIfOpen(const Work&) = 0;
/**
- * Post work to be run at a specified time to another WorkQueue, which
- * may or may not still exist and be open. Return true if we were able
- * to post.
+ * post work, unless the queue is full
*/
- template <typename CALLABLE>
- static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable);
+ virtual bool tryPost(const Work&) = 0;
/**
* Post work to another WorkQueue, which may or may not still exist
- * and be open. Return true if we were able to post.
+ * and be open. Support any post() overload. Return true if we were
+ * able to post.
*/
- template <typename CALLABLE>
- static bool postMaybe(weak_t target, CALLABLE&& callable)
- {
- return postMaybe(target, TimePoint::clock::now(),
- std::forward<CALLABLE>(callable));
- }
-
- /**
- * Launch a callable returning bool that will trigger repeatedly at
- * specified interval, until the callable returns false.
- *
- * If you need to signal that callable from outside, DO NOT bind a
- * reference to a simple bool! That's not thread-safe. Instead, bind
- * an LLCond variant, e.g. LLOneShotCond or LLBoolCond.
- */
- template <typename Rep, typename Period, typename CALLABLE>
- void postEvery(const std::chrono::duration<Rep, Period>& interval,
- CALLABLE&& callable);
-
- template <typename CALLABLE>
- bool tryPost(CALLABLE&& callable)
- {
- return mQueue.tryPush(TimedWork(TimePoint::clock::now(), std::move(callable)));
- }
+ template <typename... ARGS>
+ static bool postMaybe(weak_t target, ARGS&&... args);
/*------------------------- handshake API --------------------------*/
/**
- * Post work to another WorkQueue to be run at a specified time,
- * requesting a specific callback to be run on this WorkQueue on
- * completion.
- *
- * Returns true if able to post, false if the other WorkQueue is
- * inaccessible.
- */
- // Apparently some Microsoft header file defines a macro CALLBACK? The
- // natural template argument name CALLBACK produces very weird Visual
- // Studio compile errors that seem utterly unrelated to this source
- // code.
- template <typename CALLABLE, typename FOLLOWUP>
- bool postTo(weak_t target,
- const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback);
-
- /**
* Post work to another WorkQueue, requesting a specific callback to
- * be run on this WorkQueue on completion.
+ * be run on this WorkQueue on completion. Optional final argument is
+ * TimePoint for WorkSchedule.
*
* Returns true if able to post, false if the other WorkQueue is
* inaccessible.
*/
- template <typename CALLABLE, typename FOLLOWUP>
- bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback)
- {
- return postTo(target, TimePoint::clock::now(),
- std::move(callable), std::move(callback));
- }
-
- /**
- * Post work to another WorkQueue to be run at a specified time,
- * blocking the calling coroutine until then, returning the result to
- * caller on completion.
- *
- * In general, we assume that each thread's default coroutine is busy
- * servicing its WorkQueue or whatever. To try to prevent mistakes, we
- * forbid calling waitForResult() from a thread's default coroutine.
- */
- template <typename CALLABLE>
- auto waitForResult(const TimePoint& time, CALLABLE&& callable);
+ template <typename CALLABLE, typename FOLLOWUP, typename... ARGS>
+ bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback,
+ ARGS&&... args);
/**
* Post work to another WorkQueue, blocking the calling coroutine
- * until then, returning the result to caller on completion.
+ * until then, returning the result to caller on completion. Optional
+ * final argument is TimePoint for WorkSchedule.
*
* In general, we assume that each thread's default coroutine is busy
* servicing its WorkQueue or whatever. To try to prevent mistakes, we
* forbid calling waitForResult() from a thread's default coroutine.
*/
- template <typename CALLABLE>
- auto waitForResult(CALLABLE&& callable)
- {
- return waitForResult(TimePoint::clock::now(), std::move(callable));
- }
+ template <typename CALLABLE, typename... ARGS>
+ auto waitForResult(CALLABLE&& callable, ARGS&&... args);
/*--------------------------- worker API ---------------------------*/
@@ -270,7 +175,7 @@ namespace LL
*/
bool runUntil(const TimePoint& until);
- private:
+ protected:
template <typename CALLABLE, typename FOLLOWUP>
static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback);
/// general case: arbitrary C++ return type
@@ -290,13 +195,179 @@ namespace LL
static void checkCoroutine(const std::string& method);
static void error(const std::string& msg);
static std::string makeName(const std::string& name);
- void callWork(const Queue::DataTuple& work);
void callWork(const Work& work);
+
+ private:
+ virtual Work pop_() = 0;
+ virtual bool tryPop_(Work&) = 0;
+ };
+
+/*****************************************************************************
+* WorkQueue: no timestamped task support
+*****************************************************************************/
+ class WorkQueue: public LLInstanceTrackerSubclass<WorkQueue, WorkQueueBase>
+ {
+ private:
+ using super = LLInstanceTrackerSubclass<WorkQueue, WorkQueueBase>;
+
+ public:
+ /**
+ * You may omit the WorkQueue name, in which case a unique name is
+ * synthesized; for practical purposes that makes it anonymous.
+ */
+ WorkQueue(const std::string& name = std::string(), size_t capacity=1024);
+
+ /**
+ * Since the point of WorkQueue is to pass work to some other worker
+ * thread(s) asynchronously, it's important that it continue to exist
+ * until the worker thread(s) have drained it. To communicate that
+ * it's time for them to quit, close() the queue.
+ */
+ void close() override;
+
+ /**
+ * WorkQueue supports multiple producers and multiple consumers. In
+ * the general case it's misleading to test size(), since any other
+ * thread might change it the nanosecond the lock is released. On that
+ * basis, some might argue against publishing a size() method at all.
+ *
+ * But there are two specific cases in which a test based on size()
+ * might be reasonable:
+ *
+ * * If you're the only producer, noticing that size() == 0 is
+ * meaningful.
+ * * If you're the only consumer, noticing that size() > 0 is
+ * meaningful.
+ */
+ size_t size() override;
+ /// producer end: are we prevented from pushing any additional items?
+ bool isClosed() override;
+ /// consumer end: are we done, is the queue entirely drained?
+ bool done() override;
+
+ /*---------------------- fire and forget API -----------------------*/
+
+ /// fire-and-forget
+ void post(const Work&) override;
+
+ /**
+ * post work, unless the queue is closed before we can post
+ */
+ bool postIfOpen(const Work&) override;
+
+ /**
+ * post work, unless the queue is full
+ */
+ bool tryPost(const Work&) override;
+
+ private:
+ using Queue = LLThreadSafeQueue<Work>;
Queue mQueue;
+
+ Work pop_() override;
+ bool tryPop_(Work&) override;
+ };
+
+/*****************************************************************************
+* WorkSchedule: add support for timestamped tasks
+*****************************************************************************/
+ class WorkSchedule: public LLInstanceTrackerSubclass<WorkSchedule, WorkQueueBase>
+ {
+ private:
+ using super = LLInstanceTrackerSubclass<WorkSchedule, WorkQueueBase>;
+ using Queue = ThreadSafeSchedule<Work>;
+ // helper for postEvery()
+ template <typename Rep, typename Period, typename CALLABLE>
+ class BackJack;
+
+ public:
+ using TimePoint = Queue::TimePoint;
+ using TimedWork = Queue::TimeTuple;
+
+ /**
+ * You may omit the WorkSchedule name, in which case a unique name is
+ * synthesized; for practical purposes that makes it anonymous.
+ */
+ WorkSchedule(const std::string& name = std::string(), size_t capacity=1024);
+
+ /**
+ * Since the point of WorkSchedule is to pass work to some other worker
+ * thread(s) asynchronously, it's important that the WorkSchedule continue
+ * to exist until the worker thread(s) have drained it. To communicate
+ * that it's time for them to quit, close() the queue.
+ */
+ void close() override;
+
+ /**
+ * WorkSchedule supports multiple producers and multiple consumers. In
+ * the general case it's misleading to test size(), since any other
+ * thread might change it the nanosecond the lock is released. On that
+ * basis, some might argue against publishing a size() method at all.
+ *
+ * But there are two specific cases in which a test based on size()
+ * might be reasonable:
+ *
+ * * If you're the only producer, noticing that size() == 0 is
+ * meaningful.
+ * * If you're the only consumer, noticing that size() > 0 is
+ * meaningful.
+ */
+ size_t size() override;
+ /// producer end: are we prevented from pushing any additional items?
+ bool isClosed() override;
+ /// consumer end: are we done, is the queue entirely drained?
+ bool done() override;
+
+ /*---------------------- fire and forget API -----------------------*/
+
+ /// fire-and-forget
+ void post(const Work& callable) override;
+
+ /// fire-and-forget, but at a particular (future?) time
+ void post(const Work& callable, const TimePoint& time);
+
+ /**
+ * post work, unless the queue is closed before we can post
+ */
+ bool postIfOpen(const Work& callable) override;
+
+ /**
+ * post work for a particular time, unless the queue is closed before
+ * we can post
+ */
+ bool postIfOpen(const Work& callable, const TimePoint& time);
+
+ /**
+ * post work, unless the queue is full
+ */
+ bool tryPost(const Work& callable) override;
+
+ /**
+ * post work for a particular time, unless the queue is full
+ */
+ bool tryPost(const Work& callable, const TimePoint& time);
+
+ /**
+ * Launch a callable returning bool that will trigger repeatedly at
+ * specified interval, until the callable returns false.
+ *
+ * If you need to signal that callable from outside, DO NOT bind a
+ * reference to a simple bool! That's not thread-safe. Instead, bind
+ * an LLCond variant, e.g. LLOneShotCond or LLBoolCond.
+ */
+ template <typename Rep, typename Period, typename CALLABLE>
+ void postEvery(const std::chrono::duration<Rep, Period>& interval,
+ CALLABLE&& callable);
+
+ private:
+ Queue mQueue;
+
+ Work pop_() override;
+ bool tryPop_(Work&) override;
};
/**
- * BackJack is, in effect, a hand-rolled lambda, binding a WorkQueue, a
+ * BackJack is, in effect, a hand-rolled lambda, binding a WorkSchedule, a
* CALLABLE that returns bool, a TimePoint and an interval at which to
* relaunch it. As long as the callable continues returning true, BackJack
* keeps resubmitting it to the target WorkQueue.
@@ -305,7 +376,7 @@ namespace LL
// class method gets its own 'this' pointer -- which we need to resubmit
// the whole BackJack callable.
template <typename Rep, typename Period, typename CALLABLE>
- class WorkQueue::BackJack
+ class WorkSchedule::BackJack
{
public:
// bind the desired data
@@ -319,9 +390,10 @@ namespace LL
mCallable(std::move(callable))
{}
- // Call by target WorkQueue -- note that although WE require a
- // callable returning bool, WorkQueue wants a void callable. We
- // consume the bool.
+ // This operator() method, called by target WorkSchedule, is what
+ // makes this object a Work item. Although WE require a callable
+ // returning bool, WorkSchedule wants a void callable. We consume the
+ // bool.
void operator()()
{
// If mCallable() throws an exception, don't catch it here: if it
@@ -337,7 +409,7 @@ namespace LL
// register our intent to fire at exact mIntervals.
mStart += mInterval;
- // We're being called at this moment by the target WorkQueue.
+ // We're being called at this moment by the target WorkSchedule.
// Assume it still exists, rather than checking the result of
// lock().
// Resubmit the whole *this callable: that's why we're a class
@@ -347,7 +419,8 @@ namespace LL
// moved-from.
try
{
- mTarget.lock()->post(mStart, std::move(*this));
+ std::dynamic_pointer_cast<WorkSchedule>(mTarget.lock())->
+ post(std::move(*this), mStart);
}
catch (const Closed&)
{
@@ -364,8 +437,8 @@ namespace LL
};
template <typename Rep, typename Period, typename CALLABLE>
- void WorkQueue::postEvery(const std::chrono::duration<Rep, Period>& interval,
- CALLABLE&& callable)
+ void WorkSchedule::postEvery(const std::chrono::duration<Rep, Period>& interval,
+ CALLABLE&& callable)
{
if (interval.count() <= 0)
{
@@ -388,7 +461,7 @@ namespace LL
/// general case: arbitrary C++ return type
template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE>
- struct WorkQueue::MakeReplyLambda
+ struct WorkQueueBase::MakeReplyLambda
{
auto operator()(CALLABLE&& callable, FOLLOWUP&& callback)
{
@@ -409,7 +482,7 @@ namespace LL
/// specialize for CALLABLE returning void
template <typename CALLABLE, typename FOLLOWUP>
- struct WorkQueue::MakeReplyLambda<CALLABLE, FOLLOWUP, void>
+ struct WorkQueueBase::MakeReplyLambda<CALLABLE, FOLLOWUP, void>
{
auto operator()(CALLABLE&& callable, FOLLOWUP&& callback)
{
@@ -421,16 +494,16 @@ namespace LL
};
template <typename CALLABLE, typename FOLLOWUP>
- auto WorkQueue::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback)
+ auto WorkQueueBase::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback)
{
return MakeReplyLambda<CALLABLE, FOLLOWUP,
decltype(std::forward<CALLABLE>(callable)())>()
(std::move(callable), std::move(callback));
}
- template <typename CALLABLE, typename FOLLOWUP>
- bool WorkQueue::postTo(weak_t target,
- const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback)
+ template <typename CALLABLE, typename FOLLOWUP, typename... ARGS>
+ bool WorkQueueBase::postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback,
+ ARGS&&... args)
{
LL_PROFILE_ZONE_SCOPED;
// We're being asked to post to the WorkQueue at target.
@@ -444,12 +517,11 @@ namespace LL
// lambda that packages our callable, our callback and a weak_ptr
// to this originating WorkQueue.
tptr->post(
- time,
[reply = super::getWeak(),
callable = std::move(callable),
callback = std::move(callback)]
- ()
- mutable {
+ () mutable
+ {
// Use postMaybe() below in case this originating WorkQueue
// has been closed or destroyed. Remember, the outer lambda is
// now running on a thread servicing the target WorkQueue, and
@@ -472,14 +544,16 @@ namespace LL
// originating WorkQueue. Once there, rethrow it.
[exc = std::current_exception()](){ std::rethrow_exception(exc); });
}
- });
+ },
+ // if caller passed a TimePoint, pass it along to post()
+ std::forward<ARGS>(args)...);
// looks like we were able to post()
return true;
}
- template <typename CALLABLE>
- bool WorkQueue::postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable)
+ template <typename... ARGS>
+ bool WorkQueueBase::postMaybe(weak_t target, ARGS&&... args)
{
LL_PROFILE_ZONE_SCOPED;
// target is a weak_ptr: have to lock it to check it
@@ -488,7 +562,7 @@ namespace LL
{
try
{
- tptr->post(time, std::forward<CALLABLE>(callable));
+ tptr->post(std::forward<ARGS>(args)...);
// we were able to post()
return true;
}
@@ -503,13 +577,13 @@ namespace LL
/// general case: arbitrary C++ return type
template <typename CALLABLE, typename RETURNTYPE>
- struct WorkQueue::WaitForResult
+ struct WorkQueueBase::WaitForResult
{
- auto operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable)
+ template <typename... ARGS>
+ auto operator()(WorkQueueBase* self, CALLABLE&& callable, ARGS&&... args)
{
LLCoros::Promise<RETURNTYPE> promise;
self->post(
- time,
// We dare to bind a reference to Promise because it's
// specifically designed for cross-thread communication.
[&promise, callable = std::move(callable)]()
@@ -523,7 +597,9 @@ namespace LL
{
promise.set_exception(std::current_exception());
}
- });
+ },
+ // if caller passed a TimePoint, pass it to post()
+ std::forward<ARGS>(args)...);
auto future{ LLCoros::getFuture(promise) };
// now, on the calling thread, wait for that result
LLCoros::TempStatus st("waiting for WorkQueue::waitForResult()");
@@ -533,13 +609,13 @@ namespace LL
/// specialize for CALLABLE returning void
template <typename CALLABLE>
- struct WorkQueue::WaitForResult<CALLABLE, void>
+ struct WorkQueueBase::WaitForResult<CALLABLE, void>
{
- void operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable)
+ template <typename... ARGS>
+ void operator()(WorkQueueBase* self, CALLABLE&& callable, ARGS&&... args)
{
LLCoros::Promise<void> promise;
self->post(
- time,
// &promise is designed for cross-thread access
[&promise, callable = std::move(callable)]()
mutable {
@@ -552,7 +628,9 @@ namespace LL
{
promise.set_exception(std::current_exception());
}
- });
+ },
+ // if caller passed a TimePoint, pass it to post()
+ std::forward<ARGS>(args)...);
auto future{ LLCoros::getFuture(promise) };
// block until set_value()
LLCoros::TempStatus st("waiting for void WorkQueue::waitForResult()");
@@ -560,13 +638,13 @@ namespace LL
}
};
- template <typename CALLABLE>
- auto WorkQueue::waitForResult(const TimePoint& time, CALLABLE&& callable)
+ template <typename CALLABLE, typename... ARGS>
+ auto WorkQueueBase::waitForResult(CALLABLE&& callable, ARGS&&... args)
{
checkCoroutine("waitForResult()");
// derive callable's return type so we can specialize for void
return WaitForResult<CALLABLE, decltype(std::forward<CALLABLE>(callable)())>()
- (this, time, std::forward<CALLABLE>(callable));
+ (this, std::forward<CALLABLE>(callable), std::forward<ARGS>(args)...);
}
} // namespace LL