summaryrefslogtreecommitdiff
path: root/indra/llcommon
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llcommon')
-rw-r--r--indra/llcommon/CMakeLists.txt2
-rw-r--r--indra/llcommon/llinstancetracker.h97
-rw-r--r--indra/llcommon/llinstancetrackersubclass.h98
-rw-r--r--indra/llcommon/tests/workqueue_test.cpp26
-rw-r--r--indra/llcommon/threadpool.cpp29
-rw-r--r--indra/llcommon/threadpool.h60
-rw-r--r--indra/llcommon/threadpool_fwd.h25
-rw-r--r--indra/llcommon/workqueue.cpp194
-rw-r--r--indra/llcommon/workqueue.h416
9 files changed, 654 insertions, 293 deletions
diff --git a/indra/llcommon/CMakeLists.txt b/indra/llcommon/CMakeLists.txt
index 21998f0b78..96fdb1f924 100644
--- a/indra/llcommon/CMakeLists.txt
+++ b/indra/llcommon/CMakeLists.txt
@@ -189,6 +189,7 @@ set(llcommon_HEADER_FILES
llinitdestroyclass.h
llinitparam.h
llinstancetracker.h
+ llinstancetrackersubclass.h
llkeybind.h
llkeythrottle.h
llleap.h
@@ -261,6 +262,7 @@ set(llcommon_HEADER_FILES
stdtypes.h
stringize.h
threadpool.h
+ threadpool_fwd.h
threadsafeschedule.h
timer.h
tuple.h
diff --git a/indra/llcommon/llinstancetracker.h b/indra/llcommon/llinstancetracker.h
index 02535a59e7..97f7817e74 100644
--- a/indra/llcommon/llinstancetracker.h
+++ b/indra/llcommon/llinstancetracker.h
@@ -104,22 +104,26 @@ public:
return LockStatic()->mMap.size();
}
- // snapshot of std::pair<const KEY, std::shared_ptr<T>> pairs
- class snapshot
+ // snapshot of std::pair<const KEY, std::shared_ptr<SUBCLASS>> pairs, for
+ // some SUBCLASS derived from T
+ template <typename SUBCLASS>
+ class snapshot_of
{
// It's very important that what we store in this snapshot are
// weak_ptrs, NOT shared_ptrs. That's how we discover whether any
// instance has been deleted during the lifespan of a snapshot.
typedef std::vector<std::pair<const KEY, weak_t>> VectorType;
- // Dereferencing our iterator produces a std::shared_ptr for each
- // instance that still exists. Since we store weak_ptrs, that involves
- // two chained transformations:
+ // Dereferencing the iterator we publish produces a
+ // std::shared_ptr<SUBCLASS> for each instance that still exists.
+ // Since we store weak_ptr<T>, that involves two chained
+ // transformations:
// - a transform_iterator to lock the weak_ptr and return a shared_ptr
- // - a filter_iterator to skip any shared_ptr that has become invalid.
+ // - a filter_iterator to skip any shared_ptr<T> that has become
+ // invalid or references any T instance that isn't SUBCLASS.
// It is very important that we filter lazily, that is, during
// traversal. Any one of our stored weak_ptrs might expire during
// traversal.
- typedef std::pair<const KEY, ptr_t> strong_pair;
+ typedef std::pair<const KEY, std::shared_ptr<SUBCLASS>> strong_pair;
// Note for future reference: nat has not yet had any luck (up to
// Boost 1.67) trying to use boost::transform_iterator with a hand-
// coded functor, only with actual functions. In my experience, an
@@ -127,7 +131,7 @@ public:
// result_type typedef. But this works.
static strong_pair strengthen(typename VectorType::value_type& pair)
{
- return { pair.first, pair.second.lock() };
+ return { pair.first, std::dynamic_pointer_cast<SUBCLASS>(pair.second.lock()) };
}
static bool dead_skipper(const strong_pair& pair)
{
@@ -135,7 +139,7 @@ public:
}
public:
- snapshot():
+ snapshot_of():
// populate our vector with a snapshot of (locked!) InstanceMap
// note, this assigns pair<KEY, shared_ptr> to pair<KEY, weak_ptr>
mData(mLock->mMap.begin(), mLock->mMap.end())
@@ -184,44 +188,51 @@ public:
#endif // LL_WINDOWS
VectorType mData;
};
+ using snapshot = snapshot_of<T>;
- // iterate over this for references to each instance
- class instance_snapshot: public snapshot
+ // iterate over this for references to each SUBCLASS instance
+ template <typename SUBCLASS>
+ class instance_snapshot_of: public snapshot_of<SUBCLASS>
{
private:
- static T& instance_getter(typename snapshot::iterator::reference pair)
+ using super = snapshot_of<SUBCLASS>;
+ static T& instance_getter(typename super::iterator::reference pair)
{
return *pair.second;
}
public:
typedef boost::transform_iterator<decltype(instance_getter)*,
- typename snapshot::iterator> iterator;
- iterator begin() { return iterator(snapshot::begin(), instance_getter); }
- iterator end() { return iterator(snapshot::end(), instance_getter); }
+ typename super::iterator> iterator;
+ iterator begin() { return iterator(super::begin(), instance_getter); }
+ iterator end() { return iterator(super::end(), instance_getter); }
void deleteAll()
{
- for (auto it(snapshot::begin()), end(snapshot::end()); it != end; ++it)
+ for (auto it(super::begin()), end(super::end()); it != end; ++it)
{
delete it->second.get();
}
}
- };
+ };
+ using instance_snapshot = instance_snapshot_of<T>;
// iterate over this for each key
- class key_snapshot: public snapshot
+ template <typename SUBCLASS>
+ class key_snapshot_of: public snapshot_of<SUBCLASS>
{
private:
- static KEY key_getter(typename snapshot::iterator::reference pair)
+ using super = snapshot_of<SUBCLASS>;
+ static KEY key_getter(typename super::iterator::reference pair)
{
return pair.first;
}
public:
typedef boost::transform_iterator<decltype(key_getter)*,
- typename snapshot::iterator> iterator;
- iterator begin() { return iterator(snapshot::begin(), key_getter); }
- iterator end() { return iterator(snapshot::end(), key_getter); }
+ typename super::iterator> iterator;
+ iterator begin() { return iterator(super::begin(), key_getter); }
+ iterator end() { return iterator(super::end(), key_getter); }
};
+ using key_snapshot = key_snapshot_of<T>;
static ptr_t getInstance(const KEY& k)
{
@@ -368,22 +379,25 @@ public:
return LockStatic()->mSet.size();
}
- // snapshot of std::shared_ptr<T> pointers
- class snapshot
+ // snapshot of std::shared_ptr<SUBCLASS> pointers
+ template <typename SUBCLASS>
+ class snapshot_of
{
// It's very important that what we store in this snapshot are
// weak_ptrs, NOT shared_ptrs. That's how we discover whether any
// instance has been deleted during the lifespan of a snapshot.
typedef std::vector<weak_t> VectorType;
- // Dereferencing our iterator produces a std::shared_ptr for each
- // instance that still exists. Since we store weak_ptrs, that involves
- // two chained transformations:
+ // Dereferencing the iterator we publish produces a
+ // std::shared_ptr<SUBCLASS> for each instance that still exists.
+ // Since we store weak_ptrs, that involves two chained
+ // transformations:
// - a transform_iterator to lock the weak_ptr and return a shared_ptr
- // - a filter_iterator to skip any shared_ptr that has become invalid.
- typedef std::shared_ptr<T> strong_ptr;
+ // - a filter_iterator to skip any shared_ptr that has become invalid
+ // or references any T instance that isn't SUBCLASS.
+ typedef std::shared_ptr<SUBCLASS> strong_ptr;
static strong_ptr strengthen(typename VectorType::value_type& ptr)
{
- return ptr.lock();
+ return std::dynamic_pointer_cast<SUBCLASS>(ptr.lock());
}
static bool dead_skipper(const strong_ptr& ptr)
{
@@ -391,7 +405,7 @@ public:
}
public:
- snapshot():
+ snapshot_of():
// populate our vector with a snapshot of (locked!) InstanceSet
// note, this assigns stored shared_ptrs to weak_ptrs for snapshot
mData(mLock->mSet.begin(), mLock->mSet.end())
@@ -437,22 +451,33 @@ public:
#endif // LL_WINDOWS
VectorType mData;
};
+ using snapshot = snapshot_of<T>;
// iterate over this for references to each instance
- struct instance_snapshot: public snapshot
+ template <typename SUBCLASS>
+ class instance_snapshot_of: public snapshot_of<SUBCLASS>
{
- typedef boost::indirect_iterator<typename snapshot::iterator> iterator;
- iterator begin() { return iterator(snapshot::begin()); }
- iterator end() { return iterator(snapshot::end()); }
+ private:
+ using super = snapshot_of<SUBCLASS>;
+
+ public:
+ typedef boost::indirect_iterator<typename super::iterator> iterator;
+ iterator begin() { return iterator(super::begin()); }
+ iterator end() { return iterator(super::end()); }
void deleteAll()
{
- for (auto it(snapshot::begin()), end(snapshot::end()); it != end; ++it)
+ for (auto it(super::begin()), end(super::end()); it != end; ++it)
{
delete it->get();
}
}
};
+ using instance_snapshot = instance_snapshot_of<T>;
+ // key_snapshot_of isn't really meaningful, but define it anyway to avoid
+ // requiring two different LLInstanceTrackerSubclass implementations.
+ template <typename SUBCLASS>
+ using key_snapshot_of = instance_snapshot_of<SUBCLASS>;
protected:
LLInstanceTracker()
diff --git a/indra/llcommon/llinstancetrackersubclass.h b/indra/llcommon/llinstancetrackersubclass.h
new file mode 100644
index 0000000000..ea9a38200f
--- /dev/null
+++ b/indra/llcommon/llinstancetrackersubclass.h
@@ -0,0 +1,98 @@
+/**
+ * @file llinstancetrackersubclass.h
+ * @author Nat Goodspeed
+ * @date 2022-12-09
+ * @brief Intermediate class to get subclass-specific types from
+ * LLInstanceTracker instance-retrieval methods.
+ *
+ * $LicenseInfo:firstyear=2022&license=viewerlgpl$
+ * Copyright (c) 2022, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+#if ! defined(LL_LLINSTANCETRACKERSUBCLASS_H)
+#define LL_LLINSTANCETRACKERSUBCLASS_H
+
+#include <memory> // std::shared_ptr, std::weak_ptr
+
+/**
+ * Derive your subclass S of a subclass T of LLInstanceTracker<T> from
+ * LLInstanceTrackerSubclass<S, T> to perform appropriate downcasting and
+ * filtering for LLInstanceTracker access methods.
+ *
+ * LLInstanceTracker<T> uses CRTP, so that getWeak(), getInstance(), snapshot
+ * and instance_snapshot return pointers and references to T. The trouble is
+ * that subclasses T0 and T1 derived from T also get pointers and references
+ * to their base class T, requiring explicit downcasting. Moreover,
+ * T0::getInstance() shouldn't find an instance of any T subclass other than
+ * T0. Nor should T0::snapshot.
+ *
+ * @code
+ * class Tracked: public LLInstanceTracker<Tracked, std::string>
+ * {
+ * private:
+ * using super = LLInstanceTracker<Tracked, std::string>;
+ * public:
+ * Tracked(const std::string& name): super(name) {}
+ * // All references to Tracked::ptr_t, Tracked::getInstance() etc.
+ * // appropriately use Tracked.
+ * // ...
+ * };
+ *
+ * // But now we derive SubTracked from Tracked. We need SubTracked::ptr_t,
+ * // SubTracked::getInstance() etc. to use SubTracked, not Tracked.
+ * // This LLInstanceTrackerSubclass specialization is itself derived from
+ * // Tracked.
+ * class SubTracked: public LLInstanceTrackerSubclass<SubTracked, Tracked>
+ * {
+ * private:
+ * using super = LLInstanceTrackerSubclass<SubTracked, Tracked>;
+ * public:
+ * // LLInstanceTrackerSubclass's constructor forwards to Tracked's.
+ * SubTracked(const std::string& name): super(name) {}
+ * // SubTracked::getInstance() returns std::shared_ptr<SubTracked>, etc.
+ * // ...
+ * @endcode
+ */
+template <typename SUBCLASS, typename T>
+class LLInstanceTrackerSubclass: public T
+{
+public:
+ using ptr_t = std::shared_ptr<SUBCLASS>;
+ using weak_t = std::weak_ptr<SUBCLASS>;
+
+ // forward any constructor call to the corresponding T ctor
+ template <typename... ARGS>
+ LLInstanceTrackerSubclass(ARGS&&... args):
+ T(std::forward<ARGS>(args)...)
+ {}
+
+ weak_t getWeak()
+ {
+ // call base-class getWeak(), try to lock, downcast to SUBCLASS
+ return std::dynamic_pointer_cast<SUBCLASS>(T::getWeak().lock());
+ }
+
+ template <typename KEY>
+ static ptr_t getInstance(const KEY& k)
+ {
+ return std::dynamic_pointer_cast<SUBCLASS>(T::getInstance(k));
+ }
+
+ using snapshot = typename T::template snapshot_of<SUBCLASS>;
+ using instance_snapshot = typename T::template instance_snapshot_of<SUBCLASS>;
+ using key_snapshot = typename T::template key_snapshot_of<SUBCLASS>;
+
+ static size_t instanceCount()
+ {
+ // T::instanceCount() lies because our snapshot, et al., won't
+ // necessarily return all the T instances -- only those that are also
+ // SUBCLASS instances. Count those.
+ size_t count = 0;
+ for (const auto& pair : snapshot())
+ ++count;
+ return count;
+ }
+};
+
+#endif /* ! defined(LL_LLINSTANCETRACKERSUBCLASS_H) */
diff --git a/indra/llcommon/tests/workqueue_test.cpp b/indra/llcommon/tests/workqueue_test.cpp
index 1d73f7aa0d..41aa858084 100644
--- a/indra/llcommon/tests/workqueue_test.cpp
+++ b/indra/llcommon/tests/workqueue_test.cpp
@@ -38,7 +38,7 @@ namespace tut
{
struct workqueue_data
{
- WorkQueue queue{"queue"};
+ WorkSchedule queue{"queue"};
};
typedef test_group<workqueue_data> workqueue_group;
typedef workqueue_group::object object;
@@ -49,8 +49,8 @@ namespace tut
{
set_test_name("name");
ensure_equals("didn't capture name", queue.getKey(), "queue");
- ensure("not findable", WorkQueue::getInstance("queue") == queue.getWeak().lock());
- WorkQueue q2;
+ ensure("not findable", WorkSchedule::getInstance("queue") == queue.getWeak().lock());
+ WorkSchedule q2;
ensure("has no name", LLStringUtil::startsWith(q2.getKey(), "WorkQueue"));
}
@@ -73,16 +73,16 @@ namespace tut
{
set_test_name("postEvery");
// record of runs
- using Shared = std::deque<WorkQueue::TimePoint>;
+ using Shared = std::deque<WorkSchedule::TimePoint>;
// This is an example of how to share data between the originator of
- // postEvery(work) and the work item itself, since usually a WorkQueue
+ // postEvery(work) and the work item itself, since usually a WorkSchedule
// is used to dispatch work to a different thread. Neither of them
// should call any of LLCond's wait methods: you don't want to stall
// either the worker thread or the originating thread (conventionally
// main). Use LLCond or a subclass even if all you want to do is
// signal the work item that it can quit; consider LLOneShotCond.
LLCond<Shared> data;
- auto start = WorkQueue::TimePoint::clock::now();
+ auto start = WorkSchedule::TimePoint::clock::now();
auto interval = 100ms;
queue.postEvery(
interval,
@@ -93,7 +93,7 @@ namespace tut
data.update_one(
[](Shared& data)
{
- data.push_back(WorkQueue::TimePoint::clock::now());
+ data.push_back(WorkSchedule::TimePoint::clock::now());
});
// by the 3rd call, return false to stop
return (++count < 3);
@@ -102,7 +102,7 @@ namespace tut
// postEvery() running, so run until we have exhausted the iterations
// or we time out waiting
for (auto finish = start + 10*interval;
- WorkQueue::TimePoint::clock::now() < finish &&
+ WorkSchedule::TimePoint::clock::now() < finish &&
data.get([](const Shared& data){ return data.size(); }) < 3; )
{
queue.runPending();
@@ -139,8 +139,8 @@ namespace tut
void object::test<4>()
{
set_test_name("postTo");
- WorkQueue main("main");
- auto qptr = WorkQueue::getInstance("queue");
+ WorkSchedule main("main");
+ auto qptr = WorkSchedule::getInstance("queue");
int result = 0;
main.postTo(
qptr,
@@ -171,8 +171,8 @@ namespace tut
void object::test<5>()
{
set_test_name("postTo with void return");
- WorkQueue main("main");
- auto qptr = WorkQueue::getInstance("queue");
+ WorkSchedule main("main");
+ auto qptr = WorkSchedule::getInstance("queue");
std::string observe;
main.postTo(
qptr,
@@ -194,7 +194,7 @@ namespace tut
std::string stored;
// Try to call waitForResult() on this thread's main coroutine. It
// should throw because the main coroutine must service the queue.
- auto what{ catch_what<WorkQueue::Error>(
+ auto what{ catch_what<WorkSchedule::Error>(
[this, &stored](){ stored = queue.waitForResult(
[](){ return "should throw"; }); }) };
ensure("lambda should not have run", stored.empty());
diff --git a/indra/llcommon/threadpool.cpp b/indra/llcommon/threadpool.cpp
index 1b29b8b69f..3a9a5a2062 100644
--- a/indra/llcommon/threadpool.cpp
+++ b/indra/llcommon/threadpool.cpp
@@ -58,16 +58,17 @@ struct sleepy_robin: public boost::fibers::algo::round_robin
};
/*****************************************************************************
-* ThreadPool
+* ThreadPoolBase
*****************************************************************************/
-LL::ThreadPool::ThreadPool(const std::string& name, size_t threads, size_t capacity):
+LL::ThreadPoolBase::ThreadPoolBase(const std::string& name, size_t threads,
+ WorkQueueBase* queue):
super(name),
- mQueue(name, capacity),
mName("ThreadPool:" + name),
- mThreadCount(getConfiguredWidth(name, threads))
+ mThreadCount(getConfiguredWidth(name, threads)),
+ mQueue(queue)
{}
-void LL::ThreadPool::start()
+void LL::ThreadPoolBase::start()
{
for (size_t i = 0; i < mThreadCount; ++i)
{
@@ -95,17 +96,17 @@ void LL::ThreadPool::start()
});
}
-LL::ThreadPool::~ThreadPool()
+LL::ThreadPoolBase::~ThreadPoolBase()
{
close();
}
-void LL::ThreadPool::close()
+void LL::ThreadPoolBase::close()
{
- if (! mQueue.isClosed())
+ if (! mQueue->isClosed())
{
LL_DEBUGS("ThreadPool") << mName << " closing queue and joining threads" << LL_ENDL;
- mQueue.close();
+ mQueue->close();
for (auto& pair: mThreads)
{
LL_DEBUGS("ThreadPool") << mName << " waiting on thread " << pair.first << LL_ENDL;
@@ -115,7 +116,7 @@ void LL::ThreadPool::close()
}
}
-void LL::ThreadPool::run(const std::string& name)
+void LL::ThreadPoolBase::run(const std::string& name)
{
#if LL_WINDOWS
// Try using sleepy_robin fiber scheduler.
@@ -127,13 +128,13 @@ void LL::ThreadPool::run(const std::string& name)
LL_DEBUGS("ThreadPool") << name << " stopping" << LL_ENDL;
}
-void LL::ThreadPool::run()
+void LL::ThreadPoolBase::run()
{
- mQueue.runUntilClose();
+ mQueue->runUntilClose();
}
//static
-size_t LL::ThreadPool::getConfiguredWidth(const std::string& name, size_t dft)
+size_t LL::ThreadPoolBase::getConfiguredWidth(const std::string& name, size_t dft)
{
LLSD poolSizes;
try
@@ -174,7 +175,7 @@ size_t LL::ThreadPool::getConfiguredWidth(const std::string& name, size_t dft)
}
//static
-size_t LL::ThreadPool::getWidth(const std::string& name, size_t dft)
+size_t LL::ThreadPoolBase::getWidth(const std::string& name, size_t dft)
{
auto instance{ getInstance(name) };
if (instance)
diff --git a/indra/llcommon/threadpool.h b/indra/llcommon/threadpool.h
index b49d511257..60f4a0ce1b 100644
--- a/indra/llcommon/threadpool.h
+++ b/indra/llcommon/threadpool.h
@@ -13,7 +13,9 @@
#if ! defined(LL_THREADPOOL_H)
#define LL_THREADPOOL_H
+#include "threadpool_fwd.h"
#include "workqueue.h"
+#include <memory> // std::unique_ptr
#include <string>
#include <thread>
#include <utility> // std::pair
@@ -22,26 +24,24 @@
namespace LL
{
- class ThreadPool: public LLInstanceTracker<ThreadPool, std::string>
+ class ThreadPoolBase: public LLInstanceTracker<ThreadPoolBase, std::string>
{
private:
- using super = LLInstanceTracker<ThreadPool, std::string>;
+ using super = LLInstanceTracker<ThreadPoolBase, std::string>;
+
public:
/**
- * Pass ThreadPool a string name. This can be used to look up the
+ * Pass ThreadPoolBase a string name. This can be used to look up the
* relevant WorkQueue.
*
* The number of threads you pass sets the compile-time default. But
* if the user has overridden the LLSD map in the "ThreadPoolSizes"
* setting with a key matching this ThreadPool name, that setting
* overrides this parameter.
- *
- * Pass an explicit capacity to limit the size of the queue.
- * Constraining the queue can cause a submitter to block. Do not
- * constrain any ThreadPool accepting work from the main thread.
*/
- ThreadPool(const std::string& name, size_t threads=1, size_t capacity=1024*1024);
- virtual ~ThreadPool();
+ ThreadPoolBase(const std::string& name, size_t threads,
+ WorkQueueBase* queue);
+ virtual ~ThreadPoolBase();
/**
* Launch the ThreadPool. Until this call, a constructed ThreadPool
@@ -59,8 +59,6 @@ namespace LL
std::string getName() const { return mName; }
size_t getWidth() const { return mThreads.size(); }
- /// obtain a non-const reference to the WorkQueue to post work to it
- WorkQueue& getQueue() { return mQueue; }
/**
* Override run() if you need special processing. The default run()
@@ -87,15 +85,53 @@ namespace LL
static
size_t getWidth(const std::string& name, size_t dft);
+ protected:
+ std::unique_ptr<WorkQueueBase> mQueue;
+
private:
void run(const std::string& name);
- WorkQueue mQueue;
std::string mName;
size_t mThreadCount;
std::vector<std::pair<std::string, std::thread>> mThreads;
};
+ /**
+ * Specialize with WorkQueue or, for timestamped tasks, WorkSchedule
+ */
+ template <class QUEUE>
+ struct ThreadPoolUsing: public ThreadPoolBase
+ {
+ using queue_t = QUEUE;
+
+ /**
+ * Pass ThreadPoolUsing a string name. This can be used to look up the
+ * relevant WorkQueue.
+ *
+ * The number of threads you pass sets the compile-time default. But
+ * if the user has overridden the LLSD map in the "ThreadPoolSizes"
+ * setting with a key matching this ThreadPool name, that setting
+ * overrides this parameter.
+ *
+ * Pass an explicit capacity to limit the size of the queue.
+ * Constraining the queue can cause a submitter to block. Do not
+ * constrain any ThreadPool accepting work from the main thread.
+ */
+ ThreadPoolUsing(const std::string& name, size_t threads=1, size_t capacity=1024*1024):
+ ThreadPoolBase(name, threads, new queue_t(name, capacity))
+ {}
+ ~ThreadPoolUsing() override {}
+
+ /**
+ * obtain a non-const reference to the specific WorkQueue subclass to
+ * post work to it
+ */
+ queue_t& getQueue() { return static_cast<queue_t&>(*mQueue); }
+ };
+
+ /// ThreadPool is shorthand for using the simpler WorkQueue
+ using ThreadPool = ThreadPoolUsing<WorkQueue>;
+
} // namespace LL
#endif /* ! defined(LL_THREADPOOL_H) */
diff --git a/indra/llcommon/threadpool_fwd.h b/indra/llcommon/threadpool_fwd.h
new file mode 100644
index 0000000000..1aa3c4a0e2
--- /dev/null
+++ b/indra/llcommon/threadpool_fwd.h
@@ -0,0 +1,25 @@
+/**
+ * @file threadpool_fwd.h
+ * @author Nat Goodspeed
+ * @date 2022-12-09
+ * @brief Forward declarations for ThreadPool et al.
+ *
+ * $LicenseInfo:firstyear=2022&license=viewerlgpl$
+ * Copyright (c) 2022, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+#if ! defined(LL_THREADPOOL_FWD_H)
+#define LL_THREADPOOL_FWD_H
+
+#include "workqueue.h"
+
+namespace LL
+{
+ template <class QUEUE>
+ struct ThreadPoolUsing;
+
+ using ThreadPool = ThreadPoolUsing<WorkQueue>;
+} // namespace LL
+
+#endif /* ! defined(LL_THREADPOOL_FWD_H) */
diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp
index eb06890468..83e0216ae7 100644
--- a/indra/llcommon/workqueue.cpp
+++ b/indra/llcommon/workqueue.cpp
@@ -26,83 +26,65 @@
using Mutex = LLCoros::Mutex;
using Lock = LLCoros::LockType;
-LL::WorkQueue::WorkQueue(const std::string& name, size_t capacity):
- super(makeName(name)),
- mQueue(capacity)
+/*****************************************************************************
+* WorkQueueBase
+*****************************************************************************/
+LL::WorkQueueBase::WorkQueueBase(const std::string& name):
+ super(makeName(name))
{
// TODO: register for "LLApp" events so we can implicitly close() on
// viewer shutdown.
}
-void LL::WorkQueue::close()
-{
- mQueue.close();
-}
-
-size_t LL::WorkQueue::size()
-{
- return mQueue.size();
-}
-
-bool LL::WorkQueue::isClosed()
-{
- return mQueue.isClosed();
-}
-
-bool LL::WorkQueue::done()
-{
- return mQueue.done();
-}
-
-void LL::WorkQueue::runUntilClose()
+void LL::WorkQueueBase::runUntilClose()
{
try
{
for (;;)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
- callWork(mQueue.pop());
+ callWork(pop_());
}
}
- catch (const Queue::Closed&)
+ catch (const Closed&)
{
}
}
-bool LL::WorkQueue::runPending()
+bool LL::WorkQueueBase::runPending()
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
- for (Work work; mQueue.tryPop(work); )
+ for (Work work; tryPop_(work); )
{
callWork(work);
}
- return ! mQueue.done();
+ return ! done();
}
-bool LL::WorkQueue::runOne()
+bool LL::WorkQueueBase::runOne()
{
Work work;
- if (mQueue.tryPop(work))
+ if (tryPop_(work))
{
callWork(work);
}
- return ! mQueue.done();
+ return ! done();
}
-bool LL::WorkQueue::runUntil(const TimePoint& until)
+bool LL::WorkQueueBase::runUntil(const TimePoint& until)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
// Should we subtract some slop to allow for typical Work execution time?
// How much slop?
// runUntil() is simply a time-bounded runPending().
- for (Work work; TimePoint::clock::now() < until && mQueue.tryPop(work); )
+ for (Work work; TimePoint::clock::now() < until && tryPop_(work); )
{
callWork(work);
}
- return ! mQueue.done();
+ return ! done();
}
-std::string LL::WorkQueue::makeName(const std::string& name)
+std::string LL::WorkQueueBase::makeName(const std::string& name)
{
if (! name.empty())
return name;
@@ -120,14 +102,7 @@ std::string LL::WorkQueue::makeName(const std::string& name)
return STRINGIZE("WorkQueue" << num);
}
-void LL::WorkQueue::callWork(const Queue::DataTuple& work)
-{
- // ThreadSafeSchedule::pop() always delivers a tuple, even when
- // there's only one data field per item, as for us.
- callWork(std::get<0>(work));
-}
-
-void LL::WorkQueue::callWork(const Work& work)
+void LL::WorkQueueBase::callWork(const Work& work)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
try
@@ -142,12 +117,12 @@ void LL::WorkQueue::callWork(const Work& work)
}
}
-void LL::WorkQueue::error(const std::string& msg)
+void LL::WorkQueueBase::error(const std::string& msg)
{
LL_ERRS("WorkQueue") << msg << LL_ENDL;
}
-void LL::WorkQueue::checkCoroutine(const std::string& method)
+void LL::WorkQueueBase::checkCoroutine(const std::string& method)
{
// By convention, the default coroutine on each thread has an empty name
// string. See also LLCoros::logname().
@@ -156,3 +131,130 @@ void LL::WorkQueue::checkCoroutine(const std::string& method)
LLTHROW(Error("Do not call " + method + " from a thread's default coroutine"));
}
}
+
+/*****************************************************************************
+* WorkQueue
+*****************************************************************************/
+LL::WorkQueue::WorkQueue(const std::string& name, size_t capacity):
+ super(name),
+ mQueue(capacity)
+{
+}
+
+void LL::WorkQueue::close()
+{
+ mQueue.close();
+}
+
+size_t LL::WorkQueue::size()
+{
+ return mQueue.size();
+}
+
+bool LL::WorkQueue::isClosed()
+{
+ return mQueue.isClosed();
+}
+
+bool LL::WorkQueue::done()
+{
+ return mQueue.done();
+}
+
+void LL::WorkQueue::post(const Work& callable)
+{
+ mQueue.push(callable);
+}
+
+bool LL::WorkQueue::postIfOpen(const Work& callable)
+{
+ return mQueue.pushIfOpen(callable);
+}
+
+bool LL::WorkQueue::tryPost(const Work& callable)
+{
+ return mQueue.tryPush(callable);
+}
+
+LL::WorkQueue::Work LL::WorkQueue::pop_()
+{
+ return mQueue.pop();
+}
+
+bool LL::WorkQueue::tryPop_(Work& work)
+{
+ return mQueue.tryPop(work);
+}
+
+/*****************************************************************************
+* WorkSchedule
+*****************************************************************************/
+LL::WorkSchedule::WorkSchedule(const std::string& name, size_t capacity):
+ super(name),
+ mQueue(capacity)
+{
+}
+
+void LL::WorkSchedule::close()
+{
+ mQueue.close();
+}
+
+size_t LL::WorkSchedule::size()
+{
+ return mQueue.size();
+}
+
+bool LL::WorkSchedule::isClosed()
+{
+ return mQueue.isClosed();
+}
+
+bool LL::WorkSchedule::done()
+{
+ return mQueue.done();
+}
+
+void LL::WorkSchedule::post(const Work& callable)
+{
+ // Use TimePoint::clock::now() instead of TimePoint's representation of
+ // the epoch because this WorkSchedule may contain a mix of past-due
+ // TimedWork items and TimedWork items scheduled for the future. Sift this
+ // new item into the correct place.
+ post(callable, TimePoint::clock::now());
+}
+
+void LL::WorkSchedule::post(const Work& callable, const TimePoint& time)
+{
+ mQueue.push(TimedWork(time, callable));
+}
+
+bool LL::WorkSchedule::postIfOpen(const Work& callable)
+{
+ return postIfOpen(callable, TimePoint::clock::now());
+}
+
+bool LL::WorkSchedule::postIfOpen(const Work& callable, const TimePoint& time)
+{
+ return mQueue.pushIfOpen(TimedWork(time, callable));
+}
+
+bool LL::WorkSchedule::tryPost(const Work& callable)
+{
+ return tryPost(callable, TimePoint::clock::now());
+}
+
+bool LL::WorkSchedule::tryPost(const Work& callable, const TimePoint& time)
+{
+ return mQueue.tryPush(TimedWork(time, callable));
+}
+
+LL::WorkSchedule::Work LL::WorkSchedule::pop_()
+{
+ return std::get<0>(mQueue.pop());
+}
+
+bool LL::WorkSchedule::tryPop_(Work& work)
+{
+ return mQueue.tryPop(work);
+}
diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h
index 28a0b5e040..eea8886a7a 100644
--- a/indra/llcommon/workqueue.h
+++ b/indra/llcommon/workqueue.h
@@ -15,6 +15,7 @@
#include "llcoros.h"
#include "llexception.h"
#include "llinstancetracker.h"
+#include "llinstancetrackersubclass.h"
#include "threadsafeschedule.h"
#include <chrono>
#include <exception> // std::current_exception
@@ -23,27 +24,23 @@
namespace LL
{
+
+/*****************************************************************************
+* WorkQueueBase: API for WorkQueue and WorkSchedule
+*****************************************************************************/
/**
* A typical WorkQueue has a string name that can be used to find it.
*/
- class WorkQueue: public LLInstanceTracker<WorkQueue, std::string>
+ class WorkQueueBase: public LLInstanceTracker<WorkQueueBase, std::string>
{
private:
- using super = LLInstanceTracker<WorkQueue, std::string>;
+ using super = LLInstanceTracker<WorkQueueBase, std::string>;
public:
using Work = std::function<void()>;
-
- private:
- using Queue = ThreadSafeSchedule<Work>;
- // helper for postEvery()
- template <typename Rep, typename Period, typename CALLABLE>
- class BackJack;
-
- public:
- using TimePoint = Queue::TimePoint;
- using TimedWork = Queue::TimeTuple;
- using Closed = Queue::Closed;
+ using Closed = LLThreadSafeQueueInterrupt;
+ // for runFor()
+ using TimePoint = std::chrono::steady_clock::time_point;
struct Error: public LLException
{
@@ -51,18 +48,18 @@ namespace LL
};
/**
- * You may omit the WorkQueue name, in which case a unique name is
+ * You may omit the WorkQueueBase name, in which case a unique name is
* synthesized; for practical purposes that makes it anonymous.
*/
- WorkQueue(const std::string& name = std::string(), size_t capacity=1024);
+ WorkQueueBase(const std::string& name);
/**
* Since the point of WorkQueue is to pass work to some other worker
- * thread(s) asynchronously, it's important that the WorkQueue continue
- * to exist until the worker thread(s) have drained it. To communicate
- * that it's time for them to quit, close() the queue.
+ * thread(s) asynchronously, it's important that it continue to exist
+ * until the worker thread(s) have drained it. To communicate that
+ * it's time for them to quit, close() the queue.
*/
- void close();
+ virtual void close() = 0;
/**
* WorkQueue supports multiple producers and multiple consumers. In
@@ -78,158 +75,60 @@ namespace LL
* * If you're the only consumer, noticing that size() > 0 is
* meaningful.
*/
- size_t size();
+ virtual size_t size() = 0;
/// producer end: are we prevented from pushing any additional items?
- bool isClosed();
+ virtual bool isClosed() = 0;
/// consumer end: are we done, is the queue entirely drained?
- bool done();
+ virtual bool done() = 0;
/*---------------------- fire and forget API -----------------------*/
- /// fire-and-forget, but at a particular (future?) time
- template <typename CALLABLE>
- void post(const TimePoint& time, CALLABLE&& callable)
- {
- // Defer reifying an arbitrary CALLABLE until we hit this or
- // postIfOpen(). All other methods should accept CALLABLEs of
- // arbitrary type to avoid multiple levels of std::function
- // indirection.
- mQueue.push(TimedWork(time, std::move(callable)));
- }
-
/// fire-and-forget
- template <typename CALLABLE>
- void post(CALLABLE&& callable)
- {
- // We use TimePoint::clock::now() instead of TimePoint's
- // representation of the epoch because this WorkQueue may contain
- // a mix of past-due TimedWork items and TimedWork items scheduled
- // for the future. Sift this new item into the correct place.
- post(TimePoint::clock::now(), std::move(callable));
- }
-
- /**
- * post work for a particular time, unless the queue is closed before
- * we can post
- */
- template <typename CALLABLE>
- bool postIfOpen(const TimePoint& time, CALLABLE&& callable)
- {
- // Defer reifying an arbitrary CALLABLE until we hit this or
- // post(). All other methods should accept CALLABLEs of arbitrary
- // type to avoid multiple levels of std::function indirection.
- return mQueue.pushIfOpen(TimedWork(time, std::move(callable)));
- }
+ virtual void post(const Work&) = 0;
/**
* post work, unless the queue is closed before we can post
*/
- template <typename CALLABLE>
- bool postIfOpen(CALLABLE&& callable)
- {
- return postIfOpen(TimePoint::clock::now(), std::move(callable));
- }
+ virtual bool postIfOpen(const Work&) = 0;
/**
- * Post work to be run at a specified time to another WorkQueue, which
- * may or may not still exist and be open. Return true if we were able
- * to post.
+ * post work, unless the queue is full
*/
- template <typename CALLABLE>
- static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable);
+ virtual bool tryPost(const Work&) = 0;
/**
* Post work to another WorkQueue, which may or may not still exist
- * and be open. Return true if we were able to post.
- */
- template <typename CALLABLE>
- static bool postMaybe(weak_t target, CALLABLE&& callable)
- {
- return postMaybe(target, TimePoint::clock::now(),
- std::forward<CALLABLE>(callable));
- }
-
- /**
- * Launch a callable returning bool that will trigger repeatedly at
- * specified interval, until the callable returns false.
- *
- * If you need to signal that callable from outside, DO NOT bind a
- * reference to a simple bool! That's not thread-safe. Instead, bind
- * an LLCond variant, e.g. LLOneShotCond or LLBoolCond.
+ * and be open. Support any post() overload. Return true if we were
+ * able to post.
*/
- template <typename Rep, typename Period, typename CALLABLE>
- void postEvery(const std::chrono::duration<Rep, Period>& interval,
- CALLABLE&& callable);
-
- template <typename CALLABLE>
- bool tryPost(const TimePoint& time, CALLABLE&& callable)
- {
- return mQueue.tryPush(TimedWork(time, std::move(callable)));
- }
-
- template <typename CALLABLE>
- bool tryPost(CALLABLE&& callable)
- {
- return mQueue.tryPush(TimePoint::clock::now(), std::move(callable));
- }
+ template <typename... ARGS>
+ static bool postMaybe(weak_t target, ARGS&&... args);
/*------------------------- handshake API --------------------------*/
/**
- * Post work to another WorkQueue to be run at a specified time,
- * requesting a specific callback to be run on this WorkQueue on
- * completion.
- *
- * Returns true if able to post, false if the other WorkQueue is
- * inaccessible.
- */
- // Apparently some Microsoft header file defines a macro CALLBACK? The
- // natural template argument name CALLBACK produces very weird Visual
- // Studio compile errors that seem utterly unrelated to this source
- // code.
- template <typename CALLABLE, typename FOLLOWUP>
- bool postTo(weak_t target,
- const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback);
-
- /**
* Post work to another WorkQueue, requesting a specific callback to
- * be run on this WorkQueue on completion.
+ * be run on this WorkQueue on completion. Optional final argument is
+ * TimePoint for WorkSchedule.
*
* Returns true if able to post, false if the other WorkQueue is
* inaccessible.
*/
- template <typename CALLABLE, typename FOLLOWUP>
- bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback)
- {
- return postTo(target, TimePoint::clock::now(),
- std::move(callable), std::move(callback));
- }
-
- /**
- * Post work to another WorkQueue to be run at a specified time,
- * blocking the calling coroutine until then, returning the result to
- * caller on completion.
- *
- * In general, we assume that each thread's default coroutine is busy
- * servicing its WorkQueue or whatever. To try to prevent mistakes, we
- * forbid calling waitForResult() from a thread's default coroutine.
- */
- template <typename CALLABLE>
- auto waitForResult(const TimePoint& time, CALLABLE&& callable);
+ template <typename CALLABLE, typename FOLLOWUP, typename... ARGS>
+ bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback,
+ ARGS&&... args);
/**
* Post work to another WorkQueue, blocking the calling coroutine
- * until then, returning the result to caller on completion.
+ * until then, returning the result to caller on completion. Optional
+ * final argument is TimePoint for WorkSchedule.
*
* In general, we assume that each thread's default coroutine is busy
* servicing its WorkQueue or whatever. To try to prevent mistakes, we
* forbid calling waitForResult() from a thread's default coroutine.
*/
- template <typename CALLABLE>
- auto waitForResult(CALLABLE&& callable)
- {
- return waitForResult(TimePoint::clock::now(), std::move(callable));
- }
+ template <typename CALLABLE, typename... ARGS>
+ auto waitForResult(CALLABLE&& callable, ARGS&&... args);
/*--------------------------- worker API ---------------------------*/
@@ -276,7 +175,7 @@ namespace LL
*/
bool runUntil(const TimePoint& until);
- private:
+ protected:
template <typename CALLABLE, typename FOLLOWUP>
static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback);
/// general case: arbitrary C++ return type
@@ -296,13 +195,179 @@ namespace LL
static void checkCoroutine(const std::string& method);
static void error(const std::string& msg);
static std::string makeName(const std::string& name);
- void callWork(const Queue::DataTuple& work);
void callWork(const Work& work);
+
+ private:
+ virtual Work pop_() = 0;
+ virtual bool tryPop_(Work&) = 0;
+ };
+
+/*****************************************************************************
+* WorkQueue: no timestamped task support
+*****************************************************************************/
+ class WorkQueue: public LLInstanceTrackerSubclass<WorkQueue, WorkQueueBase>
+ {
+ private:
+ using super = LLInstanceTrackerSubclass<WorkQueue, WorkQueueBase>;
+
+ public:
+ /**
+ * You may omit the WorkQueue name, in which case a unique name is
+ * synthesized; for practical purposes that makes it anonymous.
+ */
+ WorkQueue(const std::string& name = std::string(), size_t capacity=1024);
+
+ /**
+ * Since the point of WorkQueue is to pass work to some other worker
+ * thread(s) asynchronously, it's important that it continue to exist
+ * until the worker thread(s) have drained it. To communicate that
+ * it's time for them to quit, close() the queue.
+ */
+ void close() override;
+
+ /**
+ * WorkQueue supports multiple producers and multiple consumers. In
+ * the general case it's misleading to test size(), since any other
+ * thread might change it the nanosecond the lock is released. On that
+ * basis, some might argue against publishing a size() method at all.
+ *
+ * But there are two specific cases in which a test based on size()
+ * might be reasonable:
+ *
+ * * If you're the only producer, noticing that size() == 0 is
+ * meaningful.
+ * * If you're the only consumer, noticing that size() > 0 is
+ * meaningful.
+ */
+ size_t size() override;
+ /// producer end: are we prevented from pushing any additional items?
+ bool isClosed() override;
+ /// consumer end: are we done, is the queue entirely drained?
+ bool done() override;
+
+ /*---------------------- fire and forget API -----------------------*/
+
+ /// fire-and-forget
+ void post(const Work&) override;
+
+ /**
+ * post work, unless the queue is closed before we can post
+ */
+ bool postIfOpen(const Work&) override;
+
+ /**
+ * post work, unless the queue is full
+ */
+ bool tryPost(const Work&) override;
+
+ private:
+ using Queue = LLThreadSafeQueue<Work>;
Queue mQueue;
+
+ Work pop_() override;
+ bool tryPop_(Work&) override;
+ };
+
+/*****************************************************************************
+* WorkSchedule: add support for timestamped tasks
+*****************************************************************************/
+ class WorkSchedule: public LLInstanceTrackerSubclass<WorkSchedule, WorkQueueBase>
+ {
+ private:
+ using super = LLInstanceTrackerSubclass<WorkSchedule, WorkQueueBase>;
+ using Queue = ThreadSafeSchedule<Work>;
+ // helper for postEvery()
+ template <typename Rep, typename Period, typename CALLABLE>
+ class BackJack;
+
+ public:
+ using TimePoint = Queue::TimePoint;
+ using TimedWork = Queue::TimeTuple;
+
+ /**
+ * You may omit the WorkSchedule name, in which case a unique name is
+ * synthesized; for practical purposes that makes it anonymous.
+ */
+ WorkSchedule(const std::string& name = std::string(), size_t capacity=1024);
+
+ /**
+ * Since the point of WorkSchedule is to pass work to some other worker
+ * thread(s) asynchronously, it's important that the WorkSchedule continue
+ * to exist until the worker thread(s) have drained it. To communicate
+ * that it's time for them to quit, close() the queue.
+ */
+ void close() override;
+
+ /**
+ * WorkSchedule supports multiple producers and multiple consumers. In
+ * the general case it's misleading to test size(), since any other
+ * thread might change it the nanosecond the lock is released. On that
+ * basis, some might argue against publishing a size() method at all.
+ *
+ * But there are two specific cases in which a test based on size()
+ * might be reasonable:
+ *
+ * * If you're the only producer, noticing that size() == 0 is
+ * meaningful.
+ * * If you're the only consumer, noticing that size() > 0 is
+ * meaningful.
+ */
+ size_t size() override;
+ /// producer end: are we prevented from pushing any additional items?
+ bool isClosed() override;
+ /// consumer end: are we done, is the queue entirely drained?
+ bool done() override;
+
+ /*---------------------- fire and forget API -----------------------*/
+
+ /// fire-and-forget
+ void post(const Work& callable) override;
+
+ /// fire-and-forget, but at a particular (future?) time
+ void post(const Work& callable, const TimePoint& time);
+
+ /**
+ * post work, unless the queue is closed before we can post
+ */
+ bool postIfOpen(const Work& callable) override;
+
+ /**
+ * post work for a particular time, unless the queue is closed before
+ * we can post
+ */
+ bool postIfOpen(const Work& callable, const TimePoint& time);
+
+ /**
+ * post work, unless the queue is full
+ */
+ bool tryPost(const Work& callable) override;
+
+ /**
+ * post work for a particular time, unless the queue is full
+ */
+ bool tryPost(const Work& callable, const TimePoint& time);
+
+ /**
+ * Launch a callable returning bool that will trigger repeatedly at
+ * specified interval, until the callable returns false.
+ *
+ * If you need to signal that callable from outside, DO NOT bind a
+ * reference to a simple bool! That's not thread-safe. Instead, bind
+ * an LLCond variant, e.g. LLOneShotCond or LLBoolCond.
+ */
+ template <typename Rep, typename Period, typename CALLABLE>
+ void postEvery(const std::chrono::duration<Rep, Period>& interval,
+ CALLABLE&& callable);
+
+ private:
+ Queue mQueue;
+
+ Work pop_() override;
+ bool tryPop_(Work&) override;
};
/**
- * BackJack is, in effect, a hand-rolled lambda, binding a WorkQueue, a
+ * BackJack is, in effect, a hand-rolled lambda, binding a WorkSchedule, a
* CALLABLE that returns bool, a TimePoint and an interval at which to
* relaunch it. As long as the callable continues returning true, BackJack
* keeps resubmitting it to the target WorkQueue.
@@ -311,7 +376,7 @@ namespace LL
// class method gets its own 'this' pointer -- which we need to resubmit
// the whole BackJack callable.
template <typename Rep, typename Period, typename CALLABLE>
- class WorkQueue::BackJack
+ class WorkSchedule::BackJack
{
public:
// bind the desired data
@@ -325,9 +390,10 @@ namespace LL
mCallable(std::move(callable))
{}
- // Call by target WorkQueue -- note that although WE require a
- // callable returning bool, WorkQueue wants a void callable. We
- // consume the bool.
+ // This operator() method, called by target WorkSchedule, is what
+ // makes this object a Work item. Although WE require a callable
+ // returning bool, WorkSchedule wants a void callable. We consume the
+ // bool.
void operator()()
{
// If mCallable() throws an exception, don't catch it here: if it
@@ -343,7 +409,7 @@ namespace LL
// register our intent to fire at exact mIntervals.
mStart += mInterval;
- // We're being called at this moment by the target WorkQueue.
+ // We're being called at this moment by the target WorkSchedule.
// Assume it still exists, rather than checking the result of
// lock().
// Resubmit the whole *this callable: that's why we're a class
@@ -353,7 +419,8 @@ namespace LL
// moved-from.
try
{
- mTarget.lock()->post(mStart, std::move(*this));
+ std::dynamic_pointer_cast<WorkSchedule>(mTarget.lock())->
+ post(std::move(*this), mStart);
}
catch (const Closed&)
{
@@ -370,8 +437,8 @@ namespace LL
};
template <typename Rep, typename Period, typename CALLABLE>
- void WorkQueue::postEvery(const std::chrono::duration<Rep, Period>& interval,
- CALLABLE&& callable)
+ void WorkSchedule::postEvery(const std::chrono::duration<Rep, Period>& interval,
+ CALLABLE&& callable)
{
if (interval.count() <= 0)
{
@@ -394,7 +461,7 @@ namespace LL
/// general case: arbitrary C++ return type
template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE>
- struct WorkQueue::MakeReplyLambda
+ struct WorkQueueBase::MakeReplyLambda
{
auto operator()(CALLABLE&& callable, FOLLOWUP&& callback)
{
@@ -415,7 +482,7 @@ namespace LL
/// specialize for CALLABLE returning void
template <typename CALLABLE, typename FOLLOWUP>
- struct WorkQueue::MakeReplyLambda<CALLABLE, FOLLOWUP, void>
+ struct WorkQueueBase::MakeReplyLambda<CALLABLE, FOLLOWUP, void>
{
auto operator()(CALLABLE&& callable, FOLLOWUP&& callback)
{
@@ -427,16 +494,16 @@ namespace LL
};
template <typename CALLABLE, typename FOLLOWUP>
- auto WorkQueue::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback)
+ auto WorkQueueBase::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback)
{
return MakeReplyLambda<CALLABLE, FOLLOWUP,
decltype(std::forward<CALLABLE>(callable)())>()
(std::move(callable), std::move(callback));
}
- template <typename CALLABLE, typename FOLLOWUP>
- bool WorkQueue::postTo(weak_t target,
- const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback)
+ template <typename CALLABLE, typename FOLLOWUP, typename... ARGS>
+ bool WorkQueueBase::postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback,
+ ARGS&&... args)
{
LL_PROFILE_ZONE_SCOPED;
// We're being asked to post to the WorkQueue at target.
@@ -450,12 +517,11 @@ namespace LL
// lambda that packages our callable, our callback and a weak_ptr
// to this originating WorkQueue.
tptr->post(
- time,
[reply = super::getWeak(),
callable = std::move(callable),
callback = std::move(callback)]
- ()
- mutable {
+ () mutable
+ {
// Use postMaybe() below in case this originating WorkQueue
// has been closed or destroyed. Remember, the outer lambda is
// now running on a thread servicing the target WorkQueue, and
@@ -478,14 +544,16 @@ namespace LL
// originating WorkQueue. Once there, rethrow it.
[exc = std::current_exception()](){ std::rethrow_exception(exc); });
}
- });
+ },
+ // if caller passed a TimePoint, pass it along to post()
+ std::forward<ARGS>(args)...);
// looks like we were able to post()
return true;
}
- template <typename CALLABLE>
- bool WorkQueue::postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable)
+ template <typename... ARGS>
+ bool WorkQueueBase::postMaybe(weak_t target, ARGS&&... args)
{
LL_PROFILE_ZONE_SCOPED;
// target is a weak_ptr: have to lock it to check it
@@ -494,7 +562,7 @@ namespace LL
{
try
{
- tptr->post(time, std::forward<CALLABLE>(callable));
+ tptr->post(std::forward<ARGS>(args)...);
// we were able to post()
return true;
}
@@ -509,13 +577,13 @@ namespace LL
/// general case: arbitrary C++ return type
template <typename CALLABLE, typename RETURNTYPE>
- struct WorkQueue::WaitForResult
+ struct WorkQueueBase::WaitForResult
{
- auto operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable)
+ template <typename... ARGS>
+ auto operator()(WorkQueueBase* self, CALLABLE&& callable, ARGS&&... args)
{
LLCoros::Promise<RETURNTYPE> promise;
self->post(
- time,
// We dare to bind a reference to Promise because it's
// specifically designed for cross-thread communication.
[&promise, callable = std::move(callable)]()
@@ -529,7 +597,9 @@ namespace LL
{
promise.set_exception(std::current_exception());
}
- });
+ },
+ // if caller passed a TimePoint, pass it to post()
+ std::forward<ARGS>(args)...);
auto future{ LLCoros::getFuture(promise) };
// now, on the calling thread, wait for that result
LLCoros::TempStatus st("waiting for WorkQueue::waitForResult()");
@@ -539,13 +609,13 @@ namespace LL
/// specialize for CALLABLE returning void
template <typename CALLABLE>
- struct WorkQueue::WaitForResult<CALLABLE, void>
+ struct WorkQueueBase::WaitForResult<CALLABLE, void>
{
- void operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable)
+ template <typename... ARGS>
+ void operator()(WorkQueueBase* self, CALLABLE&& callable, ARGS&&... args)
{
LLCoros::Promise<void> promise;
self->post(
- time,
// &promise is designed for cross-thread access
[&promise, callable = std::move(callable)]()
mutable {
@@ -558,7 +628,9 @@ namespace LL
{
promise.set_exception(std::current_exception());
}
- });
+ },
+ // if caller passed a TimePoint, pass it to post()
+ std::forward<ARGS>(args)...);
auto future{ LLCoros::getFuture(promise) };
// block until set_value()
LLCoros::TempStatus st("waiting for void WorkQueue::waitForResult()");
@@ -566,13 +638,13 @@ namespace LL
}
};
- template <typename CALLABLE>
- auto WorkQueue::waitForResult(const TimePoint& time, CALLABLE&& callable)
+ template <typename CALLABLE, typename... ARGS>
+ auto WorkQueueBase::waitForResult(CALLABLE&& callable, ARGS&&... args)
{
checkCoroutine("waitForResult()");
// derive callable's return type so we can specialize for void
return WaitForResult<CALLABLE, decltype(std::forward<CALLABLE>(callable)())>()
- (this, time, std::forward<CALLABLE>(callable));
+ (this, std::forward<CALLABLE>(callable), std::forward<ARGS>(args)...);
}
} // namespace LL