summaryrefslogtreecommitdiff
path: root/indra/llcommon/workqueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llcommon/workqueue.h')
-rw-r--r--indra/llcommon/workqueue.h381
1 files changed, 310 insertions, 71 deletions
diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h
index 5ec790da79..96574a18b9 100644
--- a/indra/llcommon/workqueue.h
+++ b/indra/llcommon/workqueue.h
@@ -12,14 +12,14 @@
#if ! defined(LL_WORKQUEUE_H)
#define LL_WORKQUEUE_H
+#include "llcoros.h"
+#include "llexception.h"
#include "llinstancetracker.h"
#include "threadsafeschedule.h"
#include <chrono>
+#include <exception> // std::current_exception
#include <functional> // std::function
-#include <queue>
#include <string>
-#include <utility> // std::pair
-#include <vector>
namespace LL
{
@@ -45,11 +45,16 @@ namespace LL
using TimedWork = Queue::TimeTuple;
using Closed = Queue::Closed;
+ struct Error: public LLException
+ {
+ Error(const std::string& what): LLException(what) {}
+ };
+
/**
* You may omit the WorkQueue name, in which case a unique name is
* synthesized; for practical purposes that makes it anonymous.
*/
- WorkQueue(const std::string& name = std::string());
+ WorkQueue(const std::string& name = std::string(), size_t capacity=1024);
/**
* Since the point of WorkQueue is to pass work to some other worker
@@ -59,15 +64,36 @@ namespace LL
*/
void close();
+ /**
+ * WorkQueue supports multiple producers and multiple consumers. In
+ * the general case it's misleading to test size(), since any other
+ * thread might change it the nanosecond the lock is released. On that
+ * basis, some might argue against publishing a size() method at all.
+ *
+ * But there are two specific cases in which a test based on size()
+ * might be reasonable:
+ *
+ * * If you're the only producer, noticing that size() == 0 is
+ * meaningful.
+ * * If you're the only consumer, noticing that size() > 0 is
+ * meaningful.
+ */
+ size_t size();
+ /// producer end: are we prevented from pushing any additional items?
+ bool isClosed();
+ /// consumer end: are we done, is the queue entirely drained?
+ bool done();
+
/*---------------------- fire and forget API -----------------------*/
/// fire-and-forget, but at a particular (future?) time
template <typename CALLABLE>
void post(const TimePoint& time, CALLABLE&& callable)
{
- // Defer reifying an arbitrary CALLABLE until we hit this method.
- // All other methods should accept CALLABLEs of arbitrary type to
- // avoid multiple levels of std::function indirection.
+ // Defer reifying an arbitrary CALLABLE until we hit this or
+ // postIfOpen(). All other methods should accept CALLABLEs of
+ // arbitrary type to avoid multiple levels of std::function
+ // indirection.
mQueue.push(TimedWork(time, std::move(callable)));
}
@@ -83,6 +109,47 @@ namespace LL
}
/**
+ * post work for a particular time, unless the queue is closed before
+ * we can post
+ */
+ template <typename CALLABLE>
+ bool postIfOpen(const TimePoint& time, CALLABLE&& callable)
+ {
+ // Defer reifying an arbitrary CALLABLE until we hit this or
+ // post(). All other methods should accept CALLABLEs of arbitrary
+ // type to avoid multiple levels of std::function indirection.
+ return mQueue.pushIfOpen(TimedWork(time, std::move(callable)));
+ }
+
+ /**
+ * post work, unless the queue is closed before we can post
+ */
+ template <typename CALLABLE>
+ bool postIfOpen(CALLABLE&& callable)
+ {
+ return postIfOpen(TimePoint::clock::now(), std::move(callable));
+ }
+
+ /**
+ * Post work to be run at a specified time to another WorkQueue, which
+ * may or may not still exist and be open. Return true if we were able
+ * to post.
+ */
+ template <typename CALLABLE>
+ static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable);
+
+ /**
+ * Post work to another WorkQueue, which may or may not still exist
+ * and be open. Return true if we were able to post.
+ */
+ template <typename CALLABLE>
+ static bool postMaybe(weak_t target, CALLABLE&& callable)
+ {
+ return postMaybe(target, TimePoint::clock::now(),
+ std::forward<CALLABLE>(callable));
+ }
+
+ /**
* Launch a callable returning bool that will trigger repeatedly at
* specified interval, until the callable returns false.
*
@@ -115,63 +182,8 @@ namespace LL
// Studio compile errors that seem utterly unrelated to this source
// code.
template <typename CALLABLE, typename FOLLOWUP>
- bool postTo(WorkQueue::weak_t target,
- const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback)
- {
- // We're being asked to post to the WorkQueue at target.
- // target is a weak_ptr: have to lock it to check it.
- auto tptr = target.lock();
- if (! tptr)
- // can't post() if the target WorkQueue has been destroyed
- return false;
-
- // Here we believe target WorkQueue still exists. Post to it a
- // lambda that packages our callable, our callback and a weak_ptr
- // to this originating WorkQueue.
- tptr->post(
- time,
- [reply = super::getWeak(),
- callable = std::move(callable),
- callback = std::move(callback)]
- ()
- {
- // Call the callable in any case -- but to minimize
- // copying the result, immediately bind it into a reply
- // lambda. The reply lambda also binds the original
- // callback, so that when we, the originating WorkQueue,
- // finally receive and process the reply lambda, we'll
- // call the bound callback with the bound result -- on the
- // same thread that originally called postTo().
- auto rlambda =
- [result = callable(),
- callback = std::move(callback)]
- ()
- { callback(std::move(result)); };
- // Check if this originating WorkQueue still exists.
- // Remember, the outer lambda is now running on a thread
- // servicing the target WorkQueue, and real time has
- // elapsed since postTo()'s tptr->post() call.
- // reply is a weak_ptr: have to lock it to check it.
- auto rptr = reply.lock();
- if (rptr)
- {
- // Only post reply lambda if the originating WorkQueue
- // still exists. If not -- who would we tell? Log it?
- try
- {
- rptr->post(std::move(rlambda));
- }
- catch (const Closed&)
- {
- // Originating WorkQueue might still exist, but
- // might be Closed. Same thing: just discard the
- // callback.
- }
- }
- });
- // looks like we were able to post()
- return true;
- }
+ bool postTo(weak_t target,
+ const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback);
/**
* Post work to another WorkQueue, requesting a specific callback to
@@ -181,10 +193,36 @@ namespace LL
* inaccessible.
*/
template <typename CALLABLE, typename FOLLOWUP>
- bool postTo(WorkQueue::weak_t target,
- CALLABLE&& callable, FOLLOWUP&& callback)
+ bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback)
{
- return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback));
+ return postTo(target, TimePoint::clock::now(),
+ std::move(callable), std::move(callback));
+ }
+
+ /**
+ * Post work to another WorkQueue to be run at a specified time,
+ * blocking the calling coroutine until then, returning the result to
+ * caller on completion.
+ *
+ * In general, we assume that each thread's default coroutine is busy
+ * servicing its WorkQueue or whatever. To try to prevent mistakes, we
+ * forbid calling waitForResult() from a thread's default coroutine.
+ */
+ template <typename CALLABLE>
+ auto waitForResult(const TimePoint& time, CALLABLE&& callable);
+
+ /**
+ * Post work to another WorkQueue, blocking the calling coroutine
+ * until then, returning the result to caller on completion.
+ *
+ * In general, we assume that each thread's default coroutine is busy
+ * servicing its WorkQueue or whatever. To try to prevent mistakes, we
+ * forbid calling waitForResult() from a thread's default coroutine.
+ */
+ template <typename CALLABLE>
+ auto waitForResult(CALLABLE&& callable)
+ {
+ return waitForResult(TimePoint::clock::now(), std::move(callable));
}
/*--------------------------- worker API ---------------------------*/
@@ -222,6 +260,7 @@ namespace LL
template <typename Rep, typename Period>
bool runFor(const std::chrono::duration<Rep, Period>& timeslice)
{
+ LL_PROFILE_ZONE_SCOPED;
return runUntil(TimePoint::clock::now() + timeslice);
}
@@ -232,6 +271,23 @@ namespace LL
bool runUntil(const TimePoint& until);
private:
+ template <typename CALLABLE, typename FOLLOWUP>
+ static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback);
+ /// general case: arbitrary C++ return type
+ template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE>
+ struct MakeReplyLambda;
+ /// specialize for CALLABLE returning void
+ template <typename CALLABLE, typename FOLLOWUP>
+ struct MakeReplyLambda<CALLABLE, FOLLOWUP, void>;
+
+ /// general case: arbitrary C++ return type
+ template <typename CALLABLE, typename RETURNTYPE>
+ struct WaitForResult;
+ /// specialize for CALLABLE returning void
+ template <typename CALLABLE>
+ struct WaitForResult<CALLABLE, void>;
+
+ static void checkCoroutine(const std::string& method);
static void error(const std::string& msg);
static std::string makeName(const std::string& name);
void callWork(const Queue::DataTuple& work);
@@ -253,8 +309,8 @@ namespace LL
{
public:
// bind the desired data
- BackJack(WorkQueue::weak_t target,
- const WorkQueue::TimePoint& start,
+ BackJack(weak_t target,
+ const TimePoint& start,
const std::chrono::duration<Rep, Period>& interval,
CALLABLE&& callable):
mTarget(target),
@@ -301,8 +357,8 @@ namespace LL
}
private:
- WorkQueue::weak_t mTarget;
- WorkQueue::TimePoint mStart;
+ weak_t mTarget;
+ TimePoint mStart;
std::chrono::duration<Rep, Period> mInterval;
CALLABLE mCallable;
};
@@ -330,6 +386,189 @@ namespace LL
getWeak(), TimePoint::clock::now(), interval, std::move(callable)));
}
+ /// general case: arbitrary C++ return type
+ template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE>
+ struct WorkQueue::MakeReplyLambda
+ {
+ auto operator()(CALLABLE&& callable, FOLLOWUP&& callback)
+ {
+ // Call the callable in any case -- but to minimize
+ // copying the result, immediately bind it into the reply
+ // lambda. The reply lambda also binds the original
+ // callback, so that when we, the originating WorkQueue,
+ // finally receive and process the reply lambda, we'll
+ // call the bound callback with the bound result -- on the
+ // same thread that originally called postTo().
+ return
+ [result = std::forward<CALLABLE>(callable)(),
+ callback = std::move(callback)]
+ ()
+ { callback(std::move(result)); };
+ }
+ };
+
+ /// specialize for CALLABLE returning void
+ template <typename CALLABLE, typename FOLLOWUP>
+ struct WorkQueue::MakeReplyLambda<CALLABLE, FOLLOWUP, void>
+ {
+ auto operator()(CALLABLE&& callable, FOLLOWUP&& callback)
+ {
+ // Call the callable, which produces no result.
+ std::forward<CALLABLE>(callable)();
+ // Our completion callback is simply the caller's callback.
+ return std::move(callback);
+ }
+ };
+
+ template <typename CALLABLE, typename FOLLOWUP>
+ auto WorkQueue::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback)
+ {
+ return MakeReplyLambda<CALLABLE, FOLLOWUP,
+ decltype(std::forward<CALLABLE>(callable)())>()
+ (std::move(callable), std::move(callback));
+ }
+
+ template <typename CALLABLE, typename FOLLOWUP>
+ bool WorkQueue::postTo(weak_t target,
+ const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback)
+ {
+ LL_PROFILE_ZONE_SCOPED;
+ // We're being asked to post to the WorkQueue at target.
+ // target is a weak_ptr: have to lock it to check it.
+ auto tptr = target.lock();
+ if (! tptr)
+ // can't post() if the target WorkQueue has been destroyed
+ return false;
+
+ // Here we believe target WorkQueue still exists. Post to it a
+ // lambda that packages our callable, our callback and a weak_ptr
+ // to this originating WorkQueue.
+ tptr->post(
+ time,
+ [reply = super::getWeak(),
+ callable = std::move(callable),
+ callback = std::move(callback)]
+ ()
+ {
+ // Use postMaybe() below in case this originating WorkQueue
+ // has been closed or destroyed. Remember, the outer lambda is
+ // now running on a thread servicing the target WorkQueue, and
+ // real time has elapsed since postTo()'s tptr->post() call.
+ try
+ {
+ // Make a reply lambda to repost to THIS WorkQueue.
+ // Delegate to makeReplyLambda() so we can partially
+ // specialize on void return.
+ postMaybe(reply, makeReplyLambda(std::move(callable), std::move(callback)));
+ }
+ catch (...)
+ {
+ // Either variant of makeReplyLambda() is responsible for
+ // calling the caller's callable. If that throws, return
+ // the exception to the originating thread.
+ postMaybe(
+ reply,
+ // Bind the current exception to transport back to the
+ // originating WorkQueue. Once there, rethrow it.
+ [exc = std::current_exception()](){ std::rethrow_exception(exc); });
+ }
+ });
+
+ // looks like we were able to post()
+ return true;
+ }
+
+ template <typename CALLABLE>
+ bool WorkQueue::postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable)
+ {
+ LL_PROFILE_ZONE_SCOPED;
+ // target is a weak_ptr: have to lock it to check it
+ auto tptr = target.lock();
+ if (tptr)
+ {
+ try
+ {
+ tptr->post(time, std::forward<CALLABLE>(callable));
+ // we were able to post()
+ return true;
+ }
+ catch (const Closed&)
+ {
+ // target WorkQueue still exists, but is Closed
+ }
+ }
+ // either target no longer exists, or its WorkQueue is Closed
+ return false;
+ }
+
+ /// general case: arbitrary C++ return type
+ template <typename CALLABLE, typename RETURNTYPE>
+ struct WorkQueue::WaitForResult
+ {
+ auto operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable)
+ {
+ LLCoros::Promise<RETURNTYPE> promise;
+ self->post(
+ time,
+ // We dare to bind a reference to Promise because it's
+ // specifically designed for cross-thread communication.
+ [&promise, callable = std::move(callable)]()
+ {
+ try
+ {
+ // call the caller's callable and trigger promise with result
+ promise.set_value(callable());
+ }
+ catch (...)
+ {
+ promise.set_exception(std::current_exception());
+ }
+ });
+ auto future{ LLCoros::getFuture(promise) };
+ // now, on the calling thread, wait for that result
+ LLCoros::TempStatus st("waiting for WorkQueue::waitForResult()");
+ return future.get();
+ }
+ };
+
+ /// specialize for CALLABLE returning void
+ template <typename CALLABLE>
+ struct WorkQueue::WaitForResult<CALLABLE, void>
+ {
+ void operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable)
+ {
+ LLCoros::Promise<void> promise;
+ self->post(
+ time,
+ // &promise is designed for cross-thread access
+ [&promise, callable = std::move(callable)]()
+ {
+ try
+ {
+ callable();
+ promise.set_value();
+ }
+ catch (...)
+ {
+ promise.set_exception(std::current_exception());
+ }
+ });
+ auto future{ LLCoros::getFuture(promise) };
+ // block until set_value()
+ LLCoros::TempStatus st("waiting for void WorkQueue::waitForResult()");
+ future.get();
+ }
+ };
+
+ template <typename CALLABLE>
+ auto WorkQueue::waitForResult(const TimePoint& time, CALLABLE&& callable)
+ {
+ checkCoroutine("waitForResult()");
+ // derive callable's return type so we can specialize for void
+ return WaitForResult<CALLABLE, decltype(std::forward<CALLABLE>(callable)())>()
+ (this, time, std::forward<CALLABLE>(callable));
+ }
+
} // namespace LL
#endif /* ! defined(LL_WORKQUEUE_H) */