diff options
Diffstat (limited to 'indra')
-rw-r--r-- | indra/llcommon/CMakeLists.txt | 2 | ||||
-rw-r--r-- | indra/llcommon/llinstancetracker.h | 97 | ||||
-rw-r--r-- | indra/llcommon/llinstancetrackersubclass.h | 98 | ||||
-rw-r--r-- | indra/llcommon/tests/workqueue_test.cpp | 26 | ||||
-rw-r--r-- | indra/llcommon/threadpool.cpp | 29 | ||||
-rw-r--r-- | indra/llcommon/threadpool.h | 60 | ||||
-rw-r--r-- | indra/llcommon/threadpool_fwd.h | 25 | ||||
-rw-r--r-- | indra/llcommon/workqueue.cpp | 194 | ||||
-rw-r--r-- | indra/llcommon/workqueue.h | 416 | ||||
-rw-r--r-- | indra/llimage/llimageworker.h | 6 | ||||
-rw-r--r-- | indra/llrender/llimagegl.cpp | 6 | ||||
-rw-r--r-- | indra/newview/llappviewer.h | 6 | ||||
-rw-r--r-- | indra/newview/llxmlrpctransaction.cpp | 6 |
13 files changed, 665 insertions, 306 deletions
diff --git a/indra/llcommon/CMakeLists.txt b/indra/llcommon/CMakeLists.txt index 21998f0b78..96fdb1f924 100644 --- a/indra/llcommon/CMakeLists.txt +++ b/indra/llcommon/CMakeLists.txt @@ -189,6 +189,7 @@ set(llcommon_HEADER_FILES llinitdestroyclass.h llinitparam.h llinstancetracker.h + llinstancetrackersubclass.h llkeybind.h llkeythrottle.h llleap.h @@ -261,6 +262,7 @@ set(llcommon_HEADER_FILES stdtypes.h stringize.h threadpool.h + threadpool_fwd.h threadsafeschedule.h timer.h tuple.h diff --git a/indra/llcommon/llinstancetracker.h b/indra/llcommon/llinstancetracker.h index 02535a59e7..97f7817e74 100644 --- a/indra/llcommon/llinstancetracker.h +++ b/indra/llcommon/llinstancetracker.h @@ -104,22 +104,26 @@ public: return LockStatic()->mMap.size(); } - // snapshot of std::pair<const KEY, std::shared_ptr<T>> pairs - class snapshot + // snapshot of std::pair<const KEY, std::shared_ptr<SUBCLASS>> pairs, for + // some SUBCLASS derived from T + template <typename SUBCLASS> + class snapshot_of { // It's very important that what we store in this snapshot are // weak_ptrs, NOT shared_ptrs. That's how we discover whether any // instance has been deleted during the lifespan of a snapshot. typedef std::vector<std::pair<const KEY, weak_t>> VectorType; - // Dereferencing our iterator produces a std::shared_ptr for each - // instance that still exists. Since we store weak_ptrs, that involves - // two chained transformations: + // Dereferencing the iterator we publish produces a + // std::shared_ptr<SUBCLASS> for each instance that still exists. + // Since we store weak_ptr<T>, that involves two chained + // transformations: // - a transform_iterator to lock the weak_ptr and return a shared_ptr - // - a filter_iterator to skip any shared_ptr that has become invalid. + // - a filter_iterator to skip any shared_ptr<T> that has become + // invalid or references any T instance that isn't SUBCLASS. // It is very important that we filter lazily, that is, during // traversal. Any one of our stored weak_ptrs might expire during // traversal. - typedef std::pair<const KEY, ptr_t> strong_pair; + typedef std::pair<const KEY, std::shared_ptr<SUBCLASS>> strong_pair; // Note for future reference: nat has not yet had any luck (up to // Boost 1.67) trying to use boost::transform_iterator with a hand- // coded functor, only with actual functions. In my experience, an @@ -127,7 +131,7 @@ public: // result_type typedef. But this works. static strong_pair strengthen(typename VectorType::value_type& pair) { - return { pair.first, pair.second.lock() }; + return { pair.first, std::dynamic_pointer_cast<SUBCLASS>(pair.second.lock()) }; } static bool dead_skipper(const strong_pair& pair) { @@ -135,7 +139,7 @@ public: } public: - snapshot(): + snapshot_of(): // populate our vector with a snapshot of (locked!) InstanceMap // note, this assigns pair<KEY, shared_ptr> to pair<KEY, weak_ptr> mData(mLock->mMap.begin(), mLock->mMap.end()) @@ -184,44 +188,51 @@ public: #endif // LL_WINDOWS VectorType mData; }; + using snapshot = snapshot_of<T>; - // iterate over this for references to each instance - class instance_snapshot: public snapshot + // iterate over this for references to each SUBCLASS instance + template <typename SUBCLASS> + class instance_snapshot_of: public snapshot_of<SUBCLASS> { private: - static T& instance_getter(typename snapshot::iterator::reference pair) + using super = snapshot_of<SUBCLASS>; + static T& instance_getter(typename super::iterator::reference pair) { return *pair.second; } public: typedef boost::transform_iterator<decltype(instance_getter)*, - typename snapshot::iterator> iterator; - iterator begin() { return iterator(snapshot::begin(), instance_getter); } - iterator end() { return iterator(snapshot::end(), instance_getter); } + typename super::iterator> iterator; + iterator begin() { return iterator(super::begin(), instance_getter); } + iterator end() { return iterator(super::end(), instance_getter); } void deleteAll() { - for (auto it(snapshot::begin()), end(snapshot::end()); it != end; ++it) + for (auto it(super::begin()), end(super::end()); it != end; ++it) { delete it->second.get(); } } - }; + }; + using instance_snapshot = instance_snapshot_of<T>; // iterate over this for each key - class key_snapshot: public snapshot + template <typename SUBCLASS> + class key_snapshot_of: public snapshot_of<SUBCLASS> { private: - static KEY key_getter(typename snapshot::iterator::reference pair) + using super = snapshot_of<SUBCLASS>; + static KEY key_getter(typename super::iterator::reference pair) { return pair.first; } public: typedef boost::transform_iterator<decltype(key_getter)*, - typename snapshot::iterator> iterator; - iterator begin() { return iterator(snapshot::begin(), key_getter); } - iterator end() { return iterator(snapshot::end(), key_getter); } + typename super::iterator> iterator; + iterator begin() { return iterator(super::begin(), key_getter); } + iterator end() { return iterator(super::end(), key_getter); } }; + using key_snapshot = key_snapshot_of<T>; static ptr_t getInstance(const KEY& k) { @@ -368,22 +379,25 @@ public: return LockStatic()->mSet.size(); } - // snapshot of std::shared_ptr<T> pointers - class snapshot + // snapshot of std::shared_ptr<SUBCLASS> pointers + template <typename SUBCLASS> + class snapshot_of { // It's very important that what we store in this snapshot are // weak_ptrs, NOT shared_ptrs. That's how we discover whether any // instance has been deleted during the lifespan of a snapshot. typedef std::vector<weak_t> VectorType; - // Dereferencing our iterator produces a std::shared_ptr for each - // instance that still exists. Since we store weak_ptrs, that involves - // two chained transformations: + // Dereferencing the iterator we publish produces a + // std::shared_ptr<SUBCLASS> for each instance that still exists. + // Since we store weak_ptrs, that involves two chained + // transformations: // - a transform_iterator to lock the weak_ptr and return a shared_ptr - // - a filter_iterator to skip any shared_ptr that has become invalid. - typedef std::shared_ptr<T> strong_ptr; + // - a filter_iterator to skip any shared_ptr that has become invalid + // or references any T instance that isn't SUBCLASS. + typedef std::shared_ptr<SUBCLASS> strong_ptr; static strong_ptr strengthen(typename VectorType::value_type& ptr) { - return ptr.lock(); + return std::dynamic_pointer_cast<SUBCLASS>(ptr.lock()); } static bool dead_skipper(const strong_ptr& ptr) { @@ -391,7 +405,7 @@ public: } public: - snapshot(): + snapshot_of(): // populate our vector with a snapshot of (locked!) InstanceSet // note, this assigns stored shared_ptrs to weak_ptrs for snapshot mData(mLock->mSet.begin(), mLock->mSet.end()) @@ -437,22 +451,33 @@ public: #endif // LL_WINDOWS VectorType mData; }; + using snapshot = snapshot_of<T>; // iterate over this for references to each instance - struct instance_snapshot: public snapshot + template <typename SUBCLASS> + class instance_snapshot_of: public snapshot_of<SUBCLASS> { - typedef boost::indirect_iterator<typename snapshot::iterator> iterator; - iterator begin() { return iterator(snapshot::begin()); } - iterator end() { return iterator(snapshot::end()); } + private: + using super = snapshot_of<SUBCLASS>; + + public: + typedef boost::indirect_iterator<typename super::iterator> iterator; + iterator begin() { return iterator(super::begin()); } + iterator end() { return iterator(super::end()); } void deleteAll() { - for (auto it(snapshot::begin()), end(snapshot::end()); it != end; ++it) + for (auto it(super::begin()), end(super::end()); it != end; ++it) { delete it->get(); } } }; + using instance_snapshot = instance_snapshot_of<T>; + // key_snapshot_of isn't really meaningful, but define it anyway to avoid + // requiring two different LLInstanceTrackerSubclass implementations. + template <typename SUBCLASS> + using key_snapshot_of = instance_snapshot_of<SUBCLASS>; protected: LLInstanceTracker() diff --git a/indra/llcommon/llinstancetrackersubclass.h b/indra/llcommon/llinstancetrackersubclass.h new file mode 100644 index 0000000000..ea9a38200f --- /dev/null +++ b/indra/llcommon/llinstancetrackersubclass.h @@ -0,0 +1,98 @@ +/** + * @file llinstancetrackersubclass.h + * @author Nat Goodspeed + * @date 2022-12-09 + * @brief Intermediate class to get subclass-specific types from + * LLInstanceTracker instance-retrieval methods. + * + * $LicenseInfo:firstyear=2022&license=viewerlgpl$ + * Copyright (c) 2022, Linden Research, Inc. + * $/LicenseInfo$ + */ + +#if ! defined(LL_LLINSTANCETRACKERSUBCLASS_H) +#define LL_LLINSTANCETRACKERSUBCLASS_H + +#include <memory> // std::shared_ptr, std::weak_ptr + +/** + * Derive your subclass S of a subclass T of LLInstanceTracker<T> from + * LLInstanceTrackerSubclass<S, T> to perform appropriate downcasting and + * filtering for LLInstanceTracker access methods. + * + * LLInstanceTracker<T> uses CRTP, so that getWeak(), getInstance(), snapshot + * and instance_snapshot return pointers and references to T. The trouble is + * that subclasses T0 and T1 derived from T also get pointers and references + * to their base class T, requiring explicit downcasting. Moreover, + * T0::getInstance() shouldn't find an instance of any T subclass other than + * T0. Nor should T0::snapshot. + * + * @code + * class Tracked: public LLInstanceTracker<Tracked, std::string> + * { + * private: + * using super = LLInstanceTracker<Tracked, std::string>; + * public: + * Tracked(const std::string& name): super(name) {} + * // All references to Tracked::ptr_t, Tracked::getInstance() etc. + * // appropriately use Tracked. + * // ... + * }; + * + * // But now we derive SubTracked from Tracked. We need SubTracked::ptr_t, + * // SubTracked::getInstance() etc. to use SubTracked, not Tracked. + * // This LLInstanceTrackerSubclass specialization is itself derived from + * // Tracked. + * class SubTracked: public LLInstanceTrackerSubclass<SubTracked, Tracked> + * { + * private: + * using super = LLInstanceTrackerSubclass<SubTracked, Tracked>; + * public: + * // LLInstanceTrackerSubclass's constructor forwards to Tracked's. + * SubTracked(const std::string& name): super(name) {} + * // SubTracked::getInstance() returns std::shared_ptr<SubTracked>, etc. + * // ... + * @endcode + */ +template <typename SUBCLASS, typename T> +class LLInstanceTrackerSubclass: public T +{ +public: + using ptr_t = std::shared_ptr<SUBCLASS>; + using weak_t = std::weak_ptr<SUBCLASS>; + + // forward any constructor call to the corresponding T ctor + template <typename... ARGS> + LLInstanceTrackerSubclass(ARGS&&... args): + T(std::forward<ARGS>(args)...) + {} + + weak_t getWeak() + { + // call base-class getWeak(), try to lock, downcast to SUBCLASS + return std::dynamic_pointer_cast<SUBCLASS>(T::getWeak().lock()); + } + + template <typename KEY> + static ptr_t getInstance(const KEY& k) + { + return std::dynamic_pointer_cast<SUBCLASS>(T::getInstance(k)); + } + + using snapshot = typename T::template snapshot_of<SUBCLASS>; + using instance_snapshot = typename T::template instance_snapshot_of<SUBCLASS>; + using key_snapshot = typename T::template key_snapshot_of<SUBCLASS>; + + static size_t instanceCount() + { + // T::instanceCount() lies because our snapshot, et al., won't + // necessarily return all the T instances -- only those that are also + // SUBCLASS instances. Count those. + size_t count = 0; + for (const auto& pair : snapshot()) + ++count; + return count; + } +}; + +#endif /* ! defined(LL_LLINSTANCETRACKERSUBCLASS_H) */ diff --git a/indra/llcommon/tests/workqueue_test.cpp b/indra/llcommon/tests/workqueue_test.cpp index 1d73f7aa0d..41aa858084 100644 --- a/indra/llcommon/tests/workqueue_test.cpp +++ b/indra/llcommon/tests/workqueue_test.cpp @@ -38,7 +38,7 @@ namespace tut { struct workqueue_data { - WorkQueue queue{"queue"}; + WorkSchedule queue{"queue"}; }; typedef test_group<workqueue_data> workqueue_group; typedef workqueue_group::object object; @@ -49,8 +49,8 @@ namespace tut { set_test_name("name"); ensure_equals("didn't capture name", queue.getKey(), "queue"); - ensure("not findable", WorkQueue::getInstance("queue") == queue.getWeak().lock()); - WorkQueue q2; + ensure("not findable", WorkSchedule::getInstance("queue") == queue.getWeak().lock()); + WorkSchedule q2; ensure("has no name", LLStringUtil::startsWith(q2.getKey(), "WorkQueue")); } @@ -73,16 +73,16 @@ namespace tut { set_test_name("postEvery"); // record of runs - using Shared = std::deque<WorkQueue::TimePoint>; + using Shared = std::deque<WorkSchedule::TimePoint>; // This is an example of how to share data between the originator of - // postEvery(work) and the work item itself, since usually a WorkQueue + // postEvery(work) and the work item itself, since usually a WorkSchedule // is used to dispatch work to a different thread. Neither of them // should call any of LLCond's wait methods: you don't want to stall // either the worker thread or the originating thread (conventionally // main). Use LLCond or a subclass even if all you want to do is // signal the work item that it can quit; consider LLOneShotCond. LLCond<Shared> data; - auto start = WorkQueue::TimePoint::clock::now(); + auto start = WorkSchedule::TimePoint::clock::now(); auto interval = 100ms; queue.postEvery( interval, @@ -93,7 +93,7 @@ namespace tut data.update_one( [](Shared& data) { - data.push_back(WorkQueue::TimePoint::clock::now()); + data.push_back(WorkSchedule::TimePoint::clock::now()); }); // by the 3rd call, return false to stop return (++count < 3); @@ -102,7 +102,7 @@ namespace tut // postEvery() running, so run until we have exhausted the iterations // or we time out waiting for (auto finish = start + 10*interval; - WorkQueue::TimePoint::clock::now() < finish && + WorkSchedule::TimePoint::clock::now() < finish && data.get([](const Shared& data){ return data.size(); }) < 3; ) { queue.runPending(); @@ -139,8 +139,8 @@ namespace tut void object::test<4>() { set_test_name("postTo"); - WorkQueue main("main"); - auto qptr = WorkQueue::getInstance("queue"); + WorkSchedule main("main"); + auto qptr = WorkSchedule::getInstance("queue"); int result = 0; main.postTo( qptr, @@ -171,8 +171,8 @@ namespace tut void object::test<5>() { set_test_name("postTo with void return"); - WorkQueue main("main"); - auto qptr = WorkQueue::getInstance("queue"); + WorkSchedule main("main"); + auto qptr = WorkSchedule::getInstance("queue"); std::string observe; main.postTo( qptr, @@ -194,7 +194,7 @@ namespace tut std::string stored; // Try to call waitForResult() on this thread's main coroutine. It // should throw because the main coroutine must service the queue. - auto what{ catch_what<WorkQueue::Error>( + auto what{ catch_what<WorkSchedule::Error>( [this, &stored](){ stored = queue.waitForResult( [](){ return "should throw"; }); }) }; ensure("lambda should not have run", stored.empty()); diff --git a/indra/llcommon/threadpool.cpp b/indra/llcommon/threadpool.cpp index 1b29b8b69f..3a9a5a2062 100644 --- a/indra/llcommon/threadpool.cpp +++ b/indra/llcommon/threadpool.cpp @@ -58,16 +58,17 @@ struct sleepy_robin: public boost::fibers::algo::round_robin }; /***************************************************************************** -* ThreadPool +* ThreadPoolBase *****************************************************************************/ -LL::ThreadPool::ThreadPool(const std::string& name, size_t threads, size_t capacity): +LL::ThreadPoolBase::ThreadPoolBase(const std::string& name, size_t threads, + WorkQueueBase* queue): super(name), - mQueue(name, capacity), mName("ThreadPool:" + name), - mThreadCount(getConfiguredWidth(name, threads)) + mThreadCount(getConfiguredWidth(name, threads)), + mQueue(queue) {} -void LL::ThreadPool::start() +void LL::ThreadPoolBase::start() { for (size_t i = 0; i < mThreadCount; ++i) { @@ -95,17 +96,17 @@ void LL::ThreadPool::start() }); } -LL::ThreadPool::~ThreadPool() +LL::ThreadPoolBase::~ThreadPoolBase() { close(); } -void LL::ThreadPool::close() +void LL::ThreadPoolBase::close() { - if (! mQueue.isClosed()) + if (! mQueue->isClosed()) { LL_DEBUGS("ThreadPool") << mName << " closing queue and joining threads" << LL_ENDL; - mQueue.close(); + mQueue->close(); for (auto& pair: mThreads) { LL_DEBUGS("ThreadPool") << mName << " waiting on thread " << pair.first << LL_ENDL; @@ -115,7 +116,7 @@ void LL::ThreadPool::close() } } -void LL::ThreadPool::run(const std::string& name) +void LL::ThreadPoolBase::run(const std::string& name) { #if LL_WINDOWS // Try using sleepy_robin fiber scheduler. @@ -127,13 +128,13 @@ void LL::ThreadPool::run(const std::string& name) LL_DEBUGS("ThreadPool") << name << " stopping" << LL_ENDL; } -void LL::ThreadPool::run() +void LL::ThreadPoolBase::run() { - mQueue.runUntilClose(); + mQueue->runUntilClose(); } //static -size_t LL::ThreadPool::getConfiguredWidth(const std::string& name, size_t dft) +size_t LL::ThreadPoolBase::getConfiguredWidth(const std::string& name, size_t dft) { LLSD poolSizes; try @@ -174,7 +175,7 @@ size_t LL::ThreadPool::getConfiguredWidth(const std::string& name, size_t dft) } //static -size_t LL::ThreadPool::getWidth(const std::string& name, size_t dft) +size_t LL::ThreadPoolBase::getWidth(const std::string& name, size_t dft) { auto instance{ getInstance(name) }; if (instance) diff --git a/indra/llcommon/threadpool.h b/indra/llcommon/threadpool.h index b49d511257..60f4a0ce1b 100644 --- a/indra/llcommon/threadpool.h +++ b/indra/llcommon/threadpool.h @@ -13,7 +13,9 @@ #if ! defined(LL_THREADPOOL_H) #define LL_THREADPOOL_H +#include "threadpool_fwd.h" #include "workqueue.h" +#include <memory> // std::unique_ptr #include <string> #include <thread> #include <utility> // std::pair @@ -22,26 +24,24 @@ namespace LL { - class ThreadPool: public LLInstanceTracker<ThreadPool, std::string> + class ThreadPoolBase: public LLInstanceTracker<ThreadPoolBase, std::string> { private: - using super = LLInstanceTracker<ThreadPool, std::string>; + using super = LLInstanceTracker<ThreadPoolBase, std::string>; + public: /** - * Pass ThreadPool a string name. This can be used to look up the + * Pass ThreadPoolBase a string name. This can be used to look up the * relevant WorkQueue. * * The number of threads you pass sets the compile-time default. But * if the user has overridden the LLSD map in the "ThreadPoolSizes" * setting with a key matching this ThreadPool name, that setting * overrides this parameter. - * - * Pass an explicit capacity to limit the size of the queue. - * Constraining the queue can cause a submitter to block. Do not - * constrain any ThreadPool accepting work from the main thread. */ - ThreadPool(const std::string& name, size_t threads=1, size_t capacity=1024*1024); - virtual ~ThreadPool(); + ThreadPoolBase(const std::string& name, size_t threads, + WorkQueueBase* queue); + virtual ~ThreadPoolBase(); /** * Launch the ThreadPool. Until this call, a constructed ThreadPool @@ -59,8 +59,6 @@ namespace LL std::string getName() const { return mName; } size_t getWidth() const { return mThreads.size(); } - /// obtain a non-const reference to the WorkQueue to post work to it - WorkQueue& getQueue() { return mQueue; } /** * Override run() if you need special processing. The default run() @@ -87,15 +85,53 @@ namespace LL static size_t getWidth(const std::string& name, size_t dft); + protected: + std::unique_ptr<WorkQueueBase> mQueue; + private: void run(const std::string& name); - WorkQueue mQueue; std::string mName; size_t mThreadCount; std::vector<std::pair<std::string, std::thread>> mThreads; }; + /** + * Specialize with WorkQueue or, for timestamped tasks, WorkSchedule + */ + template <class QUEUE> + struct ThreadPoolUsing: public ThreadPoolBase + { + using queue_t = QUEUE; + + /** + * Pass ThreadPoolUsing a string name. This can be used to look up the + * relevant WorkQueue. + * + * The number of threads you pass sets the compile-time default. But + * if the user has overridden the LLSD map in the "ThreadPoolSizes" + * setting with a key matching this ThreadPool name, that setting + * overrides this parameter. + * + * Pass an explicit capacity to limit the size of the queue. + * Constraining the queue can cause a submitter to block. Do not + * constrain any ThreadPool accepting work from the main thread. + */ + ThreadPoolUsing(const std::string& name, size_t threads=1, size_t capacity=1024*1024): + ThreadPoolBase(name, threads, new queue_t(name, capacity)) + {} + ~ThreadPoolUsing() override {} + + /** + * obtain a non-const reference to the specific WorkQueue subclass to + * post work to it + */ + queue_t& getQueue() { return static_cast<queue_t&>(*mQueue); } + }; + + /// ThreadPool is shorthand for using the simpler WorkQueue + using ThreadPool = ThreadPoolUsing<WorkQueue>; + } // namespace LL #endif /* ! defined(LL_THREADPOOL_H) */ diff --git a/indra/llcommon/threadpool_fwd.h b/indra/llcommon/threadpool_fwd.h new file mode 100644 index 0000000000..1aa3c4a0e2 --- /dev/null +++ b/indra/llcommon/threadpool_fwd.h @@ -0,0 +1,25 @@ +/** + * @file threadpool_fwd.h + * @author Nat Goodspeed + * @date 2022-12-09 + * @brief Forward declarations for ThreadPool et al. + * + * $LicenseInfo:firstyear=2022&license=viewerlgpl$ + * Copyright (c) 2022, Linden Research, Inc. + * $/LicenseInfo$ + */ + +#if ! defined(LL_THREADPOOL_FWD_H) +#define LL_THREADPOOL_FWD_H + +#include "workqueue.h" + +namespace LL +{ + template <class QUEUE> + struct ThreadPoolUsing; + + using ThreadPool = ThreadPoolUsing<WorkQueue>; +} // namespace LL + +#endif /* ! defined(LL_THREADPOOL_FWD_H) */ diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp index eb06890468..83e0216ae7 100644 --- a/indra/llcommon/workqueue.cpp +++ b/indra/llcommon/workqueue.cpp @@ -26,83 +26,65 @@ using Mutex = LLCoros::Mutex; using Lock = LLCoros::LockType; -LL::WorkQueue::WorkQueue(const std::string& name, size_t capacity): - super(makeName(name)), - mQueue(capacity) +/***************************************************************************** +* WorkQueueBase +*****************************************************************************/ +LL::WorkQueueBase::WorkQueueBase(const std::string& name): + super(makeName(name)) { // TODO: register for "LLApp" events so we can implicitly close() on // viewer shutdown. } -void LL::WorkQueue::close() -{ - mQueue.close(); -} - -size_t LL::WorkQueue::size() -{ - return mQueue.size(); -} - -bool LL::WorkQueue::isClosed() -{ - return mQueue.isClosed(); -} - -bool LL::WorkQueue::done() -{ - return mQueue.done(); -} - -void LL::WorkQueue::runUntilClose() +void LL::WorkQueueBase::runUntilClose() { try { for (;;) { LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; - callWork(mQueue.pop()); + callWork(pop_()); } } - catch (const Queue::Closed&) + catch (const Closed&) { } } -bool LL::WorkQueue::runPending() +bool LL::WorkQueueBase::runPending() { LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; - for (Work work; mQueue.tryPop(work); ) + for (Work work; tryPop_(work); ) { callWork(work); } - return ! mQueue.done(); + return ! done(); } -bool LL::WorkQueue::runOne() +bool LL::WorkQueueBase::runOne() { Work work; - if (mQueue.tryPop(work)) + if (tryPop_(work)) { callWork(work); } - return ! mQueue.done(); + return ! done(); } -bool LL::WorkQueue::runUntil(const TimePoint& until) +bool LL::WorkQueueBase::runUntil(const TimePoint& until) { LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; // Should we subtract some slop to allow for typical Work execution time? // How much slop? // runUntil() is simply a time-bounded runPending(). - for (Work work; TimePoint::clock::now() < until && mQueue.tryPop(work); ) + for (Work work; TimePoint::clock::now() < until && tryPop_(work); ) { callWork(work); } - return ! mQueue.done(); + return ! done(); } -std::string LL::WorkQueue::makeName(const std::string& name) +std::string LL::WorkQueueBase::makeName(const std::string& name) { if (! name.empty()) return name; @@ -120,14 +102,7 @@ std::string LL::WorkQueue::makeName(const std::string& name) return STRINGIZE("WorkQueue" << num); } -void LL::WorkQueue::callWork(const Queue::DataTuple& work) -{ - // ThreadSafeSchedule::pop() always delivers a tuple, even when - // there's only one data field per item, as for us. - callWork(std::get<0>(work)); -} - -void LL::WorkQueue::callWork(const Work& work) +void LL::WorkQueueBase::callWork(const Work& work) { LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD; try @@ -142,12 +117,12 @@ void LL::WorkQueue::callWork(const Work& work) } } -void LL::WorkQueue::error(const std::string& msg) +void LL::WorkQueueBase::error(const std::string& msg) { LL_ERRS("WorkQueue") << msg << LL_ENDL; } -void LL::WorkQueue::checkCoroutine(const std::string& method) +void LL::WorkQueueBase::checkCoroutine(const std::string& method) { // By convention, the default coroutine on each thread has an empty name // string. See also LLCoros::logname(). @@ -156,3 +131,130 @@ void LL::WorkQueue::checkCoroutine(const std::string& method) LLTHROW(Error("Do not call " + method + " from a thread's default coroutine")); } } + +/***************************************************************************** +* WorkQueue +*****************************************************************************/ +LL::WorkQueue::WorkQueue(const std::string& name, size_t capacity): + super(name), + mQueue(capacity) +{ +} + +void LL::WorkQueue::close() +{ + mQueue.close(); +} + +size_t LL::WorkQueue::size() +{ + return mQueue.size(); +} + +bool LL::WorkQueue::isClosed() +{ + return mQueue.isClosed(); +} + +bool LL::WorkQueue::done() +{ + return mQueue.done(); +} + +void LL::WorkQueue::post(const Work& callable) +{ + mQueue.push(callable); +} + +bool LL::WorkQueue::postIfOpen(const Work& callable) +{ + return mQueue.pushIfOpen(callable); +} + +bool LL::WorkQueue::tryPost(const Work& callable) +{ + return mQueue.tryPush(callable); +} + +LL::WorkQueue::Work LL::WorkQueue::pop_() +{ + return mQueue.pop(); +} + +bool LL::WorkQueue::tryPop_(Work& work) +{ + return mQueue.tryPop(work); +} + +/***************************************************************************** +* WorkSchedule +*****************************************************************************/ +LL::WorkSchedule::WorkSchedule(const std::string& name, size_t capacity): + super(name), + mQueue(capacity) +{ +} + +void LL::WorkSchedule::close() +{ + mQueue.close(); +} + +size_t LL::WorkSchedule::size() +{ + return mQueue.size(); +} + +bool LL::WorkSchedule::isClosed() +{ + return mQueue.isClosed(); +} + +bool LL::WorkSchedule::done() +{ + return mQueue.done(); +} + +void LL::WorkSchedule::post(const Work& callable) +{ + // Use TimePoint::clock::now() instead of TimePoint's representation of + // the epoch because this WorkSchedule may contain a mix of past-due + // TimedWork items and TimedWork items scheduled for the future. Sift this + // new item into the correct place. + post(callable, TimePoint::clock::now()); +} + +void LL::WorkSchedule::post(const Work& callable, const TimePoint& time) +{ + mQueue.push(TimedWork(time, callable)); +} + +bool LL::WorkSchedule::postIfOpen(const Work& callable) +{ + return postIfOpen(callable, TimePoint::clock::now()); +} + +bool LL::WorkSchedule::postIfOpen(const Work& callable, const TimePoint& time) +{ + return mQueue.pushIfOpen(TimedWork(time, callable)); +} + +bool LL::WorkSchedule::tryPost(const Work& callable) +{ + return tryPost(callable, TimePoint::clock::now()); +} + +bool LL::WorkSchedule::tryPost(const Work& callable, const TimePoint& time) +{ + return mQueue.tryPush(TimedWork(time, callable)); +} + +LL::WorkSchedule::Work LL::WorkSchedule::pop_() +{ + return std::get<0>(mQueue.pop()); +} + +bool LL::WorkSchedule::tryPop_(Work& work) +{ + return mQueue.tryPop(work); +} diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h index 28a0b5e040..eea8886a7a 100644 --- a/indra/llcommon/workqueue.h +++ b/indra/llcommon/workqueue.h @@ -15,6 +15,7 @@ #include "llcoros.h" #include "llexception.h" #include "llinstancetracker.h" +#include "llinstancetrackersubclass.h" #include "threadsafeschedule.h" #include <chrono> #include <exception> // std::current_exception @@ -23,27 +24,23 @@ namespace LL { + +/***************************************************************************** +* WorkQueueBase: API for WorkQueue and WorkSchedule +*****************************************************************************/ /** * A typical WorkQueue has a string name that can be used to find it. */ - class WorkQueue: public LLInstanceTracker<WorkQueue, std::string> + class WorkQueueBase: public LLInstanceTracker<WorkQueueBase, std::string> { private: - using super = LLInstanceTracker<WorkQueue, std::string>; + using super = LLInstanceTracker<WorkQueueBase, std::string>; public: using Work = std::function<void()>; - - private: - using Queue = ThreadSafeSchedule<Work>; - // helper for postEvery() - template <typename Rep, typename Period, typename CALLABLE> - class BackJack; - - public: - using TimePoint = Queue::TimePoint; - using TimedWork = Queue::TimeTuple; - using Closed = Queue::Closed; + using Closed = LLThreadSafeQueueInterrupt; + // for runFor() + using TimePoint = std::chrono::steady_clock::time_point; struct Error: public LLException { @@ -51,18 +48,18 @@ namespace LL }; /** - * You may omit the WorkQueue name, in which case a unique name is + * You may omit the WorkQueueBase name, in which case a unique name is * synthesized; for practical purposes that makes it anonymous. */ - WorkQueue(const std::string& name = std::string(), size_t capacity=1024); + WorkQueueBase(const std::string& name); /** * Since the point of WorkQueue is to pass work to some other worker - * thread(s) asynchronously, it's important that the WorkQueue continue - * to exist until the worker thread(s) have drained it. To communicate - * that it's time for them to quit, close() the queue. + * thread(s) asynchronously, it's important that it continue to exist + * until the worker thread(s) have drained it. To communicate that + * it's time for them to quit, close() the queue. */ - void close(); + virtual void close() = 0; /** * WorkQueue supports multiple producers and multiple consumers. In @@ -78,158 +75,60 @@ namespace LL * * If you're the only consumer, noticing that size() > 0 is * meaningful. */ - size_t size(); + virtual size_t size() = 0; /// producer end: are we prevented from pushing any additional items? - bool isClosed(); + virtual bool isClosed() = 0; /// consumer end: are we done, is the queue entirely drained? - bool done(); + virtual bool done() = 0; /*---------------------- fire and forget API -----------------------*/ - /// fire-and-forget, but at a particular (future?) time - template <typename CALLABLE> - void post(const TimePoint& time, CALLABLE&& callable) - { - // Defer reifying an arbitrary CALLABLE until we hit this or - // postIfOpen(). All other methods should accept CALLABLEs of - // arbitrary type to avoid multiple levels of std::function - // indirection. - mQueue.push(TimedWork(time, std::move(callable))); - } - /// fire-and-forget - template <typename CALLABLE> - void post(CALLABLE&& callable) - { - // We use TimePoint::clock::now() instead of TimePoint's - // representation of the epoch because this WorkQueue may contain - // a mix of past-due TimedWork items and TimedWork items scheduled - // for the future. Sift this new item into the correct place. - post(TimePoint::clock::now(), std::move(callable)); - } - - /** - * post work for a particular time, unless the queue is closed before - * we can post - */ - template <typename CALLABLE> - bool postIfOpen(const TimePoint& time, CALLABLE&& callable) - { - // Defer reifying an arbitrary CALLABLE until we hit this or - // post(). All other methods should accept CALLABLEs of arbitrary - // type to avoid multiple levels of std::function indirection. - return mQueue.pushIfOpen(TimedWork(time, std::move(callable))); - } + virtual void post(const Work&) = 0; /** * post work, unless the queue is closed before we can post */ - template <typename CALLABLE> - bool postIfOpen(CALLABLE&& callable) - { - return postIfOpen(TimePoint::clock::now(), std::move(callable)); - } + virtual bool postIfOpen(const Work&) = 0; /** - * Post work to be run at a specified time to another WorkQueue, which - * may or may not still exist and be open. Return true if we were able - * to post. + * post work, unless the queue is full */ - template <typename CALLABLE> - static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable); + virtual bool tryPost(const Work&) = 0; /** * Post work to another WorkQueue, which may or may not still exist - * and be open. Return true if we were able to post. - */ - template <typename CALLABLE> - static bool postMaybe(weak_t target, CALLABLE&& callable) - { - return postMaybe(target, TimePoint::clock::now(), - std::forward<CALLABLE>(callable)); - } - - /** - * Launch a callable returning bool that will trigger repeatedly at - * specified interval, until the callable returns false. - * - * If you need to signal that callable from outside, DO NOT bind a - * reference to a simple bool! That's not thread-safe. Instead, bind - * an LLCond variant, e.g. LLOneShotCond or LLBoolCond. + * and be open. Support any post() overload. Return true if we were + * able to post. */ - template <typename Rep, typename Period, typename CALLABLE> - void postEvery(const std::chrono::duration<Rep, Period>& interval, - CALLABLE&& callable); - - template <typename CALLABLE> - bool tryPost(const TimePoint& time, CALLABLE&& callable) - { - return mQueue.tryPush(TimedWork(time, std::move(callable))); - } - - template <typename CALLABLE> - bool tryPost(CALLABLE&& callable) - { - return mQueue.tryPush(TimePoint::clock::now(), std::move(callable)); - } + template <typename... ARGS> + static bool postMaybe(weak_t target, ARGS&&... args); /*------------------------- handshake API --------------------------*/ /** - * Post work to another WorkQueue to be run at a specified time, - * requesting a specific callback to be run on this WorkQueue on - * completion. - * - * Returns true if able to post, false if the other WorkQueue is - * inaccessible. - */ - // Apparently some Microsoft header file defines a macro CALLBACK? The - // natural template argument name CALLBACK produces very weird Visual - // Studio compile errors that seem utterly unrelated to this source - // code. - template <typename CALLABLE, typename FOLLOWUP> - bool postTo(weak_t target, - const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback); - - /** * Post work to another WorkQueue, requesting a specific callback to - * be run on this WorkQueue on completion. + * be run on this WorkQueue on completion. Optional final argument is + * TimePoint for WorkSchedule. * * Returns true if able to post, false if the other WorkQueue is * inaccessible. */ - template <typename CALLABLE, typename FOLLOWUP> - bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback) - { - return postTo(target, TimePoint::clock::now(), - std::move(callable), std::move(callback)); - } - - /** - * Post work to another WorkQueue to be run at a specified time, - * blocking the calling coroutine until then, returning the result to - * caller on completion. - * - * In general, we assume that each thread's default coroutine is busy - * servicing its WorkQueue or whatever. To try to prevent mistakes, we - * forbid calling waitForResult() from a thread's default coroutine. - */ - template <typename CALLABLE> - auto waitForResult(const TimePoint& time, CALLABLE&& callable); + template <typename CALLABLE, typename FOLLOWUP, typename... ARGS> + bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback, + ARGS&&... args); /** * Post work to another WorkQueue, blocking the calling coroutine - * until then, returning the result to caller on completion. + * until then, returning the result to caller on completion. Optional + * final argument is TimePoint for WorkSchedule. * * In general, we assume that each thread's default coroutine is busy * servicing its WorkQueue or whatever. To try to prevent mistakes, we * forbid calling waitForResult() from a thread's default coroutine. */ - template <typename CALLABLE> - auto waitForResult(CALLABLE&& callable) - { - return waitForResult(TimePoint::clock::now(), std::move(callable)); - } + template <typename CALLABLE, typename... ARGS> + auto waitForResult(CALLABLE&& callable, ARGS&&... args); /*--------------------------- worker API ---------------------------*/ @@ -276,7 +175,7 @@ namespace LL */ bool runUntil(const TimePoint& until); - private: + protected: template <typename CALLABLE, typename FOLLOWUP> static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback); /// general case: arbitrary C++ return type @@ -296,13 +195,179 @@ namespace LL static void checkCoroutine(const std::string& method); static void error(const std::string& msg); static std::string makeName(const std::string& name); - void callWork(const Queue::DataTuple& work); void callWork(const Work& work); + + private: + virtual Work pop_() = 0; + virtual bool tryPop_(Work&) = 0; + }; + +/***************************************************************************** +* WorkQueue: no timestamped task support +*****************************************************************************/ + class WorkQueue: public LLInstanceTrackerSubclass<WorkQueue, WorkQueueBase> + { + private: + using super = LLInstanceTrackerSubclass<WorkQueue, WorkQueueBase>; + + public: + /** + * You may omit the WorkQueue name, in which case a unique name is + * synthesized; for practical purposes that makes it anonymous. + */ + WorkQueue(const std::string& name = std::string(), size_t capacity=1024); + + /** + * Since the point of WorkQueue is to pass work to some other worker + * thread(s) asynchronously, it's important that it continue to exist + * until the worker thread(s) have drained it. To communicate that + * it's time for them to quit, close() the queue. + */ + void close() override; + + /** + * WorkQueue supports multiple producers and multiple consumers. In + * the general case it's misleading to test size(), since any other + * thread might change it the nanosecond the lock is released. On that + * basis, some might argue against publishing a size() method at all. + * + * But there are two specific cases in which a test based on size() + * might be reasonable: + * + * * If you're the only producer, noticing that size() == 0 is + * meaningful. + * * If you're the only consumer, noticing that size() > 0 is + * meaningful. + */ + size_t size() override; + /// producer end: are we prevented from pushing any additional items? + bool isClosed() override; + /// consumer end: are we done, is the queue entirely drained? + bool done() override; + + /*---------------------- fire and forget API -----------------------*/ + + /// fire-and-forget + void post(const Work&) override; + + /** + * post work, unless the queue is closed before we can post + */ + bool postIfOpen(const Work&) override; + + /** + * post work, unless the queue is full + */ + bool tryPost(const Work&) override; + + private: + using Queue = LLThreadSafeQueue<Work>; Queue mQueue; + + Work pop_() override; + bool tryPop_(Work&) override; + }; + +/***************************************************************************** +* WorkSchedule: add support for timestamped tasks +*****************************************************************************/ + class WorkSchedule: public LLInstanceTrackerSubclass<WorkSchedule, WorkQueueBase> + { + private: + using super = LLInstanceTrackerSubclass<WorkSchedule, WorkQueueBase>; + using Queue = ThreadSafeSchedule<Work>; + // helper for postEvery() + template <typename Rep, typename Period, typename CALLABLE> + class BackJack; + + public: + using TimePoint = Queue::TimePoint; + using TimedWork = Queue::TimeTuple; + + /** + * You may omit the WorkSchedule name, in which case a unique name is + * synthesized; for practical purposes that makes it anonymous. + */ + WorkSchedule(const std::string& name = std::string(), size_t capacity=1024); + + /** + * Since the point of WorkSchedule is to pass work to some other worker + * thread(s) asynchronously, it's important that the WorkSchedule continue + * to exist until the worker thread(s) have drained it. To communicate + * that it's time for them to quit, close() the queue. + */ + void close() override; + + /** + * WorkSchedule supports multiple producers and multiple consumers. In + * the general case it's misleading to test size(), since any other + * thread might change it the nanosecond the lock is released. On that + * basis, some might argue against publishing a size() method at all. + * + * But there are two specific cases in which a test based on size() + * might be reasonable: + * + * * If you're the only producer, noticing that size() == 0 is + * meaningful. + * * If you're the only consumer, noticing that size() > 0 is + * meaningful. + */ + size_t size() override; + /// producer end: are we prevented from pushing any additional items? + bool isClosed() override; + /// consumer end: are we done, is the queue entirely drained? + bool done() override; + + /*---------------------- fire and forget API -----------------------*/ + + /// fire-and-forget + void post(const Work& callable) override; + + /// fire-and-forget, but at a particular (future?) time + void post(const Work& callable, const TimePoint& time); + + /** + * post work, unless the queue is closed before we can post + */ + bool postIfOpen(const Work& callable) override; + + /** + * post work for a particular time, unless the queue is closed before + * we can post + */ + bool postIfOpen(const Work& callable, const TimePoint& time); + + /** + * post work, unless the queue is full + */ + bool tryPost(const Work& callable) override; + + /** + * post work for a particular time, unless the queue is full + */ + bool tryPost(const Work& callable, const TimePoint& time); + + /** + * Launch a callable returning bool that will trigger repeatedly at + * specified interval, until the callable returns false. + * + * If you need to signal that callable from outside, DO NOT bind a + * reference to a simple bool! That's not thread-safe. Instead, bind + * an LLCond variant, e.g. LLOneShotCond or LLBoolCond. + */ + template <typename Rep, typename Period, typename CALLABLE> + void postEvery(const std::chrono::duration<Rep, Period>& interval, + CALLABLE&& callable); + + private: + Queue mQueue; + + Work pop_() override; + bool tryPop_(Work&) override; }; /** - * BackJack is, in effect, a hand-rolled lambda, binding a WorkQueue, a + * BackJack is, in effect, a hand-rolled lambda, binding a WorkSchedule, a * CALLABLE that returns bool, a TimePoint and an interval at which to * relaunch it. As long as the callable continues returning true, BackJack * keeps resubmitting it to the target WorkQueue. @@ -311,7 +376,7 @@ namespace LL // class method gets its own 'this' pointer -- which we need to resubmit // the whole BackJack callable. template <typename Rep, typename Period, typename CALLABLE> - class WorkQueue::BackJack + class WorkSchedule::BackJack { public: // bind the desired data @@ -325,9 +390,10 @@ namespace LL mCallable(std::move(callable)) {} - // Call by target WorkQueue -- note that although WE require a - // callable returning bool, WorkQueue wants a void callable. We - // consume the bool. + // This operator() method, called by target WorkSchedule, is what + // makes this object a Work item. Although WE require a callable + // returning bool, WorkSchedule wants a void callable. We consume the + // bool. void operator()() { // If mCallable() throws an exception, don't catch it here: if it @@ -343,7 +409,7 @@ namespace LL // register our intent to fire at exact mIntervals. mStart += mInterval; - // We're being called at this moment by the target WorkQueue. + // We're being called at this moment by the target WorkSchedule. // Assume it still exists, rather than checking the result of // lock(). // Resubmit the whole *this callable: that's why we're a class @@ -353,7 +419,8 @@ namespace LL // moved-from. try { - mTarget.lock()->post(mStart, std::move(*this)); + std::dynamic_pointer_cast<WorkSchedule>(mTarget.lock())-> + post(std::move(*this), mStart); } catch (const Closed&) { @@ -370,8 +437,8 @@ namespace LL }; template <typename Rep, typename Period, typename CALLABLE> - void WorkQueue::postEvery(const std::chrono::duration<Rep, Period>& interval, - CALLABLE&& callable) + void WorkSchedule::postEvery(const std::chrono::duration<Rep, Period>& interval, + CALLABLE&& callable) { if (interval.count() <= 0) { @@ -394,7 +461,7 @@ namespace LL /// general case: arbitrary C++ return type template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE> - struct WorkQueue::MakeReplyLambda + struct WorkQueueBase::MakeReplyLambda { auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) { @@ -415,7 +482,7 @@ namespace LL /// specialize for CALLABLE returning void template <typename CALLABLE, typename FOLLOWUP> - struct WorkQueue::MakeReplyLambda<CALLABLE, FOLLOWUP, void> + struct WorkQueueBase::MakeReplyLambda<CALLABLE, FOLLOWUP, void> { auto operator()(CALLABLE&& callable, FOLLOWUP&& callback) { @@ -427,16 +494,16 @@ namespace LL }; template <typename CALLABLE, typename FOLLOWUP> - auto WorkQueue::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback) + auto WorkQueueBase::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback) { return MakeReplyLambda<CALLABLE, FOLLOWUP, decltype(std::forward<CALLABLE>(callable)())>() (std::move(callable), std::move(callback)); } - template <typename CALLABLE, typename FOLLOWUP> - bool WorkQueue::postTo(weak_t target, - const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback) + template <typename CALLABLE, typename FOLLOWUP, typename... ARGS> + bool WorkQueueBase::postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback, + ARGS&&... args) { LL_PROFILE_ZONE_SCOPED; // We're being asked to post to the WorkQueue at target. @@ -450,12 +517,11 @@ namespace LL // lambda that packages our callable, our callback and a weak_ptr // to this originating WorkQueue. tptr->post( - time, [reply = super::getWeak(), callable = std::move(callable), callback = std::move(callback)] - () - mutable { + () mutable + { // Use postMaybe() below in case this originating WorkQueue // has been closed or destroyed. Remember, the outer lambda is // now running on a thread servicing the target WorkQueue, and @@ -478,14 +544,16 @@ namespace LL // originating WorkQueue. Once there, rethrow it. [exc = std::current_exception()](){ std::rethrow_exception(exc); }); } - }); + }, + // if caller passed a TimePoint, pass it along to post() + std::forward<ARGS>(args)...); // looks like we were able to post() return true; } - template <typename CALLABLE> - bool WorkQueue::postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable) + template <typename... ARGS> + bool WorkQueueBase::postMaybe(weak_t target, ARGS&&... args) { LL_PROFILE_ZONE_SCOPED; // target is a weak_ptr: have to lock it to check it @@ -494,7 +562,7 @@ namespace LL { try { - tptr->post(time, std::forward<CALLABLE>(callable)); + tptr->post(std::forward<ARGS>(args)...); // we were able to post() return true; } @@ -509,13 +577,13 @@ namespace LL /// general case: arbitrary C++ return type template <typename CALLABLE, typename RETURNTYPE> - struct WorkQueue::WaitForResult + struct WorkQueueBase::WaitForResult { - auto operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) + template <typename... ARGS> + auto operator()(WorkQueueBase* self, CALLABLE&& callable, ARGS&&... args) { LLCoros::Promise<RETURNTYPE> promise; self->post( - time, // We dare to bind a reference to Promise because it's // specifically designed for cross-thread communication. [&promise, callable = std::move(callable)]() @@ -529,7 +597,9 @@ namespace LL { promise.set_exception(std::current_exception()); } - }); + }, + // if caller passed a TimePoint, pass it to post() + std::forward<ARGS>(args)...); auto future{ LLCoros::getFuture(promise) }; // now, on the calling thread, wait for that result LLCoros::TempStatus st("waiting for WorkQueue::waitForResult()"); @@ -539,13 +609,13 @@ namespace LL /// specialize for CALLABLE returning void template <typename CALLABLE> - struct WorkQueue::WaitForResult<CALLABLE, void> + struct WorkQueueBase::WaitForResult<CALLABLE, void> { - void operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable) + template <typename... ARGS> + void operator()(WorkQueueBase* self, CALLABLE&& callable, ARGS&&... args) { LLCoros::Promise<void> promise; self->post( - time, // &promise is designed for cross-thread access [&promise, callable = std::move(callable)]() mutable { @@ -558,7 +628,9 @@ namespace LL { promise.set_exception(std::current_exception()); } - }); + }, + // if caller passed a TimePoint, pass it to post() + std::forward<ARGS>(args)...); auto future{ LLCoros::getFuture(promise) }; // block until set_value() LLCoros::TempStatus st("waiting for void WorkQueue::waitForResult()"); @@ -566,13 +638,13 @@ namespace LL } }; - template <typename CALLABLE> - auto WorkQueue::waitForResult(const TimePoint& time, CALLABLE&& callable) + template <typename CALLABLE, typename... ARGS> + auto WorkQueueBase::waitForResult(CALLABLE&& callable, ARGS&&... args) { checkCoroutine("waitForResult()"); // derive callable's return type so we can specialize for void return WaitForResult<CALLABLE, decltype(std::forward<CALLABLE>(callable)())>() - (this, time, std::forward<CALLABLE>(callable)); + (this, std::forward<CALLABLE>(callable), std::forward<ARGS>(args)...); } } // namespace LL diff --git a/indra/llimage/llimageworker.h b/indra/llimage/llimageworker.h index 18398d9ae2..fb35108a7a 100644 --- a/indra/llimage/llimageworker.h +++ b/indra/llimage/llimageworker.h @@ -29,11 +29,7 @@ #include "llimage.h" #include "llpointer.h" - -namespace LL -{ - class ThreadPool; -} // namespace LL +#include "threadpool_fwd.h" class LLImageDecodeThread { diff --git a/indra/llrender/llimagegl.cpp b/indra/llrender/llimagegl.cpp index d16757d0ed..7162134c92 100644 --- a/indra/llrender/llimagegl.cpp +++ b/indra/llrender/llimagegl.cpp @@ -2404,7 +2404,7 @@ void LLImageGL::checkActiveThread() LLImageGLThread::LLImageGLThread(LLWindow* window) // We want exactly one thread. - : ThreadPool("LLImageGL", 1) + : LL::ThreadPool("LLImageGL", 1) , mWindow(window) { LL_PROFILE_ZONE_SCOPED_CATEGORY_TEXTURE; @@ -2412,7 +2412,7 @@ LLImageGLThread::LLImageGLThread(LLWindow* window) mFinished = false; mContext = mWindow->createSharedContext(); - ThreadPool::start(); + LL::ThreadPool::start(); } void LLImageGLThread::run() @@ -2422,7 +2422,7 @@ void LLImageGLThread::run() // WorkQueue, likewise cleanup afterwards. mWindow->makeContextCurrent(mContext); gGL.init(false); - ThreadPool::run(); + LL::ThreadPool::run(); gGL.shutdown(); mWindow->destroySharedContext(mContext); } diff --git a/indra/newview/llappviewer.h b/indra/newview/llappviewer.h index 3888fa8ae3..6e35da1b10 100644 --- a/indra/newview/llappviewer.h +++ b/indra/newview/llappviewer.h @@ -49,6 +49,7 @@ #include "llsys.h" // for LLOSInfo #include "lltimer.h" #include "llappcorehttp.h" +#include "threadpool_fwd.h" #include <boost/signals2.hpp> @@ -63,11 +64,6 @@ class LLViewerJoystick; class LLPurgeDiskCacheThread; class LLViewerRegion; -namespace LL -{ - class ThreadPool; -} - extern LLTrace::BlockTimerStatHandle FTM_FRAME; class LLAppViewer : public LLApp diff --git a/indra/newview/llxmlrpctransaction.cpp b/indra/newview/llxmlrpctransaction.cpp index 80e94c37f2..20084aa5d1 100644 --- a/indra/newview/llxmlrpctransaction.cpp +++ b/indra/newview/llxmlrpctransaction.cpp @@ -45,6 +45,12 @@ // Have to include these last to avoid queue redefinition! #include <xmlrpc-epi/xmlrpc.h> +// <xmlrpc-epi/queue.h> contains a harmful #define queue xmlrpc_queue. This +// breaks any use of std::queue. Ditch that #define: if any of our code wants +// to reference xmlrpc_queue, let it reference it directly. +#if defined(queue) +#undef queue +#endif #include "llappviewer.h" #include "lltrans.h" |