summaryrefslogtreecommitdiff
path: root/indra/llcommon
diff options
context:
space:
mode:
authorNat Goodspeed <nat@lindenlab.com>2022-12-09 13:21:45 -0500
committerNat Goodspeed <nat@lindenlab.com>2022-12-09 13:21:45 -0500
commitfc424a0db90fd2d2e44e85a19750ad6eaa57b28a (patch)
treea6e6fff4723d085dd96e0e30bae6823aa65da5ec /indra/llcommon
parent00478b1e7671cb109771a1ad4fb40d47d15ab756 (diff)
SL-18809: Add WorkSchedule; remove timestamps from WorkQueue.
For work queues that don't need timestamped tasks, eliminate the overhead of a priority queue ordered by timestamp. Timestamped task support moves to WorkSchedule. WorkQueue is a simpler queue that just waits for work. Both WorkQueue and WorkSchedule can be accessed via new WorkQueueBase API. Of course the WorkQueueBase API doesn't deal with timestamps, but a WorkSchedule can be accessed directly to post timestamped tasks and then handled normally (e.g. by ThreadPool) to run them. Most ThreadPool functionality migrates to new ThreadPoolBase class, with template subclass ThreadPoolUsing<WorkQueue> or ThreadPoolUsing<WorkSchedule> depending on need. ThreadPool is now an alias for ThreadPoolUsing<WorkQueue>. Importantly, ThreadPoolUsing::getQueue() delivers a reference to the specific queue subclass type, so you can post timestamped tasks on a queue retrieved from ThreadPoolUsing<WorkSchedule>::getQueue(). Since ThreadPool is no longer a simple class but an alias for a particular template specialization, introduce threadpool_fwd.h to forward-declare it. Recast workqueue_test.cpp to exercise WorkSchedule, since some of the tests are time-based. A future todo would be to exercise each applicable test with both WorkQueue and WorkSchedule.
Diffstat (limited to 'indra/llcommon')
-rw-r--r--indra/llcommon/CMakeLists.txt1
-rw-r--r--indra/llcommon/tests/workqueue_test.cpp26
-rw-r--r--indra/llcommon/threadpool.cpp27
-rw-r--r--indra/llcommon/threadpool.h60
-rw-r--r--indra/llcommon/threadpool_fwd.h25
-rw-r--r--indra/llcommon/workqueue.cpp194
-rw-r--r--indra/llcommon/workqueue.h416
7 files changed, 493 insertions, 256 deletions
diff --git a/indra/llcommon/CMakeLists.txt b/indra/llcommon/CMakeLists.txt
index 0fe61108ff..96fdb1f924 100644
--- a/indra/llcommon/CMakeLists.txt
+++ b/indra/llcommon/CMakeLists.txt
@@ -262,6 +262,7 @@ set(llcommon_HEADER_FILES
stdtypes.h
stringize.h
threadpool.h
+ threadpool_fwd.h
threadsafeschedule.h
timer.h
tuple.h
diff --git a/indra/llcommon/tests/workqueue_test.cpp b/indra/llcommon/tests/workqueue_test.cpp
index 1d73f7aa0d..41aa858084 100644
--- a/indra/llcommon/tests/workqueue_test.cpp
+++ b/indra/llcommon/tests/workqueue_test.cpp
@@ -38,7 +38,7 @@ namespace tut
{
struct workqueue_data
{
- WorkQueue queue{"queue"};
+ WorkSchedule queue{"queue"};
};
typedef test_group<workqueue_data> workqueue_group;
typedef workqueue_group::object object;
@@ -49,8 +49,8 @@ namespace tut
{
set_test_name("name");
ensure_equals("didn't capture name", queue.getKey(), "queue");
- ensure("not findable", WorkQueue::getInstance("queue") == queue.getWeak().lock());
- WorkQueue q2;
+ ensure("not findable", WorkSchedule::getInstance("queue") == queue.getWeak().lock());
+ WorkSchedule q2;
ensure("has no name", LLStringUtil::startsWith(q2.getKey(), "WorkQueue"));
}
@@ -73,16 +73,16 @@ namespace tut
{
set_test_name("postEvery");
// record of runs
- using Shared = std::deque<WorkQueue::TimePoint>;
+ using Shared = std::deque<WorkSchedule::TimePoint>;
// This is an example of how to share data between the originator of
- // postEvery(work) and the work item itself, since usually a WorkQueue
+ // postEvery(work) and the work item itself, since usually a WorkSchedule
// is used to dispatch work to a different thread. Neither of them
// should call any of LLCond's wait methods: you don't want to stall
// either the worker thread or the originating thread (conventionally
// main). Use LLCond or a subclass even if all you want to do is
// signal the work item that it can quit; consider LLOneShotCond.
LLCond<Shared> data;
- auto start = WorkQueue::TimePoint::clock::now();
+ auto start = WorkSchedule::TimePoint::clock::now();
auto interval = 100ms;
queue.postEvery(
interval,
@@ -93,7 +93,7 @@ namespace tut
data.update_one(
[](Shared& data)
{
- data.push_back(WorkQueue::TimePoint::clock::now());
+ data.push_back(WorkSchedule::TimePoint::clock::now());
});
// by the 3rd call, return false to stop
return (++count < 3);
@@ -102,7 +102,7 @@ namespace tut
// postEvery() running, so run until we have exhausted the iterations
// or we time out waiting
for (auto finish = start + 10*interval;
- WorkQueue::TimePoint::clock::now() < finish &&
+ WorkSchedule::TimePoint::clock::now() < finish &&
data.get([](const Shared& data){ return data.size(); }) < 3; )
{
queue.runPending();
@@ -139,8 +139,8 @@ namespace tut
void object::test<4>()
{
set_test_name("postTo");
- WorkQueue main("main");
- auto qptr = WorkQueue::getInstance("queue");
+ WorkSchedule main("main");
+ auto qptr = WorkSchedule::getInstance("queue");
int result = 0;
main.postTo(
qptr,
@@ -171,8 +171,8 @@ namespace tut
void object::test<5>()
{
set_test_name("postTo with void return");
- WorkQueue main("main");
- auto qptr = WorkQueue::getInstance("queue");
+ WorkSchedule main("main");
+ auto qptr = WorkSchedule::getInstance("queue");
std::string observe;
main.postTo(
qptr,
@@ -194,7 +194,7 @@ namespace tut
std::string stored;
// Try to call waitForResult() on this thread's main coroutine. It
// should throw because the main coroutine must service the queue.
- auto what{ catch_what<WorkQueue::Error>(
+ auto what{ catch_what<WorkSchedule::Error>(
[this, &stored](){ stored = queue.waitForResult(
[](){ return "should throw"; }); }) };
ensure("lambda should not have run", stored.empty());
diff --git a/indra/llcommon/threadpool.cpp b/indra/llcommon/threadpool.cpp
index f49dd40a8b..856306d8f4 100644
--- a/indra/llcommon/threadpool.cpp
+++ b/indra/llcommon/threadpool.cpp
@@ -23,14 +23,15 @@
#include "llsd.h"
#include "stringize.h"
-LL::ThreadPool::ThreadPool(const std::string& name, size_t threads, size_t capacity):
+LL::ThreadPoolBase::ThreadPoolBase(const std::string& name, size_t threads,
+ WorkQueueBase* queue):
super(name),
- mQueue(name, capacity),
mName("ThreadPool:" + name),
- mThreadCount(getConfiguredWidth(name, threads))
+ mThreadCount(getConfiguredWidth(name, threads)),
+ mQueue(queue)
{}
-void LL::ThreadPool::start()
+void LL::ThreadPoolBase::start()
{
for (size_t i = 0; i < mThreadCount; ++i)
{
@@ -58,17 +59,17 @@ void LL::ThreadPool::start()
});
}
-LL::ThreadPool::~ThreadPool()
+LL::ThreadPoolBase::~ThreadPoolBase()
{
close();
}
-void LL::ThreadPool::close()
+void LL::ThreadPoolBase::close()
{
- if (! mQueue.isClosed())
+ if (! mQueue->isClosed())
{
LL_DEBUGS("ThreadPool") << mName << " closing queue and joining threads" << LL_ENDL;
- mQueue.close();
+ mQueue->close();
for (auto& pair: mThreads)
{
LL_DEBUGS("ThreadPool") << mName << " waiting on thread " << pair.first << LL_ENDL;
@@ -78,20 +79,20 @@ void LL::ThreadPool::close()
}
}
-void LL::ThreadPool::run(const std::string& name)
+void LL::ThreadPoolBase::run(const std::string& name)
{
LL_DEBUGS("ThreadPool") << name << " starting" << LL_ENDL;
run();
LL_DEBUGS("ThreadPool") << name << " stopping" << LL_ENDL;
}
-void LL::ThreadPool::run()
+void LL::ThreadPoolBase::run()
{
- mQueue.runUntilClose();
+ mQueue->runUntilClose();
}
//static
-size_t LL::ThreadPool::getConfiguredWidth(const std::string& name, size_t dft)
+size_t LL::ThreadPoolBase::getConfiguredWidth(const std::string& name, size_t dft)
{
LLSD poolSizes;
try
@@ -132,7 +133,7 @@ size_t LL::ThreadPool::getConfiguredWidth(const std::string& name, size_t dft)
}
//static
-size_t LL::ThreadPool::getWidth(const std::string& name, size_t dft)
+size_t LL::ThreadPoolBase::getWidth(const std::string& name, size_t dft)
{
auto instance{ getInstance(name) };
if (instance)
diff --git a/indra/llcommon/threadpool.h b/indra/llcommon/threadpool.h
index b49d511257..60f4a0ce1b 100644
--- a/indra/llcommon/threadpool.h
+++ b/indra/llcommon/threadpool.h
@@ -13,7 +13,9 @@
#if ! defined(LL_THREADPOOL_H)
#define LL_THREADPOOL_H
+#include "threadpool_fwd.h"
#include "workqueue.h"
+#include <memory> // std::unique_ptr
#include <string>
#include <thread>
#include <utility> // std::pair
@@ -22,26 +24,24 @@
namespace LL
{
- class ThreadPool: public LLInstanceTracker<ThreadPool, std::string>
+ class ThreadPoolBase: public LLInstanceTracker<ThreadPoolBase, std::string>
{
private:
- using super = LLInstanceTracker<ThreadPool, std::string>;
+ using super = LLInstanceTracker<ThreadPoolBase, std::string>;
+
public:
/**
- * Pass ThreadPool a string name. This can be used to look up the
+ * Pass ThreadPoolBase a string name. This can be used to look up the
* relevant WorkQueue.
*
* The number of threads you pass sets the compile-time default. But
* if the user has overridden the LLSD map in the "ThreadPoolSizes"
* setting with a key matching this ThreadPool name, that setting
* overrides this parameter.
- *
- * Pass an explicit capacity to limit the size of the queue.
- * Constraining the queue can cause a submitter to block. Do not
- * constrain any ThreadPool accepting work from the main thread.
*/
- ThreadPool(const std::string& name, size_t threads=1, size_t capacity=1024*1024);
- virtual ~ThreadPool();
+ ThreadPoolBase(const std::string& name, size_t threads,
+ WorkQueueBase* queue);
+ virtual ~ThreadPoolBase();
/**
* Launch the ThreadPool. Until this call, a constructed ThreadPool
@@ -59,8 +59,6 @@ namespace LL
std::string getName() const { return mName; }
size_t getWidth() const { return mThreads.size(); }
- /// obtain a non-const reference to the WorkQueue to post work to it
- WorkQueue& getQueue() { return mQueue; }
/**
* Override run() if you need special processing. The default run()
@@ -87,15 +85,53 @@ namespace LL
static
size_t getWidth(const std::string& name, size_t dft);
+ protected:
+ std::unique_ptr<WorkQueueBase> mQueue;
+
private:
void run(const std::string& name);
- WorkQueue mQueue;
std::string mName;
size_t mThreadCount;
std::vector<std::pair<std::string, std::thread>> mThreads;
};
+ /**
+ * Specialize with WorkQueue or, for timestamped tasks, WorkSchedule
+ */
+ template <class QUEUE>
+ struct ThreadPoolUsing: public ThreadPoolBase
+ {
+ using queue_t = QUEUE;
+
+ /**
+ * Pass ThreadPoolUsing a string name. This can be used to look up the
+ * relevant WorkQueue.
+ *
+ * The number of threads you pass sets the compile-time default. But
+ * if the user has overridden the LLSD map in the "ThreadPoolSizes"
+ * setting with a key matching this ThreadPool name, that setting
+ * overrides this parameter.
+ *
+ * Pass an explicit capacity to limit the size of the queue.
+ * Constraining the queue can cause a submitter to block. Do not
+ * constrain any ThreadPool accepting work from the main thread.
+ */
+ ThreadPoolUsing(const std::string& name, size_t threads=1, size_t capacity=1024*1024):
+ ThreadPoolBase(name, threads, new queue_t(name, capacity))
+ {}
+ ~ThreadPoolUsing() override {}
+
+ /**
+ * obtain a non-const reference to the specific WorkQueue subclass to
+ * post work to it
+ */
+ queue_t& getQueue() { return static_cast<queue_t&>(*mQueue); }
+ };
+
+ /// ThreadPool is shorthand for using the simpler WorkQueue
+ using ThreadPool = ThreadPoolUsing<WorkQueue>;
+
} // namespace LL
#endif /* ! defined(LL_THREADPOOL_H) */
diff --git a/indra/llcommon/threadpool_fwd.h b/indra/llcommon/threadpool_fwd.h
new file mode 100644
index 0000000000..1aa3c4a0e2
--- /dev/null
+++ b/indra/llcommon/threadpool_fwd.h
@@ -0,0 +1,25 @@
+/**
+ * @file threadpool_fwd.h
+ * @author Nat Goodspeed
+ * @date 2022-12-09
+ * @brief Forward declarations for ThreadPool et al.
+ *
+ * $LicenseInfo:firstyear=2022&license=viewerlgpl$
+ * Copyright (c) 2022, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+#if ! defined(LL_THREADPOOL_FWD_H)
+#define LL_THREADPOOL_FWD_H
+
+#include "workqueue.h"
+
+namespace LL
+{
+ template <class QUEUE>
+ struct ThreadPoolUsing;
+
+ using ThreadPool = ThreadPoolUsing<WorkQueue>;
+} // namespace LL
+
+#endif /* ! defined(LL_THREADPOOL_FWD_H) */
diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp
index eb06890468..83e0216ae7 100644
--- a/indra/llcommon/workqueue.cpp
+++ b/indra/llcommon/workqueue.cpp
@@ -26,83 +26,65 @@
using Mutex = LLCoros::Mutex;
using Lock = LLCoros::LockType;
-LL::WorkQueue::WorkQueue(const std::string& name, size_t capacity):
- super(makeName(name)),
- mQueue(capacity)
+/*****************************************************************************
+* WorkQueueBase
+*****************************************************************************/
+LL::WorkQueueBase::WorkQueueBase(const std::string& name):
+ super(makeName(name))
{
// TODO: register for "LLApp" events so we can implicitly close() on
// viewer shutdown.
}
-void LL::WorkQueue::close()
-{
- mQueue.close();
-}
-
-size_t LL::WorkQueue::size()
-{
- return mQueue.size();
-}
-
-bool LL::WorkQueue::isClosed()
-{
- return mQueue.isClosed();
-}
-
-bool LL::WorkQueue::done()
-{
- return mQueue.done();
-}
-
-void LL::WorkQueue::runUntilClose()
+void LL::WorkQueueBase::runUntilClose()
{
try
{
for (;;)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
- callWork(mQueue.pop());
+ callWork(pop_());
}
}
- catch (const Queue::Closed&)
+ catch (const Closed&)
{
}
}
-bool LL::WorkQueue::runPending()
+bool LL::WorkQueueBase::runPending()
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
- for (Work work; mQueue.tryPop(work); )
+ for (Work work; tryPop_(work); )
{
callWork(work);
}
- return ! mQueue.done();
+ return ! done();
}
-bool LL::WorkQueue::runOne()
+bool LL::WorkQueueBase::runOne()
{
Work work;
- if (mQueue.tryPop(work))
+ if (tryPop_(work))
{
callWork(work);
}
- return ! mQueue.done();
+ return ! done();
}
-bool LL::WorkQueue::runUntil(const TimePoint& until)
+bool LL::WorkQueueBase::runUntil(const TimePoint& until)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
// Should we subtract some slop to allow for typical Work execution time?
// How much slop?
// runUntil() is simply a time-bounded runPending().
- for (Work work; TimePoint::clock::now() < until && mQueue.tryPop(work); )
+ for (Work work; TimePoint::clock::now() < until && tryPop_(work); )
{
callWork(work);
}
- return ! mQueue.done();
+ return ! done();
}
-std::string LL::WorkQueue::makeName(const std::string& name)
+std::string LL::WorkQueueBase::makeName(const std::string& name)
{
if (! name.empty())
return name;
@@ -120,14 +102,7 @@ std::string LL::WorkQueue::makeName(const std::string& name)
return STRINGIZE("WorkQueue" << num);
}
-void LL::WorkQueue::callWork(const Queue::DataTuple& work)
-{
- // ThreadSafeSchedule::pop() always delivers a tuple, even when
- // there's only one data field per item, as for us.
- callWork(std::get<0>(work));
-}
-
-void LL::WorkQueue::callWork(const Work& work)
+void LL::WorkQueueBase::callWork(const Work& work)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
try
@@ -142,12 +117,12 @@ void LL::WorkQueue::callWork(const Work& work)
}
}
-void LL::WorkQueue::error(const std::string& msg)
+void LL::WorkQueueBase::error(const std::string& msg)
{
LL_ERRS("WorkQueue") << msg << LL_ENDL;
}
-void LL::WorkQueue::checkCoroutine(const std::string& method)
+void LL::WorkQueueBase::checkCoroutine(const std::string& method)
{
// By convention, the default coroutine on each thread has an empty name
// string. See also LLCoros::logname().
@@ -156,3 +131,130 @@ void LL::WorkQueue::checkCoroutine(const std::string& method)
LLTHROW(Error("Do not call " + method + " from a thread's default coroutine"));
}
}
+
+/*****************************************************************************
+* WorkQueue
+*****************************************************************************/
+LL::WorkQueue::WorkQueue(const std::string& name, size_t capacity):
+ super(name),
+ mQueue(capacity)
+{
+}
+
+void LL::WorkQueue::close()
+{
+ mQueue.close();
+}
+
+size_t LL::WorkQueue::size()
+{
+ return mQueue.size();
+}
+
+bool LL::WorkQueue::isClosed()
+{
+ return mQueue.isClosed();
+}
+
+bool LL::WorkQueue::done()
+{
+ return mQueue.done();
+}
+
+void LL::WorkQueue::post(const Work& callable)
+{
+ mQueue.push(callable);
+}
+
+bool LL::WorkQueue::postIfOpen(const Work& callable)
+{
+ return mQueue.pushIfOpen(callable);
+}
+
+bool LL::WorkQueue::tryPost(const Work& callable)
+{
+ return mQueue.tryPush(callable);
+}
+
+LL::WorkQueue::Work LL::WorkQueue::pop_()
+{
+ return mQueue.pop();
+}
+
+bool LL::WorkQueue::tryPop_(Work& work)
+{
+ return mQueue.tryPop(work);
+}
+
+/*****************************************************************************
+* WorkSchedule
+*****************************************************************************/
+LL::WorkSchedule::WorkSchedule(const std::string& name, size_t capacity):
+ super(name),
+ mQueue(capacity)
+{
+}
+
+void LL::WorkSchedule::close()
+{
+ mQueue.close();
+}
+
+size_t LL::WorkSchedule::size()
+{
+ return mQueue.size();
+}
+
+bool LL::WorkSchedule::isClosed()
+{
+ return mQueue.isClosed();
+}
+
+bool LL::WorkSchedule::done()
+{
+ return mQueue.done();
+}
+
+void LL::WorkSchedule::post(const Work& callable)
+{
+ // Use TimePoint::clock::now() instead of TimePoint's representation of
+ // the epoch because this WorkSchedule may contain a mix of past-due
+ // TimedWork items and TimedWork items scheduled for the future. Sift this
+ // new item into the correct place.
+ post(callable, TimePoint::clock::now());
+}
+
+void LL::WorkSchedule::post(const Work& callable, const TimePoint& time)
+{
+ mQueue.push(TimedWork(time, callable));
+}
+
+bool LL::WorkSchedule::postIfOpen(const Work& callable)
+{
+ return postIfOpen(callable, TimePoint::clock::now());
+}
+
+bool LL::WorkSchedule::postIfOpen(const Work& callable, const TimePoint& time)
+{
+ return mQueue.pushIfOpen(TimedWork(time, callable));
+}
+
+bool LL::WorkSchedule::tryPost(const Work& callable)
+{
+ return tryPost(callable, TimePoint::clock::now());
+}
+
+bool LL::WorkSchedule::tryPost(const Work& callable, const TimePoint& time)
+{
+ return mQueue.tryPush(TimedWork(time, callable));
+}
+
+LL::WorkSchedule::Work LL::WorkSchedule::pop_()
+{
+ return std::get<0>(mQueue.pop());
+}
+
+bool LL::WorkSchedule::tryPop_(Work& work)
+{
+ return mQueue.tryPop(work);
+}
diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h
index 28a0b5e040..eea8886a7a 100644
--- a/indra/llcommon/workqueue.h
+++ b/indra/llcommon/workqueue.h
@@ -15,6 +15,7 @@
#include "llcoros.h"
#include "llexception.h"
#include "llinstancetracker.h"
+#include "llinstancetrackersubclass.h"
#include "threadsafeschedule.h"
#include <chrono>
#include <exception> // std::current_exception
@@ -23,27 +24,23 @@
namespace LL
{
+
+/*****************************************************************************
+* WorkQueueBase: API for WorkQueue and WorkSchedule
+*****************************************************************************/
/**
* A typical WorkQueue has a string name that can be used to find it.
*/
- class WorkQueue: public LLInstanceTracker<WorkQueue, std::string>
+ class WorkQueueBase: public LLInstanceTracker<WorkQueueBase, std::string>
{
private:
- using super = LLInstanceTracker<WorkQueue, std::string>;
+ using super = LLInstanceTracker<WorkQueueBase, std::string>;
public:
using Work = std::function<void()>;
-
- private:
- using Queue = ThreadSafeSchedule<Work>;
- // helper for postEvery()
- template <typename Rep, typename Period, typename CALLABLE>
- class BackJack;
-
- public:
- using TimePoint = Queue::TimePoint;
- using TimedWork = Queue::TimeTuple;
- using Closed = Queue::Closed;
+ using Closed = LLThreadSafeQueueInterrupt;
+ // for runFor()
+ using TimePoint = std::chrono::steady_clock::time_point;
struct Error: public LLException
{
@@ -51,18 +48,18 @@ namespace LL
};
/**
- * You may omit the WorkQueue name, in which case a unique name is
+ * You may omit the WorkQueueBase name, in which case a unique name is
* synthesized; for practical purposes that makes it anonymous.
*/
- WorkQueue(const std::string& name = std::string(), size_t capacity=1024);
+ WorkQueueBase(const std::string& name);
/**
* Since the point of WorkQueue is to pass work to some other worker
- * thread(s) asynchronously, it's important that the WorkQueue continue
- * to exist until the worker thread(s) have drained it. To communicate
- * that it's time for them to quit, close() the queue.
+ * thread(s) asynchronously, it's important that it continue to exist
+ * until the worker thread(s) have drained it. To communicate that
+ * it's time for them to quit, close() the queue.
*/
- void close();
+ virtual void close() = 0;
/**
* WorkQueue supports multiple producers and multiple consumers. In
@@ -78,158 +75,60 @@ namespace LL
* * If you're the only consumer, noticing that size() > 0 is
* meaningful.
*/
- size_t size();
+ virtual size_t size() = 0;
/// producer end: are we prevented from pushing any additional items?
- bool isClosed();
+ virtual bool isClosed() = 0;
/// consumer end: are we done, is the queue entirely drained?
- bool done();
+ virtual bool done() = 0;
/*---------------------- fire and forget API -----------------------*/
- /// fire-and-forget, but at a particular (future?) time
- template <typename CALLABLE>
- void post(const TimePoint& time, CALLABLE&& callable)
- {
- // Defer reifying an arbitrary CALLABLE until we hit this or
- // postIfOpen(). All other methods should accept CALLABLEs of
- // arbitrary type to avoid multiple levels of std::function
- // indirection.
- mQueue.push(TimedWork(time, std::move(callable)));
- }
-
/// fire-and-forget
- template <typename CALLABLE>
- void post(CALLABLE&& callable)
- {
- // We use TimePoint::clock::now() instead of TimePoint's
- // representation of the epoch because this WorkQueue may contain
- // a mix of past-due TimedWork items and TimedWork items scheduled
- // for the future. Sift this new item into the correct place.
- post(TimePoint::clock::now(), std::move(callable));
- }
-
- /**
- * post work for a particular time, unless the queue is closed before
- * we can post
- */
- template <typename CALLABLE>
- bool postIfOpen(const TimePoint& time, CALLABLE&& callable)
- {
- // Defer reifying an arbitrary CALLABLE until we hit this or
- // post(). All other methods should accept CALLABLEs of arbitrary
- // type to avoid multiple levels of std::function indirection.
- return mQueue.pushIfOpen(TimedWork(time, std::move(callable)));
- }
+ virtual void post(const Work&) = 0;
/**
* post work, unless the queue is closed before we can post
*/
- template <typename CALLABLE>
- bool postIfOpen(CALLABLE&& callable)
- {
- return postIfOpen(TimePoint::clock::now(), std::move(callable));
- }
+ virtual bool postIfOpen(const Work&) = 0;
/**
- * Post work to be run at a specified time to another WorkQueue, which
- * may or may not still exist and be open. Return true if we were able
- * to post.
+ * post work, unless the queue is full
*/
- template <typename CALLABLE>
- static bool postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable);
+ virtual bool tryPost(const Work&) = 0;
/**
* Post work to another WorkQueue, which may or may not still exist
- * and be open. Return true if we were able to post.
- */
- template <typename CALLABLE>
- static bool postMaybe(weak_t target, CALLABLE&& callable)
- {
- return postMaybe(target, TimePoint::clock::now(),
- std::forward<CALLABLE>(callable));
- }
-
- /**
- * Launch a callable returning bool that will trigger repeatedly at
- * specified interval, until the callable returns false.
- *
- * If you need to signal that callable from outside, DO NOT bind a
- * reference to a simple bool! That's not thread-safe. Instead, bind
- * an LLCond variant, e.g. LLOneShotCond or LLBoolCond.
+ * and be open. Support any post() overload. Return true if we were
+ * able to post.
*/
- template <typename Rep, typename Period, typename CALLABLE>
- void postEvery(const std::chrono::duration<Rep, Period>& interval,
- CALLABLE&& callable);
-
- template <typename CALLABLE>
- bool tryPost(const TimePoint& time, CALLABLE&& callable)
- {
- return mQueue.tryPush(TimedWork(time, std::move(callable)));
- }
-
- template <typename CALLABLE>
- bool tryPost(CALLABLE&& callable)
- {
- return mQueue.tryPush(TimePoint::clock::now(), std::move(callable));
- }
+ template <typename... ARGS>
+ static bool postMaybe(weak_t target, ARGS&&... args);
/*------------------------- handshake API --------------------------*/
/**
- * Post work to another WorkQueue to be run at a specified time,
- * requesting a specific callback to be run on this WorkQueue on
- * completion.
- *
- * Returns true if able to post, false if the other WorkQueue is
- * inaccessible.
- */
- // Apparently some Microsoft header file defines a macro CALLBACK? The
- // natural template argument name CALLBACK produces very weird Visual
- // Studio compile errors that seem utterly unrelated to this source
- // code.
- template <typename CALLABLE, typename FOLLOWUP>
- bool postTo(weak_t target,
- const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback);
-
- /**
* Post work to another WorkQueue, requesting a specific callback to
- * be run on this WorkQueue on completion.
+ * be run on this WorkQueue on completion. Optional final argument is
+ * TimePoint for WorkSchedule.
*
* Returns true if able to post, false if the other WorkQueue is
* inaccessible.
*/
- template <typename CALLABLE, typename FOLLOWUP>
- bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback)
- {
- return postTo(target, TimePoint::clock::now(),
- std::move(callable), std::move(callback));
- }
-
- /**
- * Post work to another WorkQueue to be run at a specified time,
- * blocking the calling coroutine until then, returning the result to
- * caller on completion.
- *
- * In general, we assume that each thread's default coroutine is busy
- * servicing its WorkQueue or whatever. To try to prevent mistakes, we
- * forbid calling waitForResult() from a thread's default coroutine.
- */
- template <typename CALLABLE>
- auto waitForResult(const TimePoint& time, CALLABLE&& callable);
+ template <typename CALLABLE, typename FOLLOWUP, typename... ARGS>
+ bool postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback,
+ ARGS&&... args);
/**
* Post work to another WorkQueue, blocking the calling coroutine
- * until then, returning the result to caller on completion.
+ * until then, returning the result to caller on completion. Optional
+ * final argument is TimePoint for WorkSchedule.
*
* In general, we assume that each thread's default coroutine is busy
* servicing its WorkQueue or whatever. To try to prevent mistakes, we
* forbid calling waitForResult() from a thread's default coroutine.
*/
- template <typename CALLABLE>
- auto waitForResult(CALLABLE&& callable)
- {
- return waitForResult(TimePoint::clock::now(), std::move(callable));
- }
+ template <typename CALLABLE, typename... ARGS>
+ auto waitForResult(CALLABLE&& callable, ARGS&&... args);
/*--------------------------- worker API ---------------------------*/
@@ -276,7 +175,7 @@ namespace LL
*/
bool runUntil(const TimePoint& until);
- private:
+ protected:
template <typename CALLABLE, typename FOLLOWUP>
static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback);
/// general case: arbitrary C++ return type
@@ -296,13 +195,179 @@ namespace LL
static void checkCoroutine(const std::string& method);
static void error(const std::string& msg);
static std::string makeName(const std::string& name);
- void callWork(const Queue::DataTuple& work);
void callWork(const Work& work);
+
+ private:
+ virtual Work pop_() = 0;
+ virtual bool tryPop_(Work&) = 0;
+ };
+
+/*****************************************************************************
+* WorkQueue: no timestamped task support
+*****************************************************************************/
+ class WorkQueue: public LLInstanceTrackerSubclass<WorkQueue, WorkQueueBase>
+ {
+ private:
+ using super = LLInstanceTrackerSubclass<WorkQueue, WorkQueueBase>;
+
+ public:
+ /**
+ * You may omit the WorkQueue name, in which case a unique name is
+ * synthesized; for practical purposes that makes it anonymous.
+ */
+ WorkQueue(const std::string& name = std::string(), size_t capacity=1024);
+
+ /**
+ * Since the point of WorkQueue is to pass work to some other worker
+ * thread(s) asynchronously, it's important that it continue to exist
+ * until the worker thread(s) have drained it. To communicate that
+ * it's time for them to quit, close() the queue.
+ */
+ void close() override;
+
+ /**
+ * WorkQueue supports multiple producers and multiple consumers. In
+ * the general case it's misleading to test size(), since any other
+ * thread might change it the nanosecond the lock is released. On that
+ * basis, some might argue against publishing a size() method at all.
+ *
+ * But there are two specific cases in which a test based on size()
+ * might be reasonable:
+ *
+ * * If you're the only producer, noticing that size() == 0 is
+ * meaningful.
+ * * If you're the only consumer, noticing that size() > 0 is
+ * meaningful.
+ */
+ size_t size() override;
+ /// producer end: are we prevented from pushing any additional items?
+ bool isClosed() override;
+ /// consumer end: are we done, is the queue entirely drained?
+ bool done() override;
+
+ /*---------------------- fire and forget API -----------------------*/
+
+ /// fire-and-forget
+ void post(const Work&) override;
+
+ /**
+ * post work, unless the queue is closed before we can post
+ */
+ bool postIfOpen(const Work&) override;
+
+ /**
+ * post work, unless the queue is full
+ */
+ bool tryPost(const Work&) override;
+
+ private:
+ using Queue = LLThreadSafeQueue<Work>;
Queue mQueue;
+
+ Work pop_() override;
+ bool tryPop_(Work&) override;
+ };
+
+/*****************************************************************************
+* WorkSchedule: add support for timestamped tasks
+*****************************************************************************/
+ class WorkSchedule: public LLInstanceTrackerSubclass<WorkSchedule, WorkQueueBase>
+ {
+ private:
+ using super = LLInstanceTrackerSubclass<WorkSchedule, WorkQueueBase>;
+ using Queue = ThreadSafeSchedule<Work>;
+ // helper for postEvery()
+ template <typename Rep, typename Period, typename CALLABLE>
+ class BackJack;
+
+ public:
+ using TimePoint = Queue::TimePoint;
+ using TimedWork = Queue::TimeTuple;
+
+ /**
+ * You may omit the WorkSchedule name, in which case a unique name is
+ * synthesized; for practical purposes that makes it anonymous.
+ */
+ WorkSchedule(const std::string& name = std::string(), size_t capacity=1024);
+
+ /**
+ * Since the point of WorkSchedule is to pass work to some other worker
+ * thread(s) asynchronously, it's important that the WorkSchedule continue
+ * to exist until the worker thread(s) have drained it. To communicate
+ * that it's time for them to quit, close() the queue.
+ */
+ void close() override;
+
+ /**
+ * WorkSchedule supports multiple producers and multiple consumers. In
+ * the general case it's misleading to test size(), since any other
+ * thread might change it the nanosecond the lock is released. On that
+ * basis, some might argue against publishing a size() method at all.
+ *
+ * But there are two specific cases in which a test based on size()
+ * might be reasonable:
+ *
+ * * If you're the only producer, noticing that size() == 0 is
+ * meaningful.
+ * * If you're the only consumer, noticing that size() > 0 is
+ * meaningful.
+ */
+ size_t size() override;
+ /// producer end: are we prevented from pushing any additional items?
+ bool isClosed() override;
+ /// consumer end: are we done, is the queue entirely drained?
+ bool done() override;
+
+ /*---------------------- fire and forget API -----------------------*/
+
+ /// fire-and-forget
+ void post(const Work& callable) override;
+
+ /// fire-and-forget, but at a particular (future?) time
+ void post(const Work& callable, const TimePoint& time);
+
+ /**
+ * post work, unless the queue is closed before we can post
+ */
+ bool postIfOpen(const Work& callable) override;
+
+ /**
+ * post work for a particular time, unless the queue is closed before
+ * we can post
+ */
+ bool postIfOpen(const Work& callable, const TimePoint& time);
+
+ /**
+ * post work, unless the queue is full
+ */
+ bool tryPost(const Work& callable) override;
+
+ /**
+ * post work for a particular time, unless the queue is full
+ */
+ bool tryPost(const Work& callable, const TimePoint& time);
+
+ /**
+ * Launch a callable returning bool that will trigger repeatedly at
+ * specified interval, until the callable returns false.
+ *
+ * If you need to signal that callable from outside, DO NOT bind a
+ * reference to a simple bool! That's not thread-safe. Instead, bind
+ * an LLCond variant, e.g. LLOneShotCond or LLBoolCond.
+ */
+ template <typename Rep, typename Period, typename CALLABLE>
+ void postEvery(const std::chrono::duration<Rep, Period>& interval,
+ CALLABLE&& callable);
+
+ private:
+ Queue mQueue;
+
+ Work pop_() override;
+ bool tryPop_(Work&) override;
};
/**
- * BackJack is, in effect, a hand-rolled lambda, binding a WorkQueue, a
+ * BackJack is, in effect, a hand-rolled lambda, binding a WorkSchedule, a
* CALLABLE that returns bool, a TimePoint and an interval at which to
* relaunch it. As long as the callable continues returning true, BackJack
* keeps resubmitting it to the target WorkQueue.
@@ -311,7 +376,7 @@ namespace LL
// class method gets its own 'this' pointer -- which we need to resubmit
// the whole BackJack callable.
template <typename Rep, typename Period, typename CALLABLE>
- class WorkQueue::BackJack
+ class WorkSchedule::BackJack
{
public:
// bind the desired data
@@ -325,9 +390,10 @@ namespace LL
mCallable(std::move(callable))
{}
- // Call by target WorkQueue -- note that although WE require a
- // callable returning bool, WorkQueue wants a void callable. We
- // consume the bool.
+ // This operator() method, called by target WorkSchedule, is what
+ // makes this object a Work item. Although WE require a callable
+ // returning bool, WorkSchedule wants a void callable. We consume the
+ // bool.
void operator()()
{
// If mCallable() throws an exception, don't catch it here: if it
@@ -343,7 +409,7 @@ namespace LL
// register our intent to fire at exact mIntervals.
mStart += mInterval;
- // We're being called at this moment by the target WorkQueue.
+ // We're being called at this moment by the target WorkSchedule.
// Assume it still exists, rather than checking the result of
// lock().
// Resubmit the whole *this callable: that's why we're a class
@@ -353,7 +419,8 @@ namespace LL
// moved-from.
try
{
- mTarget.lock()->post(mStart, std::move(*this));
+ std::dynamic_pointer_cast<WorkSchedule>(mTarget.lock())->
+ post(std::move(*this), mStart);
}
catch (const Closed&)
{
@@ -370,8 +437,8 @@ namespace LL
};
template <typename Rep, typename Period, typename CALLABLE>
- void WorkQueue::postEvery(const std::chrono::duration<Rep, Period>& interval,
- CALLABLE&& callable)
+ void WorkSchedule::postEvery(const std::chrono::duration<Rep, Period>& interval,
+ CALLABLE&& callable)
{
if (interval.count() <= 0)
{
@@ -394,7 +461,7 @@ namespace LL
/// general case: arbitrary C++ return type
template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE>
- struct WorkQueue::MakeReplyLambda
+ struct WorkQueueBase::MakeReplyLambda
{
auto operator()(CALLABLE&& callable, FOLLOWUP&& callback)
{
@@ -415,7 +482,7 @@ namespace LL
/// specialize for CALLABLE returning void
template <typename CALLABLE, typename FOLLOWUP>
- struct WorkQueue::MakeReplyLambda<CALLABLE, FOLLOWUP, void>
+ struct WorkQueueBase::MakeReplyLambda<CALLABLE, FOLLOWUP, void>
{
auto operator()(CALLABLE&& callable, FOLLOWUP&& callback)
{
@@ -427,16 +494,16 @@ namespace LL
};
template <typename CALLABLE, typename FOLLOWUP>
- auto WorkQueue::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback)
+ auto WorkQueueBase::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback)
{
return MakeReplyLambda<CALLABLE, FOLLOWUP,
decltype(std::forward<CALLABLE>(callable)())>()
(std::move(callable), std::move(callback));
}
- template <typename CALLABLE, typename FOLLOWUP>
- bool WorkQueue::postTo(weak_t target,
- const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback)
+ template <typename CALLABLE, typename FOLLOWUP, typename... ARGS>
+ bool WorkQueueBase::postTo(weak_t target, CALLABLE&& callable, FOLLOWUP&& callback,
+ ARGS&&... args)
{
LL_PROFILE_ZONE_SCOPED;
// We're being asked to post to the WorkQueue at target.
@@ -450,12 +517,11 @@ namespace LL
// lambda that packages our callable, our callback and a weak_ptr
// to this originating WorkQueue.
tptr->post(
- time,
[reply = super::getWeak(),
callable = std::move(callable),
callback = std::move(callback)]
- ()
- mutable {
+ () mutable
+ {
// Use postMaybe() below in case this originating WorkQueue
// has been closed or destroyed. Remember, the outer lambda is
// now running on a thread servicing the target WorkQueue, and
@@ -478,14 +544,16 @@ namespace LL
// originating WorkQueue. Once there, rethrow it.
[exc = std::current_exception()](){ std::rethrow_exception(exc); });
}
- });
+ },
+ // if caller passed a TimePoint, pass it along to post()
+ std::forward<ARGS>(args)...);
// looks like we were able to post()
return true;
}
- template <typename CALLABLE>
- bool WorkQueue::postMaybe(weak_t target, const TimePoint& time, CALLABLE&& callable)
+ template <typename... ARGS>
+ bool WorkQueueBase::postMaybe(weak_t target, ARGS&&... args)
{
LL_PROFILE_ZONE_SCOPED;
// target is a weak_ptr: have to lock it to check it
@@ -494,7 +562,7 @@ namespace LL
{
try
{
- tptr->post(time, std::forward<CALLABLE>(callable));
+ tptr->post(std::forward<ARGS>(args)...);
// we were able to post()
return true;
}
@@ -509,13 +577,13 @@ namespace LL
/// general case: arbitrary C++ return type
template <typename CALLABLE, typename RETURNTYPE>
- struct WorkQueue::WaitForResult
+ struct WorkQueueBase::WaitForResult
{
- auto operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable)
+ template <typename... ARGS>
+ auto operator()(WorkQueueBase* self, CALLABLE&& callable, ARGS&&... args)
{
LLCoros::Promise<RETURNTYPE> promise;
self->post(
- time,
// We dare to bind a reference to Promise because it's
// specifically designed for cross-thread communication.
[&promise, callable = std::move(callable)]()
@@ -529,7 +597,9 @@ namespace LL
{
promise.set_exception(std::current_exception());
}
- });
+ },
+ // if caller passed a TimePoint, pass it to post()
+ std::forward<ARGS>(args)...);
auto future{ LLCoros::getFuture(promise) };
// now, on the calling thread, wait for that result
LLCoros::TempStatus st("waiting for WorkQueue::waitForResult()");
@@ -539,13 +609,13 @@ namespace LL
/// specialize for CALLABLE returning void
template <typename CALLABLE>
- struct WorkQueue::WaitForResult<CALLABLE, void>
+ struct WorkQueueBase::WaitForResult<CALLABLE, void>
{
- void operator()(WorkQueue* self, const TimePoint& time, CALLABLE&& callable)
+ template <typename... ARGS>
+ void operator()(WorkQueueBase* self, CALLABLE&& callable, ARGS&&... args)
{
LLCoros::Promise<void> promise;
self->post(
- time,
// &promise is designed for cross-thread access
[&promise, callable = std::move(callable)]()
mutable {
@@ -558,7 +628,9 @@ namespace LL
{
promise.set_exception(std::current_exception());
}
- });
+ },
+ // if caller passed a TimePoint, pass it to post()
+ std::forward<ARGS>(args)...);
auto future{ LLCoros::getFuture(promise) };
// block until set_value()
LLCoros::TempStatus st("waiting for void WorkQueue::waitForResult()");
@@ -566,13 +638,13 @@ namespace LL
}
};
- template <typename CALLABLE>
- auto WorkQueue::waitForResult(const TimePoint& time, CALLABLE&& callable)
+ template <typename CALLABLE, typename... ARGS>
+ auto WorkQueueBase::waitForResult(CALLABLE&& callable, ARGS&&... args)
{
checkCoroutine("waitForResult()");
// derive callable's return type so we can specialize for void
return WaitForResult<CALLABLE, decltype(std::forward<CALLABLE>(callable)())>()
- (this, time, std::forward<CALLABLE>(callable));
+ (this, std::forward<CALLABLE>(callable), std::forward<ARGS>(args)...);
}
} // namespace LL