From eda264c2821a86505b4ec2a898ff3169ab82f895 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Wed, 20 Oct 2021 18:38:36 -0400 Subject: SL-16220: Add a WorkQueue to be serviced by mainloop. Make LLAppViewer::idle() call LL::WorkQueue::runFor() to dequeue and run some or all of the pending responses from worker threads. Add a MainWorkTime setting to specify the time slice the main loop may devote each frame to servicing such responses. --- indra/newview/app_settings/settings.xml | 11 +++++++++++ indra/newview/llappviewer.cpp | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+) (limited to 'indra') diff --git a/indra/newview/app_settings/settings.xml b/indra/newview/app_settings/settings.xml index 05c3fc3bfe..802453d508 100644 --- a/indra/newview/app_settings/settings.xml +++ b/indra/newview/app_settings/settings.xml @@ -3858,6 +3858,17 @@ Value 1 + MainWorkTime + + Comment + Max time per frame devoted to mainloop work queue (in milliseconds) + Persist + 1 + Type + F32 + Value + 0.1 + QueueInventoryFetchTimeout Comment diff --git a/indra/newview/llappviewer.cpp b/indra/newview/llappviewer.cpp index 722a6caa65..7c932a3959 100644 --- a/indra/newview/llappviewer.cpp +++ b/indra/newview/llappviewer.cpp @@ -233,6 +233,8 @@ #include "llavatariconctrl.h" #include "llgroupiconctrl.h" #include "llviewerassetstats.h" +#include "workqueue.h" +using namespace LL; // Include for security api initialization #include "llsecapi.h" @@ -366,6 +368,8 @@ BOOL gLogoutInProgress = FALSE; BOOL gSimulateMemLeak = FALSE; +WorkQueue gMainloopWork("mainloop"); + //////////////////////////////////////////////////////////// // Internal globals... that should be removed. static std::string gArgs; @@ -5210,6 +5214,20 @@ void LLAppViewer::idle() // Execute deferred tasks. LLDeferredTaskList::instance().run(); + // Service the WorkQueue we use for replies from worker threads. + // Use function statics for the timeslice setting so we only have to fetch + // and convert MainWorkTime once. + static F32 MainWorkTimeRaw = gSavedSettings.getF32("MainWorkTime"); + static F32Milliseconds MainWorkTimeMs(MainWorkTimeRaw); + // MainWorkTime is specified in fractional milliseconds, but std::chrono + // uses integer representations. What if we want less than a microsecond? + // Use nanoseconds. We're very sure we will never need to specify a + // MainWorkTime that would be larger than we could express in + // std::chrono::nanoseconds. + static std::chrono::nanoseconds MainWorkTimeNanoSec{ + std::chrono::nanoseconds::rep(MainWorkTimeMs.value() * 1000000)}; + gMainloopWork.runFor(MainWorkTimeNanoSec); + // Handle shutdown process, for example, // wait for floaters to close, send quit message, // forcibly quit if it has taken too long -- cgit v1.2.3 From 11afa09ea3f56c0e20eb195ae1520a88602ceaca Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Fri, 22 Oct 2021 11:36:31 -0400 Subject: SL-16220: Add LL::ThreadPool class and a "General" instance. ThreadPool bundles a WorkQueue with the specified number of worker threads to service it. Each ThreadPool has a name that can be used to locate its WorkQueue. Each worker thread calls WorkQueue::runUntilClose(). ThreadPool listens on the "LLApp" LLEventPump for shutdown notification. On receiving that, it closes its WorkQueue and then join()s each of its worker threads for orderly shutdown. Add a settings.xml entry "ThreadPoolSizes", the first LLSD-valued settings entry to expect a map: pool name->size. The expectation is that usually code instantiating a particular ThreadPool will have a default size in mind, but it should check "ThreadPoolSizes" for a user override. Make idle_startup()'s STATE_SEED_CAP_GRANTED state instantiate a "General" ThreadPool. This is function-static for lazy initialization. Eliminate LLMainLoopRepeater, which is completely unreferenced. Any potential future use cases are better addressed by posting to the main loop's WorkQueue. Eliminate llappviewer.cpp's private LLDeferredTaskList class, which implemented LLAppViewer::addOnIdleCallback(). Make addOnIdleCallback() post work to the main loop's WorkQueue instead. --- indra/llcommon/CMakeLists.txt | 3 +- indra/llcommon/threadpool.cpp | 75 ++++++++++++++++++++++++++++ indra/llcommon/threadpool.h | 46 +++++++++++++++++ indra/llcommon/timing.cpp | 25 ---------- indra/llcommon/workqueue.cpp | 10 ++++ indra/llcommon/workqueue.h | 5 ++ indra/newview/CMakeLists.txt | 2 - indra/newview/app_settings/settings.xml | 14 ++++++ indra/newview/llappviewer.cpp | 47 +----------------- indra/newview/llmainlooprepeater.cpp | 88 --------------------------------- indra/newview/llmainlooprepeater.h | 64 ------------------------ indra/newview/llstartup.cpp | 18 +++++++ 12 files changed, 171 insertions(+), 226 deletions(-) create mode 100644 indra/llcommon/threadpool.cpp create mode 100644 indra/llcommon/threadpool.h delete mode 100644 indra/llcommon/timing.cpp delete mode 100644 indra/newview/llmainlooprepeater.cpp delete mode 100644 indra/newview/llmainlooprepeater.h (limited to 'indra') diff --git a/indra/llcommon/CMakeLists.txt b/indra/llcommon/CMakeLists.txt index fda43dd24c..c374f1135c 100644 --- a/indra/llcommon/CMakeLists.txt +++ b/indra/llcommon/CMakeLists.txt @@ -121,8 +121,8 @@ set(llcommon_SOURCE_FILES lluriparser.cpp lluuid.cpp llworkerthread.cpp - timing.cpp u64.cpp + threadpool.cpp workqueue.cpp StackWalker.cpp ) @@ -258,6 +258,7 @@ set(llcommon_HEADER_FILES lockstatic.h stdtypes.h stringize.h + threadpool.h threadsafeschedule.h timer.h tuple.h diff --git a/indra/llcommon/threadpool.cpp b/indra/llcommon/threadpool.cpp new file mode 100644 index 0000000000..aa7d4179a2 --- /dev/null +++ b/indra/llcommon/threadpool.cpp @@ -0,0 +1,75 @@ +/** + * @file threadpool.cpp + * @author Nat Goodspeed + * @date 2021-10-21 + * @brief Implementation for threadpool. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +// Precompiled header +#include "linden_common.h" +// associated header +#include "threadpool.h" +// STL headers +// std headers +// external library headers +// other Linden headers +#include "llerror.h" +#include "llevents.h" +#include "stringize.h" + +LL::ThreadPool::ThreadPool(const std::string& name, size_t threads): + mQueue(name), + mName("ThreadPool:" + name) +{ + for (size_t i = 0; i < threads; ++i) + { + std::string tname{ STRINGIZE(mName << ':' << (i+i) << '/' << threads) }; + mThreads.emplace_back(tname, [this, tname](){ run(tname); }); + } + // Listen on "LLApp", and when the app is shutting down, close the queue + // and join the workers. + LLEventPumps::instance().obtain("LLApp").listen( + mName, + [this](const LLSD& stat) + { + std::string status(stat["status"]); + if (status != "running") + { + // viewer is starting shutdown -- proclaim the end is nigh! + LL_DEBUGS("ThreadPool") << mName << " saw " << status << LL_ENDL; + close(); + } + return false; + }); +} + +LL::ThreadPool::~ThreadPool() +{ + close(); +} + +void LL::ThreadPool::close() +{ + if (! mQueue.isClosed()) + { + LL_DEBUGS("ThreadPool") << mName << " closing queue and joining threads" << LL_ENDL; + mQueue.close(); + for (auto& pair: mThreads) + { + LL_DEBUGS("ThreadPool") << mName << " waiting on thread " << pair.first << LL_ENDL; + pair.second.join(); + } + LL_DEBUGS("ThreadPool") << mName << " shutdown complete" << LL_ENDL; + } +} + +void LL::ThreadPool::run(const std::string& name) +{ + LL_DEBUGS("ThreadPool") << name << " starting" << LL_ENDL; + mQueue.runUntilClose(); + LL_DEBUGS("ThreadPool") << name << " stopping" << LL_ENDL; +} diff --git a/indra/llcommon/threadpool.h b/indra/llcommon/threadpool.h new file mode 100644 index 0000000000..8f3c8514b5 --- /dev/null +++ b/indra/llcommon/threadpool.h @@ -0,0 +1,46 @@ +/** + * @file threadpool.h + * @author Nat Goodspeed + * @date 2021-10-21 + * @brief ThreadPool configures a WorkQueue along with a pool of threads to + * service it. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +#if ! defined(LL_THREADPOOL_H) +#define LL_THREADPOOL_H + +#include "workqueue.h" +#include +#include +#include // std::pair +#include + +namespace LL +{ + + class ThreadPool + { + public: + /** + * Pass ThreadPool a string name. This can be used to look up the + * relevant WorkQueue. + */ + ThreadPool(const std::string& name, size_t threads=1); + ~ThreadPool(); + void close(); + + private: + void run(const std::string& name); + + WorkQueue mQueue; + std::string mName; + std::vector> mThreads; + }; + +} // namespace LL + +#endif /* ! defined(LL_THREADPOOL_H) */ diff --git a/indra/llcommon/timing.cpp b/indra/llcommon/timing.cpp deleted file mode 100644 index c2dc695ef3..0000000000 --- a/indra/llcommon/timing.cpp +++ /dev/null @@ -1,25 +0,0 @@ -/** - * @file timing.cpp - * @brief This file will be deprecated in the future. - * - * $LicenseInfo:firstyear=2000&license=viewerlgpl$ - * Second Life Viewer Source Code - * Copyright (C) 2010, Linden Research, Inc. - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; - * version 2.1 of the License only. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - * - * Linden Research, Inc., 945 Battery Street, San Francisco, CA 94111 USA - * $/LicenseInfo$ - */ diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index ffc9a97dc0..114aeea1f3 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -38,6 +38,16 @@ void LL::WorkQueue::close() mQueue.close(); } +bool LL::WorkQueue::isClosed() +{ + return mQueue.isClosed(); +} + +bool LL::WorkQueue::done() +{ + return mQueue.done(); +} + void LL::WorkQueue::runUntilClose() { try diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index b88aef989a..cfae2019dc 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -59,6 +59,11 @@ namespace LL */ void close(); + /// producer end: are we prevented from pushing any additional items? + bool isClosed(); + /// consumer end: are we done, is the queue entirely drained? + bool done(); + /*---------------------- fire and forget API -----------------------*/ /// fire-and-forget, but at a particular (future?) time diff --git a/indra/newview/CMakeLists.txt b/indra/newview/CMakeLists.txt index fbe75af712..bad36505d1 100644 --- a/indra/newview/CMakeLists.txt +++ b/indra/newview/CMakeLists.txt @@ -393,7 +393,6 @@ set(viewer_SOURCE_FILES llloginhandler.cpp lllogininstance.cpp llmachineid.cpp - llmainlooprepeater.cpp llmanip.cpp llmaniprotate.cpp llmanipscale.cpp @@ -1032,7 +1031,6 @@ set(viewer_HEADER_FILES llloginhandler.h lllogininstance.h llmachineid.h - llmainlooprepeater.h llmanip.h llmaniprotate.h llmanipscale.h diff --git a/indra/newview/app_settings/settings.xml b/indra/newview/app_settings/settings.xml index 802453d508..3c7fe174fd 100644 --- a/indra/newview/app_settings/settings.xml +++ b/indra/newview/app_settings/settings.xml @@ -12663,6 +12663,20 @@ Value 50 + ThreadPoolSizes + + Comment + Map of size overrides for specific thread pools. + Persist + 1 + Type + LLSD + Value + + General + 4 + + ThrottleBandwidthKBPS Comment diff --git a/indra/newview/llappviewer.cpp b/indra/newview/llappviewer.cpp index 7c932a3959..7c363eea5e 100644 --- a/indra/newview/llappviewer.cpp +++ b/indra/newview/llappviewer.cpp @@ -239,7 +239,6 @@ using namespace LL; // Include for security api initialization #include "llsecapi.h" #include "llmachineid.h" -#include "llmainlooprepeater.h" #include "llcleanup.h" #include "llcoproceduremanager.h" @@ -385,42 +384,6 @@ static std::string gLaunchFileOnQuit; // Used on Win32 for other apps to identify our window (eg, win_setup) const char* const VIEWER_WINDOW_CLASSNAME = "Second Life"; -//-- LLDeferredTaskList ------------------------------------------------------ - -/** - * A list of deferred tasks. - * - * We sometimes need to defer execution of some code until the viewer gets idle, - * e.g. removing an inventory item from within notifyObservers() may not work out. - * - * Tasks added to this list will be executed in the next LLAppViewer::idle() iteration. - * All tasks are executed only once. - */ -class LLDeferredTaskList: public LLSingleton -{ - LLSINGLETON_EMPTY_CTOR(LLDeferredTaskList); - LOG_CLASS(LLDeferredTaskList); - - friend class LLAppViewer; - typedef boost::signals2::signal signal_t; - - void addTask(const signal_t::slot_type& cb) - { - mSignal.connect(cb); - } - - void run() - { - if (!mSignal.empty()) - { - mSignal(); - mSignal.disconnect_all_slots(); - } - } - - signal_t mSignal; -}; - //---------------------------------------------------------------------------- // List of entries from strings.xml to always replace @@ -980,9 +943,6 @@ bool LLAppViewer::init() } LL_INFOS("InitInfo") << "Cache initialization is done." << LL_ENDL ; - // Initialize the repeater service. - LLMainLoopRepeater::instance().start(); - // // Initialize the window // @@ -2171,8 +2131,6 @@ bool LLAppViewer::cleanup() SUBSYSTEM_CLEANUP(LLProxy); LLCore::LLHttp::cleanup(); - LLMainLoopRepeater::instance().stop(); - ll_close_fail_log(); LLError::LLCallStacks::cleanup(); @@ -4437,7 +4395,7 @@ bool LLAppViewer::initCache() void LLAppViewer::addOnIdleCallback(const boost::function& cb) { - LLDeferredTaskList::instance().addTask(cb); + gMainloopWork.post(cb); } void LLAppViewer::loadKeyBindings() @@ -5211,9 +5169,6 @@ void LLAppViewer::idle() } } - // Execute deferred tasks. - LLDeferredTaskList::instance().run(); - // Service the WorkQueue we use for replies from worker threads. // Use function statics for the timeslice setting so we only have to fetch // and convert MainWorkTime once. diff --git a/indra/newview/llmainlooprepeater.cpp b/indra/newview/llmainlooprepeater.cpp deleted file mode 100644 index 6736e9a950..0000000000 --- a/indra/newview/llmainlooprepeater.cpp +++ /dev/null @@ -1,88 +0,0 @@ -/** - * @file llmachineid.cpp - * @brief retrieves unique machine ids - * - * $LicenseInfo:firstyear=2009&license=viewerlgpl$ - * Second Life Viewer Source Code - * Copyright (C) 2010, Linden Research, Inc. - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; - * version 2.1 of the License only. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - * - * Linden Research, Inc., 945 Battery Street, San Francisco, CA 94111 USA - * $/LicenseInfo$ - */ - -#include "llviewerprecompiledheaders.h" -#include "llapr.h" -#include "llevents.h" -#include "llmainlooprepeater.h" - - - -// LLMainLoopRepeater -//----------------------------------------------------------------------------- - - -LLMainLoopRepeater::LLMainLoopRepeater(void): - mQueue(0) -{ - ; // No op. -} - - -void LLMainLoopRepeater::start(void) -{ - if(mQueue != 0) return; - - mQueue = new LLThreadSafeQueue(1024); - mMainLoopConnection = LLEventPumps::instance(). - obtain("mainloop").listen(LLEventPump::inventName(), boost::bind(&LLMainLoopRepeater::onMainLoop, this, _1)); - mRepeaterConnection = LLEventPumps::instance(). - obtain("mainlooprepeater").listen(LLEventPump::inventName(), boost::bind(&LLMainLoopRepeater::onMessage, this, _1)); -} - - -void LLMainLoopRepeater::stop(void) -{ - mMainLoopConnection.release(); - mRepeaterConnection.release(); - - delete mQueue; - mQueue = 0; -} - - -bool LLMainLoopRepeater::onMainLoop(LLSD const &) -{ - LLSD message; - while(mQueue->tryPopBack(message)) { - std::string pump = message["pump"].asString(); - if(pump.length() == 0 ) continue; // No pump. - LLEventPumps::instance().obtain(pump).post(message["payload"]); - } - return false; -} - - -bool LLMainLoopRepeater::onMessage(LLSD const & event) -{ - try { - mQueue->pushFront(event); - } catch(LLThreadSafeQueueError & e) { - LL_WARNS() << "could not repeat message (" << e.what() << ")" << - event.asString() << LL_ENDL; - } - return false; -} diff --git a/indra/newview/llmainlooprepeater.h b/indra/newview/llmainlooprepeater.h deleted file mode 100644 index 2ec3a74e4a..0000000000 --- a/indra/newview/llmainlooprepeater.h +++ /dev/null @@ -1,64 +0,0 @@ -/** - * @file llmainlooprepeater.h - * @brief a service for repeating messages on the main loop. - * - * $LicenseInfo:firstyear=2010&license=viewerlgpl$ - * Second Life Viewer Source Code - * Copyright (C) 2010, Linden Research, Inc. - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; - * version 2.1 of the License only. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - * - * Linden Research, Inc., 945 Battery Street, San Francisco, CA 94111 USA - * $/LicenseInfo$ - */ - -#ifndef LL_LLMAINLOOPREPEATER_H -#define LL_LLMAINLOOPREPEATER_H - - -#include "llsd.h" -#include "llthreadsafequeue.h" - - -// -// A service which creates the pump 'mainlooprepeater' to which any thread can -// post a message that will be re-posted on the main loop. -// -// The posted message should contain two map elements: pump and payload. The -// pump value is a string naming the pump to which the message should be -// re-posted. The payload value is what will be posted to the designated pump. -// -class LLMainLoopRepeater: - public LLSingleton -{ - LLSINGLETON(LLMainLoopRepeater); -public: - // Start the repeater service. - void start(void); - - // Stop the repeater service. - void stop(void); - -private: - LLTempBoundListener mMainLoopConnection; - LLTempBoundListener mRepeaterConnection; - LLThreadSafeQueue * mQueue; - - bool onMainLoop(LLSD const &); - bool onMessage(LLSD const & event); -}; - - -#endif diff --git a/indra/newview/llstartup.cpp b/indra/newview/llstartup.cpp index 57c5074804..13e7fcb6e4 100644 --- a/indra/newview/llstartup.cpp +++ b/indra/newview/llstartup.cpp @@ -205,6 +205,9 @@ #include "llstacktrace.h" +#include "threadpool.h" + + #if LL_WINDOWS #include "lldxhardware.h" #endif @@ -301,6 +304,18 @@ void callback_cache_name(const LLUUID& id, const std::string& full_name, bool is // local classes // +void launchThreadPool() +{ + LLSD poolSizes{ gSavedSettings.getLLSD("ThreadPoolSizes") }; + LLSD sizeSpec{ poolSizes["General"] }; + LLSD::Integer size{ sizeSpec.isInteger()? sizeSpec.asInteger() : 3 }; + LL_DEBUGS("ThreadPool") << "Instantiating General pool with " + << size << " threads" << LL_ENDL; + // Use a function-static ThreadPool: static duration, but instantiated + // only on demand. + static LL::ThreadPool pool("General", size); +} + void update_texture_fetch() { LLAppViewer::getTextureCache()->update(1); // unpauses the texture cache thread @@ -1489,6 +1504,9 @@ bool idle_startup() gAgentCamera.resetCamera(); display_startup(); + // start up the ThreadPool we'll use for textures et al. + launchThreadPool(); + // Initialize global class data needed for surfaces (i.e. textures) LL_DEBUGS("AppInit") << "Initializing sky..." << LL_ENDL; // Initialize all of the viewer object classes for the first time (doing things like texture fetches. -- cgit v1.2.3 From d2763897f22e3d7789f97fe68000662ecd4a3548 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Fri, 22 Oct 2021 21:51:44 -0400 Subject: SL-16220: Fix thread name expression. --- indra/llcommon/threadpool.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'indra') diff --git a/indra/llcommon/threadpool.cpp b/indra/llcommon/threadpool.cpp index aa7d4179a2..1899f9a20a 100644 --- a/indra/llcommon/threadpool.cpp +++ b/indra/llcommon/threadpool.cpp @@ -27,7 +27,7 @@ LL::ThreadPool::ThreadPool(const std::string& name, size_t threads): { for (size_t i = 0; i < threads; ++i) { - std::string tname{ STRINGIZE(mName << ':' << (i+i) << '/' << threads) }; + std::string tname{ STRINGIZE(mName << ':' << (i+1) << '/' << threads) }; mThreads.emplace_back(tname, [this, tname](){ run(tname); }); } // Listen on "LLApp", and when the app is shutting down, close the queue -- cgit v1.2.3 From e7b8c27741201528bf78f95c96ba820833923dab Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Mon, 25 Oct 2021 15:55:49 -0400 Subject: SL-16220: Specialize WorkQueue for callable with void return. Add a test exercising this feature. --- indra/llcommon/tests/threadsafeschedule_test.cpp | 4 +- indra/llcommon/tests/workqueue_test.cpp | 23 +++- indra/llcommon/workqueue.h | 167 +++++++++++++++-------- 3 files changed, 134 insertions(+), 60 deletions(-) (limited to 'indra') diff --git a/indra/llcommon/tests/threadsafeschedule_test.cpp b/indra/llcommon/tests/threadsafeschedule_test.cpp index af67b9f492..c421cc7b1c 100644 --- a/indra/llcommon/tests/threadsafeschedule_test.cpp +++ b/indra/llcommon/tests/threadsafeschedule_test.cpp @@ -46,11 +46,11 @@ namespace tut // the real time required for each push() call. Explicitly increment // the timestamp for each one -- but since we're passing explicit // timestamps, make the queue reorder them. - queue.push(Queue::TimeTuple(Queue::Clock::now() + 20ms, "ghi")); + queue.push(Queue::TimeTuple(Queue::Clock::now() + 200ms, "ghi")); // Given the various push() overloads, you have to match the type // exactly: conversions are ambiguous. queue.push("abc"s); - queue.push(Queue::Clock::now() + 10ms, "def"); + queue.push(Queue::Clock::now() + 100ms, "def"); queue.close(); auto entry = queue.pop(); ensure_equals("failed to pop first", std::get<0>(entry), "abc"s); diff --git a/indra/llcommon/tests/workqueue_test.cpp b/indra/llcommon/tests/workqueue_test.cpp index d5405400fd..b69df49d33 100644 --- a/indra/llcommon/tests/workqueue_test.cpp +++ b/indra/llcommon/tests/workqueue_test.cpp @@ -138,7 +138,8 @@ namespace tut [](){ return 17; }, // Note that a postTo() *callback* can safely bind a reference to // a variable on the invoking thread, because the callback is run - // on the invoking thread. + // on the invoking thread. (Of course the bound variable must + // survive until the callback is called.) [&result](int i){ result = i; }); // this should post the callback to main qptr->runOne(); @@ -156,4 +157,24 @@ namespace tut main.runPending(); ensure_equals("failed to run string callback", alpha, "abc"); } + + template<> template<> + void object::test<5>() + { + set_test_name("postTo with void return"); + WorkQueue main("main"); + auto qptr = WorkQueue::getInstance("queue"); + std::string observe; + main.postTo( + qptr, + // The ONLY reason we can get away with binding a reference to + // 'observe' in our work callable is because we're directly + // calling qptr->runOne() on this same thread. It would be a + // mistake to do that if some other thread were servicing 'queue'. + [&observe](){ observe = "queue"; }, + [&observe](){ observe.append(";main"); }); + qptr->runOne(); + main.runOne(); + ensure_equals("failed to run both lambdas", observe, "queue;main"); + } } // namespace tut diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index cfae2019dc..deef3c8e84 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -115,62 +115,7 @@ namespace LL // code. template bool postTo(WorkQueue::weak_t target, - const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) - { - // We're being asked to post to the WorkQueue at target. - // target is a weak_ptr: have to lock it to check it. - auto tptr = target.lock(); - if (! tptr) - // can't post() if the target WorkQueue has been destroyed - return false; - - // Here we believe target WorkQueue still exists. Post to it a - // lambda that packages our callable, our callback and a weak_ptr - // to this originating WorkQueue. - tptr->post( - time, - [reply = super::getWeak(), - callable = std::move(callable), - callback = std::move(callback)] - () - { - // Call the callable in any case -- but to minimize - // copying the result, immediately bind it into a reply - // lambda. The reply lambda also binds the original - // callback, so that when we, the originating WorkQueue, - // finally receive and process the reply lambda, we'll - // call the bound callback with the bound result -- on the - // same thread that originally called postTo(). - auto rlambda = - [result = callable(), - callback = std::move(callback)] - () - { callback(std::move(result)); }; - // Check if this originating WorkQueue still exists. - // Remember, the outer lambda is now running on a thread - // servicing the target WorkQueue, and real time has - // elapsed since postTo()'s tptr->post() call. - // reply is a weak_ptr: have to lock it to check it. - auto rptr = reply.lock(); - if (rptr) - { - // Only post reply lambda if the originating WorkQueue - // still exists. If not -- who would we tell? Log it? - try - { - rptr->post(std::move(rlambda)); - } - catch (const Closed&) - { - // Originating WorkQueue might still exist, but - // might be Closed. Same thing: just discard the - // callback. - } - } - }); - // looks like we were able to post() - return true; - } + const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback); /** * Post work to another WorkQueue, requesting a specific callback to @@ -183,7 +128,8 @@ namespace LL bool postTo(WorkQueue::weak_t target, CALLABLE&& callable, FOLLOWUP&& callback) { - return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); + return postTo(target, TimePoint::clock::now(), + std::move(callable), std::move(callback)); } /*--------------------------- worker API ---------------------------*/ @@ -231,6 +177,17 @@ namespace LL bool runUntil(const TimePoint& until); private: + template + static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback); + + /// general case: arbitrary C++ return type + template + struct MakeReplyLambda; + + /// specialize for CALLABLE returning void + template + struct MakeReplyLambda; + static void error(const std::string& msg); static std::string makeName(const std::string& name); void callWork(const Queue::DataTuple& work); @@ -329,6 +286,102 @@ namespace LL getWeak(), TimePoint::clock::now(), interval, std::move(callable))); } + template + struct WorkQueue::MakeReplyLambda + { + auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) + { + // Call the callable in any case -- but to minimize + // copying the result, immediately bind it into the reply + // lambda. The reply lambda also binds the original + // callback, so that when we, the originating WorkQueue, + // finally receive and process the reply lambda, we'll + // call the bound callback with the bound result -- on the + // same thread that originally called postTo(). + return + [result = std::forward(callable)(), + callback = std::move(callback)] + () + { callback(std::move(result)); }; + } + }; + + /// specialize for CALLABLE returning void + template + struct WorkQueue::MakeReplyLambda + { + auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) + { + // Call the callable, which produces no result. + std::forward(callable)(); + // This reply lambda binds the original callback, so + // that when we, the originating WorkQueue, finally + // receive and process the reply lambda, we'll call + // the bound callback -- on the same thread that + // originally called postTo(). + return [callback = std::move(callback)](){ callback(); }; + } + }; + + template + auto WorkQueue::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback) + { + return MakeReplyLambda(callable)())>() + (std::move(callable), std::move(callback)); + } + + template + bool WorkQueue::postTo(WorkQueue::weak_t target, + const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) + { + // We're being asked to post to the WorkQueue at target. + // target is a weak_ptr: have to lock it to check it. + auto tptr = target.lock(); + if (! tptr) + // can't post() if the target WorkQueue has been destroyed + return false; + + // Here we believe target WorkQueue still exists. Post to it a + // lambda that packages our callable, our callback and a weak_ptr + // to this originating WorkQueue. + tptr->post( + time, + [reply = super::getWeak(), + callable = std::move(callable), + callback = std::move(callback)] + () + { + // Make a reply lambda to repost to THIS WorkQueue. + // Delegate to makeReplyLambda() so we can partially + // specialize on void return. + auto rlambda = makeReplyLambda(std::move(callable), std::move(callback)); + // Check if this originating WorkQueue still exists. + // Remember, the outer lambda is now running on a thread + // servicing the target WorkQueue, and real time has + // elapsed since postTo()'s tptr->post() call. + // reply is a weak_ptr: have to lock it to check it. + auto rptr = reply.lock(); + if (rptr) + { + // Only post reply lambda if the originating WorkQueue + // still exists. If not -- who would we tell? Log it? + try + { + rptr->post(std::move(rlambda)); + } + catch (const Closed&) + { + // Originating WorkQueue might still exist, but + // might be Closed. Same thing: just discard the + // callback. + } + } + }); + // looks like we were able to post() + return true; + } + } // namespace LL #endif /* ! defined(LL_WORKQUEUE_H) */ -- cgit v1.2.3 From 023d39963e850356e1af6eec7f857e2534ce8d38 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Mon, 25 Oct 2021 17:31:27 -0400 Subject: SL-16220: WorkQueue::runOn() methods submit work, wait for result. The idea is that you can call runOn(target, callable) from a (non-default) coroutine and block that coroutine until the result becomes available. As a safety check, we forbid calling runOn() from a thread's default coroutine, assuming that a given thread's default coroutine is the one servicing the relevant WorkQueue. --- indra/llcommon/workqueue.cpp | 15 +++++ indra/llcommon/workqueue.h | 150 +++++++++++++++++++++++++++++++++++++++---- 2 files changed, 154 insertions(+), 11 deletions(-) (limited to 'indra') diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index 114aeea1f3..f7ffc8233c 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -26,6 +26,11 @@ using Mutex = LLCoros::Mutex; using Lock = LLCoros::LockType; +struct NotOnDftCoro: public LLException +{ + NotOnDftCoro(const std::string& what): LLException(what) {} +}; + LL::WorkQueue::WorkQueue(const std::string& name): super(makeName(name)) { @@ -136,3 +141,13 @@ void LL::WorkQueue::error(const std::string& msg) { LL_ERRS("WorkQueue") << msg << LL_ENDL; } + +void LL::WorkQueue::checkCoroutine(const std::string& method) +{ + // By convention, the default coroutine on each thread has an empty name + // string. See also LLCoros::logname(). + if (LLCoros::getName().empty()) + { + LLTHROW(NotOnDftCoro("Do not call " + method + " from a thread's default coroutine")); + } +} diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index deef3c8e84..b17c666172 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -12,11 +12,18 @@ #if ! defined(LL_WORKQUEUE_H) #define LL_WORKQUEUE_H +#include "llcoros.h" #include "llinstancetracker.h" #include "threadsafeschedule.h" #include #include // std::function -#include +#if __cplusplus >= 201703 +#include +namespace stdopt = std; +#else +#include +namespace stdopt = boost; +#endif #include #include // std::pair #include @@ -44,6 +51,8 @@ namespace LL using TimePoint = Queue::TimePoint; using TimedWork = Queue::TimeTuple; using Closed = Queue::Closed; + template + using optional = stdopt::optional; /** * You may omit the WorkQueue name, in which case a unique name is @@ -114,7 +123,7 @@ namespace LL // Studio compile errors that seem utterly unrelated to this source // code. template - bool postTo(WorkQueue::weak_t target, + bool postTo(weak_t target, const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback); /** @@ -125,13 +134,62 @@ namespace LL * inaccessible. */ template - bool postTo(WorkQueue::weak_t target, - CALLABLE&& callable, FOLLOWUP&& callback) + bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback) { return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback)); } + /** + * Post work to another WorkQueue to be run at a specified time, + * blocking the calling coroutine until then, returning the result to + * caller on completion. + * + * REQUIRED: + * + * * The calling thread is the thread servicing 'this' WorkQueue. + * * The calling coroutine is not the @em coroutine servicing this + * WorkQueue. We block the calling coroutine until the result is + * available. If this same coroutine is responsible for checking the + * local WorkQueue, the result will never be dequeued. In practice, + * to try to prevent mistakes, we forbid calling runOn() from a + * thread's default coroutine. + * + * Returns result if able to post, empty optional if the other + * WorkQueue is inaccessible. + * + * If the passed callable has void return, runOn() returns bool true + * if able to post, false if the other WorkQueue is inaccessible. + */ + template + auto runOn(weak_t target, const TimePoint& time, CALLABLE&& callable); + + /** + * Post work to another WorkQueue, blocking the calling coroutine + * until then, returning the result to caller on completion. + * + * REQUIRED: + * + * * The calling thread is the thread servicing 'this' WorkQueue. + * * The calling coroutine is not the @em coroutine servicing this + * WorkQueue. We block the calling coroutine until the result is + * available. If this same coroutine is responsible for checking the + * local WorkQueue, the result will never be dequeued. In practice, + * to try to prevent mistakes, we forbid calling runOn() from a + * thread's default coroutine. + * + * Returns result if able to post, empty optional if the other + * WorkQueue is inaccessible. + * + * If the passed callable has void return, runOn() returns bool true + * if able to post, false if the other WorkQueue is inaccessible. + */ + template + auto runOn(weak_t target, CALLABLE&& callable) + { + return runOn(target, TimePoint::clock::now(), std::move(callable)); + } + /*--------------------------- worker API ---------------------------*/ /** @@ -179,15 +237,21 @@ namespace LL private: template static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback); - /// general case: arbitrary C++ return type template struct MakeReplyLambda; - /// specialize for CALLABLE returning void template struct MakeReplyLambda; + /// general case: arbitrary C++ return type + template + struct RunOn; + /// specialize for CALLABLE returning void + template + struct RunOn; + + static void checkCoroutine(const std::string& method); static void error(const std::string& msg); static std::string makeName(const std::string& name); void callWork(const Queue::DataTuple& work); @@ -209,8 +273,8 @@ namespace LL { public: // bind the desired data - BackJack(WorkQueue::weak_t target, - const WorkQueue::TimePoint& start, + BackJack(weak_t target, + const TimePoint& start, const std::chrono::duration& interval, CALLABLE&& callable): mTarget(target), @@ -257,8 +321,8 @@ namespace LL } private: - WorkQueue::weak_t mTarget; - WorkQueue::TimePoint mStart; + weak_t mTarget; + TimePoint mStart; std::chrono::duration mInterval; CALLABLE mCallable; }; @@ -286,6 +350,7 @@ namespace LL getWeak(), TimePoint::clock::now(), interval, std::move(callable))); } + /// general case: arbitrary C++ return type template struct WorkQueue::MakeReplyLambda { @@ -332,7 +397,7 @@ namespace LL } template - bool WorkQueue::postTo(WorkQueue::weak_t target, + bool WorkQueue::postTo(weak_t target, const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) { // We're being asked to post to the WorkQueue at target. @@ -382,6 +447,69 @@ namespace LL return true; } + /// general case: arbitrary C++ return type + template + struct WorkQueue::RunOn + { + optional operator()(WorkQueue* self, weak_t target, + const TimePoint& time, CALLABLE&& callable) + { + LLCoros::Promise promise; + if (! self->postTo( + target, + time, + std::forward(callable), + // We dare to bind a reference to Promise because it's + // specifically intended for cross-thread synchronization. + [&promise] + (RETURNTYPE&& result) + { + promise.set_value(std::forward(result)); + })) + { + // we couldn't even postTo(): return empty optional + return {}; + } + // we were able to post + auto future{ LLCoros::getFuture(promise) }; + return { future.get(); } + } + }; + + /// specialize for CALLABLE returning void + template + struct WorkQueue::RunOn + { + bool operator()(WorkQueue* self, weak_t target, + const TimePoint& time, CALLABLE&& callable) + { + LLCoros::Promise promise; + if (! self->postTo( + target, + time, + std::forward(callable), + // &promise is designed for cross-thread access + [&promise](){ promise.set_value(); })) + { + // we couldn't postTo() + return false; + } + // we were able to post + auto future{ LLCoros::getFuture(promise) }; + // block until set_value() + future.get(); + return true; + } + }; + + template + auto WorkQueue::runOn(weak_t target, const TimePoint& time, CALLABLE&& callable) + { + checkCoroutine("runOn()"); + return RunOn(callable)())>() + (this, target, time, std::forward(callable)); + } + } // namespace LL #endif /* ! defined(LL_WORKQUEUE_H) */ -- cgit v1.2.3 From e6eebea8da545350f6684c191c633dd2fbc6f6f1 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Tue, 26 Oct 2021 11:49:53 -0400 Subject: SL-16220: Change WorkQueue::runOn() to waitForResult(). In addition to the name making the blocking explicit, we changed the signature: instead of specifying a target WorkQueue on which to run, waitForResult() runs the passed callable on its own WorkQueue. Why is that? Because, unlike postTo(), we do not require a handshake between two different WorkQueues. postTo() allows running arbitrary callback code, setting variables or whatever, on the originating WorkQueue (presumably on the originating thread). waitForResult() synchronizes using Promise/Future, which are explicitly designed for cross-thread communication. We need not call set_value() on the originating thread, so we don't need a postTo() callback lambda. --- indra/llcommon/workqueue.cpp | 7 +-- indra/llcommon/workqueue.h | 145 ++++++++++++++++++------------------------- 2 files changed, 62 insertions(+), 90 deletions(-) (limited to 'indra') diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index f7ffc8233c..ac3086aac5 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -26,11 +26,6 @@ using Mutex = LLCoros::Mutex; using Lock = LLCoros::LockType; -struct NotOnDftCoro: public LLException -{ - NotOnDftCoro(const std::string& what): LLException(what) {} -}; - LL::WorkQueue::WorkQueue(const std::string& name): super(makeName(name)) { @@ -148,6 +143,6 @@ void LL::WorkQueue::checkCoroutine(const std::string& method) // string. See also LLCoros::logname(). if (LLCoros::getName().empty()) { - LLTHROW(NotOnDftCoro("Do not call " + method + " from a thread's default coroutine")); + LLTHROW(Error("Do not call " + method + " from a thread's default coroutine")); } } diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index b17c666172..869f5d9a82 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -13,20 +13,13 @@ #define LL_WORKQUEUE_H #include "llcoros.h" +#include "llexception.h" #include "llinstancetracker.h" #include "threadsafeschedule.h" #include +#include // std::current_exception #include // std::function -#if __cplusplus >= 201703 -#include -namespace stdopt = std; -#else -#include -namespace stdopt = boost; -#endif #include -#include // std::pair -#include namespace LL { @@ -51,8 +44,11 @@ namespace LL using TimePoint = Queue::TimePoint; using TimedWork = Queue::TimeTuple; using Closed = Queue::Closed; - template - using optional = stdopt::optional; + + struct Error: public LLException + { + Error(const std::string& what): LLException(what) {} + }; /** * You may omit the WorkQueue name, in which case a unique name is @@ -145,49 +141,25 @@ namespace LL * blocking the calling coroutine until then, returning the result to * caller on completion. * - * REQUIRED: - * - * * The calling thread is the thread servicing 'this' WorkQueue. - * * The calling coroutine is not the @em coroutine servicing this - * WorkQueue. We block the calling coroutine until the result is - * available. If this same coroutine is responsible for checking the - * local WorkQueue, the result will never be dequeued. In practice, - * to try to prevent mistakes, we forbid calling runOn() from a - * thread's default coroutine. - * - * Returns result if able to post, empty optional if the other - * WorkQueue is inaccessible. - * - * If the passed callable has void return, runOn() returns bool true - * if able to post, false if the other WorkQueue is inaccessible. + * In general, we assume that each thread's default coroutine is busy + * servicing its WorkQueue or whatever. To try to prevent mistakes, we + * forbid calling waitForResult() from a thread's default coroutine. */ template - auto runOn(weak_t target, const TimePoint& time, CALLABLE&& callable); + auto waitForResult(const TimePoint& time, CALLABLE&& callable); /** * Post work to another WorkQueue, blocking the calling coroutine * until then, returning the result to caller on completion. * - * REQUIRED: - * - * * The calling thread is the thread servicing 'this' WorkQueue. - * * The calling coroutine is not the @em coroutine servicing this - * WorkQueue. We block the calling coroutine until the result is - * available. If this same coroutine is responsible for checking the - * local WorkQueue, the result will never be dequeued. In practice, - * to try to prevent mistakes, we forbid calling runOn() from a - * thread's default coroutine. - * - * Returns result if able to post, empty optional if the other - * WorkQueue is inaccessible. - * - * If the passed callable has void return, runOn() returns bool true - * if able to post, false if the other WorkQueue is inaccessible. + * In general, we assume that each thread's default coroutine is busy + * servicing its WorkQueue or whatever. To try to prevent mistakes, we + * forbid calling waitForResult() from a thread's default coroutine. */ template - auto runOn(weak_t target, CALLABLE&& callable) + auto waitForResult(CALLABLE&& callable) { - return runOn(target, TimePoint::clock::now(), std::move(callable)); + return waitForResult(TimePoint::clock::now(), std::move(callable)); } /*--------------------------- worker API ---------------------------*/ @@ -246,10 +218,10 @@ namespace LL /// general case: arbitrary C++ return type template - struct RunOn; + struct WaitForResult; /// specialize for CALLABLE returning void template - struct RunOn; + struct WaitForResult; static void checkCoroutine(const std::string& method); static void error(const std::string& msg); @@ -449,65 +421,70 @@ namespace LL /// general case: arbitrary C++ return type template - struct WorkQueue::RunOn + struct WorkQueue::WaitForResult { - optional operator()(WorkQueue* self, weak_t target, - const TimePoint& time, CALLABLE&& callable) + auto operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) { LLCoros::Promise promise; - if (! self->postTo( - target, - time, - std::forward(callable), - // We dare to bind a reference to Promise because it's - // specifically intended for cross-thread synchronization. - [&promise] - (RETURNTYPE&& result) + self->post( + time, + // We dare to bind a reference to Promise because it's + // specifically designed for cross-thread communication. + [&promise, callable = std::move(callable)]() + { + try { - promise.set_value(std::forward(result)); - })) - { - // we couldn't even postTo(): return empty optional - return {}; - } - // we were able to post + // call the caller's callable and trigger promise with result + promise.set_value(callable()); + } + catch (...) + { + promise.set_exception(std::current_exception()); + } + }); auto future{ LLCoros::getFuture(promise) }; - return { future.get(); } + // now, on the calling thread, wait for that result + LLCoros::TempStatus st("waiting for WorkQueue::waitForResult()"); + return future.get(); } }; /// specialize for CALLABLE returning void template - struct WorkQueue::RunOn + struct WorkQueue::WaitForResult { - bool operator()(WorkQueue* self, weak_t target, - const TimePoint& time, CALLABLE&& callable) + void operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) { LLCoros::Promise promise; - if (! self->postTo( - target, - time, - std::forward(callable), - // &promise is designed for cross-thread access - [&promise](){ promise.set_value(); })) - { - // we couldn't postTo() - return false; - } - // we were able to post + self->post( + time, + // &promise is designed for cross-thread access + [&promise, callable = std::move(callable)]() + { + try + { + callable(); + promise.set_value(); + } + catch (...) + { + promise.set_exception(std::current_exception()); + } + }); auto future{ LLCoros::getFuture(promise) }; // block until set_value() + LLCoros::TempStatus st("waiting for void WorkQueue::waitForResult()"); future.get(); - return true; } }; template - auto WorkQueue::runOn(weak_t target, const TimePoint& time, CALLABLE&& callable) + auto WorkQueue::waitForResult(const TimePoint& time, CALLABLE&& callable) { - checkCoroutine("runOn()"); - return RunOn(callable)())>() - (this, target, time, std::forward(callable)); + checkCoroutine("waitForResult()"); + // derive callable's return type so we can specialize for void + return WaitForResult(callable)())>() + (this, time, std::forward(callable)); } } // namespace LL -- cgit v1.2.3 From f06765cba868679492934452354d16f9f3af9ade Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Tue, 26 Oct 2021 12:29:49 -0400 Subject: SL-16220: Make WorkQueue::postTo() return exception to caller. postTo() sets up two-way communication: the caller asks to run work on some other WorkQueue, expecting an eventual callback on the originating WorkQueue. That permits us to transport any exception thrown by the work callable back to rethrow on the originating WorkQueue. --- indra/llcommon/workqueue.h | 93 +++++++++++++++++++++++++++++++--------------- 1 file changed, 64 insertions(+), 29 deletions(-) (limited to 'indra') diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index 869f5d9a82..42f5d78ba3 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -136,6 +136,25 @@ namespace LL std::move(callable), std::move(callback)); } + /** + * Post work to be run at a specified time to another WorkQueue, which + * may or may not still exist and be open. Return true if we were able + * to post. + */ + template + static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable); + + /** + * Post work to another WorkQueue, which may or may not still exist + * and be open. Return true if we were able to post. + */ + template + static bool postMaybe(weak_t target, CALLABLE&& callable) + { + return postMaybe(target, TimePoint::clock::now(), + std::forward(callable)); + } + /** * Post work to another WorkQueue to be run at a specified time, * blocking the calling coroutine until then, returning the result to @@ -351,12 +370,8 @@ namespace LL { // Call the callable, which produces no result. std::forward(callable)(); - // This reply lambda binds the original callback, so - // that when we, the originating WorkQueue, finally - // receive and process the reply lambda, we'll call - // the bound callback -- on the same thread that - // originally called postTo(). - return [callback = std::move(callback)](){ callback(); }; + // Our completion callback is simply the caller's callback. + return std::move(callback); } }; @@ -389,36 +404,56 @@ namespace LL callback = std::move(callback)] () { - // Make a reply lambda to repost to THIS WorkQueue. - // Delegate to makeReplyLambda() so we can partially - // specialize on void return. - auto rlambda = makeReplyLambda(std::move(callable), std::move(callback)); - // Check if this originating WorkQueue still exists. - // Remember, the outer lambda is now running on a thread - // servicing the target WorkQueue, and real time has - // elapsed since postTo()'s tptr->post() call. - // reply is a weak_ptr: have to lock it to check it. - auto rptr = reply.lock(); - if (rptr) + // Use postMaybe() below in case this originating WorkQueue + // has been closed or destroyed. Remember, the outer lambda is + // now running on a thread servicing the target WorkQueue, and + // real time has elapsed since postTo()'s tptr->post() call. + try { - // Only post reply lambda if the originating WorkQueue - // still exists. If not -- who would we tell? Log it? - try - { - rptr->post(std::move(rlambda)); - } - catch (const Closed&) - { - // Originating WorkQueue might still exist, but - // might be Closed. Same thing: just discard the - // callback. - } + // Make a reply lambda to repost to THIS WorkQueue. + // Delegate to makeReplyLambda() so we can partially + // specialize on void return. + postMaybe(reply, makeReplyLambda(std::move(callable), std::move(callback))); + } + catch (...) + { + // Either variant of makeReplyLambda() is responsible for + // calling the caller's callable. If that throws, return + // the exception to the originating thread. + postMaybe( + reply, + // Bind the current exception to transport back to the + // originating WorkQueue. Once there, rethrow it. + [exc = std::current_exception()](){ std::rethrow_exception(exc); }); } }); + // looks like we were able to post() return true; } + template + bool WorkQueue::postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable) + { + // target is a weak_ptr: have to lock it to check it + auto tptr = target.lock(); + if (tptr) + { + try + { + tptr->post(time, std::forward(callable)); + // we were able to post() + return true; + } + catch (const Closed&) + { + // target WorkQueue still exists, but is Closed + } + } + // either target no longer exists, or its WorkQueue is Closed + return false; + } + /// general case: arbitrary C++ return type template struct WorkQueue::WaitForResult -- cgit v1.2.3 From 8b16ecb9cfb4917fe38e4e5b0e4f40a23dd4ffbf Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Wed, 27 Oct 2021 15:31:54 -0400 Subject: SL-16220: Add tests for WorkQueue::waitForResult(), void & non-void. --- indra/llcommon/tests/workqueue_test.cpp | 49 +++++++++++++++++++++++++++++++++ indra/llcommon/workqueue.h | 38 ++++++++++++------------- 2 files changed, 68 insertions(+), 19 deletions(-) (limited to 'indra') diff --git a/indra/llcommon/tests/workqueue_test.cpp b/indra/llcommon/tests/workqueue_test.cpp index b69df49d33..bea3ad911b 100644 --- a/indra/llcommon/tests/workqueue_test.cpp +++ b/indra/llcommon/tests/workqueue_test.cpp @@ -20,7 +20,10 @@ // external library headers // other Linden headers #include "../test/lltut.h" +#include "../test/catch_and_store_what_in.h" #include "llcond.h" +#include "llcoros.h" +#include "lleventcoro.h" #include "llstring.h" #include "stringize.h" @@ -177,4 +180,50 @@ namespace tut main.runOne(); ensure_equals("failed to run both lambdas", observe, "queue;main"); } + + template<> template<> + void object::test<6>() + { + set_test_name("waitForResult"); + std::string stored; + // Try to call waitForResult() on this thread's main coroutine. It + // should throw because the main coroutine must service the queue. + auto what{ catch_what( + [this, &stored](){ stored = queue.waitForResult( + [](){ return "should throw"; }); }) }; + ensure("lambda should not have run", stored.empty()); + ensure_not("waitForResult() should have thrown", what.empty()); + ensure(STRINGIZE("should mention waitForResult: " << what), + what.find("waitForResult") != std::string::npos); + + // Call waitForResult() on a coroutine, with a string result. + LLCoros::instance().launch( + "waitForResult string", + [this, &stored]() + { stored = queue.waitForResult( + [](){ return "string result"; }); }); + llcoro::suspend(); + // Nothing will have happened yet because, even if the coroutine did + // run immediately, all it did was to queue the inner lambda on + // 'queue'. Service it. + queue.runOne(); + llcoro::suspend(); + ensure_equals("bad waitForResult return", stored, "string result"); + + // Call waitForResult() on a coroutine, with a void callable. + stored.clear(); + bool done = false; + LLCoros::instance().launch( + "waitForResult void", + [this, &stored, &done]() + { + queue.waitForResult([&stored](){ stored = "ran"; }); + done = true; + }); + llcoro::suspend(); + queue.runOne(); + llcoro::suspend(); + ensure_equals("didn't run coroutine", stored, "ran"); + ensure("void waitForResult() didn't return", done); + } } // namespace tut diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index 42f5d78ba3..7dbc735c6d 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -92,6 +92,25 @@ namespace LL post(TimePoint::clock::now(), std::move(callable)); } + /** + * Post work to be run at a specified time to another WorkQueue, which + * may or may not still exist and be open. Return true if we were able + * to post. + */ + template + static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable); + + /** + * Post work to another WorkQueue, which may or may not still exist + * and be open. Return true if we were able to post. + */ + template + static bool postMaybe(weak_t target, CALLABLE&& callable) + { + return postMaybe(target, TimePoint::clock::now(), + std::forward(callable)); + } + /** * Launch a callable returning bool that will trigger repeatedly at * specified interval, until the callable returns false. @@ -136,25 +155,6 @@ namespace LL std::move(callable), std::move(callback)); } - /** - * Post work to be run at a specified time to another WorkQueue, which - * may or may not still exist and be open. Return true if we were able - * to post. - */ - template - static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable); - - /** - * Post work to another WorkQueue, which may or may not still exist - * and be open. Return true if we were able to post. - */ - template - static bool postMaybe(weak_t target, CALLABLE&& callable) - { - return postMaybe(target, TimePoint::clock::now(), - std::forward(callable)); - } - /** * Post work to another WorkQueue to be run at a specified time, * blocking the calling coroutine until then, returning the result to -- cgit v1.2.3