summaryrefslogtreecommitdiff
path: root/indra/llcommon/workqueue.cpp
diff options
context:
space:
mode:
authorNat Goodspeed <nat@lindenlab.com>2022-12-09 13:21:45 -0500
committerNat Goodspeed <nat@lindenlab.com>2022-12-09 13:21:45 -0500
commitfc424a0db90fd2d2e44e85a19750ad6eaa57b28a (patch)
treea6e6fff4723d085dd96e0e30bae6823aa65da5ec /indra/llcommon/workqueue.cpp
parent00478b1e7671cb109771a1ad4fb40d47d15ab756 (diff)
SL-18809: Add WorkSchedule; remove timestamps from WorkQueue.
For work queues that don't need timestamped tasks, eliminate the overhead of a priority queue ordered by timestamp. Timestamped task support moves to WorkSchedule. WorkQueue is a simpler queue that just waits for work. Both WorkQueue and WorkSchedule can be accessed via new WorkQueueBase API. Of course the WorkQueueBase API doesn't deal with timestamps, but a WorkSchedule can be accessed directly to post timestamped tasks and then handled normally (e.g. by ThreadPool) to run them. Most ThreadPool functionality migrates to new ThreadPoolBase class, with template subclass ThreadPoolUsing<WorkQueue> or ThreadPoolUsing<WorkSchedule> depending on need. ThreadPool is now an alias for ThreadPoolUsing<WorkQueue>. Importantly, ThreadPoolUsing::getQueue() delivers a reference to the specific queue subclass type, so you can post timestamped tasks on a queue retrieved from ThreadPoolUsing<WorkSchedule>::getQueue(). Since ThreadPool is no longer a simple class but an alias for a particular template specialization, introduce threadpool_fwd.h to forward-declare it. Recast workqueue_test.cpp to exercise WorkSchedule, since some of the tests are time-based. A future todo would be to exercise each applicable test with both WorkQueue and WorkSchedule.
Diffstat (limited to 'indra/llcommon/workqueue.cpp')
-rw-r--r--indra/llcommon/workqueue.cpp194
1 files changed, 148 insertions, 46 deletions
diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp
index eb06890468..83e0216ae7 100644
--- a/indra/llcommon/workqueue.cpp
+++ b/indra/llcommon/workqueue.cpp
@@ -26,83 +26,65 @@
using Mutex = LLCoros::Mutex;
using Lock = LLCoros::LockType;
-LL::WorkQueue::WorkQueue(const std::string& name, size_t capacity):
- super(makeName(name)),
- mQueue(capacity)
+/*****************************************************************************
+* WorkQueueBase
+*****************************************************************************/
+LL::WorkQueueBase::WorkQueueBase(const std::string& name):
+ super(makeName(name))
{
// TODO: register for "LLApp" events so we can implicitly close() on
// viewer shutdown.
}
-void LL::WorkQueue::close()
-{
- mQueue.close();
-}
-
-size_t LL::WorkQueue::size()
-{
- return mQueue.size();
-}
-
-bool LL::WorkQueue::isClosed()
-{
- return mQueue.isClosed();
-}
-
-bool LL::WorkQueue::done()
-{
- return mQueue.done();
-}
-
-void LL::WorkQueue::runUntilClose()
+void LL::WorkQueueBase::runUntilClose()
{
try
{
for (;;)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
- callWork(mQueue.pop());
+ callWork(pop_());
}
}
- catch (const Queue::Closed&)
+ catch (const Closed&)
{
}
}
-bool LL::WorkQueue::runPending()
+bool LL::WorkQueueBase::runPending()
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
- for (Work work; mQueue.tryPop(work); )
+ for (Work work; tryPop_(work); )
{
callWork(work);
}
- return ! mQueue.done();
+ return ! done();
}
-bool LL::WorkQueue::runOne()
+bool LL::WorkQueueBase::runOne()
{
Work work;
- if (mQueue.tryPop(work))
+ if (tryPop_(work))
{
callWork(work);
}
- return ! mQueue.done();
+ return ! done();
}
-bool LL::WorkQueue::runUntil(const TimePoint& until)
+bool LL::WorkQueueBase::runUntil(const TimePoint& until)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
// Should we subtract some slop to allow for typical Work execution time?
// How much slop?
// runUntil() is simply a time-bounded runPending().
- for (Work work; TimePoint::clock::now() < until && mQueue.tryPop(work); )
+ for (Work work; TimePoint::clock::now() < until && tryPop_(work); )
{
callWork(work);
}
- return ! mQueue.done();
+ return ! done();
}
-std::string LL::WorkQueue::makeName(const std::string& name)
+std::string LL::WorkQueueBase::makeName(const std::string& name)
{
if (! name.empty())
return name;
@@ -120,14 +102,7 @@ std::string LL::WorkQueue::makeName(const std::string& name)
return STRINGIZE("WorkQueue" << num);
}
-void LL::WorkQueue::callWork(const Queue::DataTuple& work)
-{
- // ThreadSafeSchedule::pop() always delivers a tuple, even when
- // there's only one data field per item, as for us.
- callWork(std::get<0>(work));
-}
-
-void LL::WorkQueue::callWork(const Work& work)
+void LL::WorkQueueBase::callWork(const Work& work)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
try
@@ -142,12 +117,12 @@ void LL::WorkQueue::callWork(const Work& work)
}
}
-void LL::WorkQueue::error(const std::string& msg)
+void LL::WorkQueueBase::error(const std::string& msg)
{
LL_ERRS("WorkQueue") << msg << LL_ENDL;
}
-void LL::WorkQueue::checkCoroutine(const std::string& method)
+void LL::WorkQueueBase::checkCoroutine(const std::string& method)
{
// By convention, the default coroutine on each thread has an empty name
// string. See also LLCoros::logname().
@@ -156,3 +131,130 @@ void LL::WorkQueue::checkCoroutine(const std::string& method)
LLTHROW(Error("Do not call " + method + " from a thread's default coroutine"));
}
}
+
+/*****************************************************************************
+* WorkQueue
+*****************************************************************************/
+LL::WorkQueue::WorkQueue(const std::string& name, size_t capacity):
+ super(name),
+ mQueue(capacity)
+{
+}
+
+void LL::WorkQueue::close()
+{
+ mQueue.close();
+}
+
+size_t LL::WorkQueue::size()
+{
+ return mQueue.size();
+}
+
+bool LL::WorkQueue::isClosed()
+{
+ return mQueue.isClosed();
+}
+
+bool LL::WorkQueue::done()
+{
+ return mQueue.done();
+}
+
+void LL::WorkQueue::post(const Work& callable)
+{
+ mQueue.push(callable);
+}
+
+bool LL::WorkQueue::postIfOpen(const Work& callable)
+{
+ return mQueue.pushIfOpen(callable);
+}
+
+bool LL::WorkQueue::tryPost(const Work& callable)
+{
+ return mQueue.tryPush(callable);
+}
+
+LL::WorkQueue::Work LL::WorkQueue::pop_()
+{
+ return mQueue.pop();
+}
+
+bool LL::WorkQueue::tryPop_(Work& work)
+{
+ return mQueue.tryPop(work);
+}
+
+/*****************************************************************************
+* WorkSchedule
+*****************************************************************************/
+LL::WorkSchedule::WorkSchedule(const std::string& name, size_t capacity):
+ super(name),
+ mQueue(capacity)
+{
+}
+
+void LL::WorkSchedule::close()
+{
+ mQueue.close();
+}
+
+size_t LL::WorkSchedule::size()
+{
+ return mQueue.size();
+}
+
+bool LL::WorkSchedule::isClosed()
+{
+ return mQueue.isClosed();
+}
+
+bool LL::WorkSchedule::done()
+{
+ return mQueue.done();
+}
+
+void LL::WorkSchedule::post(const Work& callable)
+{
+ // Use TimePoint::clock::now() instead of TimePoint's representation of
+ // the epoch because this WorkSchedule may contain a mix of past-due
+ // TimedWork items and TimedWork items scheduled for the future. Sift this
+ // new item into the correct place.
+ post(callable, TimePoint::clock::now());
+}
+
+void LL::WorkSchedule::post(const Work& callable, const TimePoint& time)
+{
+ mQueue.push(TimedWork(time, callable));
+}
+
+bool LL::WorkSchedule::postIfOpen(const Work& callable)
+{
+ return postIfOpen(callable, TimePoint::clock::now());
+}
+
+bool LL::WorkSchedule::postIfOpen(const Work& callable, const TimePoint& time)
+{
+ return mQueue.pushIfOpen(TimedWork(time, callable));
+}
+
+bool LL::WorkSchedule::tryPost(const Work& callable)
+{
+ return tryPost(callable, TimePoint::clock::now());
+}
+
+bool LL::WorkSchedule::tryPost(const Work& callable, const TimePoint& time)
+{
+ return mQueue.tryPush(TimedWork(time, callable));
+}
+
+LL::WorkSchedule::Work LL::WorkSchedule::pop_()
+{
+ return std::get<0>(mQueue.pop());
+}
+
+bool LL::WorkSchedule::tryPop_(Work& work)
+{
+ return mQueue.tryPop(work);
+}