summaryrefslogtreecommitdiff
path: root/indra
diff options
context:
space:
mode:
authorNat Goodspeed <nat@lindenlab.com>2021-10-07 15:32:51 -0400
committerNat Goodspeed <nat@lindenlab.com>2021-10-07 15:32:51 -0400
commit623ac79120a417ec445ce5c106a907fe46734309 (patch)
treed91b262dae1d4416d71ce78b7089aa1603961dc0 /indra
parent6e06d1db6045df2e4961243f379c4d7695a8190d (diff)
SL-16024: Add LL::WorkQueue for passing work items between threads.
A typical WorkQueue has a string name, which can be used to find it to post work to it. "Work" is a nullary callable. WorkQueue is a multi-producer, multi-consumer thread-safe queue: multiple threads can service the WorkQueue, multiple threads can post work to it. Work can be scheduled in the future by submitting with a timestamp. In addition, a given work item can be scheduled to run on a recurring basis. A requesting thread servicing a WorkQueue of its own, such as the viewer's main thread, can submit work to another WorkQueue along with a callback to be passed the result (of arbitrary type) of the first work item. The callback is posted to the originating WorkQueue, permitting safe data exchange between participating threads. Methods are provided for different kinds of servicing threads. runUntilClose() is useful for a simple worker thread. runFor(duration) devotes no more than a specified time slice to that WorkQueue, e.g. for use by the main thread.
Diffstat (limited to 'indra')
-rw-r--r--indra/llcommon/CMakeLists.txt3
-rw-r--r--indra/llcommon/tests/workqueue_test.cpp158
-rw-r--r--indra/llcommon/threadsafeschedule.h1
-rw-r--r--indra/llcommon/workqueue.cpp114
-rw-r--r--indra/llcommon/workqueue.h325
5 files changed, 601 insertions, 0 deletions
diff --git a/indra/llcommon/CMakeLists.txt b/indra/llcommon/CMakeLists.txt
index 5efcfabf24..a3dbb6d9d0 100644
--- a/indra/llcommon/CMakeLists.txt
+++ b/indra/llcommon/CMakeLists.txt
@@ -121,6 +121,7 @@ set(llcommon_SOURCE_FILES
llworkerthread.cpp
timing.cpp
u64.cpp
+ workqueue.cpp
StackWalker.cpp
)
@@ -258,6 +259,7 @@ set(llcommon_HEADER_FILES
timer.h
tuple.h
u64.h
+ workqueue.h
StackWalker.h
)
@@ -363,6 +365,7 @@ if (LL_TESTS)
LL_ADD_INTEGRATION_TEST(stringize "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(threadsafeschedule "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(tuple "" "${test_libs}")
+ LL_ADD_INTEGRATION_TEST(workqueue "" "${test_libs}")
## llexception_test.cpp isn't a regression test, and doesn't need to be run
## every build. It's to help a developer make implementation choices about
diff --git a/indra/llcommon/tests/workqueue_test.cpp b/indra/llcommon/tests/workqueue_test.cpp
new file mode 100644
index 0000000000..ab1cae6c14
--- /dev/null
+++ b/indra/llcommon/tests/workqueue_test.cpp
@@ -0,0 +1,158 @@
+/**
+ * @file workqueue_test.cpp
+ * @author Nat Goodspeed
+ * @date 2021-10-07
+ * @brief Test for workqueue.
+ *
+ * $LicenseInfo:firstyear=2021&license=viewerlgpl$
+ * Copyright (c) 2021, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+// Precompiled header
+#include "linden_common.h"
+// associated header
+#include "workqueue.h"
+// STL headers
+// std headers
+#include <chrono>
+#include <deque>
+// external library headers
+// other Linden headers
+#include "../test/lltut.h"
+#include "llcond.h"
+#include "llstring.h"
+#include "stringize.h"
+
+using namespace LL;
+using namespace std::literals::chrono_literals; // ms suffix
+using namespace std::literals::string_literals; // s suffix
+
+/*****************************************************************************
+* TUT
+*****************************************************************************/
+namespace tut
+{
+ struct workqueue_data
+ {
+ WorkQueue queue{"queue"};
+ };
+ typedef test_group<workqueue_data> workqueue_group;
+ typedef workqueue_group::object object;
+ workqueue_group workqueuegrp("workqueue");
+
+ template<> template<>
+ void object::test<1>()
+ {
+ set_test_name("name");
+ ensure_equals("didn't capture name", queue.getKey(), "queue");
+ ensure("not findable", WorkQueue::getInstance("queue") == queue.getWeak().lock());
+ WorkQueue q2;
+ ensure("has no name", LLStringUtil::startsWith(q2.getKey(), "WorkQueue"));
+ }
+
+ template<> template<>
+ void object::test<2>()
+ {
+ set_test_name("post");
+ bool wasRun{ false };
+ // We only get away with binding a simple bool because we're running
+ // the work on the same thread.
+ queue.post([&wasRun](){ wasRun = true; });
+ queue.close();
+ ensure("ran too soon", ! wasRun);
+ queue.runUntilClose();
+ ensure("didn't run", wasRun);
+ }
+
+ template<> template<>
+ void object::test<3>()
+ {
+ set_test_name("postEvery");
+ // record of runs
+ using Shared = std::deque<WorkQueue::TimePoint>;
+ // This is an example of how to share data between the originator of
+ // postEvery(work) and the work item itself, since usually a WorkQueue
+ // is used to dispatch work to a different thread. Neither of them
+ // should call any of LLCond's wait methods: you don't want to stall
+ // either the worker thread or the originating thread (conventionally
+ // main). Use LLCond or a subclass even if all you want to do is
+ // signal the work item that it can quit; consider LLOneShotCond.
+ LLCond<Shared> data;
+ auto start = WorkQueue::TimePoint::clock::now();
+ auto interval = 100ms;
+ queue.postEvery(
+ interval,
+ [&data, count = 0]
+ () mutable
+ {
+ // record the timestamp at which this instance is running
+ data.update_one(
+ [](Shared& data)
+ {
+ data.push_back(WorkQueue::TimePoint::clock::now());
+ });
+ // by the 3rd call, return false to stop
+ return (++count < 3);
+ });
+ // no convenient way to close() our queue while we've got a
+ // postEvery() running, so run until we think we should have exhausted
+ // the iterations
+ queue.runFor(10*interval);
+ // Take a copy of the captured deque.
+ Shared result = data.get();
+ ensure_equals("called wrong number of times", result.size(), 3);
+ // postEvery() assumes you want the first call to happen right away.
+ // Inject a fake start time that's (interval) earlier than that, to
+ // make our too early/too late tests uniform for all entries.
+ result.push_front(start - interval);
+ for (size_t i = 1; i < result.size(); ++i)
+ {
+ auto diff = (result[i] - result[i-1]);
+ try
+ {
+ ensure(STRINGIZE("call " << i << " too soon"), diff >= interval);
+ ensure(STRINGIZE("call " << i << " too late"), diff < interval*1.5);
+ }
+ catch (const tut::failure&)
+ {
+ auto interval_ms = interval / 1ms;
+ auto diff_ms = diff / 1ms;
+ std::cerr << "interval " << interval_ms
+ << "ms; diff " << diff_ms << "ms" << std::endl;
+ throw;
+ }
+ }
+ }
+
+ template<> template<>
+ void object::test<4>()
+ {
+ set_test_name("postTo");
+ WorkQueue main("main");
+ auto qptr = WorkQueue::getInstance("queue");
+ int result = 0;
+ main.postTo(
+ qptr,
+ [](){ return 17; },
+ // Note that a postTo() *callback* can safely bind a reference to
+ // a variable on the invoking thread, because the callback is run
+ // on the invoking thread.
+ [&result](int i){ result = i; });
+ // this should post the callback to main
+ qptr->runOne();
+ // this should run the callback
+ main.runOne();
+ ensure_equals("failed to run int callback", result, 17);
+
+ std::string alpha;
+ // postTo() handles arbitrary return types
+ main.postTo(
+ qptr,
+ [](){ return "abc"s; },
+ [&alpha](const std::string& s){ alpha = s; });
+ qptr->runPending();
+ main.runPending();
+ ensure_equals("failed to run string callback", alpha, "abc");
+ }
+} // namespace tut
diff --git a/indra/llcommon/threadsafeschedule.h b/indra/llcommon/threadsafeschedule.h
index 0e70d30714..c8ad23532b 100644
--- a/indra/llcommon/threadsafeschedule.h
+++ b/indra/llcommon/threadsafeschedule.h
@@ -78,6 +78,7 @@ namespace LL
enum pop_result { EMPTY=super::EMPTY, DONE=super::DONE, WAITING=super::WAITING, POPPED=super::POPPED };
public:
+ using Closed = LLThreadSafeQueueInterrupt;
using TimePoint = ThreadSafeSchedulePrivate::TimePoint;
using Clock = TimePoint::clock;
diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp
new file mode 100644
index 0000000000..15e292fb43
--- /dev/null
+++ b/indra/llcommon/workqueue.cpp
@@ -0,0 +1,114 @@
+/**
+ * @file workqueue.cpp
+ * @author Nat Goodspeed
+ * @date 2021-10-06
+ * @brief Implementation for WorkQueue.
+ *
+ * $LicenseInfo:firstyear=2021&license=viewerlgpl$
+ * Copyright (c) 2021, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+// Precompiled header
+#include "linden_common.h"
+// associated header
+#include "workqueue.h"
+// STL headers
+// std headers
+// external library headers
+// other Linden headers
+#include "llerror.h"
+#include "llexception.h"
+#include "stringize.h"
+
+LL::WorkQueue::WorkQueue(const std::string& name):
+ super(makeName(name))
+{
+ // TODO: register for "LLApp" events so we can implicitly close() on
+ // viewer shutdown.
+}
+
+void LL::WorkQueue::close()
+{
+ mQueue.close();
+}
+
+void LL::WorkQueue::runUntilClose()
+{
+ try
+ {
+ for (;;)
+ {
+ callWork(mQueue.pop());
+ }
+ }
+ catch (const Queue::Closed&)
+ {
+ }
+}
+
+bool LL::WorkQueue::runPending()
+{
+ for (Work work; mQueue.tryPop(work); )
+ {
+ callWork(work);
+ }
+ return ! mQueue.done();
+}
+
+bool LL::WorkQueue::runOne()
+{
+ Work work;
+ if (mQueue.tryPop(work))
+ {
+ callWork(work);
+ }
+ return ! mQueue.done();
+}
+
+bool LL::WorkQueue::runUntil(const TimePoint& until)
+{
+ // Should we subtract some slop to allow for typical Work execution time?
+ // How much slop?
+ Work work;
+ while (TimePoint::clock::now() < until && mQueue.tryPopUntil(until, work))
+ {
+ callWork(work);
+ }
+ return ! mQueue.done();
+}
+
+std::string LL::WorkQueue::makeName(const std::string& name)
+{
+ if (! name.empty())
+ return name;
+
+ static thread_local U32 discriminator = 0;
+ return STRINGIZE("WorkQueue" << discriminator++);
+}
+
+void LL::WorkQueue::callWork(const Queue::DataTuple& work)
+{
+ // ThreadSafeSchedule::pop() always delivers a tuple, even when
+ // there's only one data field per item, as for us.
+ callWork(std::get<0>(work));
+}
+
+void LL::WorkQueue::callWork(const Work& work)
+{
+ try
+ {
+ work();
+ }
+ catch (...)
+ {
+ // No matter what goes wrong with any individual work item, the worker
+ // thread must go on! Log our own instance name with the exception.
+ LOG_UNHANDLED_EXCEPTION(getKey());
+ }
+}
+
+void LL::WorkQueue::error(const std::string& msg)
+{
+ LL_ERRS("WorkQueue") << msg << LL_ENDL;
+}
diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h
new file mode 100644
index 0000000000..a52f7b0e26
--- /dev/null
+++ b/indra/llcommon/workqueue.h
@@ -0,0 +1,325 @@
+/**
+ * @file workqueue.h
+ * @author Nat Goodspeed
+ * @date 2021-09-30
+ * @brief Queue used for inter-thread work passing.
+ *
+ * $LicenseInfo:firstyear=2021&license=viewerlgpl$
+ * Copyright (c) 2021, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+#if ! defined(LL_WORKQUEUE_H)
+#define LL_WORKQUEUE_H
+
+#include "llinstancetracker.h"
+#include "threadsafeschedule.h"
+#include <chrono>
+#include <functional> // std::function
+#include <queue>
+#include <string>
+#include <utility> // std::pair
+#include <vector>
+
+namespace LL
+{
+ /**
+ * A typical WorkQueue has a string name that can be used to find it.
+ */
+ class WorkQueue: public LLInstanceTracker<WorkQueue, std::string>
+ {
+ private:
+ using super = LLInstanceTracker<WorkQueue, std::string>;
+
+ public:
+ using Work = std::function<void()>;
+
+ private:
+ using Queue = ThreadSafeSchedule<Work>;
+ // helper for postEvery()
+ template <typename Rep, typename Period, typename CALLABLE>
+ class BackJack;
+
+ public:
+ using TimePoint = Queue::TimePoint;
+ using TimedWork = Queue::TimeTuple;
+ using Closed = Queue::Closed;
+
+ /**
+ * You may omit the WorkQueue name, in which case a unique name is
+ * synthesized; for practical purposes that makes it anonymous.
+ */
+ WorkQueue(const std::string& name = std::string());
+
+ /**
+ * Since the point of WorkQueue is to pass work to some other worker
+ * thread(s) asynchronously, it's important that the WorkQueue continue
+ * to exist until the worker thread(s) have drained it. To communicate
+ * that it's time for them to quit, close() the queue.
+ */
+ void close();
+
+ /*---------------------- fire and forget API -----------------------*/
+
+ /// fire-and-forget, but at a particular (future?) time
+ template <typename CALLABLE>
+ void post(const TimePoint& time, CALLABLE&& callable)
+ {
+ // Defer reifying an arbitrary CALLABLE until we hit this method.
+ // All other methods should accept CALLABLEs of arbitrary type to
+ // avoid multiple levels of std::function indirection.
+ mQueue.push(TimedWork(time, std::move(callable)));
+ }
+
+ /// fire-and-forget
+ template <typename CALLABLE>
+ void post(CALLABLE&& callable)
+ {
+ // We use TimePoint::clock::now() instead of TimePoint's
+ // representation of the epoch because this WorkQueue may contain
+ // a mix of past-due TimedWork items and TimedWork items scheduled
+ // for the future. Sift this new item into the correct place.
+ post(TimePoint::clock::now(), std::move(callable));
+ }
+
+ /**
+ * Launch a callable returning bool that will trigger repeatedly at
+ * specified interval, until the callable returns false.
+ *
+ * If you need to signal that callable from outside, DO NOT bind a
+ * reference to a simple bool! That's not thread-safe. Instead, bind
+ * an LLCond variant, e.g. LLOneShotCond or LLBoolCond.
+ */
+ template <typename Rep, typename Period, typename CALLABLE>
+ void postEvery(const std::chrono::duration<Rep, Period>& interval,
+ CALLABLE&& callable);
+
+ /*------------------------- handshake API --------------------------*/
+
+ /**
+ * Post work to another WorkQueue to be run at a specified time,
+ * requesting a specific callback to be run on this WorkQueue on
+ * completion.
+ *
+ * Returns true if able to post, false if the other WorkQueue is
+ * inaccessible.
+ */
+ template <typename CALLABLE, typename CALLBACK>
+ bool postTo(std::weak_ptr<WorkQueue> target,
+ const TimePoint& time, CALLABLE&& callable, CALLBACK&& callback)
+ {
+ // We're being asked to post to the WorkQueue at target.
+ // target is a weak_ptr: have to lock it to check it.
+ auto tptr = target.lock();
+ if (! tptr)
+ // can't post() if the target WorkQueue has been destroyed
+ return false;
+
+ // Here we believe target WorkQueue still exists. Post to it a
+ // lambda that packages our callable, our callback and a weak_ptr
+ // to this originating WorkQueue.
+ tptr->post(
+ time,
+ [reply = super::getWeak(),
+ callable = std::move(callable),
+ callback = std::move(callback)]
+ ()
+ {
+ // Call the callable in any case -- but to minimize
+ // copying the result, immediately bind it into a reply
+ // lambda. The reply lambda also binds the original
+ // callback, so that when we, the originating WorkQueue,
+ // finally receive and process the reply lambda, we'll
+ // call the bound callback with the bound result -- on the
+ // same thread that originally called postTo().
+ auto rlambda =
+ [result = callable(),
+ callback = std::move(callback)]
+ ()
+ { callback(std::move(result)); };
+ // Check if this originating WorkQueue still exists.
+ // Remember, the outer lambda is now running on a thread
+ // servicing the target WorkQueue, and real time has
+ // elapsed since postTo()'s tptr->post() call.
+ // reply is a weak_ptr: have to lock it to check it.
+ auto rptr = reply.lock();
+ if (rptr)
+ {
+ // Only post reply lambda if the originating WorkQueue
+ // still exists. If not -- who would we tell? Log it?
+ try
+ {
+ rptr->post(std::move(rlambda));
+ }
+ catch (const Closed&)
+ {
+ // Originating WorkQueue might still exist, but
+ // might be Closed. Same thing: just discard the
+ // callback.
+ }
+ }
+ });
+ // looks like we were able to post()
+ return true;
+ }
+
+ /**
+ * Post work to another WorkQueue, requesting a specific callback to
+ * be run on this WorkQueue on completion.
+ *
+ * Returns true if able to post, false if the other WorkQueue is
+ * inaccessible.
+ */
+ template <typename CALLABLE, typename CALLBACK>
+ bool postTo(std::weak_ptr<WorkQueue> target,
+ CALLABLE&& callable, CALLBACK&& callback)
+ {
+ return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback));
+ }
+
+ /*--------------------------- worker API ---------------------------*/
+
+ /**
+ * runUntilClose() pulls TimedWork items off this WorkQueue until the
+ * queue is closed, at which point it returns. This would be the
+ * typical entry point for a simple worker thread.
+ */
+ void runUntilClose();
+
+ /**
+ * runPending() runs all TimedWork items that are ready to run. It
+ * returns true if the queue remains open, false if the queue has been
+ * closed. This could be used by a thread whose primary purpose is to
+ * serve the queue, but also wants to do other things with its idle time.
+ */
+ bool runPending();
+
+ /**
+ * runOne() runs at most one ready TimedWork item -- zero if none are
+ * ready. It returns true if the queue remains open, false if the
+ * queue has been closed.
+ */
+ bool runOne();
+
+ /**
+ * runFor() runs a subset of ready TimedWork items, until the
+ * timeslice has been exceeded. It returns true if the queue remains
+ * open, false if the queue has been closed. This could be used by a
+ * busy main thread to lend a bounded few CPU cycles to this WorkQueue
+ * without risking the WorkQueue blowing out the length of any one
+ * frame.
+ */
+ template <typename Rep, typename Period>
+ bool runFor(const std::chrono::duration<Rep, Period>& timeslice)
+ {
+ return runUntil(TimePoint::clock::now() + timeslice);
+ }
+
+ /**
+ * runUntil() is just like runFor(), only with a specific end time
+ * instead of a timeslice duration.
+ */
+ bool runUntil(const TimePoint& until);
+
+ private:
+ static void error(const std::string& msg);
+ static std::string makeName(const std::string& name);
+ void callWork(const Queue::DataTuple& work);
+ void callWork(const Work& work);
+ Queue mQueue;
+ };
+
+ /**
+ * BackJack is, in effect, a hand-rolled lambda, binding a WorkQueue, a
+ * CALLABLE that returns bool, a TimePoint and an interval at which to
+ * relaunch it. As long as the callable continues returning true, BackJack
+ * keeps resubmitting it to the target WorkQueue.
+ */
+ // Why is BackJack a class and not a lambda? Because, unlike a lambda, a
+ // class method gets its own 'this' pointer -- which we need to resubmit
+ // the whole BackJack callable.
+ template <typename Rep, typename Period, typename CALLABLE>
+ class WorkQueue::BackJack
+ {
+ public:
+ // bind the desired data
+ BackJack(std::weak_ptr<WorkQueue> target,
+ const WorkQueue::TimePoint& start,
+ const std::chrono::duration<Rep, Period>& interval,
+ CALLABLE&& callable):
+ mTarget(target),
+ mStart(start),
+ mInterval(interval),
+ mCallable(std::move(callable))
+ {}
+
+ // Call by target WorkQueue -- note that although WE require a
+ // callable returning bool, WorkQueue wants a void callable. We
+ // consume the bool.
+ void operator()()
+ {
+ // If mCallable() throws an exception, don't catch it here: if it
+ // throws once, it's likely to throw every time, so it's a waste
+ // of time to arrange to call it again.
+ if (mCallable())
+ {
+ // Modify mStart to the new start time we desire. If we simply
+ // added mInterval to now, we'd get actual timings of
+ // (mInterval + slop), where 'slop' is the latency between the
+ // previous mStart and the WorkQueue actually calling us.
+ // Instead, add mInterval to mStart so that at least we
+ // register our intent to fire at exact mIntervals.
+ mStart += mInterval;
+
+ // We're being called at this moment by the target WorkQueue.
+ // Assume it still exists, rather than checking the result of
+ // lock().
+ // Resubmit the whole *this callable: that's why we're a class
+ // rather than a lambda. Allow moving *this so we can carry a
+ // move-only callable; but naturally this statement must be
+ // the last time we reference this instance, which may become
+ // moved-from.
+ try
+ {
+ mTarget.lock()->post(mStart, std::move(*this));
+ }
+ catch (const Closed&)
+ {
+ // Once this queue is closed, oh well, just stop
+ }
+ }
+ }
+
+ private:
+ std::weak_ptr<WorkQueue> mTarget;
+ WorkQueue::TimePoint mStart;
+ std::chrono::duration<Rep, Period> mInterval;
+ CALLABLE mCallable;
+ };
+
+ template <typename Rep, typename Period, typename CALLABLE>
+ void WorkQueue::postEvery(const std::chrono::duration<Rep, Period>& interval,
+ CALLABLE&& callable)
+ {
+ if (interval.count() <= 0)
+ {
+ // It's essential that postEvery() be called with a positive
+ // interval, since each call to BackJack posts another instance of
+ // itself at (start + interval) and we order by target time. A
+ // zero or negative interval would result in that BackJack
+ // instance going to the head of the queue every time, immediately
+ // ready to run. Effectively that would produce an infinite loop,
+ // a denial of service on this WorkQueue.
+ error("postEvery(interval) may not be 0");
+ }
+ // Instantiate and post a suitable BackJack, binding a weak_ptr to
+ // self, the current time, the desired interval and the desired
+ // callable.
+ post(
+ BackJack<Rep, Period, CALLABLE>(
+ getWeak(), TimePoint::clock::now(), interval, std::move(callable)));
+ }
+
+} // namespace LL
+
+#endif /* ! defined(LL_WORKQUEUE_H) */