From 623ac79120a417ec445ce5c106a907fe46734309 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 7 Oct 2021 15:32:51 -0400 Subject: SL-16024: Add LL::WorkQueue for passing work items between threads. A typical WorkQueue has a string name, which can be used to find it to post work to it. "Work" is a nullary callable. WorkQueue is a multi-producer, multi-consumer thread-safe queue: multiple threads can service the WorkQueue, multiple threads can post work to it. Work can be scheduled in the future by submitting with a timestamp. In addition, a given work item can be scheduled to run on a recurring basis. A requesting thread servicing a WorkQueue of its own, such as the viewer's main thread, can submit work to another WorkQueue along with a callback to be passed the result (of arbitrary type) of the first work item. The callback is posted to the originating WorkQueue, permitting safe data exchange between participating threads. Methods are provided for different kinds of servicing threads. runUntilClose() is useful for a simple worker thread. runFor(duration) devotes no more than a specified time slice to that WorkQueue, e.g. for use by the main thread. --- indra/llcommon/workqueue.cpp | 114 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 indra/llcommon/workqueue.cpp (limited to 'indra/llcommon/workqueue.cpp') diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp new file mode 100644 index 0000000000..15e292fb43 --- /dev/null +++ b/indra/llcommon/workqueue.cpp @@ -0,0 +1,114 @@ +/** + * @file workqueue.cpp + * @author Nat Goodspeed + * @date 2021-10-06 + * @brief Implementation for WorkQueue. + * + * $LicenseInfo:firstyear=2021&license=viewerlgpl$ + * Copyright (c) 2021, Linden Research, Inc. + * $/LicenseInfo$ + */ + +// Precompiled header +#include "linden_common.h" +// associated header +#include "workqueue.h" +// STL headers +// std headers +// external library headers +// other Linden headers +#include "llerror.h" +#include "llexception.h" +#include "stringize.h" + +LL::WorkQueue::WorkQueue(const std::string& name): + super(makeName(name)) +{ + // TODO: register for "LLApp" events so we can implicitly close() on + // viewer shutdown. +} + +void LL::WorkQueue::close() +{ + mQueue.close(); +} + +void LL::WorkQueue::runUntilClose() +{ + try + { + for (;;) + { + callWork(mQueue.pop()); + } + } + catch (const Queue::Closed&) + { + } +} + +bool LL::WorkQueue::runPending() +{ + for (Work work; mQueue.tryPop(work); ) + { + callWork(work); + } + return ! mQueue.done(); +} + +bool LL::WorkQueue::runOne() +{ + Work work; + if (mQueue.tryPop(work)) + { + callWork(work); + } + return ! mQueue.done(); +} + +bool LL::WorkQueue::runUntil(const TimePoint& until) +{ + // Should we subtract some slop to allow for typical Work execution time? + // How much slop? + Work work; + while (TimePoint::clock::now() < until && mQueue.tryPopUntil(until, work)) + { + callWork(work); + } + return ! mQueue.done(); +} + +std::string LL::WorkQueue::makeName(const std::string& name) +{ + if (! name.empty()) + return name; + + static thread_local U32 discriminator = 0; + return STRINGIZE("WorkQueue" << discriminator++); +} + +void LL::WorkQueue::callWork(const Queue::DataTuple& work) +{ + // ThreadSafeSchedule::pop() always delivers a tuple, even when + // there's only one data field per item, as for us. + callWork(std::get<0>(work)); +} + +void LL::WorkQueue::callWork(const Work& work) +{ + try + { + work(); + } + catch (...) + { + // No matter what goes wrong with any individual work item, the worker + // thread must go on! Log our own instance name with the exception. + LOG_UNHANDLED_EXCEPTION(getKey()); + } +} + +void LL::WorkQueue::error(const std::string& msg) +{ + LL_ERRS("WorkQueue") << msg << LL_ENDL; +} -- cgit v1.2.3 From c585ddb75e383cdd994d0d99fed8f2de8f955e3c Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 7 Oct 2021 16:45:15 -0400 Subject: SL-16024: Defend against two threads making "anonymous" WorkQueues. Also make workqueue_test.cpp more robust. --- indra/llcommon/workqueue.cpp | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) (limited to 'indra/llcommon/workqueue.cpp') diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index 15e292fb43..ffc9a97dc0 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -17,10 +17,15 @@ // std headers // external library headers // other Linden headers +#include "llcoros.h" +#include LLCOROS_MUTEX_HEADER #include "llerror.h" #include "llexception.h" #include "stringize.h" +using Mutex = LLCoros::Mutex; +using Lock = LLCoros::LockType; + LL::WorkQueue::WorkQueue(const std::string& name): super(makeName(name)) { @@ -83,8 +88,17 @@ std::string LL::WorkQueue::makeName(const std::string& name) if (! name.empty()) return name; - static thread_local U32 discriminator = 0; - return STRINGIZE("WorkQueue" << discriminator++); + static U32 discriminator = 0; + static Mutex mutex; + U32 num; + { + // Protect discriminator from concurrent access by different threads. + // It can't be thread_local, else two racing threads will come up with + // the same name. + Lock lk(mutex); + num = discriminator++; + } + return STRINGIZE("WorkQueue" << num); } void LL::WorkQueue::callWork(const Queue::DataTuple& work) -- cgit v1.2.3 From e774bffb28a71730792931aeb1ed6a46d3cfe67b Mon Sep 17 00:00:00 2001 From: Dave Parks Date: Thu, 21 Oct 2021 21:19:48 +0000 Subject: SL-16202 Fix for textures appearing black or flashing white due to optimization bugs. --- indra/llcommon/workqueue.cpp | 2 ++ 1 file changed, 2 insertions(+) (limited to 'indra/llcommon/workqueue.cpp') diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index ffc9a97dc0..b32357e832 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -54,6 +54,7 @@ void LL::WorkQueue::runUntilClose() bool LL::WorkQueue::runPending() { + LL_PROFILE_ZONE_SCOPED; for (Work work; mQueue.tryPop(work); ) { callWork(work); @@ -110,6 +111,7 @@ void LL::WorkQueue::callWork(const Queue::DataTuple& work) void LL::WorkQueue::callWork(const Work& work) { + LL_PROFILE_ZONE_SCOPED; try { work(); -- cgit v1.2.3 From 11afa09ea3f56c0e20eb195ae1520a88602ceaca Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Fri, 22 Oct 2021 11:36:31 -0400 Subject: SL-16220: Add LL::ThreadPool class and a "General" instance. ThreadPool bundles a WorkQueue with the specified number of worker threads to service it. Each ThreadPool has a name that can be used to locate its WorkQueue. Each worker thread calls WorkQueue::runUntilClose(). ThreadPool listens on the "LLApp" LLEventPump for shutdown notification. On receiving that, it closes its WorkQueue and then join()s each of its worker threads for orderly shutdown. Add a settings.xml entry "ThreadPoolSizes", the first LLSD-valued settings entry to expect a map: pool name->size. The expectation is that usually code instantiating a particular ThreadPool will have a default size in mind, but it should check "ThreadPoolSizes" for a user override. Make idle_startup()'s STATE_SEED_CAP_GRANTED state instantiate a "General" ThreadPool. This is function-static for lazy initialization. Eliminate LLMainLoopRepeater, which is completely unreferenced. Any potential future use cases are better addressed by posting to the main loop's WorkQueue. Eliminate llappviewer.cpp's private LLDeferredTaskList class, which implemented LLAppViewer::addOnIdleCallback(). Make addOnIdleCallback() post work to the main loop's WorkQueue instead. --- indra/llcommon/workqueue.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) (limited to 'indra/llcommon/workqueue.cpp') diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index ffc9a97dc0..114aeea1f3 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -38,6 +38,16 @@ void LL::WorkQueue::close() mQueue.close(); } +bool LL::WorkQueue::isClosed() +{ + return mQueue.isClosed(); +} + +bool LL::WorkQueue::done() +{ + return mQueue.done(); +} + void LL::WorkQueue::runUntilClose() { try -- cgit v1.2.3 From 023d39963e850356e1af6eec7f857e2534ce8d38 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Mon, 25 Oct 2021 17:31:27 -0400 Subject: SL-16220: WorkQueue::runOn() methods submit work, wait for result. The idea is that you can call runOn(target, callable) from a (non-default) coroutine and block that coroutine until the result becomes available. As a safety check, we forbid calling runOn() from a thread's default coroutine, assuming that a given thread's default coroutine is the one servicing the relevant WorkQueue. --- indra/llcommon/workqueue.cpp | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'indra/llcommon/workqueue.cpp') diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index 114aeea1f3..f7ffc8233c 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -26,6 +26,11 @@ using Mutex = LLCoros::Mutex; using Lock = LLCoros::LockType; +struct NotOnDftCoro: public LLException +{ + NotOnDftCoro(const std::string& what): LLException(what) {} +}; + LL::WorkQueue::WorkQueue(const std::string& name): super(makeName(name)) { @@ -136,3 +141,13 @@ void LL::WorkQueue::error(const std::string& msg) { LL_ERRS("WorkQueue") << msg << LL_ENDL; } + +void LL::WorkQueue::checkCoroutine(const std::string& method) +{ + // By convention, the default coroutine on each thread has an empty name + // string. See also LLCoros::logname(). + if (LLCoros::getName().empty()) + { + LLTHROW(NotOnDftCoro("Do not call " + method + " from a thread's default coroutine")); + } +} -- cgit v1.2.3 From e6eebea8da545350f6684c191c633dd2fbc6f6f1 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Tue, 26 Oct 2021 11:49:53 -0400 Subject: SL-16220: Change WorkQueue::runOn() to waitForResult(). In addition to the name making the blocking explicit, we changed the signature: instead of specifying a target WorkQueue on which to run, waitForResult() runs the passed callable on its own WorkQueue. Why is that? Because, unlike postTo(), we do not require a handshake between two different WorkQueues. postTo() allows running arbitrary callback code, setting variables or whatever, on the originating WorkQueue (presumably on the originating thread). waitForResult() synchronizes using Promise/Future, which are explicitly designed for cross-thread communication. We need not call set_value() on the originating thread, so we don't need a postTo() callback lambda. --- indra/llcommon/workqueue.cpp | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) (limited to 'indra/llcommon/workqueue.cpp') diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index f7ffc8233c..ac3086aac5 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -26,11 +26,6 @@ using Mutex = LLCoros::Mutex; using Lock = LLCoros::LockType; -struct NotOnDftCoro: public LLException -{ - NotOnDftCoro(const std::string& what): LLException(what) {} -}; - LL::WorkQueue::WorkQueue(const std::string& name): super(makeName(name)) { @@ -148,6 +143,6 @@ void LL::WorkQueue::checkCoroutine(const std::string& method) // string. See also LLCoros::logname(). if (LLCoros::getName().empty()) { - LLTHROW(NotOnDftCoro("Do not call " + method + " from a thread's default coroutine")); + LLTHROW(Error("Do not call " + method + " from a thread's default coroutine")); } } -- cgit v1.2.3 From 834e7ca088b5f417235327cd290b42459c733594 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 4 Nov 2021 17:18:57 -0400 Subject: SL-16202: Use large WorkQueue size limits for mainloop and General. Give ThreadPool and WorkQueue the ability to override default ThreadSafeSchedule capacity. Instantiate "mainloop" WorkQueue and "General" ThreadPool with very large capacity because we never want to have to block trying to push to either. --- indra/llcommon/workqueue.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'indra/llcommon/workqueue.cpp') diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index 9808757b0a..14ae4c4ab8 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -26,8 +26,9 @@ using Mutex = LLCoros::Mutex; using Lock = LLCoros::LockType; -LL::WorkQueue::WorkQueue(const std::string& name): - super(makeName(name)) +LL::WorkQueue::WorkQueue(const std::string& name, size_t capacity): + super(makeName(name)), + mQueue(capacity) { // TODO: register for "LLApp" events so we can implicitly close() on // viewer shutdown. -- cgit v1.2.3 From df8e17d8e851c34a83de6c508aba07f6bde12a10 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Wed, 10 Nov 2021 10:13:38 -0500 Subject: SL-16094: Add WorkQueue::size() method to support changeset 08336bb. We want to skip calling PostMessage() to bump the window thread out of GetMessage() in any frame with no work functions pending for that thread. That test depends on being able to sense the size() of the queue. Having converted to WorkQueue, we need that queue to support size(). --- indra/llcommon/workqueue.cpp | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'indra/llcommon/workqueue.cpp') diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index 14ae4c4ab8..633594ceea 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -39,6 +39,11 @@ void LL::WorkQueue::close() mQueue.close(); } +size_t LL::WorkQueue::size() +{ + return mQueue.size(); +} + bool LL::WorkQueue::isClosed() { return mQueue.isClosed(); -- cgit v1.2.3 From 029b41c0419e975bbb28454538b46dc69ce5d2ba Mon Sep 17 00:00:00 2001 From: Dave Houlton Date: Mon, 15 Nov 2021 09:25:35 -0700 Subject: Revert "SL-16220: Merge branch 'origin/DRTVWR-546' into glthread" This reverts commit 5188a26a8521251dda07ac0140bb129f28417e49, reversing changes made to 819088563e13f1d75e048311fbaf0df4a79b7e19. --- indra/llcommon/workqueue.cpp | 30 ++---------------------------- 1 file changed, 2 insertions(+), 28 deletions(-) (limited to 'indra/llcommon/workqueue.cpp') diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index 633594ceea..b32357e832 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -26,9 +26,8 @@ using Mutex = LLCoros::Mutex; using Lock = LLCoros::LockType; -LL::WorkQueue::WorkQueue(const std::string& name, size_t capacity): - super(makeName(name)), - mQueue(capacity) +LL::WorkQueue::WorkQueue(const std::string& name): + super(makeName(name)) { // TODO: register for "LLApp" events so we can implicitly close() on // viewer shutdown. @@ -39,21 +38,6 @@ void LL::WorkQueue::close() mQueue.close(); } -size_t LL::WorkQueue::size() -{ - return mQueue.size(); -} - -bool LL::WorkQueue::isClosed() -{ - return mQueue.isClosed(); -} - -bool LL::WorkQueue::done() -{ - return mQueue.done(); -} - void LL::WorkQueue::runUntilClose() { try @@ -144,13 +128,3 @@ void LL::WorkQueue::error(const std::string& msg) { LL_ERRS("WorkQueue") << msg << LL_ENDL; } - -void LL::WorkQueue::checkCoroutine(const std::string& method) -{ - // By convention, the default coroutine on each thread has an empty name - // string. See also LLCoros::logname(). - if (LLCoros::getName().empty()) - { - LLTHROW(Error("Do not call " + method + " from a thread's default coroutine")); - } -} -- cgit v1.2.3 From cc34e26ef7e74845e4af9e5c5d450c0b12a268e0 Mon Sep 17 00:00:00 2001 From: Runitai Linden Date: Mon, 22 Nov 2021 11:51:03 -0600 Subject: SL-16094 Add WorkQueue profile hooks --- indra/llcommon/workqueue.cpp | 2 ++ 1 file changed, 2 insertions(+) (limited to 'indra/llcommon/workqueue.cpp') diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index 633594ceea..fbdbea2051 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -60,6 +60,7 @@ void LL::WorkQueue::runUntilClose() { for (;;) { + LL_PROFILE_ZONE_SCOPED; callWork(mQueue.pop()); } } @@ -90,6 +91,7 @@ bool LL::WorkQueue::runOne() bool LL::WorkQueue::runUntil(const TimePoint& until) { + LL_PROFILE_ZONE_SCOPED; // Should we subtract some slop to allow for typical Work execution time? // How much slop? Work work; -- cgit v1.2.3 From 78d837789a3741c65c3334934d96a505a522ee43 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Wed, 24 Nov 2021 09:43:37 -0500 Subject: SL-16400: Make WorkQueue::runFor() and runUntil() stop when done. runFor(interval) and runUntil(timestamp) are intended, and documented, to run *no longer than* the specified time. Instead, the initial implementation always waited the full specified time, hoping for work to arrive. Fix that: once we clear work that's already pending, return right away. --- indra/llcommon/workqueue.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'indra/llcommon/workqueue.cpp') diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index 1e89d87cff..e7d40354aa 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -78,8 +78,8 @@ bool LL::WorkQueue::runUntil(const TimePoint& until) LL_PROFILE_ZONE_SCOPED; // Should we subtract some slop to allow for typical Work execution time? // How much slop? - Work work; - while (TimePoint::clock::now() < until && mQueue.tryPopUntil(until, work)) + // runUntil() is simply a time-bounded runPending(). + for (Work work; TimePoint::clock::now() < until && mQueue.tryPop(work); ) { callWork(work); } -- cgit v1.2.3 From 0b066539fe68dc5750900c3452189645c40adb45 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Wed, 24 Nov 2021 10:47:54 -0500 Subject: DRTVWR-546, SL-16220, SL-16094: Undo previous glthread branch revert. Reverting a merge is sticky: it tells git you never want to see that branch again. Merging the DRTVWR-546 branch, which contained the revert, into the glthread branch undid much of the development work on that branch. To restore it we must revert the revert. This reverts commit 029b41c0419e975bbb28454538b46dc69ce5d2ba. --- indra/llcommon/workqueue.cpp | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) (limited to 'indra/llcommon/workqueue.cpp') diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index e7d40354aa..c74dada2e4 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -26,8 +26,9 @@ using Mutex = LLCoros::Mutex; using Lock = LLCoros::LockType; -LL::WorkQueue::WorkQueue(const std::string& name): - super(makeName(name)) +LL::WorkQueue::WorkQueue(const std::string& name, size_t capacity): + super(makeName(name)), + mQueue(capacity) { // TODO: register for "LLApp" events so we can implicitly close() on // viewer shutdown. @@ -38,6 +39,21 @@ void LL::WorkQueue::close() mQueue.close(); } +size_t LL::WorkQueue::size() +{ + return mQueue.size(); +} + +bool LL::WorkQueue::isClosed() +{ + return mQueue.isClosed(); +} + +bool LL::WorkQueue::done() +{ + return mQueue.done(); +} + void LL::WorkQueue::runUntilClose() { try @@ -130,3 +146,13 @@ void LL::WorkQueue::error(const std::string& msg) { LL_ERRS("WorkQueue") << msg << LL_ENDL; } + +void LL::WorkQueue::checkCoroutine(const std::string& method) +{ + // By convention, the default coroutine on each thread has an empty name + // string. See also LLCoros::logname(). + if (LLCoros::getName().empty()) + { + LLTHROW(Error("Do not call " + method + " from a thread's default coroutine")); + } +} -- cgit v1.2.3 From b504c692554d492113a10ef45427fe0ab0d8a85d Mon Sep 17 00:00:00 2001 From: Ptolemy Date: Thu, 13 Jan 2022 12:55:53 -0800 Subject: SL-16606: Add profiler category THREAD --- indra/llcommon/workqueue.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'indra/llcommon/workqueue.cpp') diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index c74dada2e4..eb06890468 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -60,7 +60,7 @@ void LL::WorkQueue::runUntilClose() { for (;;) { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; callWork(mQueue.pop()); } } @@ -71,7 +71,7 @@ void LL::WorkQueue::runUntilClose() bool LL::WorkQueue::runPending() { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; for (Work work; mQueue.tryPop(work); ) { callWork(work); @@ -91,7 +91,7 @@ bool LL::WorkQueue::runOne() bool LL::WorkQueue::runUntil(const TimePoint& until) { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; // Should we subtract some slop to allow for typical Work execution time? // How much slop? // runUntil() is simply a time-bounded runPending(). @@ -129,7 +129,7 @@ void LL::WorkQueue::callWork(const Queue::DataTuple& work) void LL::WorkQueue::callWork(const Work& work) { - LL_PROFILE_ZONE_SCOPED; + LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; try { work(); -- cgit v1.2.3