diff options
| -rw-r--r-- | indra/llcommon/tests/threadsafeschedule_test.cpp | 4 | ||||
| -rw-r--r-- | indra/llcommon/tests/workqueue_test.cpp | 23 | ||||
| -rw-r--r-- | indra/llcommon/workqueue.h | 167 | 
3 files changed, 134 insertions, 60 deletions
diff --git a/indra/llcommon/tests/threadsafeschedule_test.cpp b/indra/llcommon/tests/threadsafeschedule_test.cpp index af67b9f492..c421cc7b1c 100644 --- a/indra/llcommon/tests/threadsafeschedule_test.cpp +++ b/indra/llcommon/tests/threadsafeschedule_test.cpp @@ -46,11 +46,11 @@ namespace tut          // the real time required for each push() call. Explicitly increment          // the timestamp for each one -- but since we're passing explicit          // timestamps, make the queue reorder them. -        queue.push(Queue::TimeTuple(Queue::Clock::now() + 20ms, "ghi")); +        queue.push(Queue::TimeTuple(Queue::Clock::now() + 200ms, "ghi"));          // Given the various push() overloads, you have to match the type          // exactly: conversions are ambiguous.          queue.push("abc"s); -        queue.push(Queue::Clock::now() + 10ms, "def"); +        queue.push(Queue::Clock::now() + 100ms, "def");          queue.close();          auto entry = queue.pop();          ensure_equals("failed to pop first", std::get<0>(entry), "abc"s); diff --git a/indra/llcommon/tests/workqueue_test.cpp b/indra/llcommon/tests/workqueue_test.cpp index d5405400fd..b69df49d33 100644 --- a/indra/llcommon/tests/workqueue_test.cpp +++ b/indra/llcommon/tests/workqueue_test.cpp @@ -138,7 +138,8 @@ namespace tut              [](){ return 17; },              // Note that a postTo() *callback* can safely bind a reference to              // a variable on the invoking thread, because the callback is run -            // on the invoking thread. +            // on the invoking thread. (Of course the bound variable must +            // survive until the callback is called.)              [&result](int i){ result = i; });          // this should post the callback to main          qptr->runOne(); @@ -156,4 +157,24 @@ namespace tut          main.runPending();          ensure_equals("failed to run string callback", alpha, "abc");      } + +    template<> template<> +    void object::test<5>() +    { +        set_test_name("postTo with void return"); +        WorkQueue main("main"); +        auto qptr = WorkQueue::getInstance("queue"); +        std::string observe; +        main.postTo( +            qptr, +            // The ONLY reason we can get away with binding a reference to +            // 'observe' in our work callable is because we're directly +            // calling qptr->runOne() on this same thread. It would be a +            // mistake to do that if some other thread were servicing 'queue'. +            [&observe](){ observe = "queue"; }, +            [&observe](){ observe.append(";main"); }); +        qptr->runOne(); +        main.runOne(); +        ensure_equals("failed to run both lambdas", observe, "queue;main"); +    }  } // namespace tut diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index cfae2019dc..deef3c8e84 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -115,62 +115,7 @@ namespace LL          // code.          template <typename CALLABLE, typename FOLLOWUP>          bool postTo(WorkQueue::weak_t target, -                    const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) -        { -            // We're being asked to post to the WorkQueue at target. -            // target is a weak_ptr: have to lock it to check it. -            auto tptr = target.lock(); -            if (! tptr) -                // can't post() if the target WorkQueue has been destroyed -                return false; - -            // Here we believe target WorkQueue still exists. Post to it a -            // lambda that packages our callable, our callback and a weak_ptr -            // to this originating WorkQueue. -            tptr->post( -                time, -                [reply = super::getWeak(), -                 callable = std::move(callable), -                 callback = std::move(callback)] -                () -                { -                    // Call the callable in any case -- but to minimize -                    // copying the result, immediately bind it into a reply -                    // lambda. The reply lambda also binds the original -                    // callback, so that when we, the originating WorkQueue, -                    // finally receive and process the reply lambda, we'll -                    // call the bound callback with the bound result -- on the -                    // same thread that originally called postTo(). -                    auto rlambda = -                        [result = callable(), -                         callback = std::move(callback)] -                        () -                        { callback(std::move(result)); }; -                    // Check if this originating WorkQueue still exists. -                    // Remember, the outer lambda is now running on a thread -                    // servicing the target WorkQueue, and real time has -                    // elapsed since postTo()'s tptr->post() call. -                    // reply is a weak_ptr: have to lock it to check it. -                    auto rptr = reply.lock(); -                    if (rptr) -                    { -                        // Only post reply lambda if the originating WorkQueue -                        // still exists. If not -- who would we tell? Log it? -                        try -                        { -                            rptr->post(std::move(rlambda)); -                        } -                        catch (const Closed&) -                        { -                            // Originating WorkQueue might still exist, but -                            // might be Closed. Same thing: just discard the -                            // callback. -                        } -                    } -                }); -            // looks like we were able to post() -            return true; -        } +                    const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback);          /**           * Post work to another WorkQueue, requesting a specific callback to @@ -183,7 +128,8 @@ namespace LL          bool postTo(WorkQueue::weak_t target,                      CALLABLE&& callable, FOLLOWUP&& callback)          { -            return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); +            return postTo(target, TimePoint::clock::now(), +                          std::move(callable), std::move(callback));          }          /*--------------------------- worker API ---------------------------*/ @@ -231,6 +177,17 @@ namespace LL          bool runUntil(const TimePoint& until);      private: +        template <typename CALLABLE, typename FOLLOWUP> +        static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback); + +        /// general case: arbitrary C++ return type +        template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE> +        struct MakeReplyLambda; + +        /// specialize for CALLABLE returning void +        template <typename CALLABLE, typename FOLLOWUP> +        struct MakeReplyLambda<CALLABLE, FOLLOWUP, void>; +          static void error(const std::string& msg);          static std::string makeName(const std::string& name);          void callWork(const Queue::DataTuple& work); @@ -329,6 +286,102 @@ namespace LL                   getWeak(), TimePoint::clock::now(), interval, std::move(callable)));      } +    template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE> +    struct WorkQueue::MakeReplyLambda +    { +        auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) +        { +            // Call the callable in any case -- but to minimize +            // copying the result, immediately bind it into the reply +            // lambda. The reply lambda also binds the original +            // callback, so that when we, the originating WorkQueue, +            // finally receive and process the reply lambda, we'll +            // call the bound callback with the bound result -- on the +            // same thread that originally called postTo(). +            return +                [result = std::forward<CALLABLE>(callable)(), +                 callback = std::move(callback)] +                () +                { callback(std::move(result)); }; +        } +    }; + +    /// specialize for CALLABLE returning void +    template <typename CALLABLE, typename FOLLOWUP> +    struct WorkQueue::MakeReplyLambda<CALLABLE, FOLLOWUP, void> +    { +        auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) +        { +            // Call the callable, which produces no result. +            std::forward<CALLABLE>(callable)(); +            // This reply lambda binds the original callback, so +            // that when we, the originating WorkQueue, finally +            // receive and process the reply lambda, we'll call +            // the bound callback -- on the same thread that +            // originally called postTo(). +            return [callback = std::move(callback)](){ callback(); }; +        } +    }; + +    template <typename CALLABLE, typename FOLLOWUP> +    auto WorkQueue::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback) +    { +        return MakeReplyLambda<CALLABLE, FOLLOWUP, +                               decltype(std::forward<CALLABLE>(callable)())>() +            (std::move(callable), std::move(callback)); +    } + +    template <typename CALLABLE, typename FOLLOWUP> +    bool WorkQueue::postTo(WorkQueue::weak_t target, +                           const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) +    { +        // We're being asked to post to the WorkQueue at target. +        // target is a weak_ptr: have to lock it to check it. +        auto tptr = target.lock(); +        if (! tptr) +            // can't post() if the target WorkQueue has been destroyed +            return false; + +        // Here we believe target WorkQueue still exists. Post to it a +        // lambda that packages our callable, our callback and a weak_ptr +        // to this originating WorkQueue. +        tptr->post( +            time, +            [reply = super::getWeak(), +             callable = std::move(callable), +             callback = std::move(callback)] +            () +            { +                // Make a reply lambda to repost to THIS WorkQueue. +                // Delegate to makeReplyLambda() so we can partially +                // specialize on void return. +                auto rlambda = makeReplyLambda(std::move(callable), std::move(callback)); +                // Check if this originating WorkQueue still exists. +                // Remember, the outer lambda is now running on a thread +                // servicing the target WorkQueue, and real time has +                // elapsed since postTo()'s tptr->post() call. +                // reply is a weak_ptr: have to lock it to check it. +                auto rptr = reply.lock(); +                if (rptr) +                { +                    // Only post reply lambda if the originating WorkQueue +                    // still exists. If not -- who would we tell? Log it? +                    try +                    { +                        rptr->post(std::move(rlambda)); +                    } +                    catch (const Closed&) +                    { +                        // Originating WorkQueue might still exist, but +                        // might be Closed. Same thing: just discard the +                        // callback. +                    } +                } +            }); +        // looks like we were able to post() +        return true; +    } +  } // namespace LL  #endif /* ! defined(LL_WORKQUEUE_H) */  | 
