diff options
| -rw-r--r-- | indra/llcommon/workqueue.cpp | 7 | ||||
| -rw-r--r-- | indra/llcommon/workqueue.h | 145 | 
2 files changed, 62 insertions, 90 deletions
diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index f7ffc8233c..ac3086aac5 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -26,11 +26,6 @@  using Mutex = LLCoros::Mutex;  using Lock  = LLCoros::LockType; -struct NotOnDftCoro: public LLException -{ -    NotOnDftCoro(const std::string& what): LLException(what) {} -}; -  LL::WorkQueue::WorkQueue(const std::string& name):      super(makeName(name))  { @@ -148,6 +143,6 @@ void LL::WorkQueue::checkCoroutine(const std::string& method)      // string. See also LLCoros::logname().      if (LLCoros::getName().empty())      { -        LLTHROW(NotOnDftCoro("Do not call " + method + " from a thread's default coroutine")); +        LLTHROW(Error("Do not call " + method + " from a thread's default coroutine"));      }  } diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index b17c666172..869f5d9a82 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -13,20 +13,13 @@  #define LL_WORKQUEUE_H  #include "llcoros.h" +#include "llexception.h"  #include "llinstancetracker.h"  #include "threadsafeschedule.h"  #include <chrono> +#include <exception>                // std::current_exception  #include <functional>               // std::function -#if __cplusplus >= 201703 -#include <optional> -namespace stdopt = std; -#else -#include <boost/optional.hpp> -namespace stdopt = boost; -#endif  #include <string> -#include <utility>                  // std::pair -#include <vector>  namespace LL  { @@ -51,8 +44,11 @@ namespace LL          using TimePoint = Queue::TimePoint;          using TimedWork = Queue::TimeTuple;          using Closed    = Queue::Closed; -        template <typename T> -        using optional  = stdopt::optional<T>; + +        struct Error: public LLException +        { +            Error(const std::string& what): LLException(what) {} +        };          /**           * You may omit the WorkQueue name, in which case a unique name is @@ -145,49 +141,25 @@ namespace LL           * blocking the calling coroutine until then, returning the result to           * caller on completion.           * -         * REQUIRED: -         * -         * * The calling thread is the thread servicing 'this' WorkQueue. -         * * The calling coroutine is not the @em coroutine servicing this -         *   WorkQueue. We block the calling coroutine until the result is -         *   available. If this same coroutine is responsible for checking the -         *   local WorkQueue, the result will never be dequeued. In practice, -         *   to try to prevent mistakes, we forbid calling runOn() from a -         *   thread's default coroutine. -         * -         * Returns result if able to post, empty optional if the other -         * WorkQueue is inaccessible. -         * -         * If the passed callable has void return, runOn() returns bool true -         * if able to post, false if the other WorkQueue is inaccessible. +         * In general, we assume that each thread's default coroutine is busy +         * servicing its WorkQueue or whatever. To try to prevent mistakes, we +         * forbid calling waitForResult() from a thread's default coroutine.           */          template <typename CALLABLE> -        auto runOn(weak_t target, const TimePoint& time, CALLABLE&& callable); +        auto waitForResult(const TimePoint& time, CALLABLE&& callable);          /**           * Post work to another WorkQueue, blocking the calling coroutine           * until then, returning the result to caller on completion.           * -         * REQUIRED: -         * -         * * The calling thread is the thread servicing 'this' WorkQueue. -         * * The calling coroutine is not the @em coroutine servicing this -         *   WorkQueue. We block the calling coroutine until the result is -         *   available. If this same coroutine is responsible for checking the -         *   local WorkQueue, the result will never be dequeued. In practice, -         *   to try to prevent mistakes, we forbid calling runOn() from a -         *   thread's default coroutine. -         * -         * Returns result if able to post, empty optional if the other -         * WorkQueue is inaccessible. -         * -         * If the passed callable has void return, runOn() returns bool true -         * if able to post, false if the other WorkQueue is inaccessible. +         * In general, we assume that each thread's default coroutine is busy +         * servicing its WorkQueue or whatever. To try to prevent mistakes, we +         * forbid calling waitForResult() from a thread's default coroutine.           */          template <typename CALLABLE> -        auto runOn(weak_t target, CALLABLE&& callable) +        auto waitForResult(CALLABLE&& callable)          { -            return runOn(target, TimePoint::clock::now(), std::move(callable)); +            return waitForResult(TimePoint::clock::now(), std::move(callable));          }          /*--------------------------- worker API ---------------------------*/ @@ -246,10 +218,10 @@ namespace LL          /// general case: arbitrary C++ return type          template <typename CALLABLE, typename RETURNTYPE> -        struct RunOn; +        struct WaitForResult;          /// specialize for CALLABLE returning void          template <typename CALLABLE> -        struct RunOn<CALLABLE, void>; +        struct WaitForResult<CALLABLE, void>;          static void checkCoroutine(const std::string& method);          static void error(const std::string& msg); @@ -449,65 +421,70 @@ namespace LL      /// general case: arbitrary C++ return type      template <typename CALLABLE, typename RETURNTYPE> -    struct WorkQueue::RunOn +    struct WorkQueue::WaitForResult      { -        optional<RETURNTYPE> operator()(WorkQueue* self, weak_t target, -                                        const TimePoint& time, CALLABLE&& callable) +        auto operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable)          {              LLCoros::Promise<RETURNTYPE> promise; -            if (! self->postTo( -                    target, -                    time, -                    std::forward<CALLABLE>(callable), -                    // We dare to bind a reference to Promise because it's -                    // specifically intended for cross-thread synchronization. -                    [&promise] -                    (RETURNTYPE&& result) +            self->post( +                time, +                // We dare to bind a reference to Promise because it's +                // specifically designed for cross-thread communication. +                [&promise, callable = std::move(callable)]() +                { +                    try                      { -                        promise.set_value(std::forward<RETURNTYPE>(result)); -                    })) -            { -                // we couldn't even postTo(): return empty optional -                return {}; -            } -            // we were able to post +                        // call the caller's callable and trigger promise with result +                        promise.set_value(callable()); +                    } +                    catch (...) +                    { +                        promise.set_exception(std::current_exception()); +                    } +                });              auto future{ LLCoros::getFuture(promise) }; -            return { future.get(); } +            // now, on the calling thread, wait for that result +            LLCoros::TempStatus st("waiting for WorkQueue::waitForResult()"); +            return future.get();          }      };      /// specialize for CALLABLE returning void      template <typename CALLABLE> -    struct WorkQueue::RunOn<CALLABLE, void> +    struct WorkQueue::WaitForResult<CALLABLE, void>      { -        bool operator()(WorkQueue* self, weak_t target, -                        const TimePoint& time, CALLABLE&& callable) +        void operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable)          {              LLCoros::Promise<void> promise; -            if (! self->postTo( -                    target, -                    time, -                    std::forward<CALLABLE>(callable), -                    // &promise is designed for cross-thread access -                    [&promise](){ promise.set_value(); })) -            { -                // we couldn't postTo() -                return false; -            } -            // we were able to post +            self->post( +                time, +                // &promise is designed for cross-thread access +                [&promise, callable = std::move(callable)]() +                { +                    try +                    { +                        callable(); +                        promise.set_value(); +                    } +                    catch (...) +                    { +                        promise.set_exception(std::current_exception()); +                    } +                });              auto future{ LLCoros::getFuture(promise) };              // block until set_value() +            LLCoros::TempStatus st("waiting for void WorkQueue::waitForResult()");              future.get(); -            return true;          }      };      template <typename CALLABLE> -    auto WorkQueue::runOn(weak_t target, const TimePoint& time, CALLABLE&& callable) +    auto WorkQueue::waitForResult(const TimePoint& time, CALLABLE&& callable)      { -        checkCoroutine("runOn()"); -        return RunOn<CALLABLE, decltype(std::forward<CALLABLE>(callable)())>() -            (this, target, time, std::forward<CALLABLE>(callable)); +        checkCoroutine("waitForResult()"); +        // derive callable's return type so we can specialize for void +        return WaitForResult<CALLABLE, decltype(std::forward<CALLABLE>(callable)())>() +            (this, time, std::forward<CALLABLE>(callable));      }  } // namespace LL  | 
