diff options
Diffstat (limited to 'indra/llcommon')
| -rw-r--r-- | indra/llcommon/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | indra/llcommon/llsingleton.h | 14 | ||||
| -rw-r--r-- | indra/llcommon/tests/threadsafeschedule_test.cpp | 4 | ||||
| -rw-r--r-- | indra/llcommon/tests/workqueue_test.cpp | 72 | ||||
| -rw-r--r-- | indra/llcommon/timing.cpp | 25 | ||||
| -rw-r--r-- | indra/llcommon/workqueue.cpp | 30 | ||||
| -rw-r--r-- | indra/llcommon/workqueue.h | 197 | 
7 files changed, 237 insertions, 108 deletions
diff --git a/indra/llcommon/CMakeLists.txt b/indra/llcommon/CMakeLists.txt index 9defa6b6c1..782f656406 100644 --- a/indra/llcommon/CMakeLists.txt +++ b/indra/llcommon/CMakeLists.txt @@ -119,8 +119,8 @@ set(llcommon_SOURCE_FILES      lluriparser.cpp      lluuid.cpp      llworkerthread.cpp -    timing.cpp      u64.cpp +    threadpool.cpp      workqueue.cpp      StackWalker.cpp      ) @@ -256,6 +256,7 @@ set(llcommon_HEADER_FILES      lockstatic.h      stdtypes.h      stringize.h +    threadpool.h      threadsafeschedule.h      timer.h      tuple.h diff --git a/indra/llcommon/llsingleton.h b/indra/llcommon/llsingleton.h index f85f961287..6042c0906c 100644 --- a/indra/llcommon/llsingleton.h +++ b/indra/llcommon/llsingleton.h @@ -847,14 +847,13 @@ template<class T>  class LLSimpleton  {  public: -    static T* sInstance; -     -    static void createInstance()  -    {  +    template <typename... ARGS> +    static void createInstance(ARGS&&... args) +    {          llassert(sInstance == nullptr); -        sInstance = new T();  +        sInstance = new T(std::forward<ARGS>(args)...);      } -     +      static inline T* getInstance() { return sInstance; }      static inline T& instance() { return *getInstance(); }      static inline bool instanceExists() { return sInstance != nullptr; } @@ -864,6 +863,9 @@ public:          delete sInstance;          sInstance = nullptr;      } + +private: +    static T* sInstance;  };  template <class T> diff --git a/indra/llcommon/tests/threadsafeschedule_test.cpp b/indra/llcommon/tests/threadsafeschedule_test.cpp index af67b9f492..c421cc7b1c 100644 --- a/indra/llcommon/tests/threadsafeschedule_test.cpp +++ b/indra/llcommon/tests/threadsafeschedule_test.cpp @@ -46,11 +46,11 @@ namespace tut          // the real time required for each push() call. Explicitly increment          // the timestamp for each one -- but since we're passing explicit          // timestamps, make the queue reorder them. -        queue.push(Queue::TimeTuple(Queue::Clock::now() + 20ms, "ghi")); +        queue.push(Queue::TimeTuple(Queue::Clock::now() + 200ms, "ghi"));          // Given the various push() overloads, you have to match the type          // exactly: conversions are ambiguous.          queue.push("abc"s); -        queue.push(Queue::Clock::now() + 10ms, "def"); +        queue.push(Queue::Clock::now() + 100ms, "def");          queue.close();          auto entry = queue.pop();          ensure_equals("failed to pop first", std::get<0>(entry), "abc"s); diff --git a/indra/llcommon/tests/workqueue_test.cpp b/indra/llcommon/tests/workqueue_test.cpp index d5405400fd..bea3ad911b 100644 --- a/indra/llcommon/tests/workqueue_test.cpp +++ b/indra/llcommon/tests/workqueue_test.cpp @@ -20,7 +20,10 @@  // external library headers  // other Linden headers  #include "../test/lltut.h" +#include "../test/catch_and_store_what_in.h"  #include "llcond.h" +#include "llcoros.h" +#include "lleventcoro.h"  #include "llstring.h"  #include "stringize.h" @@ -138,7 +141,8 @@ namespace tut              [](){ return 17; },              // Note that a postTo() *callback* can safely bind a reference to              // a variable on the invoking thread, because the callback is run -            // on the invoking thread. +            // on the invoking thread. (Of course the bound variable must +            // survive until the callback is called.)              [&result](int i){ result = i; });          // this should post the callback to main          qptr->runOne(); @@ -156,4 +160,70 @@ namespace tut          main.runPending();          ensure_equals("failed to run string callback", alpha, "abc");      } + +    template<> template<> +    void object::test<5>() +    { +        set_test_name("postTo with void return"); +        WorkQueue main("main"); +        auto qptr = WorkQueue::getInstance("queue"); +        std::string observe; +        main.postTo( +            qptr, +            // The ONLY reason we can get away with binding a reference to +            // 'observe' in our work callable is because we're directly +            // calling qptr->runOne() on this same thread. It would be a +            // mistake to do that if some other thread were servicing 'queue'. +            [&observe](){ observe = "queue"; }, +            [&observe](){ observe.append(";main"); }); +        qptr->runOne(); +        main.runOne(); +        ensure_equals("failed to run both lambdas", observe, "queue;main"); +    } + +    template<> template<> +    void object::test<6>() +    { +        set_test_name("waitForResult"); +        std::string stored; +        // Try to call waitForResult() on this thread's main coroutine. It +        // should throw because the main coroutine must service the queue. +        auto what{ catch_what<WorkQueue::Error>( +                [this, &stored](){ stored = queue.waitForResult( +                        [](){ return "should throw"; }); }) }; +        ensure("lambda should not have run", stored.empty()); +        ensure_not("waitForResult() should have thrown", what.empty()); +        ensure(STRINGIZE("should mention waitForResult: " << what), +               what.find("waitForResult") != std::string::npos); + +        // Call waitForResult() on a coroutine, with a string result. +        LLCoros::instance().launch( +            "waitForResult string", +            [this, &stored]() +            { stored = queue.waitForResult( +                    [](){ return "string result"; }); }); +        llcoro::suspend(); +        // Nothing will have happened yet because, even if the coroutine did +        // run immediately, all it did was to queue the inner lambda on +        // 'queue'. Service it. +        queue.runOne(); +        llcoro::suspend(); +        ensure_equals("bad waitForResult return", stored, "string result"); + +        // Call waitForResult() on a coroutine, with a void callable. +        stored.clear(); +        bool done = false; +        LLCoros::instance().launch( +            "waitForResult void", +            [this, &stored, &done]() +            { +                queue.waitForResult([&stored](){ stored = "ran"; }); +                done = true; +            }); +        llcoro::suspend(); +        queue.runOne(); +        llcoro::suspend(); +        ensure_equals("didn't run coroutine", stored, "ran"); +        ensure("void waitForResult() didn't return", done); +    }  } // namespace tut diff --git a/indra/llcommon/timing.cpp b/indra/llcommon/timing.cpp deleted file mode 100644 index c2dc695ef3..0000000000 --- a/indra/llcommon/timing.cpp +++ /dev/null @@ -1,25 +0,0 @@ -/**  - * @file timing.cpp - * @brief This file will be deprecated in the future. - * - * $LicenseInfo:firstyear=2000&license=viewerlgpl$ - * Second Life Viewer Source Code - * Copyright (C) 2010, Linden Research, Inc. - *  - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; - * version 2.1 of the License only. - *  - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU - * Lesser General Public License for more details. - *  - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA - *  - * Linden Research, Inc., 945 Battery Street, San Francisco, CA  94111  USA - * $/LicenseInfo$ - */ diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index e7d40354aa..c74dada2e4 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -26,8 +26,9 @@  using Mutex = LLCoros::Mutex;  using Lock  = LLCoros::LockType; -LL::WorkQueue::WorkQueue(const std::string& name): -    super(makeName(name)) +LL::WorkQueue::WorkQueue(const std::string& name, size_t capacity): +    super(makeName(name)), +    mQueue(capacity)  {      // TODO: register for "LLApp" events so we can implicitly close() on      // viewer shutdown. @@ -38,6 +39,21 @@ void LL::WorkQueue::close()      mQueue.close();  } +size_t LL::WorkQueue::size() +{ +    return mQueue.size(); +} + +bool LL::WorkQueue::isClosed() +{ +    return mQueue.isClosed(); +} + +bool LL::WorkQueue::done() +{ +    return mQueue.done(); +} +  void LL::WorkQueue::runUntilClose()  {      try @@ -130,3 +146,13 @@ void LL::WorkQueue::error(const std::string& msg)  {      LL_ERRS("WorkQueue") << msg << LL_ENDL;  } + +void LL::WorkQueue::checkCoroutine(const std::string& method) +{ +    // By convention, the default coroutine on each thread has an empty name +    // string. See also LLCoros::logname(). +    if (LLCoros::getName().empty()) +    { +        LLTHROW(Error("Do not call " + method + " from a thread's default coroutine")); +    } +} diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index 8e4b38c2f3..96574a18b9 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -12,14 +12,14 @@  #if ! defined(LL_WORKQUEUE_H)  #define LL_WORKQUEUE_H +#include "llcoros.h" +#include "llexception.h"  #include "llinstancetracker.h"  #include "threadsafeschedule.h"  #include <chrono> +#include <exception>                // std::current_exception  #include <functional>               // std::function -#include <queue>  #include <string> -#include <utility>                  // std::pair -#include <vector>  namespace LL  { @@ -45,11 +45,16 @@ namespace LL          using TimedWork = Queue::TimeTuple;          using Closed    = Queue::Closed; +        struct Error: public LLException +        { +            Error(const std::string& what): LLException(what) {} +        }; +          /**           * You may omit the WorkQueue name, in which case a unique name is           * synthesized; for practical purposes that makes it anonymous.           */ -        WorkQueue(const std::string& name = std::string()); +        WorkQueue(const std::string& name = std::string(), size_t capacity=1024);          /**           * Since the point of WorkQueue is to pass work to some other worker @@ -59,15 +64,36 @@ namespace LL           */          void close(); +        /** +         * WorkQueue supports multiple producers and multiple consumers. In +         * the general case it's misleading to test size(), since any other +         * thread might change it the nanosecond the lock is released. On that +         * basis, some might argue against publishing a size() method at all. +         * +         * But there are two specific cases in which a test based on size() +         * might be reasonable: +         * +         * * If you're the only producer, noticing that size() == 0 is +         *   meaningful. +         * * If you're the only consumer, noticing that size() > 0 is +         *   meaningful. +         */ +        size_t size(); +        /// producer end: are we prevented from pushing any additional items? +        bool isClosed(); +        /// consumer end: are we done, is the queue entirely drained? +        bool done(); +          /*---------------------- fire and forget API -----------------------*/          /// fire-and-forget, but at a particular (future?) time          template <typename CALLABLE>          void post(const TimePoint& time, CALLABLE&& callable)          { -            // Defer reifying an arbitrary CALLABLE until we hit this method. -            // All other methods should accept CALLABLEs of arbitrary type to -            // avoid multiple levels of std::function indirection. +            // Defer reifying an arbitrary CALLABLE until we hit this or +            // postIfOpen(). All other methods should accept CALLABLEs of +            // arbitrary type to avoid multiple levels of std::function +            // indirection.              mQueue.push(TimedWork(time, std::move(callable)));          } @@ -83,6 +109,47 @@ namespace LL          }          /** +         * post work for a particular time, unless the queue is closed before +         * we can post +         */ +        template <typename CALLABLE> +        bool postIfOpen(const TimePoint& time, CALLABLE&& callable) +        { +            // Defer reifying an arbitrary CALLABLE until we hit this or +            // post(). All other methods should accept CALLABLEs of arbitrary +            // type to avoid multiple levels of std::function indirection. +            return mQueue.pushIfOpen(TimedWork(time, std::move(callable))); +        } + +        /** +         * post work, unless the queue is closed before we can post +         */ +        template <typename CALLABLE> +        bool postIfOpen(CALLABLE&& callable) +        { +            return postIfOpen(TimePoint::clock::now(), std::move(callable)); +        } + +        /** +         * Post work to be run at a specified time to another WorkQueue, which +         * may or may not still exist and be open. Return true if we were able +         * to post. +         */ +        template <typename CALLABLE> +        static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable); + +        /** +         * Post work to another WorkQueue, which may or may not still exist +         * and be open. Return true if we were able to post. +         */ +        template <typename CALLABLE> +        static bool postMaybe(weak_t target, CALLABLE&& callable) +        { +            return postMaybe(target, TimePoint::clock::now(), +                             std::forward<CALLABLE>(callable)); +        } + +        /**           * Launch a callable returning bool that will trigger repeatedly at           * specified interval, until the callable returns false.           * @@ -115,63 +182,8 @@ namespace LL          // Studio compile errors that seem utterly unrelated to this source          // code.          template <typename CALLABLE, typename FOLLOWUP> -        bool postTo(WorkQueue::weak_t target, -                    const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) -        { -            // We're being asked to post to the WorkQueue at target. -            // target is a weak_ptr: have to lock it to check it. -            auto tptr = target.lock(); -            if (! tptr) -                // can't post() if the target WorkQueue has been destroyed -                return false; - -            // Here we believe target WorkQueue still exists. Post to it a -            // lambda that packages our callable, our callback and a weak_ptr -            // to this originating WorkQueue. -            tptr->post( -                time, -                [reply = super::getWeak(), -                 callable = std::move(callable), -                 callback = std::move(callback)] -                () -                { -                    // Call the callable in any case -- but to minimize -                    // copying the result, immediately bind it into a reply -                    // lambda. The reply lambda also binds the original -                    // callback, so that when we, the originating WorkQueue, -                    // finally receive and process the reply lambda, we'll -                    // call the bound callback with the bound result -- on the -                    // same thread that originally called postTo(). -                    auto rlambda = -                        [result = callable(), -                         callback = std::move(callback)] -                        () -                        { callback(std::move(result)); }; -                    // Check if this originating WorkQueue still exists. -                    // Remember, the outer lambda is now running on a thread -                    // servicing the target WorkQueue, and real time has -                    // elapsed since postTo()'s tptr->post() call. -                    // reply is a weak_ptr: have to lock it to check it. -                    auto rptr = reply.lock(); -                    if (rptr) -                    { -                        // Only post reply lambda if the originating WorkQueue -                        // still exists. If not -- who would we tell? Log it? -                        try -                        { -                            rptr->post(std::move(rlambda)); -                        } -                        catch (const Closed&) -                        { -                            // Originating WorkQueue might still exist, but -                            // might be Closed. Same thing: just discard the -                            // callback. -                        } -                    } -                }); -            // looks like we were able to post() -            return true; -        } +        bool postTo(weak_t target, +                    const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback);          /**           * Post work to another WorkQueue, requesting a specific callback to @@ -181,10 +193,36 @@ namespace LL           * inaccessible.           */          template <typename CALLABLE, typename FOLLOWUP> -        bool postTo(WorkQueue::weak_t target, -                    CALLABLE&& callable, FOLLOWUP&& callback) +        bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback) +        { +            return postTo(target, TimePoint::clock::now(), +                          std::move(callable), std::move(callback)); +        } + +        /** +         * Post work to another WorkQueue to be run at a specified time, +         * blocking the calling coroutine until then, returning the result to +         * caller on completion. +         * +         * In general, we assume that each thread's default coroutine is busy +         * servicing its WorkQueue or whatever. To try to prevent mistakes, we +         * forbid calling waitForResult() from a thread's default coroutine. +         */ +        template <typename CALLABLE> +        auto waitForResult(const TimePoint& time, CALLABLE&& callable); + +        /** +         * Post work to another WorkQueue, blocking the calling coroutine +         * until then, returning the result to caller on completion. +         * +         * In general, we assume that each thread's default coroutine is busy +         * servicing its WorkQueue or whatever. To try to prevent mistakes, we +         * forbid calling waitForResult() from a thread's default coroutine. +         */ +        template <typename CALLABLE> +        auto waitForResult(CALLABLE&& callable)          { -            return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); +            return waitForResult(TimePoint::clock::now(), std::move(callable));          }          /*--------------------------- worker API ---------------------------*/ @@ -233,6 +271,23 @@ namespace LL          bool runUntil(const TimePoint& until);      private: +        template <typename CALLABLE, typename FOLLOWUP> +        static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback); +        /// general case: arbitrary C++ return type +        template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE> +        struct MakeReplyLambda; +        /// specialize for CALLABLE returning void +        template <typename CALLABLE, typename FOLLOWUP> +        struct MakeReplyLambda<CALLABLE, FOLLOWUP, void>; + +        /// general case: arbitrary C++ return type +        template <typename CALLABLE, typename RETURNTYPE> +        struct WaitForResult; +        /// specialize for CALLABLE returning void +        template <typename CALLABLE> +        struct WaitForResult<CALLABLE, void>; + +        static void checkCoroutine(const std::string& method);          static void error(const std::string& msg);          static std::string makeName(const std::string& name);          void callWork(const Queue::DataTuple& work); @@ -254,8 +309,8 @@ namespace LL      {      public:          // bind the desired data -        BackJack(WorkQueue::weak_t target, -                 const WorkQueue::TimePoint& start, +        BackJack(weak_t target, +                 const TimePoint& start,                   const std::chrono::duration<Rep, Period>& interval,                   CALLABLE&& callable):              mTarget(target), @@ -302,8 +357,8 @@ namespace LL          }      private: -        WorkQueue::weak_t mTarget; -        WorkQueue::TimePoint mStart; +        weak_t mTarget; +        TimePoint mStart;          std::chrono::duration<Rep, Period> mInterval;          CALLABLE mCallable;      };  | 
