diff options
| -rw-r--r-- | indra/llcommon/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | indra/llcommon/tests/workqueue_test.cpp | 158 | ||||
| -rw-r--r-- | indra/llcommon/threadsafeschedule.h | 1 | ||||
| -rw-r--r-- | indra/llcommon/workqueue.cpp | 114 | ||||
| -rw-r--r-- | indra/llcommon/workqueue.h | 325 | 
5 files changed, 601 insertions, 0 deletions
| diff --git a/indra/llcommon/CMakeLists.txt b/indra/llcommon/CMakeLists.txt index 5efcfabf24..a3dbb6d9d0 100644 --- a/indra/llcommon/CMakeLists.txt +++ b/indra/llcommon/CMakeLists.txt @@ -121,6 +121,7 @@ set(llcommon_SOURCE_FILES      llworkerthread.cpp      timing.cpp      u64.cpp +    workqueue.cpp      StackWalker.cpp      ) @@ -258,6 +259,7 @@ set(llcommon_HEADER_FILES      timer.h      tuple.h      u64.h +    workqueue.h      StackWalker.h      ) @@ -363,6 +365,7 @@ if (LL_TESTS)    LL_ADD_INTEGRATION_TEST(stringize "" "${test_libs}")    LL_ADD_INTEGRATION_TEST(threadsafeschedule "" "${test_libs}")    LL_ADD_INTEGRATION_TEST(tuple "" "${test_libs}") +  LL_ADD_INTEGRATION_TEST(workqueue "" "${test_libs}")  ## llexception_test.cpp isn't a regression test, and doesn't need to be run  ## every build. It's to help a developer make implementation choices about diff --git a/indra/llcommon/tests/workqueue_test.cpp b/indra/llcommon/tests/workqueue_test.cpp new file mode 100644 index 0000000000..ab1cae6c14 --- /dev/null +++ b/indra/llcommon/tests/workqueue_test.cpp @@ -0,0 +1,158 @@ +/** + * @file   workqueue_test.cpp + * @author Nat Goodspeed + * @date   2021-10-07 + * @brief  Test for workqueue. + *  + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +// Precompiled header +#include "linden_common.h" +// associated header +#include "workqueue.h" +// STL headers +// std headers +#include <chrono> +#include <deque> +// external library headers +// other Linden headers +#include "../test/lltut.h" +#include "llcond.h" +#include "llstring.h" +#include "stringize.h" + +using namespace LL; +using namespace std::literals::chrono_literals; // ms suffix +using namespace std::literals::string_literals; // s suffix + +/***************************************************************************** +*   TUT +*****************************************************************************/ +namespace tut +{ +    struct workqueue_data +    { +        WorkQueue queue{"queue"}; +    }; +    typedef test_group<workqueue_data> workqueue_group; +    typedef workqueue_group::object object; +    workqueue_group workqueuegrp("workqueue"); + +    template<> template<> +    void object::test<1>() +    { +        set_test_name("name"); +        ensure_equals("didn't capture name", queue.getKey(), "queue"); +        ensure("not findable", WorkQueue::getInstance("queue") == queue.getWeak().lock()); +        WorkQueue q2; +        ensure("has no name", LLStringUtil::startsWith(q2.getKey(), "WorkQueue")); +    } + +    template<> template<> +    void object::test<2>() +    { +        set_test_name("post"); +        bool wasRun{ false }; +        // We only get away with binding a simple bool because we're running +        // the work on the same thread. +        queue.post([&wasRun](){ wasRun = true; }); +        queue.close(); +        ensure("ran too soon", ! wasRun); +        queue.runUntilClose(); +        ensure("didn't run", wasRun); +    } + +    template<> template<> +    void object::test<3>() +    { +        set_test_name("postEvery"); +        // record of runs +        using Shared = std::deque<WorkQueue::TimePoint>; +        // This is an example of how to share data between the originator of +        // postEvery(work) and the work item itself, since usually a WorkQueue +        // is used to dispatch work to a different thread. Neither of them +        // should call any of LLCond's wait methods: you don't want to stall +        // either the worker thread or the originating thread (conventionally +        // main). Use LLCond or a subclass even if all you want to do is +        // signal the work item that it can quit; consider LLOneShotCond. +        LLCond<Shared> data; +        auto start = WorkQueue::TimePoint::clock::now(); +        auto interval = 100ms; +        queue.postEvery( +            interval, +            [&data, count = 0] +            () mutable +            { +                // record the timestamp at which this instance is running +                data.update_one( +                    [](Shared& data) +                    { +                        data.push_back(WorkQueue::TimePoint::clock::now()); +                    }); +                // by the 3rd call, return false to stop +                return (++count < 3); +            }); +        // no convenient way to close() our queue while we've got a +        // postEvery() running, so run until we think we should have exhausted +        // the iterations +        queue.runFor(10*interval); +        // Take a copy of the captured deque. +        Shared result = data.get(); +        ensure_equals("called wrong number of times", result.size(), 3); +        // postEvery() assumes you want the first call to happen right away. +        // Inject a fake start time that's (interval) earlier than that, to +        // make our too early/too late tests uniform for all entries. +        result.push_front(start - interval); +        for (size_t i = 1; i < result.size(); ++i) +        { +            auto diff = (result[i] - result[i-1]); +            try +            { +                ensure(STRINGIZE("call " << i << " too soon"), diff >= interval); +                ensure(STRINGIZE("call " << i << " too late"), diff < interval*1.5); +            } +            catch (const tut::failure&) +            { +                auto interval_ms = interval / 1ms; +                auto diff_ms = diff / 1ms; +                std::cerr << "interval " << interval_ms +                          << "ms; diff " << diff_ms << "ms" << std::endl; +                throw; +            } +        } +    } + +    template<> template<> +    void object::test<4>() +    { +        set_test_name("postTo"); +        WorkQueue main("main"); +        auto qptr = WorkQueue::getInstance("queue"); +        int result = 0; +        main.postTo( +            qptr, +            [](){ return 17; }, +            // Note that a postTo() *callback* can safely bind a reference to +            // a variable on the invoking thread, because the callback is run +            // on the invoking thread. +            [&result](int i){ result = i; }); +        // this should post the callback to main +        qptr->runOne(); +        // this should run the callback +        main.runOne(); +        ensure_equals("failed to run int callback", result, 17); + +        std::string alpha; +        // postTo() handles arbitrary return types +        main.postTo( +            qptr, +            [](){ return "abc"s; }, +            [&alpha](const std::string& s){ alpha = s; }); +        qptr->runPending(); +        main.runPending(); +        ensure_equals("failed to run string callback", alpha, "abc"); +    } +} // namespace tut diff --git a/indra/llcommon/threadsafeschedule.h b/indra/llcommon/threadsafeschedule.h index 0e70d30714..c8ad23532b 100644 --- a/indra/llcommon/threadsafeschedule.h +++ b/indra/llcommon/threadsafeschedule.h @@ -78,6 +78,7 @@ namespace LL          enum pop_result { EMPTY=super::EMPTY, DONE=super::DONE, WAITING=super::WAITING, POPPED=super::POPPED };      public: +        using Closed = LLThreadSafeQueueInterrupt;          using TimePoint = ThreadSafeSchedulePrivate::TimePoint;          using Clock = TimePoint::clock; diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp new file mode 100644 index 0000000000..15e292fb43 --- /dev/null +++ b/indra/llcommon/workqueue.cpp @@ -0,0 +1,114 @@ +/** + * @file   workqueue.cpp + * @author Nat Goodspeed + * @date   2021-10-06 + * @brief  Implementation for WorkQueue. + *  + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +// Precompiled header +#include "linden_common.h" +// associated header +#include "workqueue.h" +// STL headers +// std headers +// external library headers +// other Linden headers +#include "llerror.h" +#include "llexception.h" +#include "stringize.h" + +LL::WorkQueue::WorkQueue(const std::string& name): +    super(makeName(name)) +{ +    // TODO: register for "LLApp" events so we can implicitly close() on +    // viewer shutdown. +} + +void LL::WorkQueue::close() +{ +    mQueue.close(); +} + +void LL::WorkQueue::runUntilClose() +{ +    try +    { +        for (;;) +        { +            callWork(mQueue.pop()); +        } +    } +    catch (const Queue::Closed&) +    { +    } +} + +bool LL::WorkQueue::runPending() +{ +    for (Work work; mQueue.tryPop(work); ) +    { +        callWork(work); +    } +    return ! mQueue.done(); +} + +bool LL::WorkQueue::runOne() +{ +    Work work; +    if (mQueue.tryPop(work)) +    { +        callWork(work); +    } +    return ! mQueue.done(); +} + +bool LL::WorkQueue::runUntil(const TimePoint& until) +{ +    // Should we subtract some slop to allow for typical Work execution time? +    // How much slop? +    Work work; +    while (TimePoint::clock::now() < until && mQueue.tryPopUntil(until, work)) +    { +        callWork(work); +    } +    return ! mQueue.done(); +} + +std::string LL::WorkQueue::makeName(const std::string& name) +{ +    if (! name.empty()) +        return name; + +    static thread_local U32 discriminator = 0; +    return STRINGIZE("WorkQueue" << discriminator++); +} + +void LL::WorkQueue::callWork(const Queue::DataTuple& work) +{ +    // ThreadSafeSchedule::pop() always delivers a tuple, even when +    // there's only one data field per item, as for us. +    callWork(std::get<0>(work)); +} + +void LL::WorkQueue::callWork(const Work& work) +{ +    try +    { +        work(); +    } +    catch (...) +    { +        // No matter what goes wrong with any individual work item, the worker +        // thread must go on! Log our own instance name with the exception. +        LOG_UNHANDLED_EXCEPTION(getKey()); +    } +} + +void LL::WorkQueue::error(const std::string& msg) +{ +    LL_ERRS("WorkQueue") << msg << LL_ENDL; +} diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h new file mode 100644 index 0000000000..a52f7b0e26 --- /dev/null +++ b/indra/llcommon/workqueue.h @@ -0,0 +1,325 @@ +/** + * @file   workqueue.h + * @author Nat Goodspeed + * @date   2021-09-30 + * @brief  Queue used for inter-thread work passing. + *  + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +#if ! defined(LL_WORKQUEUE_H) +#define LL_WORKQUEUE_H + +#include "llinstancetracker.h" +#include "threadsafeschedule.h" +#include <chrono> +#include <functional>               // std::function +#include <queue> +#include <string> +#include <utility>                  // std::pair +#include <vector> + +namespace LL +{ +    /** +     * A typical WorkQueue has a string name that can be used to find it. +     */ +    class WorkQueue: public LLInstanceTracker<WorkQueue, std::string> +    { +    private: +        using super = LLInstanceTracker<WorkQueue, std::string>; + +    public: +        using Work = std::function<void()>; + +    private: +        using Queue = ThreadSafeSchedule<Work>; +        // helper for postEvery() +        template <typename Rep, typename Period, typename CALLABLE> +        class BackJack; + +    public: +        using TimePoint = Queue::TimePoint; +        using TimedWork = Queue::TimeTuple; +        using Closed    = Queue::Closed; + +        /** +         * You may omit the WorkQueue name, in which case a unique name is +         * synthesized; for practical purposes that makes it anonymous. +         */ +        WorkQueue(const std::string& name = std::string()); + +        /** +         * Since the point of WorkQueue is to pass work to some other worker +         * thread(s) asynchronously, it's important that the WorkQueue continue +         * to exist until the worker thread(s) have drained it. To communicate +         * that it's time for them to quit, close() the queue. +         */ +        void close(); + +        /*---------------------- fire and forget API -----------------------*/ + +        /// fire-and-forget, but at a particular (future?) time +        template <typename CALLABLE> +        void post(const TimePoint& time, CALLABLE&& callable) +        { +            // Defer reifying an arbitrary CALLABLE until we hit this method. +            // All other methods should accept CALLABLEs of arbitrary type to +            // avoid multiple levels of std::function indirection. +            mQueue.push(TimedWork(time, std::move(callable))); +        } + +        /// fire-and-forget +        template <typename CALLABLE> +        void post(CALLABLE&& callable) +        { +            // We use TimePoint::clock::now() instead of TimePoint's +            // representation of the epoch because this WorkQueue may contain +            // a mix of past-due TimedWork items and TimedWork items scheduled +            // for the future. Sift this new item into the correct place. +            post(TimePoint::clock::now(), std::move(callable)); +        } + +        /** +         * Launch a callable returning bool that will trigger repeatedly at +         * specified interval, until the callable returns false. +         * +         * If you need to signal that callable from outside, DO NOT bind a +         * reference to a simple bool! That's not thread-safe. Instead, bind +         * an LLCond variant, e.g. LLOneShotCond or LLBoolCond. +         */ +        template <typename Rep, typename Period, typename CALLABLE> +        void postEvery(const std::chrono::duration<Rep, Period>& interval, +                       CALLABLE&& callable); + +        /*------------------------- handshake API --------------------------*/ + +        /** +         * Post work to another WorkQueue to be run at a specified time, +         * requesting a specific callback to be run on this WorkQueue on +         * completion. +         * +         * Returns true if able to post, false if the other WorkQueue is +         * inaccessible. +         */ +        template <typename CALLABLE, typename CALLBACK> +        bool postTo(std::weak_ptr<WorkQueue> target, +                    const TimePoint& time, CALLABLE&& callable, CALLBACK&& callback) +        { +            // We're being asked to post to the WorkQueue at target. +            // target is a weak_ptr: have to lock it to check it. +            auto tptr = target.lock(); +            if (! tptr) +                // can't post() if the target WorkQueue has been destroyed +                return false; + +            // Here we believe target WorkQueue still exists. Post to it a +            // lambda that packages our callable, our callback and a weak_ptr +            // to this originating WorkQueue. +            tptr->post( +                time, +                [reply = super::getWeak(), +                 callable = std::move(callable), +                 callback = std::move(callback)] +                () +                { +                    // Call the callable in any case -- but to minimize +                    // copying the result, immediately bind it into a reply +                    // lambda. The reply lambda also binds the original +                    // callback, so that when we, the originating WorkQueue, +                    // finally receive and process the reply lambda, we'll +                    // call the bound callback with the bound result -- on the +                    // same thread that originally called postTo(). +                    auto rlambda = +                        [result = callable(), +                         callback = std::move(callback)] +                        () +                        { callback(std::move(result)); }; +                    // Check if this originating WorkQueue still exists. +                    // Remember, the outer lambda is now running on a thread +                    // servicing the target WorkQueue, and real time has +                    // elapsed since postTo()'s tptr->post() call. +                    // reply is a weak_ptr: have to lock it to check it. +                    auto rptr = reply.lock(); +                    if (rptr) +                    { +                        // Only post reply lambda if the originating WorkQueue +                        // still exists. If not -- who would we tell? Log it? +                        try +                        { +                            rptr->post(std::move(rlambda)); +                        } +                        catch (const Closed&) +                        { +                            // Originating WorkQueue might still exist, but +                            // might be Closed. Same thing: just discard the +                            // callback. +                        } +                    } +                }); +            // looks like we were able to post() +            return true; +        } + +        /** +         * Post work to another WorkQueue, requesting a specific callback to +         * be run on this WorkQueue on completion. +         * +         * Returns true if able to post, false if the other WorkQueue is +         * inaccessible. +         */ +        template <typename CALLABLE, typename CALLBACK> +        bool postTo(std::weak_ptr<WorkQueue> target, +                    CALLABLE&& callable, CALLBACK&& callback) +        { +            return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); +        } + +        /*--------------------------- worker API ---------------------------*/ + +        /** +         * runUntilClose() pulls TimedWork items off this WorkQueue until the +         * queue is closed, at which point it returns. This would be the +         * typical entry point for a simple worker thread. +         */ +        void runUntilClose(); + +        /** +         * runPending() runs all TimedWork items that are ready to run. It +         * returns true if the queue remains open, false if the queue has been +         * closed. This could be used by a thread whose primary purpose is to +         * serve the queue, but also wants to do other things with its idle time. +         */ +        bool runPending(); + +        /** +         * runOne() runs at most one ready TimedWork item -- zero if none are +         * ready. It returns true if the queue remains open, false if the +         * queue has been closed. +         */ +        bool runOne(); + +        /** +         * runFor() runs a subset of ready TimedWork items, until the +         * timeslice has been exceeded. It returns true if the queue remains +         * open, false if the queue has been closed. This could be used by a +         * busy main thread to lend a bounded few CPU cycles to this WorkQueue +         * without risking the WorkQueue blowing out the length of any one +         * frame. +         */ +        template <typename Rep, typename Period> +        bool runFor(const std::chrono::duration<Rep, Period>& timeslice) +        { +            return runUntil(TimePoint::clock::now() + timeslice); +        } + +        /** +         * runUntil() is just like runFor(), only with a specific end time +         * instead of a timeslice duration. +         */ +        bool runUntil(const TimePoint& until); + +    private: +        static void error(const std::string& msg); +        static std::string makeName(const std::string& name); +        void callWork(const Queue::DataTuple& work); +        void callWork(const Work& work); +        Queue mQueue; +    }; + +    /** +     * BackJack is, in effect, a hand-rolled lambda, binding a WorkQueue, a +     * CALLABLE that returns bool, a TimePoint and an interval at which to +     * relaunch it. As long as the callable continues returning true, BackJack +     * keeps resubmitting it to the target WorkQueue. +     */ +    // Why is BackJack a class and not a lambda? Because, unlike a lambda, a +    // class method gets its own 'this' pointer -- which we need to resubmit +    // the whole BackJack callable. +    template <typename Rep, typename Period, typename CALLABLE> +    class WorkQueue::BackJack +    { +    public: +        // bind the desired data +        BackJack(std::weak_ptr<WorkQueue> target, +                 const WorkQueue::TimePoint& start, +                 const std::chrono::duration<Rep, Period>& interval, +                 CALLABLE&& callable): +            mTarget(target), +            mStart(start), +            mInterval(interval), +            mCallable(std::move(callable)) +        {} + +        // Call by target WorkQueue -- note that although WE require a +        // callable returning bool, WorkQueue wants a void callable. We +        // consume the bool. +        void operator()() +        { +            // If mCallable() throws an exception, don't catch it here: if it +            // throws once, it's likely to throw every time, so it's a waste +            // of time to arrange to call it again. +            if (mCallable()) +            { +                // Modify mStart to the new start time we desire. If we simply +                // added mInterval to now, we'd get actual timings of +                // (mInterval + slop), where 'slop' is the latency between the +                // previous mStart and the WorkQueue actually calling us. +                // Instead, add mInterval to mStart so that at least we +                // register our intent to fire at exact mIntervals. +                mStart += mInterval; + +                // We're being called at this moment by the target WorkQueue. +                // Assume it still exists, rather than checking the result of +                // lock(). +                // Resubmit the whole *this callable: that's why we're a class +                // rather than a lambda. Allow moving *this so we can carry a +                // move-only callable; but naturally this statement must be +                // the last time we reference this instance, which may become +                // moved-from. +                try +                { +                    mTarget.lock()->post(mStart, std::move(*this)); +                } +                catch (const Closed&) +                { +                    // Once this queue is closed, oh well, just stop +                } +            } +        } + +    private: +        std::weak_ptr<WorkQueue> mTarget; +        WorkQueue::TimePoint mStart; +        std::chrono::duration<Rep, Period> mInterval; +        CALLABLE mCallable; +    }; + +    template <typename Rep, typename Period, typename CALLABLE> +    void WorkQueue::postEvery(const std::chrono::duration<Rep, Period>& interval, +                              CALLABLE&& callable) +    { +        if (interval.count() <= 0) +        { +            // It's essential that postEvery() be called with a positive +            // interval, since each call to BackJack posts another instance of +            // itself at (start + interval) and we order by target time. A +            // zero or negative interval would result in that BackJack +            // instance going to the head of the queue every time, immediately +            // ready to run. Effectively that would produce an infinite loop, +            // a denial of service on this WorkQueue. +            error("postEvery(interval) may not be 0"); +        } +        // Instantiate and post a suitable BackJack, binding a weak_ptr to +        // self, the current time, the desired interval and the desired +        // callable. +        post( +            BackJack<Rep, Period, CALLABLE>( +                 getWeak(), TimePoint::clock::now(), interval, std::move(callable))); +    } + +} // namespace LL + +#endif /* ! defined(LL_WORKQUEUE_H) */ | 
