diff options
| -rw-r--r-- | indra/llcommon/workqueue.cpp | 15 | ||||
| -rw-r--r-- | indra/llcommon/workqueue.h | 150 | 
2 files changed, 154 insertions, 11 deletions
diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index 114aeea1f3..f7ffc8233c 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -26,6 +26,11 @@  using Mutex = LLCoros::Mutex;  using Lock  = LLCoros::LockType; +struct NotOnDftCoro: public LLException +{ +    NotOnDftCoro(const std::string& what): LLException(what) {} +}; +  LL::WorkQueue::WorkQueue(const std::string& name):      super(makeName(name))  { @@ -136,3 +141,13 @@ void LL::WorkQueue::error(const std::string& msg)  {      LL_ERRS("WorkQueue") << msg << LL_ENDL;  } + +void LL::WorkQueue::checkCoroutine(const std::string& method) +{ +    // By convention, the default coroutine on each thread has an empty name +    // string. See also LLCoros::logname(). +    if (LLCoros::getName().empty()) +    { +        LLTHROW(NotOnDftCoro("Do not call " + method + " from a thread's default coroutine")); +    } +} diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index deef3c8e84..b17c666172 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -12,11 +12,18 @@  #if ! defined(LL_WORKQUEUE_H)  #define LL_WORKQUEUE_H +#include "llcoros.h"  #include "llinstancetracker.h"  #include "threadsafeschedule.h"  #include <chrono>  #include <functional>               // std::function -#include <queue> +#if __cplusplus >= 201703 +#include <optional> +namespace stdopt = std; +#else +#include <boost/optional.hpp> +namespace stdopt = boost; +#endif  #include <string>  #include <utility>                  // std::pair  #include <vector> @@ -44,6 +51,8 @@ namespace LL          using TimePoint = Queue::TimePoint;          using TimedWork = Queue::TimeTuple;          using Closed    = Queue::Closed; +        template <typename T> +        using optional  = stdopt::optional<T>;          /**           * You may omit the WorkQueue name, in which case a unique name is @@ -114,7 +123,7 @@ namespace LL          // Studio compile errors that seem utterly unrelated to this source          // code.          template <typename CALLABLE, typename FOLLOWUP> -        bool postTo(WorkQueue::weak_t target, +        bool postTo(weak_t target,                      const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback);          /** @@ -125,13 +134,62 @@ namespace LL           * inaccessible.           */          template <typename CALLABLE, typename FOLLOWUP> -        bool postTo(WorkQueue::weak_t target, -                    CALLABLE&& callable, FOLLOWUP&& callback) +        bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback)          {              return postTo(target, TimePoint::clock::now(),                            std::move(callable), std::move(callback));          } +        /** +         * Post work to another WorkQueue to be run at a specified time, +         * blocking the calling coroutine until then, returning the result to +         * caller on completion. +         * +         * REQUIRED: +         * +         * * The calling thread is the thread servicing 'this' WorkQueue. +         * * The calling coroutine is not the @em coroutine servicing this +         *   WorkQueue. We block the calling coroutine until the result is +         *   available. If this same coroutine is responsible for checking the +         *   local WorkQueue, the result will never be dequeued. In practice, +         *   to try to prevent mistakes, we forbid calling runOn() from a +         *   thread's default coroutine. +         * +         * Returns result if able to post, empty optional if the other +         * WorkQueue is inaccessible. +         * +         * If the passed callable has void return, runOn() returns bool true +         * if able to post, false if the other WorkQueue is inaccessible. +         */ +        template <typename CALLABLE> +        auto runOn(weak_t target, const TimePoint& time, CALLABLE&& callable); + +        /** +         * Post work to another WorkQueue, blocking the calling coroutine +         * until then, returning the result to caller on completion. +         * +         * REQUIRED: +         * +         * * The calling thread is the thread servicing 'this' WorkQueue. +         * * The calling coroutine is not the @em coroutine servicing this +         *   WorkQueue. We block the calling coroutine until the result is +         *   available. If this same coroutine is responsible for checking the +         *   local WorkQueue, the result will never be dequeued. In practice, +         *   to try to prevent mistakes, we forbid calling runOn() from a +         *   thread's default coroutine. +         * +         * Returns result if able to post, empty optional if the other +         * WorkQueue is inaccessible. +         * +         * If the passed callable has void return, runOn() returns bool true +         * if able to post, false if the other WorkQueue is inaccessible. +         */ +        template <typename CALLABLE> +        auto runOn(weak_t target, CALLABLE&& callable) +        { +            return runOn(target, TimePoint::clock::now(), std::move(callable)); +        } +          /*--------------------------- worker API ---------------------------*/          /** @@ -179,15 +237,21 @@ namespace LL      private:          template <typename CALLABLE, typename FOLLOWUP>          static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback); -          /// general case: arbitrary C++ return type          template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE>          struct MakeReplyLambda; -          /// specialize for CALLABLE returning void          template <typename CALLABLE, typename FOLLOWUP>          struct MakeReplyLambda<CALLABLE, FOLLOWUP, void>; +        /// general case: arbitrary C++ return type +        template <typename CALLABLE, typename RETURNTYPE> +        struct RunOn; +        /// specialize for CALLABLE returning void +        template <typename CALLABLE> +        struct RunOn<CALLABLE, void>; + +        static void checkCoroutine(const std::string& method);          static void error(const std::string& msg);          static std::string makeName(const std::string& name);          void callWork(const Queue::DataTuple& work); @@ -209,8 +273,8 @@ namespace LL      {      public:          // bind the desired data -        BackJack(WorkQueue::weak_t target, -                 const WorkQueue::TimePoint& start, +        BackJack(weak_t target, +                 const TimePoint& start,                   const std::chrono::duration<Rep, Period>& interval,                   CALLABLE&& callable):              mTarget(target), @@ -257,8 +321,8 @@ namespace LL          }      private: -        WorkQueue::weak_t mTarget; -        WorkQueue::TimePoint mStart; +        weak_t mTarget; +        TimePoint mStart;          std::chrono::duration<Rep, Period> mInterval;          CALLABLE mCallable;      }; @@ -286,6 +350,7 @@ namespace LL                   getWeak(), TimePoint::clock::now(), interval, std::move(callable)));      } +    /// general case: arbitrary C++ return type      template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE>      struct WorkQueue::MakeReplyLambda      { @@ -332,7 +397,7 @@ namespace LL      }      template <typename CALLABLE, typename FOLLOWUP> -    bool WorkQueue::postTo(WorkQueue::weak_t target, +    bool WorkQueue::postTo(weak_t target,                             const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback)      {          // We're being asked to post to the WorkQueue at target. @@ -382,6 +447,69 @@ namespace LL          return true;      } +    /// general case: arbitrary C++ return type +    template <typename CALLABLE, typename RETURNTYPE> +    struct WorkQueue::RunOn +    { +        optional<RETURNTYPE> operator()(WorkQueue* self, weak_t target, +                                        const TimePoint& time, CALLABLE&& callable) +        { +            LLCoros::Promise<RETURNTYPE> promise; +            if (! self->postTo( +                    target, +                    time, +                    std::forward<CALLABLE>(callable), +                    // We dare to bind a reference to Promise because it's +                    // specifically intended for cross-thread synchronization. +                    [&promise] +                    (RETURNTYPE&& result) +                    { +                        promise.set_value(std::forward<RETURNTYPE>(result)); +                    })) +            { +                // we couldn't even postTo(): return empty optional +                return {}; +            } +            // we were able to post +            auto future{ LLCoros::getFuture(promise) }; +            return { future.get(); } +        } +    }; + +    /// specialize for CALLABLE returning void +    template <typename CALLABLE> +    struct WorkQueue::RunOn<CALLABLE, void> +    { +        bool operator()(WorkQueue* self, weak_t target, +                        const TimePoint& time, CALLABLE&& callable) +        { +            LLCoros::Promise<void> promise; +            if (! self->postTo( +                    target, +                    time, +                    std::forward<CALLABLE>(callable), +                    // &promise is designed for cross-thread access +                    [&promise](){ promise.set_value(); })) +            { +                // we couldn't postTo() +                return false; +            } +            // we were able to post +            auto future{ LLCoros::getFuture(promise) }; +            // block until set_value() +            future.get(); +            return true; +        } +    }; + +    template <typename CALLABLE> +    auto WorkQueue::runOn(weak_t target, const TimePoint& time, CALLABLE&& callable) +    { +        checkCoroutine("runOn()"); +        return RunOn<CALLABLE, decltype(std::forward<CALLABLE>(callable)())>() +            (this, target, time, std::forward<CALLABLE>(callable)); +    } +  } // namespace LL  #endif /* ! defined(LL_WORKQUEUE_H) */  | 
