From fc424a0db90fd2d2e44e85a19750ad6eaa57b28a Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Fri, 9 Dec 2022 13:21:45 -0500 Subject: SL-18809: Add WorkSchedule; remove timestamps from WorkQueue. For work queues that don't need timestamped tasks, eliminate the overhead of a priority queue ordered by timestamp. Timestamped task support moves to WorkSchedule. WorkQueue is a simpler queue that just waits for work. Both WorkQueue and WorkSchedule can be accessed via new WorkQueueBase API. Of course the WorkQueueBase API doesn't deal with timestamps, but a WorkSchedule can be accessed directly to post timestamped tasks and then handled normally (e.g. by ThreadPool) to run them. Most ThreadPool functionality migrates to new ThreadPoolBase class, with template subclass ThreadPoolUsing or ThreadPoolUsing depending on need. ThreadPool is now an alias for ThreadPoolUsing. Importantly, ThreadPoolUsing::getQueue() delivers a reference to the specific queue subclass type, so you can post timestamped tasks on a queue retrieved from ThreadPoolUsing::getQueue(). Since ThreadPool is no longer a simple class but an alias for a particular template specialization, introduce threadpool_fwd.h to forward-declare it. Recast workqueue_test.cpp to exercise WorkSchedule, since some of the tests are time-based. A future todo would be to exercise each applicable test with both WorkQueue and WorkSchedule. --- indra/llcommon/workqueue.cpp | 194 +++++++++++++++++++++++++++++++++---------- 1 file changed, 148 insertions(+), 46 deletions(-) (limited to 'indra/llcommon/workqueue.cpp') diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index eb06890468..83e0216ae7 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -26,83 +26,65 @@ using Mutex = LLCoros::Mutex; using Lock = LLCoros::LockType; -LL::WorkQueue::WorkQueue(const std::string& name, size_t capacity): - super(makeName(name)), - mQueue(capacity) +/***************************************************************************** +* WorkQueueBase +*****************************************************************************/ +LL::WorkQueueBase::WorkQueueBase(const std::string& name): + super(makeName(name)) { // TODO: register for "LLApp" events so we can implicitly close() on // viewer shutdown. } -void LL::WorkQueue::close() -{ - mQueue.close(); -} - -size_t LL::WorkQueue::size() -{ - return mQueue.size(); -} - -bool LL::WorkQueue::isClosed() -{ - return mQueue.isClosed(); -} - -bool LL::WorkQueue::done() -{ - return mQueue.done(); -} - -void LL::WorkQueue::runUntilClose() +void LL::WorkQueueBase::runUntilClose() { try { for (;;) { LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; - callWork(mQueue.pop()); + callWork(pop_()); } } - catch (const Queue::Closed&) + catch (const Closed&) { } } -bool LL::WorkQueue::runPending() +bool LL::WorkQueueBase::runPending() { LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; - for (Work work; mQueue.tryPop(work); ) + for (Work work; tryPop_(work); ) { callWork(work); } - return ! mQueue.done(); + return ! done(); } -bool LL::WorkQueue::runOne() +bool LL::WorkQueueBase::runOne() { Work work; - if (mQueue.tryPop(work)) + if (tryPop_(work)) { callWork(work); } - return ! mQueue.done(); + return ! done(); } -bool LL::WorkQueue::runUntil(const TimePoint& until) +bool LL::WorkQueueBase::runUntil(const TimePoint& until) { LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; // Should we subtract some slop to allow for typical Work execution time? // How much slop? // runUntil() is simply a time-bounded runPending(). - for (Work work; TimePoint::clock::now() < until && mQueue.tryPop(work); ) + for (Work work; TimePoint::clock::now() < until && tryPop_(work); ) { callWork(work); } - return ! mQueue.done(); + return ! done(); } -std::string LL::WorkQueue::makeName(const std::string& name) +std::string LL::WorkQueueBase::makeName(const std::string& name) { if (! name.empty()) return name; @@ -120,14 +102,7 @@ std::string LL::WorkQueue::makeName(const std::string& name) return STRINGIZE("WorkQueue" << num); } -void LL::WorkQueue::callWork(const Queue::DataTuple& work) -{ - // ThreadSafeSchedule::pop() always delivers a tuple, even when - // there's only one data field per item, as for us. - callWork(std::get<0>(work)); -} - -void LL::WorkQueue::callWork(const Work& work) +void LL::WorkQueueBase::callWork(const Work& work) { LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; try @@ -142,12 +117,12 @@ void LL::WorkQueue::callWork(const Work& work) } } -void LL::WorkQueue::error(const std::string& msg) +void LL::WorkQueueBase::error(const std::string& msg) { LL_ERRS("WorkQueue") << msg << LL_ENDL; } -void LL::WorkQueue::checkCoroutine(const std::string& method) +void LL::WorkQueueBase::checkCoroutine(const std::string& method) { // By convention, the default coroutine on each thread has an empty name // string. See also LLCoros::logname(). @@ -156,3 +131,130 @@ void LL::WorkQueue::checkCoroutine(const std::string& method) LLTHROW(Error("Do not call " + method + " from a thread's default coroutine")); } } + +/***************************************************************************** +* WorkQueue +*****************************************************************************/ +LL::WorkQueue::WorkQueue(const std::string& name, size_t capacity): + super(name), + mQueue(capacity) +{ +} + +void LL::WorkQueue::close() +{ + mQueue.close(); +} + +size_t LL::WorkQueue::size() +{ + return mQueue.size(); +} + +bool LL::WorkQueue::isClosed() +{ + return mQueue.isClosed(); +} + +bool LL::WorkQueue::done() +{ + return mQueue.done(); +} + +void LL::WorkQueue::post(const Work& callable) +{ + mQueue.push(callable); +} + +bool LL::WorkQueue::postIfOpen(const Work& callable) +{ + return mQueue.pushIfOpen(callable); +} + +bool LL::WorkQueue::tryPost(const Work& callable) +{ + return mQueue.tryPush(callable); +} + +LL::WorkQueue::Work LL::WorkQueue::pop_() +{ + return mQueue.pop(); +} + +bool LL::WorkQueue::tryPop_(Work& work) +{ + return mQueue.tryPop(work); +} + +/***************************************************************************** +* WorkSchedule +*****************************************************************************/ +LL::WorkSchedule::WorkSchedule(const std::string& name, size_t capacity): + super(name), + mQueue(capacity) +{ +} + +void LL::WorkSchedule::close() +{ + mQueue.close(); +} + +size_t LL::WorkSchedule::size() +{ + return mQueue.size(); +} + +bool LL::WorkSchedule::isClosed() +{ + return mQueue.isClosed(); +} + +bool LL::WorkSchedule::done() +{ + return mQueue.done(); +} + +void LL::WorkSchedule::post(const Work& callable) +{ + // Use TimePoint::clock::now() instead of TimePoint's representation of + // the epoch because this WorkSchedule may contain a mix of past-due + // TimedWork items and TimedWork items scheduled for the future. Sift this + // new item into the correct place. + post(callable, TimePoint::clock::now()); +} + +void LL::WorkSchedule::post(const Work& callable, const TimePoint& time) +{ + mQueue.push(TimedWork(time, callable)); +} + +bool LL::WorkSchedule::postIfOpen(const Work& callable) +{ + return postIfOpen(callable, TimePoint::clock::now()); +} + +bool LL::WorkSchedule::postIfOpen(const Work& callable, const TimePoint& time) +{ + return mQueue.pushIfOpen(TimedWork(time, callable)); +} + +bool LL::WorkSchedule::tryPost(const Work& callable) +{ + return tryPost(callable, TimePoint::clock::now()); +} + +bool LL::WorkSchedule::tryPost(const Work& callable, const TimePoint& time) +{ + return mQueue.tryPush(TimedWork(time, callable)); +} + +LL::WorkSchedule::Work LL::WorkSchedule::pop_() +{ + return std::get<0>(mQueue.pop()); +} + +bool LL::WorkSchedule::tryPop_(Work& work) +{ + return mQueue.tryPop(work); +} -- cgit v1.3 From 026ef1935dbdb21ab79159d38fb78e126dd6ac95 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Mon, 8 May 2023 12:07:31 -0400 Subject: SL-19690: Follow up on Rye Mutt's fix for shutdown crashes. Rather than continuing to propagate try/catch (Closed) (aka LLThreadSafeQueueInterrupt) constructs through the code base, make WorkQueueBase::post() return bool indicating success (i.e. ! isClosed()). This obviates postIfOpen(), which no one was using anyway. In effect, postIfOpen() is renamed post(), bypassing the exception when isClosed(). Review existing try/catch blocks of that sort, changing to test for post() returning false. --- indra/llaudio/llaudiodecodemgr.cpp | 61 ++++++++--------- indra/llcommon/llqueuedthread.cpp | 2 +- indra/llcommon/workqueue.cpp | 21 +----- indra/llcommon/workqueue.h | 135 +++++++++++++++---------------------- indra/llimage/llimageworker.cpp | 28 ++++---- indra/llrender/llimagegl.h | 2 +- indra/llwindow/llwindowwin32.cpp | 13 ++-- 7 files changed, 104 insertions(+), 158 deletions(-) (limited to 'indra/llcommon/workqueue.cpp') diff --git a/indra/llaudio/llaudiodecodemgr.cpp b/indra/llaudio/llaudiodecodemgr.cpp index 38a6b41afe..190c5290cb 100755 --- a/indra/llaudio/llaudiodecodemgr.cpp +++ b/indra/llaudio/llaudiodecodemgr.cpp @@ -607,40 +607,37 @@ void LLAudioDecodeMgr::Impl::startMoreDecodes() // Kick off a decode mDecodes[decode_id] = LLPointer(NULL); - try - { - main_queue->postTo( - general_queue, - [decode_id]() // Work done on general queue - { - LLPointer decode_state = beginDecodingAndWritingAudio(decode_id); - - if (!decode_state) - { - // Audio decode has errored - return decode_state; - } + bool posted = main_queue->postTo( + general_queue, + [decode_id]() // Work done on general queue + { + LLPointer decode_state = beginDecodingAndWritingAudio(decode_id); - // Disk write of decoded audio is now in progress off-thread + if (!decode_state) + { + // Audio decode has errored return decode_state; - }, - [decode_id, this](LLPointer decode_state) // Callback to main thread - mutable { - if (!gAudiop) - { - // There is no LLAudioEngine anymore. This might happen if - // an audio decode is enqueued just before shutdown. - return; - } - - // At this point, we can be certain that the pointer to "this" - // is valid because the lifetime of "this" is dependent upon - // the lifetime of gAudiop. - - enqueueFinishAudio(decode_id, decode_state); - }); - } - catch (const LLThreadSafeQueueInterrupt&) + } + + // Disk write of decoded audio is now in progress off-thread + return decode_state; + }, + [decode_id, this](LLPointer decode_state) // Callback to main thread + mutable { + if (!gAudiop) + { + // There is no LLAudioEngine anymore. This might happen if + // an audio decode is enqueued just before shutdown. + return; + } + + // At this point, we can be certain that the pointer to "this" + // is valid because the lifetime of "this" is dependent upon + // the lifetime of gAudiop. + + enqueueFinishAudio(decode_id, decode_state); + }); + if (! posted) { // Shutdown // Consider making processQueue() do a cleanup instead diff --git a/indra/llcommon/llqueuedthread.cpp b/indra/llcommon/llqueuedthread.cpp index 9b1de2e9a5..7da7c1e026 100644 --- a/indra/llcommon/llqueuedthread.cpp +++ b/indra/llcommon/llqueuedthread.cpp @@ -146,7 +146,7 @@ S32 LLQueuedThread::updateQueue(F32 max_time_ms) // schedule a call to threadedUpdate for every call to updateQueue if (!isQuitting()) { - mRequestQueue.postIfOpen([=]() + mRequestQueue.post([=]() { LL_PROFILE_ZONE_NAMED_CATEGORY_THREAD("qt - update"); mIdleThread = FALSE; diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index 83e0216ae7..5a77d517dd 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -161,12 +161,7 @@ bool LL::WorkQueue::done() return mQueue.done(); } -void LL::WorkQueue::post(const Work& callable) -{ - mQueue.push(callable); -} - -bool LL::WorkQueue::postIfOpen(const Work& callable) +bool LL::WorkQueue::post(const Work& callable) { return mQueue.pushIfOpen(callable); } @@ -215,26 +210,16 @@ bool LL::WorkSchedule::done() return mQueue.done(); } -void LL::WorkSchedule::post(const Work& callable) +bool LL::WorkSchedule::post(const Work& callable) { // Use TimePoint::clock::now() instead of TimePoint's representation of // the epoch because this WorkSchedule may contain a mix of past-due // TimedWork items and TimedWork items scheduled for the future. Sift this // new item into the correct place. - post(callable, TimePoint::clock::now()); -} - -void LL::WorkSchedule::post(const Work& callable, const TimePoint& time) -{ - mQueue.push(TimedWork(time, callable)); -} - -bool LL::WorkSchedule::postIfOpen(const Work& callable) -{ return postIfOpen(callable, TimePoint::clock::now()); } -bool LL::WorkSchedule::postIfOpen(const Work& callable, const TimePoint& time) +bool LL::WorkSchedule::post(const Work& callable, const TimePoint& time) { return mQueue.pushIfOpen(TimedWork(time, callable)); } diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index 5b704e7198..87fdd1517f 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -83,13 +83,10 @@ namespace LL /*---------------------- fire and forget API -----------------------*/ - /// fire-and-forget - virtual void post(const Work&) = 0; - /** * post work, unless the queue is closed before we can post */ - virtual bool postIfOpen(const Work&) = 0; + virtual bool post(const Work&) = 0; /** * post work, unless the queue is full @@ -247,13 +244,10 @@ namespace LL /*---------------------- fire and forget API -----------------------*/ - /// fire-and-forget - void post(const Work&) override; - /** * post work, unless the queue is closed before we can post */ - bool postIfOpen(const Work&) override; + bool post(const Work&) override; /** * post work, unless the queue is full @@ -320,22 +314,16 @@ namespace LL /*---------------------- fire and forget API -----------------------*/ - /// fire-and-forget - void post(const Work& callable) override; - - /// fire-and-forget, but at a particular (future?) time - void post(const Work& callable, const TimePoint& time); - /** * post work, unless the queue is closed before we can post */ - bool postIfOpen(const Work& callable) override; + bool post(const Work& callable) override; /** * post work for a particular time, unless the queue is closed before * we can post */ - bool postIfOpen(const Work& callable, const TimePoint& time); + bool post(const Work& callable, const TimePoint& time); /** * post work, unless the queue is full @@ -356,7 +344,7 @@ namespace LL * an LLCond variant, e.g. LLOneShotCond or LLBoolCond. */ template - void postEvery(const std::chrono::duration& interval, + bool postEvery(const std::chrono::duration& interval, CALLABLE&& callable); private: @@ -417,15 +405,10 @@ namespace LL // move-only callable; but naturally this statement must be // the last time we reference this instance, which may become // moved-from. - try - { - auto target{ std::dynamic_pointer_cast(mTarget.lock()) }; - target->post(std::move(*this), mStart); - } - catch (const Closed&) - { - // Once this queue is closed, oh well, just stop - } + auto target{ std::dynamic_pointer_cast(mTarget.lock()) }; + // Discard bool return: once this queue is closed, oh well, + // just stop + target->post(std::move(*this), mStart); } } @@ -437,7 +420,7 @@ namespace LL }; template - void WorkSchedule::postEvery(const std::chrono::duration& interval, + bool WorkSchedule::postEvery(const std::chrono::duration& interval, CALLABLE&& callable) { if (interval.count() <= 0) @@ -454,7 +437,7 @@ namespace LL // Instantiate and post a suitable BackJack, binding a weak_ptr to // self, the current time, the desired interval and the desired // callable. - post( + return post( BackJack( getWeak(), TimePoint::clock::now(), interval, std::move(callable))); } @@ -516,48 +499,37 @@ namespace LL // Here we believe target WorkQueue still exists. Post to it a // lambda that packages our callable, our callback and a weak_ptr // to this originating WorkQueue. - try - { - tptr->post( - [reply = super::getWeak(), - callable = std::move(callable), - callback = std::move(callback)] - () mutable + return tptr->post( + [reply = super::getWeak(), + callable = std::move(callable), + callback = std::move(callback)] + () mutable + { + // Use postMaybe() below in case this originating WorkQueue + // has been closed or destroyed. Remember, the outer lambda is + // now running on a thread servicing the target WorkQueue, and + // real time has elapsed since postTo()'s tptr->post() call. + try { - // Use postMaybe() below in case this originating WorkQueue - // has been closed or destroyed. Remember, the outer lambda is - // now running on a thread servicing the target WorkQueue, and - // real time has elapsed since postTo()'s tptr->post() call. - try - { - // Make a reply lambda to repost to THIS WorkQueue. - // Delegate to makeReplyLambda() so we can partially - // specialize on void return. - postMaybe(reply, makeReplyLambda(std::move(callable), std::move(callback))); - } - catch (...) - { - // Either variant of makeReplyLambda() is responsible for - // calling the caller's callable. If that throws, return - // the exception to the originating thread. - postMaybe( - reply, - // Bind the current exception to transport back to the - // originating WorkQueue. Once there, rethrow it. - [exc = std::current_exception()](){ std::rethrow_exception(exc); }); - } - }, - // if caller passed a TimePoint, pass it along to post() - std::forward(args)...); - } - catch (const Closed&) - { - // target WorkQueue still exists, but is Closed - return false; - } - - // looks like we were able to post() - return true; + // Make a reply lambda to repost to THIS WorkQueue. + // Delegate to makeReplyLambda() so we can partially + // specialize on void return. + postMaybe(reply, makeReplyLambda(std::move(callable), std::move(callback))); + } + catch (...) + { + // Either variant of makeReplyLambda() is responsible for + // calling the caller's callable. If that throws, return + // the exception to the originating thread. + postMaybe( + reply, + // Bind the current exception to transport back to the + // originating WorkQueue. Once there, rethrow it. + [exc = std::current_exception()](){ std::rethrow_exception(exc); }); + } + }, + // if caller passed a TimePoint, pass it along to post() + std::forward(args)...); } template @@ -568,18 +540,9 @@ namespace LL auto tptr = target.lock(); if (tptr) { - try - { - tptr->post(std::forward(args)...); - // we were able to post() - return true; - } - catch (const Closed&) - { - // target WorkQueue still exists, but is Closed - } + return tptr->post(std::forward(args)...); } - // either target no longer exists, or its WorkQueue is Closed + // target no longer exists return false; } @@ -591,7 +554,7 @@ namespace LL auto operator()(WorkQueueBase* self, CALLABLE&& callable, ARGS&&... args) { LLCoros::Promise promise; - self->post( + bool posted = self->post( // We dare to bind a reference to Promise because it's // specifically designed for cross-thread communication. [&promise, callable = std::move(callable)]() @@ -608,6 +571,10 @@ namespace LL }, // if caller passed a TimePoint, pass it to post() std::forward(args)...); + if (! posted) + { + LLTHROW(Closed); + } auto future{ LLCoros::getFuture(promise) }; // now, on the calling thread, wait for that result LLCoros::TempStatus st("waiting for WorkQueue::waitForResult()"); @@ -623,7 +590,7 @@ namespace LL void operator()(WorkQueueBase* self, CALLABLE&& callable, ARGS&&... args) { LLCoros::Promise promise; - self->post( + bool posted = self->post( // &promise is designed for cross-thread access [&promise, callable = std::move(callable)]() mutable { @@ -639,6 +606,10 @@ namespace LL }, // if caller passed a TimePoint, pass it to post() std::forward(args)...); + if (! posted) + { + LLTHROW(Closed); + } auto future{ LLCoros::getFuture(promise) }; // block until set_value() LLCoros::TempStatus st("waiting for void WorkQueue::waitForResult()"); diff --git a/indra/llimage/llimageworker.cpp b/indra/llimage/llimageworker.cpp index 9358a0ae2c..520c81a8ec 100644 --- a/indra/llimage/llimageworker.cpp +++ b/indra/llimage/llimageworker.cpp @@ -92,21 +92,19 @@ LLImageDecodeThread::handle_t LLImageDecodeThread::decodeImage( { LL_PROFILE_ZONE_SCOPED_CATEGORY_TEXTURE; - try - { - // Instantiate the ImageRequest right in the lambda, why not? - mThreadPool->getQueue().post( - [req = ImageRequest(image, discard, needs_aux, responder)] - () mutable - { - auto done = req.processRequest(); - req.finishRequest(done); - }); - } - catch (const LLThreadSafeQueueInterrupt&) - { - LL_DEBUGS() << "Tried to start decoding on shutdown" << LL_ENDL; - } + // Instantiate the ImageRequest right in the lambda, why not? + bool posted = mThreadPool->getQueue().post( + [req = ImageRequest(image, discard, needs_aux, responder)] + () mutable + { + auto done = req.processRequest(); + req.finishRequest(done); + }); + if (! posted) + { + LL_DEBUGS() << "Tried to start decoding on shutdown" << LL_ENDL; + // should this return 0? + } // It's important to our consumer (LLTextureFetchWorker) that we return a // nonzero handle. It is NOT important that the nonzero handle be unique: diff --git a/indra/llrender/llimagegl.h b/indra/llrender/llimagegl.h index 87fb9e363e..243aeaea25 100644 --- a/indra/llrender/llimagegl.h +++ b/indra/llrender/llimagegl.h @@ -340,7 +340,7 @@ public: template bool post(CALLABLE&& func) { - return getQueue().postIfOpen(std::forward(func)); + return getQueue().post(std::forward(func)); } void run() override; diff --git a/indra/llwindow/llwindowwin32.cpp b/indra/llwindow/llwindowwin32.cpp index 43bef5ff68..c5a6a3fa8f 100644 --- a/indra/llwindow/llwindowwin32.cpp +++ b/indra/llwindow/llwindowwin32.cpp @@ -370,15 +370,10 @@ struct LLWindowWin32::LLWindowWin32Thread : public LL::ThreadPool template void post(CALLABLE&& func) { - try - { - getQueue().post(std::forward(func)); - } - catch (const LLThreadSafeQueueInterrupt&) - { - // Shutdown timing is tricky. The main thread can end up trying - // to post a cursor position after having closed the WorkQueue. - } + // Ignore bool return. Shutdown timing is tricky: the main thread can + // end up trying to post a cursor position after having closed the + // WorkQueue. + getQueue().post(std::forward(func)); } /** -- cgit v1.3 From f728808d938666a01f73a039c861358fc4b02a9e Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Mon, 8 May 2023 12:45:27 -0400 Subject: SL-19690: Fix a lingering reference to WorkSchedule::postIfOpen() --- indra/llcommon/workqueue.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'indra/llcommon/workqueue.cpp') diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index 5a77d517dd..cf80ce0656 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -216,7 +216,7 @@ bool LL::WorkSchedule::post(const Work& callable) // the epoch because this WorkSchedule may contain a mix of past-due // TimedWork items and TimedWork items scheduled for the future. Sift this // new item into the correct place. - return postIfOpen(callable, TimePoint::clock::now()); + return post(callable, TimePoint::clock::now()); } bool LL::WorkSchedule::post(const Work& callable, const TimePoint& time) -- cgit v1.3