summaryrefslogtreecommitdiff
path: root/indra
diff options
context:
space:
mode:
authorNat Goodspeed <nat@lindenlab.com>2021-10-25 17:31:27 -0400
committerNat Goodspeed <nat@lindenlab.com>2021-10-25 17:31:27 -0400
commit023d39963e850356e1af6eec7f857e2534ce8d38 (patch)
tree3e813e18493e5d74127e4699bc5cf23746965932 /indra
parente7b8c27741201528bf78f95c96ba820833923dab (diff)
SL-16220: WorkQueue::runOn() methods submit work, wait for result.
The idea is that you can call runOn(target, callable) from a (non-default) coroutine and block that coroutine until the result becomes available. As a safety check, we forbid calling runOn() from a thread's default coroutine, assuming that a given thread's default coroutine is the one servicing the relevant WorkQueue.
Diffstat (limited to 'indra')
-rw-r--r--indra/llcommon/workqueue.cpp15
-rw-r--r--indra/llcommon/workqueue.h150
2 files changed, 154 insertions, 11 deletions
diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp
index 114aeea1f3..f7ffc8233c 100644
--- a/indra/llcommon/workqueue.cpp
+++ b/indra/llcommon/workqueue.cpp
@@ -26,6 +26,11 @@
using Mutex = LLCoros::Mutex;
using Lock = LLCoros::LockType;
+struct NotOnDftCoro: public LLException
+{
+ NotOnDftCoro(const std::string& what): LLException(what) {}
+};
+
LL::WorkQueue::WorkQueue(const std::string& name):
super(makeName(name))
{
@@ -136,3 +141,13 @@ void LL::WorkQueue::error(const std::string& msg)
{
LL_ERRS("WorkQueue") << msg << LL_ENDL;
}
+
+void LL::WorkQueue::checkCoroutine(const std::string& method)
+{
+ // By convention, the default coroutine on each thread has an empty name
+ // string. See also LLCoros::logname().
+ if (LLCoros::getName().empty())
+ {
+ LLTHROW(NotOnDftCoro("Do not call " + method + " from a thread's default coroutine"));
+ }
+}
diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h
index deef3c8e84..b17c666172 100644
--- a/indra/llcommon/workqueue.h
+++ b/indra/llcommon/workqueue.h
@@ -12,11 +12,18 @@
#if ! defined(LL_WORKQUEUE_H)
#define LL_WORKQUEUE_H
+#include "llcoros.h"
#include "llinstancetracker.h"
#include "threadsafeschedule.h"
#include <chrono>
#include <functional> // std::function
-#include <queue>
+#if __cplusplus >= 201703
+#include <optional>
+namespace stdopt = std;
+#else
+#include <boost/optional.hpp>
+namespace stdopt = boost;
+#endif
#include <string>
#include <utility> // std::pair
#include <vector>
@@ -44,6 +51,8 @@ namespace LL
using TimePoint = Queue::TimePoint;
using TimedWork = Queue::TimeTuple;
using Closed = Queue::Closed;
+ template <typename T>
+ using optional = stdopt::optional<T>;
/**
* You may omit the WorkQueue name, in which case a unique name is
@@ -114,7 +123,7 @@ namespace LL
// Studio compile errors that seem utterly unrelated to this source
// code.
template <typename CALLABLE, typename FOLLOWUP>
- bool postTo(WorkQueue::weak_t target,
+ bool postTo(weak_t target,
const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback);
/**
@@ -125,13 +134,62 @@ namespace LL
* inaccessible.
*/
template <typename CALLABLE, typename FOLLOWUP>
- bool postTo(WorkQueue::weak_t target,
- CALLABLE&& callable, FOLLOWUP&& callback)
+ bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback)
{
return postTo(target, TimePoint::clock::now(),
std::move(callable), std::move(callback));
}
+ /**
+ * Post work to another WorkQueue to be run at a specified time,
+ * blocking the calling coroutine until then, returning the result to
+ * caller on completion.
+ *
+ * REQUIRED:
+ *
+ * * The calling thread is the thread servicing 'this' WorkQueue.
+ * * The calling coroutine is not the @em coroutine servicing this
+ * WorkQueue. We block the calling coroutine until the result is
+ * available. If this same coroutine is responsible for checking the
+ * local WorkQueue, the result will never be dequeued. In practice,
+ * to try to prevent mistakes, we forbid calling runOn() from a
+ * thread's default coroutine.
+ *
+ * Returns result if able to post, empty optional if the other
+ * WorkQueue is inaccessible.
+ *
+ * If the passed callable has void return, runOn() returns bool true
+ * if able to post, false if the other WorkQueue is inaccessible.
+ */
+ template <typename CALLABLE>
+ auto runOn(weak_t target, const TimePoint& time, CALLABLE&& callable);
+
+ /**
+ * Post work to another WorkQueue, blocking the calling coroutine
+ * until then, returning the result to caller on completion.
+ *
+ * REQUIRED:
+ *
+ * * The calling thread is the thread servicing 'this' WorkQueue.
+ * * The calling coroutine is not the @em coroutine servicing this
+ * WorkQueue. We block the calling coroutine until the result is
+ * available. If this same coroutine is responsible for checking the
+ * local WorkQueue, the result will never be dequeued. In practice,
+ * to try to prevent mistakes, we forbid calling runOn() from a
+ * thread's default coroutine.
+ *
+ * Returns result if able to post, empty optional if the other
+ * WorkQueue is inaccessible.
+ *
+ * If the passed callable has void return, runOn() returns bool true
+ * if able to post, false if the other WorkQueue is inaccessible.
+ */
+ template <typename CALLABLE>
+ auto runOn(weak_t target, CALLABLE&& callable)
+ {
+ return runOn(target, TimePoint::clock::now(), std::move(callable));
+ }
+
/*--------------------------- worker API ---------------------------*/
/**
@@ -179,15 +237,21 @@ namespace LL
private:
template <typename CALLABLE, typename FOLLOWUP>
static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback);
-
/// general case: arbitrary C++ return type
template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE>
struct MakeReplyLambda;
-
/// specialize for CALLABLE returning void
template <typename CALLABLE, typename FOLLOWUP>
struct MakeReplyLambda<CALLABLE, FOLLOWUP, void>;
+ /// general case: arbitrary C++ return type
+ template <typename CALLABLE, typename RETURNTYPE>
+ struct RunOn;
+ /// specialize for CALLABLE returning void
+ template <typename CALLABLE>
+ struct RunOn<CALLABLE, void>;
+
+ static void checkCoroutine(const std::string& method);
static void error(const std::string& msg);
static std::string makeName(const std::string& name);
void callWork(const Queue::DataTuple& work);
@@ -209,8 +273,8 @@ namespace LL
{
public:
// bind the desired data
- BackJack(WorkQueue::weak_t target,
- const WorkQueue::TimePoint& start,
+ BackJack(weak_t target,
+ const TimePoint& start,
const std::chrono::duration<Rep, Period>& interval,
CALLABLE&& callable):
mTarget(target),
@@ -257,8 +321,8 @@ namespace LL
}
private:
- WorkQueue::weak_t mTarget;
- WorkQueue::TimePoint mStart;
+ weak_t mTarget;
+ TimePoint mStart;
std::chrono::duration<Rep, Period> mInterval;
CALLABLE mCallable;
};
@@ -286,6 +350,7 @@ namespace LL
getWeak(), TimePoint::clock::now(), interval, std::move(callable)));
}
+ /// general case: arbitrary C++ return type
template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE>
struct WorkQueue::MakeReplyLambda
{
@@ -332,7 +397,7 @@ namespace LL
}
template <typename CALLABLE, typename FOLLOWUP>
- bool WorkQueue::postTo(WorkQueue::weak_t target,
+ bool WorkQueue::postTo(weak_t target,
const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback)
{
// We're being asked to post to the WorkQueue at target.
@@ -382,6 +447,69 @@ namespace LL
return true;
}
+ /// general case: arbitrary C++ return type
+ template <typename CALLABLE, typename RETURNTYPE>
+ struct WorkQueue::RunOn
+ {
+ optional<RETURNTYPE> operator()(WorkQueue* self, weak_t target,
+ const TimePoint& time, CALLABLE&& callable)
+ {
+ LLCoros::Promise<RETURNTYPE> promise;
+ if (! self->postTo(
+ target,
+ time,
+ std::forward<CALLABLE>(callable),
+ // We dare to bind a reference to Promise because it's
+ // specifically intended for cross-thread synchronization.
+ [&promise]
+ (RETURNTYPE&& result)
+ {
+ promise.set_value(std::forward<RETURNTYPE>(result));
+ }))
+ {
+ // we couldn't even postTo(): return empty optional
+ return {};
+ }
+ // we were able to post
+ auto future{ LLCoros::getFuture(promise) };
+ return { future.get(); }
+ }
+ };
+
+ /// specialize for CALLABLE returning void
+ template <typename CALLABLE>
+ struct WorkQueue::RunOn<CALLABLE, void>
+ {
+ bool operator()(WorkQueue* self, weak_t target,
+ const TimePoint& time, CALLABLE&& callable)
+ {
+ LLCoros::Promise<void> promise;
+ if (! self->postTo(
+ target,
+ time,
+ std::forward<CALLABLE>(callable),
+ // &promise is designed for cross-thread access
+ [&promise](){ promise.set_value(); }))
+ {
+ // we couldn't postTo()
+ return false;
+ }
+ // we were able to post
+ auto future{ LLCoros::getFuture(promise) };
+ // block until set_value()
+ future.get();
+ return true;
+ }
+ };
+
+ template <typename CALLABLE>
+ auto WorkQueue::runOn(weak_t target, const TimePoint& time, CALLABLE&& callable)
+ {
+ checkCoroutine("runOn()");
+ return RunOn<CALLABLE, decltype(std::forward<CALLABLE>(callable)())>()
+ (this, target, time, std::forward<CALLABLE>(callable));
+ }
+
} // namespace LL
#endif /* ! defined(LL_WORKQUEUE_H) */