/** * @file workqueue.cpp * @author Nat Goodspeed * @date 2021-10-06 * @brief Implementation for WorkQueue. * * $LicenseInfo:firstyear=2021&license=viewerlgpl$ * Copyright (c) 2021, Linden Research, Inc. * $/LicenseInfo$ */ // Precompiled header #include "linden_common.h" // associated header #include "workqueue.h" // STL headers // std headers // external library headers // other Linden headers #include "llcoros.h" #include LLCOROS_MUTEX_HEADER #include "llerror.h" #include "llexception.h" #include "stringize.h" using Mutex = LLCoros::Mutex; using Lock = LLCoros::LockType; /***************************************************************************** * WorkQueueBase *****************************************************************************/ LL::WorkQueueBase::WorkQueueBase(const std::string& name): super(makeName(name)) { // Register for status change events so we'll implicitly close() on viewer // shutdown. mStopListener = LLCoros::getStopListener( "WorkQueue:" + getKey(), [this](const LLSD&){ close(); }); } void LL::WorkQueueBase::runUntilClose() { try { for (;;) { LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; callWork(pop_()); } } catch (const Closed&) { } } bool LL::WorkQueueBase::runPending() { LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; for (Work work; tryPop_(work); ) { callWork(work); } return ! done(); } bool LL::WorkQueueBase::runOne() { Work work; if (tryPop_(work)) { callWork(work); } return ! done(); } bool LL::WorkQueueBase::runUntil(const TimePoint& until) { LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; // Should we subtract some slop to allow for typical Work execution time? // How much slop? // runUntil() is simply a time-bounded runPending(). for (Work work; TimePoint::clock::now() < until && tryPop_(work); ) { callWork(work); } return ! done(); } std::string LL::WorkQueueBase::makeName(const std::string& name) { if (! name.empty()) return name; static U32 discriminator = 0; static Mutex mutex; U32 num; { // Protect discriminator from concurrent access by different threads. // It can't be thread_local, else two racing threads will come up with // the same name. Lock lk(mutex); num = discriminator++; } return STRINGIZE("WorkQueue" << num); } void LL::WorkQueueBase::callWork(const Work& work) { LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; try { work(); } catch (...) { // No matter what goes wrong with any individual work item, the worker // thread must go on! Log our own instance name with the exception. LOG_UNHANDLED_EXCEPTION(getKey()); } } void LL::WorkQueueBase::error(const std::string& msg) { LL_ERRS("WorkQueue") << msg << LL_ENDL; } void LL::WorkQueueBase::checkCoroutine(const std::string& method) { // By convention, the default coroutine on each thread has an empty name // string. See also LLCoros::logname(). if (LLCoros::getName().empty()) { LLTHROW(Error("Do not call " + method + " from a thread's default coroutine")); } } /***************************************************************************** * WorkQueue *****************************************************************************/ LL::WorkQueue::WorkQueue(const std::string& name, size_t capacity): super(name), mQueue(capacity) { } void LL::WorkQueue::close() { mQueue.close(); } size_t LL::WorkQueue::size() { return mQueue.size(); } bool LL::WorkQueue::isClosed() { return mQueue.isClosed(); } bool LL::WorkQueue::done() { return mQueue.done(); } bool LL::WorkQueue::post(const Work& callable) { return mQueue.pushIfOpen(callable); } bool LL::WorkQueue::tryPost(const Work& callable) { return mQueue.tryPush(callable); } LL::WorkQueue::Work LL::WorkQueue::pop_() { return mQueue.pop(); } bool LL::WorkQueue::tryPop_(Work& work) { return mQueue.tryPop(work); } /***************************************************************************** * WorkSchedule *****************************************************************************/ LL::WorkSchedule::WorkSchedule(const std::string& name, size_t capacity): super(name), mQueue(capacity) { } void LL::WorkSchedule::close() { mQueue.close(); } size_t LL::WorkSchedule::size() { return mQueue.size(); } bool LL::WorkSchedule::isClosed() { return mQueue.isClosed(); } bool LL::WorkSchedule::done() { return mQueue.done(); } bool LL::WorkSchedule::post(const Work& callable) { // Use TimePoint::clock::now() instead of TimePoint's representation of // the epoch because this WorkSchedule may contain a mix of past-due // TimedWork items and TimedWork items scheduled for the future. Sift this // new item into the correct place. return post(callable, TimePoint::clock::now()); } bool LL::WorkSchedule::post(const Work& callable, const TimePoint& time) { return mQueue.pushIfOpen(TimedWork(time, callable)); } bool LL::WorkSchedule::tryPost(const Work& callable) { return tryPost(callable, TimePoint::clock::now()); } bool LL::WorkSchedule::tryPost(const Work& callable, const TimePoint& time) { return mQueue.tryPush(TimedWork(time, callable)); } LL::WorkSchedule::Work LL::WorkSchedule::pop_() { return std::get<0>(mQueue.pop()); } bool LL::WorkSchedule::tryPop_(Work& work) { return mQueue.tryPop(work); }