diff options
author | Dave Houlton <euclid@lindenlab.com> | 2021-11-15 09:25:35 -0700 |
---|---|---|
committer | Dave Houlton <euclid@lindenlab.com> | 2021-11-15 09:25:35 -0700 |
commit | 029b41c0419e975bbb28454538b46dc69ce5d2ba (patch) | |
tree | 4f9a28bb36ee07fe9a7b45a434384afd1f24bb85 /indra/llcommon | |
parent | aeed774ff9cc55c0c1dd2784e23b2366ff367fbe (diff) |
Revert "SL-16220: Merge branch 'origin/DRTVWR-546' into glthread"
This reverts commit 5188a26a8521251dda07ac0140bb129f28417e49, reversing
changes made to 819088563e13f1d75e048311fbaf0df4a79b7e19.
Diffstat (limited to 'indra/llcommon')
-rw-r--r-- | indra/llcommon/CMakeLists.txt | 3 | ||||
-rw-r--r-- | indra/llcommon/llsingleton.h | 24 | ||||
-rw-r--r-- | indra/llcommon/llthreadsafequeue.h | 30 | ||||
-rw-r--r-- | indra/llcommon/tests/threadsafeschedule_test.cpp | 4 | ||||
-rw-r--r-- | indra/llcommon/tests/workqueue_test.cpp | 72 | ||||
-rw-r--r-- | indra/llcommon/threadpool.cpp | 80 | ||||
-rw-r--r-- | indra/llcommon/threadpool.h | 62 | ||||
-rw-r--r-- | indra/llcommon/timing.cpp | 25 | ||||
-rw-r--r-- | indra/llcommon/workqueue.cpp | 30 | ||||
-rw-r--r-- | indra/llcommon/workqueue.h | 378 |
10 files changed, 119 insertions, 589 deletions
diff --git a/indra/llcommon/CMakeLists.txt b/indra/llcommon/CMakeLists.txt index 78d6ea3090..ad6d3a5049 100644 --- a/indra/llcommon/CMakeLists.txt +++ b/indra/llcommon/CMakeLists.txt @@ -119,8 +119,8 @@ set(llcommon_SOURCE_FILES lluriparser.cpp lluuid.cpp llworkerthread.cpp + timing.cpp u64.cpp - threadpool.cpp workqueue.cpp StackWalker.cpp ) @@ -256,7 +256,6 @@ set(llcommon_HEADER_FILES lockstatic.h stdtypes.h stringize.h - threadpool.h threadsafeschedule.h timer.h tuple.h diff --git a/indra/llcommon/llsingleton.h b/indra/llcommon/llsingleton.h index 6042c0906c..10a8ecfedb 100644 --- a/indra/llcommon/llsingleton.h +++ b/indra/llcommon/llsingleton.h @@ -847,28 +847,22 @@ template<class T> class LLSimpleton { public: - template <typename... ARGS> - static void createInstance(ARGS&&... args) - { + static T* sInstance; + + static void createInstance() + { llassert(sInstance == nullptr); - sInstance = new T(std::forward<ARGS>(args)...); + sInstance = new T(); } - + static inline T* getInstance() { return sInstance; } static inline T& instance() { return *getInstance(); } static inline bool instanceExists() { return sInstance != nullptr; } - static void deleteSingleton() - { - delete sInstance; - sInstance = nullptr; + static void deleteSingleton() { + delete sInstance; + sInstance = nullptr; } - -private: - static T* sInstance; }; -template <class T> -T* LLSimpleton<T>::sInstance{ nullptr }; - #endif diff --git a/indra/llcommon/llthreadsafequeue.h b/indra/llcommon/llthreadsafequeue.h index 5c934791fe..06e8d8f609 100644 --- a/indra/llcommon/llthreadsafequeue.h +++ b/indra/llcommon/llthreadsafequeue.h @@ -85,8 +85,8 @@ public: LLThreadSafeQueue(U32 capacity = 1024); virtual ~LLThreadSafeQueue() {} - // Add an element to the queue (will block if the queue has reached - // capacity). + // Add an element to the queue (will block if the queue has + // reached capacity). // // This call will raise an interrupt error if the queue is closed while // the caller is blocked. @@ -95,11 +95,6 @@ public: // legacy name void pushFront(ElementT const & element) { return push(element); } - // Add an element to the queue (will block if the queue has reached - // capacity). Return false if the queue is closed before push is possible. - template <typename T> - bool pushIfOpen(T&& element); - // Try to add an element to the queue without blocking. Returns // true only if the element was actually added. template <typename T> @@ -316,8 +311,8 @@ bool LLThreadSafeQueue<ElementT, QueueT>::push_(lock_t& lock, T&& element) template <typename ElementT, typename QueueT> -template <typename T> -bool LLThreadSafeQueue<ElementT, QueueT>::pushIfOpen(T&& element) +template<typename T> +void LLThreadSafeQueue<ElementT, QueueT>::push(T&& element) { lock_t lock1(mLock); while (true) @@ -326,10 +321,12 @@ bool LLThreadSafeQueue<ElementT, QueueT>::pushIfOpen(T&& element) // drained or not: the moment either end calls close(), further push() // operations will fail. if (mClosed) - return false; + { + LLTHROW(LLThreadSafeQueueInterrupt()); + } if (push_(lock1, std::forward<T>(element))) - return true; + return; // Storage Full. Wait for signal. mCapacityCond.wait(lock1); @@ -337,17 +334,6 @@ bool LLThreadSafeQueue<ElementT, QueueT>::pushIfOpen(T&& element) } -template <typename ElementT, typename QueueT> -template<typename T> -void LLThreadSafeQueue<ElementT, QueueT>::push(T&& element) -{ - if (! pushIfOpen(std::forward<T>(element))) - { - LLTHROW(LLThreadSafeQueueInterrupt()); - } -} - - template<typename ElementT, typename QueueT> template<typename T> bool LLThreadSafeQueue<ElementT, QueueT>::tryPush(T&& element) diff --git a/indra/llcommon/tests/threadsafeschedule_test.cpp b/indra/llcommon/tests/threadsafeschedule_test.cpp index c421cc7b1c..af67b9f492 100644 --- a/indra/llcommon/tests/threadsafeschedule_test.cpp +++ b/indra/llcommon/tests/threadsafeschedule_test.cpp @@ -46,11 +46,11 @@ namespace tut // the real time required for each push() call. Explicitly increment // the timestamp for each one -- but since we're passing explicit // timestamps, make the queue reorder them. - queue.push(Queue::TimeTuple(Queue::Clock::now() + 200ms, "ghi")); + queue.push(Queue::TimeTuple(Queue::Clock::now() + 20ms, "ghi")); // Given the various push() overloads, you have to match the type // exactly: conversions are ambiguous. queue.push("abc"s); - queue.push(Queue::Clock::now() + 100ms, "def"); + queue.push(Queue::Clock::now() + 10ms, "def"); queue.close(); auto entry = queue.pop(); ensure_equals("failed to pop first", std::get<0>(entry), "abc"s); diff --git a/indra/llcommon/tests/workqueue_test.cpp b/indra/llcommon/tests/workqueue_test.cpp index bea3ad911b..d5405400fd 100644 --- a/indra/llcommon/tests/workqueue_test.cpp +++ b/indra/llcommon/tests/workqueue_test.cpp @@ -20,10 +20,7 @@ // external library headers // other Linden headers #include "../test/lltut.h" -#include "../test/catch_and_store_what_in.h" #include "llcond.h" -#include "llcoros.h" -#include "lleventcoro.h" #include "llstring.h" #include "stringize.h" @@ -141,8 +138,7 @@ namespace tut [](){ return 17; }, // Note that a postTo() *callback* can safely bind a reference to // a variable on the invoking thread, because the callback is run - // on the invoking thread. (Of course the bound variable must - // survive until the callback is called.) + // on the invoking thread. [&result](int i){ result = i; }); // this should post the callback to main qptr->runOne(); @@ -160,70 +156,4 @@ namespace tut main.runPending(); ensure_equals("failed to run string callback", alpha, "abc"); } - - template<> template<> - void object::test<5>() - { - set_test_name("postTo with void return"); - WorkQueue main("main"); - auto qptr = WorkQueue::getInstance("queue"); - std::string observe; - main.postTo( - qptr, - // The ONLY reason we can get away with binding a reference to - // 'observe' in our work callable is because we're directly - // calling qptr->runOne() on this same thread. It would be a - // mistake to do that if some other thread were servicing 'queue'. - [&observe](){ observe = "queue"; }, - [&observe](){ observe.append(";main"); }); - qptr->runOne(); - main.runOne(); - ensure_equals("failed to run both lambdas", observe, "queue;main"); - } - - template<> template<> - void object::test<6>() - { - set_test_name("waitForResult"); - std::string stored; - // Try to call waitForResult() on this thread's main coroutine. It - // should throw because the main coroutine must service the queue. - auto what{ catch_what<WorkQueue::Error>( - [this, &stored](){ stored = queue.waitForResult( - [](){ return "should throw"; }); }) }; - ensure("lambda should not have run", stored.empty()); - ensure_not("waitForResult() should have thrown", what.empty()); - ensure(STRINGIZE("should mention waitForResult: " << what), - what.find("waitForResult") != std::string::npos); - - // Call waitForResult() on a coroutine, with a string result. - LLCoros::instance().launch( - "waitForResult string", - [this, &stored]() - { stored = queue.waitForResult( - [](){ return "string result"; }); }); - llcoro::suspend(); - // Nothing will have happened yet because, even if the coroutine did - // run immediately, all it did was to queue the inner lambda on - // 'queue'. Service it. - queue.runOne(); - llcoro::suspend(); - ensure_equals("bad waitForResult return", stored, "string result"); - - // Call waitForResult() on a coroutine, with a void callable. - stored.clear(); - bool done = false; - LLCoros::instance().launch( - "waitForResult void", - [this, &stored, &done]() - { - queue.waitForResult([&stored](){ stored = "ran"; }); - done = true; - }); - llcoro::suspend(); - queue.runOne(); - llcoro::suspend(); - ensure_equals("didn't run coroutine", stored, "ran"); - ensure("void waitForResult() didn't return", done); - } } // namespace tut diff --git a/indra/llcommon/threadpool.cpp b/indra/llcommon/threadpool.cpp deleted file mode 100644 index cf25cc838e..0000000000 --- a/indra/llcommon/threadpool.cpp +++ /dev/null @@ -1,80 +0,0 @@ -/** - * @file threadpool.cpp - * @author Nat Goodspeed - * @date 2021-10-21 - * @brief Implementation for threadpool. - * - * $LicenseInfo:firstyear=2021&license=viewerlgpl$ - * Copyright (c) 2021, Linden Research, Inc. - * $/LicenseInfo$ - */ - -// Precompiled header -#include "linden_common.h" -// associated header -#include "threadpool.h" -// STL headers -// std headers -// external library headers -// other Linden headers -#include "llerror.h" -#include "llevents.h" -#include "stringize.h" - -LL::ThreadPool::ThreadPool(const std::string& name, size_t threads, size_t capacity): - mQueue(name, capacity), - mName("ThreadPool:" + name) -{ - for (size_t i = 0; i < threads; ++i) - { - std::string tname{ STRINGIZE(mName << ':' << (i+1) << '/' << threads) }; - mThreads.emplace_back(tname, [this, tname](){ run(tname); }); - } - // Listen on "LLApp", and when the app is shutting down, close the queue - // and join the workers. - LLEventPumps::instance().obtain("LLApp").listen( - mName, - [this](const LLSD& stat) - { - std::string status(stat["status"]); - if (status != "running") - { - // viewer is starting shutdown -- proclaim the end is nigh! - LL_DEBUGS("ThreadPool") << mName << " saw " << status << LL_ENDL; - close(); - } - return false; - }); -} - -LL::ThreadPool::~ThreadPool() -{ - close(); -} - -void LL::ThreadPool::close() -{ - if (! mQueue.isClosed()) - { - LL_DEBUGS("ThreadPool") << mName << " closing queue and joining threads" << LL_ENDL; - mQueue.close(); - for (auto& pair: mThreads) - { - LL_DEBUGS("ThreadPool") << mName << " waiting on thread " << pair.first << LL_ENDL; - pair.second.join(); - } - LL_DEBUGS("ThreadPool") << mName << " shutdown complete" << LL_ENDL; - } -} - -void LL::ThreadPool::run(const std::string& name) -{ - LL_DEBUGS("ThreadPool") << name << " starting" << LL_ENDL; - run(); - LL_DEBUGS("ThreadPool") << name << " stopping" << LL_ENDL; -} - -void LL::ThreadPool::run() -{ - mQueue.runUntilClose(); -} diff --git a/indra/llcommon/threadpool.h b/indra/llcommon/threadpool.h deleted file mode 100644 index 1ca24aec58..0000000000 --- a/indra/llcommon/threadpool.h +++ /dev/null @@ -1,62 +0,0 @@ -/** - * @file threadpool.h - * @author Nat Goodspeed - * @date 2021-10-21 - * @brief ThreadPool configures a WorkQueue along with a pool of threads to - * service it. - * - * $LicenseInfo:firstyear=2021&license=viewerlgpl$ - * Copyright (c) 2021, Linden Research, Inc. - * $/LicenseInfo$ - */ - -#if ! defined(LL_THREADPOOL_H) -#define LL_THREADPOOL_H - -#include "workqueue.h" -#include <string> -#include <thread> -#include <utility> // std::pair -#include <vector> - -namespace LL -{ - - class ThreadPool - { - public: - /** - * Pass ThreadPool a string name. This can be used to look up the - * relevant WorkQueue. - */ - ThreadPool(const std::string& name, size_t threads=1, size_t capacity=1024); - virtual ~ThreadPool(); - - /** - * ThreadPool listens for application shutdown messages on the "LLApp" - * LLEventPump. Call close() to shut down this ThreadPool early. - */ - void close(); - - std::string getName() const { return mName; } - size_t getWidth() const { return mThreads.size(); } - /// obtain a non-const reference to the WorkQueue to post work to it - WorkQueue& getQueue() { return mQueue; } - - /** - * Override run() if you need special processing. The default run() - * implementation simply calls WorkQueue::runUntilClose(). - */ - virtual void run(); - - private: - void run(const std::string& name); - - WorkQueue mQueue; - std::string mName; - std::vector<std::pair<std::string, std::thread>> mThreads; - }; - -} // namespace LL - -#endif /* ! defined(LL_THREADPOOL_H) */ diff --git a/indra/llcommon/timing.cpp b/indra/llcommon/timing.cpp new file mode 100644 index 0000000000..c2dc695ef3 --- /dev/null +++ b/indra/llcommon/timing.cpp @@ -0,0 +1,25 @@ +/** + * @file timing.cpp + * @brief This file will be deprecated in the future. + * + * $LicenseInfo:firstyear=2000&license=viewerlgpl$ + * Second Life Viewer Source Code + * Copyright (C) 2010, Linden Research, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; + * version 2.1 of the License only. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + * Linden Research, Inc., 945 Battery Street, San Francisco, CA 94111 USA + * $/LicenseInfo$ + */ diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index 633594ceea..b32357e832 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -26,9 +26,8 @@ using Mutex = LLCoros::Mutex; using Lock = LLCoros::LockType; -LL::WorkQueue::WorkQueue(const std::string& name, size_t capacity): - super(makeName(name)), - mQueue(capacity) +LL::WorkQueue::WorkQueue(const std::string& name): + super(makeName(name)) { // TODO: register for "LLApp" events so we can implicitly close() on // viewer shutdown. @@ -39,21 +38,6 @@ void LL::WorkQueue::close() mQueue.close(); } -size_t LL::WorkQueue::size() -{ - return mQueue.size(); -} - -bool LL::WorkQueue::isClosed() -{ - return mQueue.isClosed(); -} - -bool LL::WorkQueue::done() -{ - return mQueue.done(); -} - void LL::WorkQueue::runUntilClose() { try @@ -144,13 +128,3 @@ void LL::WorkQueue::error(const std::string& msg) { LL_ERRS("WorkQueue") << msg << LL_ENDL; } - -void LL::WorkQueue::checkCoroutine(const std::string& method) -{ - // By convention, the default coroutine on each thread has an empty name - // string. See also LLCoros::logname(). - if (LLCoros::getName().empty()) - { - LLTHROW(Error("Do not call " + method + " from a thread's default coroutine")); - } -} diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index c25d787425..5ec790da79 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -12,14 +12,14 @@ #if ! defined(LL_WORKQUEUE_H) #define LL_WORKQUEUE_H -#include "llcoros.h" -#include "llexception.h" #include "llinstancetracker.h" #include "threadsafeschedule.h" #include <chrono> -#include <exception> // std::current_exception #include <functional> // std::function +#include <queue> #include <string> +#include <utility> // std::pair +#include <vector> namespace LL { @@ -45,16 +45,11 @@ namespace LL using TimedWork = Queue::TimeTuple; using Closed = Queue::Closed; - struct Error: public LLException - { - Error(const std::string& what): LLException(what) {} - }; - /** * You may omit the WorkQueue name, in which case a unique name is * synthesized; for practical purposes that makes it anonymous. */ - WorkQueue(const std::string& name = std::string(), size_t capacity=1024); + WorkQueue(const std::string& name = std::string()); /** * Since the point of WorkQueue is to pass work to some other worker @@ -64,36 +59,15 @@ namespace LL */ void close(); - /** - * WorkQueue supports multiple producers and multiple consumers. In - * the general case it's misleading to test size(), since any other - * thread might change it the nanosecond the lock is released. On that - * basis, some might argue against publishing a size() method at all. - * - * But there are two specific cases in which a test based on size() - * might be reasonable: - * - * * If you're the only producer, noticing that size() == 0 is - * meaningful. - * * If you're the only consumer, noticing that size() > 0 is - * meaningful. - */ - size_t size(); - /// producer end: are we prevented from pushing any additional items? - bool isClosed(); - /// consumer end: are we done, is the queue entirely drained? - bool done(); - /*---------------------- fire and forget API -----------------------*/ /// fire-and-forget, but at a particular (future?) time template <typename CALLABLE> void post(const TimePoint& time, CALLABLE&& callable) { - // Defer reifying an arbitrary CALLABLE until we hit this or - // postIfOpen(). All other methods should accept CALLABLEs of - // arbitrary type to avoid multiple levels of std::function - // indirection. + // Defer reifying an arbitrary CALLABLE until we hit this method. + // All other methods should accept CALLABLEs of arbitrary type to + // avoid multiple levels of std::function indirection. mQueue.push(TimedWork(time, std::move(callable))); } @@ -109,47 +83,6 @@ namespace LL } /** - * post work for a particular time, unless the queue is closed before - * we can post - */ - template <typename CALLABLE> - bool postIfOpen(const TimePoint& time, CALLABLE&& callable) - { - // Defer reifying an arbitrary CALLABLE until we hit this or - // post(). All other methods should accept CALLABLEs of arbitrary - // type to avoid multiple levels of std::function indirection. - return mQueue.pushIfOpen(TimedWork(time, std::move(callable))); - } - - /** - * post work, unless the queue is closed before we can post - */ - template <typename CALLABLE> - bool postIfOpen(CALLABLE&& callable) - { - return postIfOpen(TimePoint::clock::now(), std::move(callable)); - } - - /** - * Post work to be run at a specified time to another WorkQueue, which - * may or may not still exist and be open. Return true if we were able - * to post. - */ - template <typename CALLABLE> - static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable); - - /** - * Post work to another WorkQueue, which may or may not still exist - * and be open. Return true if we were able to post. - */ - template <typename CALLABLE> - static bool postMaybe(weak_t target, CALLABLE&& callable) - { - return postMaybe(target, TimePoint::clock::now(), - std::forward<CALLABLE>(callable)); - } - - /** * Launch a callable returning bool that will trigger repeatedly at * specified interval, until the callable returns false. * @@ -182,8 +115,63 @@ namespace LL // Studio compile errors that seem utterly unrelated to this source // code. template <typename CALLABLE, typename FOLLOWUP> - bool postTo(weak_t target, - const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback); + bool postTo(WorkQueue::weak_t target, + const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) + { + // We're being asked to post to the WorkQueue at target. + // target is a weak_ptr: have to lock it to check it. + auto tptr = target.lock(); + if (! tptr) + // can't post() if the target WorkQueue has been destroyed + return false; + + // Here we believe target WorkQueue still exists. Post to it a + // lambda that packages our callable, our callback and a weak_ptr + // to this originating WorkQueue. + tptr->post( + time, + [reply = super::getWeak(), + callable = std::move(callable), + callback = std::move(callback)] + () + { + // Call the callable in any case -- but to minimize + // copying the result, immediately bind it into a reply + // lambda. The reply lambda also binds the original + // callback, so that when we, the originating WorkQueue, + // finally receive and process the reply lambda, we'll + // call the bound callback with the bound result -- on the + // same thread that originally called postTo(). + auto rlambda = + [result = callable(), + callback = std::move(callback)] + () + { callback(std::move(result)); }; + // Check if this originating WorkQueue still exists. + // Remember, the outer lambda is now running on a thread + // servicing the target WorkQueue, and real time has + // elapsed since postTo()'s tptr->post() call. + // reply is a weak_ptr: have to lock it to check it. + auto rptr = reply.lock(); + if (rptr) + { + // Only post reply lambda if the originating WorkQueue + // still exists. If not -- who would we tell? Log it? + try + { + rptr->post(std::move(rlambda)); + } + catch (const Closed&) + { + // Originating WorkQueue might still exist, but + // might be Closed. Same thing: just discard the + // callback. + } + } + }); + // looks like we were able to post() + return true; + } /** * Post work to another WorkQueue, requesting a specific callback to @@ -193,36 +181,10 @@ namespace LL * inaccessible. */ template <typename CALLABLE, typename FOLLOWUP> - bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback) + bool postTo(WorkQueue::weak_t target, + CALLABLE&& callable, FOLLOWUP&& callback) { - return postTo(target, TimePoint::clock::now(), - std::move(callable), std::move(callback)); - } - - /** - * Post work to another WorkQueue to be run at a specified time, - * blocking the calling coroutine until then, returning the result to - * caller on completion. - * - * In general, we assume that each thread's default coroutine is busy - * servicing its WorkQueue or whatever. To try to prevent mistakes, we - * forbid calling waitForResult() from a thread's default coroutine. - */ - template <typename CALLABLE> - auto waitForResult(const TimePoint& time, CALLABLE&& callable); - - /** - * Post work to another WorkQueue, blocking the calling coroutine - * until then, returning the result to caller on completion. - * - * In general, we assume that each thread's default coroutine is busy - * servicing its WorkQueue or whatever. To try to prevent mistakes, we - * forbid calling waitForResult() from a thread's default coroutine. - */ - template <typename CALLABLE> - auto waitForResult(CALLABLE&& callable) - { - return waitForResult(TimePoint::clock::now(), std::move(callable)); + return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); } /*--------------------------- worker API ---------------------------*/ @@ -270,23 +232,6 @@ namespace LL bool runUntil(const TimePoint& until); private: - template <typename CALLABLE, typename FOLLOWUP> - static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback); - /// general case: arbitrary C++ return type - template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE> - struct MakeReplyLambda; - /// specialize for CALLABLE returning void - template <typename CALLABLE, typename FOLLOWUP> - struct MakeReplyLambda<CALLABLE, FOLLOWUP, void>; - - /// general case: arbitrary C++ return type - template <typename CALLABLE, typename RETURNTYPE> - struct WaitForResult; - /// specialize for CALLABLE returning void - template <typename CALLABLE> - struct WaitForResult<CALLABLE, void>; - - static void checkCoroutine(const std::string& method); static void error(const std::string& msg); static std::string makeName(const std::string& name); void callWork(const Queue::DataTuple& work); @@ -308,8 +253,8 @@ namespace LL { public: // bind the desired data - BackJack(weak_t target, - const TimePoint& start, + BackJack(WorkQueue::weak_t target, + const WorkQueue::TimePoint& start, const std::chrono::duration<Rep, Period>& interval, CALLABLE&& callable): mTarget(target), @@ -356,8 +301,8 @@ namespace LL } private: - weak_t mTarget; - TimePoint mStart; + WorkQueue::weak_t mTarget; + WorkQueue::TimePoint mStart; std::chrono::duration<Rep, Period> mInterval; CALLABLE mCallable; }; @@ -385,187 +330,6 @@ namespace LL getWeak(), TimePoint::clock::now(), interval, std::move(callable))); } - /// general case: arbitrary C++ return type - template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE> - struct WorkQueue::MakeReplyLambda - { - auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) - { - // Call the callable in any case -- but to minimize - // copying the result, immediately bind it into the reply - // lambda. The reply lambda also binds the original - // callback, so that when we, the originating WorkQueue, - // finally receive and process the reply lambda, we'll - // call the bound callback with the bound result -- on the - // same thread that originally called postTo(). - return - [result = std::forward<CALLABLE>(callable)(), - callback = std::move(callback)] - () - { callback(std::move(result)); }; - } - }; - - /// specialize for CALLABLE returning void - template <typename CALLABLE, typename FOLLOWUP> - struct WorkQueue::MakeReplyLambda<CALLABLE, FOLLOWUP, void> - { - auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) - { - // Call the callable, which produces no result. - std::forward<CALLABLE>(callable)(); - // Our completion callback is simply the caller's callback. - return std::move(callback); - } - }; - - template <typename CALLABLE, typename FOLLOWUP> - auto WorkQueue::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback) - { - return MakeReplyLambda<CALLABLE, FOLLOWUP, - decltype(std::forward<CALLABLE>(callable)())>() - (std::move(callable), std::move(callback)); - } - - template <typename CALLABLE, typename FOLLOWUP> - bool WorkQueue::postTo(weak_t target, - const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) - { - // We're being asked to post to the WorkQueue at target. - // target is a weak_ptr: have to lock it to check it. - auto tptr = target.lock(); - if (! tptr) - // can't post() if the target WorkQueue has been destroyed - return false; - - // Here we believe target WorkQueue still exists. Post to it a - // lambda that packages our callable, our callback and a weak_ptr - // to this originating WorkQueue. - tptr->post( - time, - [reply = super::getWeak(), - callable = std::move(callable), - callback = std::move(callback)] - () - { - // Use postMaybe() below in case this originating WorkQueue - // has been closed or destroyed. Remember, the outer lambda is - // now running on a thread servicing the target WorkQueue, and - // real time has elapsed since postTo()'s tptr->post() call. - try - { - // Make a reply lambda to repost to THIS WorkQueue. - // Delegate to makeReplyLambda() so we can partially - // specialize on void return. - postMaybe(reply, makeReplyLambda(std::move(callable), std::move(callback))); - } - catch (...) - { - // Either variant of makeReplyLambda() is responsible for - // calling the caller's callable. If that throws, return - // the exception to the originating thread. - postMaybe( - reply, - // Bind the current exception to transport back to the - // originating WorkQueue. Once there, rethrow it. - [exc = std::current_exception()](){ std::rethrow_exception(exc); }); - } - }); - - // looks like we were able to post() - return true; - } - - template <typename CALLABLE> - bool WorkQueue::postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable) - { - // target is a weak_ptr: have to lock it to check it - auto tptr = target.lock(); - if (tptr) - { - try - { - tptr->post(time, std::forward<CALLABLE>(callable)); - // we were able to post() - return true; - } - catch (const Closed&) - { - // target WorkQueue still exists, but is Closed - } - } - // either target no longer exists, or its WorkQueue is Closed - return false; - } - - /// general case: arbitrary C++ return type - template <typename CALLABLE, typename RETURNTYPE> - struct WorkQueue::WaitForResult - { - auto operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) - { - LLCoros::Promise<RETURNTYPE> promise; - self->post( - time, - // We dare to bind a reference to Promise because it's - // specifically designed for cross-thread communication. - [&promise, callable = std::move(callable)]() - { - try - { - // call the caller's callable and trigger promise with result - promise.set_value(callable()); - } - catch (...) - { - promise.set_exception(std::current_exception()); - } - }); - auto future{ LLCoros::getFuture(promise) }; - // now, on the calling thread, wait for that result - LLCoros::TempStatus st("waiting for WorkQueue::waitForResult()"); - return future.get(); - } - }; - - /// specialize for CALLABLE returning void - template <typename CALLABLE> - struct WorkQueue::WaitForResult<CALLABLE, void> - { - void operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) - { - LLCoros::Promise<void> promise; - self->post( - time, - // &promise is designed for cross-thread access - [&promise, callable = std::move(callable)]() - { - try - { - callable(); - promise.set_value(); - } - catch (...) - { - promise.set_exception(std::current_exception()); - } - }); - auto future{ LLCoros::getFuture(promise) }; - // block until set_value() - LLCoros::TempStatus st("waiting for void WorkQueue::waitForResult()"); - future.get(); - } - }; - - template <typename CALLABLE> - auto WorkQueue::waitForResult(const TimePoint& time, CALLABLE&& callable) - { - checkCoroutine("waitForResult()"); - // derive callable's return type so we can specialize for void - return WaitForResult<CALLABLE, decltype(std::forward<CALLABLE>(callable)())>() - (this, time, std::forward<CALLABLE>(callable)); - } - } // namespace LL #endif /* ! defined(LL_WORKQUEUE_H) */ |