summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--indra/llcommon/CMakeLists.txt3
-rw-r--r--indra/llcommon/tests/workqueue_test.cpp158
-rw-r--r--indra/llcommon/threadsafeschedule.h1
-rw-r--r--indra/llcommon/workqueue.cpp114
-rw-r--r--indra/llcommon/workqueue.h325
5 files changed, 601 insertions, 0 deletions
diff --git a/indra/llcommon/CMakeLists.txt b/indra/llcommon/CMakeLists.txt
index 5efcfabf24..a3dbb6d9d0 100644
--- a/indra/llcommon/CMakeLists.txt
+++ b/indra/llcommon/CMakeLists.txt
@@ -121,6 +121,7 @@ set(llcommon_SOURCE_FILES
llworkerthread.cpp
timing.cpp
u64.cpp
+ workqueue.cpp
StackWalker.cpp
)
@@ -258,6 +259,7 @@ set(llcommon_HEADER_FILES
timer.h
tuple.h
u64.h
+ workqueue.h
StackWalker.h
)
@@ -363,6 +365,7 @@ if (LL_TESTS)
LL_ADD_INTEGRATION_TEST(stringize "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(threadsafeschedule "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(tuple "" "${test_libs}")
+ LL_ADD_INTEGRATION_TEST(workqueue "" "${test_libs}")
## llexception_test.cpp isn't a regression test, and doesn't need to be run
## every build. It's to help a developer make implementation choices about
diff --git a/indra/llcommon/tests/workqueue_test.cpp b/indra/llcommon/tests/workqueue_test.cpp
new file mode 100644
index 0000000000..ab1cae6c14
--- /dev/null
+++ b/indra/llcommon/tests/workqueue_test.cpp
@@ -0,0 +1,158 @@
+/**
+ * @file workqueue_test.cpp
+ * @author Nat Goodspeed
+ * @date 2021-10-07
+ * @brief Test for workqueue.
+ *
+ * $LicenseInfo:firstyear=2021&license=viewerlgpl$
+ * Copyright (c) 2021, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+// Precompiled header
+#include "linden_common.h"
+// associated header
+#include "workqueue.h"
+// STL headers
+// std headers
+#include <chrono>
+#include <deque>
+// external library headers
+// other Linden headers
+#include "../test/lltut.h"
+#include "llcond.h"
+#include "llstring.h"
+#include "stringize.h"
+
+using namespace LL;
+using namespace std::literals::chrono_literals; // ms suffix
+using namespace std::literals::string_literals; // s suffix
+
+/*****************************************************************************
+* TUT
+*****************************************************************************/
+namespace tut
+{
+ struct workqueue_data
+ {
+ WorkQueue queue{"queue"};
+ };
+ typedef test_group<workqueue_data> workqueue_group;
+ typedef workqueue_group::object object;
+ workqueue_group workqueuegrp("workqueue");
+
+ template<> template<>
+ void object::test<1>()
+ {
+ set_test_name("name");
+ ensure_equals("didn't capture name", queue.getKey(), "queue");
+ ensure("not findable", WorkQueue::getInstance("queue") == queue.getWeak().lock());
+ WorkQueue q2;
+ ensure("has no name", LLStringUtil::startsWith(q2.getKey(), "WorkQueue"));
+ }
+
+ template<> template<>
+ void object::test<2>()
+ {
+ set_test_name("post");
+ bool wasRun{ false };
+ // We only get away with binding a simple bool because we're running
+ // the work on the same thread.
+ queue.post([&wasRun](){ wasRun = true; });
+ queue.close();
+ ensure("ran too soon", ! wasRun);
+ queue.runUntilClose();
+ ensure("didn't run", wasRun);
+ }
+
+ template<> template<>
+ void object::test<3>()
+ {
+ set_test_name("postEvery");
+ // record of runs
+ using Shared = std::deque<WorkQueue::TimePoint>;
+ // This is an example of how to share data between the originator of
+ // postEvery(work) and the work item itself, since usually a WorkQueue
+ // is used to dispatch work to a different thread. Neither of them
+ // should call any of LLCond's wait methods: you don't want to stall
+ // either the worker thread or the originating thread (conventionally
+ // main). Use LLCond or a subclass even if all you want to do is
+ // signal the work item that it can quit; consider LLOneShotCond.
+ LLCond<Shared> data;
+ auto start = WorkQueue::TimePoint::clock::now();
+ auto interval = 100ms;
+ queue.postEvery(
+ interval,
+ [&data, count = 0]
+ () mutable
+ {
+ // record the timestamp at which this instance is running
+ data.update_one(
+ [](Shared& data)
+ {
+ data.push_back(WorkQueue::TimePoint::clock::now());
+ });
+ // by the 3rd call, return false to stop
+ return (++count < 3);
+ });
+ // no convenient way to close() our queue while we've got a
+ // postEvery() running, so run until we think we should have exhausted
+ // the iterations
+ queue.runFor(10*interval);
+ // Take a copy of the captured deque.
+ Shared result = data.get();
+ ensure_equals("called wrong number of times", result.size(), 3);
+ // postEvery() assumes you want the first call to happen right away.
+ // Inject a fake start time that's (interval) earlier than that, to
+ // make our too early/too late tests uniform for all entries.
+ result.push_front(start - interval);
+ for (size_t i = 1; i < result.size(); ++i)
+ {
+ auto diff = (result[i] - result[i-1]);
+ try
+ {
+ ensure(STRINGIZE("call " << i << " too soon"), diff >= interval);
+ ensure(STRINGIZE("call " << i << " too late"), diff < interval*1.5);
+ }
+ catch (const tut::failure&)
+ {
+ auto interval_ms = interval / 1ms;
+ auto diff_ms = diff / 1ms;
+ std::cerr << "interval " << interval_ms
+ << "ms; diff " << diff_ms << "ms" << std::endl;
+ throw;
+ }
+ }
+ }
+
+ template<> template<>
+ void object::test<4>()
+ {
+ set_test_name("postTo");
+ WorkQueue main("main");
+ auto qptr = WorkQueue::getInstance("queue");
+ int result = 0;
+ main.postTo(
+ qptr,
+ [](){ return 17; },
+ // Note that a postTo() *callback* can safely bind a reference to
+ // a variable on the invoking thread, because the callback is run
+ // on the invoking thread.
+ [&result](int i){ result = i; });
+ // this should post the callback to main
+ qptr->runOne();
+ // this should run the callback
+ main.runOne();
+ ensure_equals("failed to run int callback", result, 17);
+
+ std::string alpha;
+ // postTo() handles arbitrary return types
+ main.postTo(
+ qptr,
+ [](){ return "abc"s; },
+ [&alpha](const std::string& s){ alpha = s; });
+ qptr->runPending();
+ main.runPending();
+ ensure_equals("failed to run string callback", alpha, "abc");
+ }
+} // namespace tut
diff --git a/indra/llcommon/threadsafeschedule.h b/indra/llcommon/threadsafeschedule.h
index 0e70d30714..c8ad23532b 100644
--- a/indra/llcommon/threadsafeschedule.h
+++ b/indra/llcommon/threadsafeschedule.h
@@ -78,6 +78,7 @@ namespace LL
enum pop_result { EMPTY=super::EMPTY, DONE=super::DONE, WAITING=super::WAITING, POPPED=super::POPPED };
public:
+ using Closed = LLThreadSafeQueueInterrupt;
using TimePoint = ThreadSafeSchedulePrivate::TimePoint;
using Clock = TimePoint::clock;
diff --git a/indra/llcommon/workqueue.cpp b/indra/llcommon/workqueue.cpp
new file mode 100644
index 0000000000..15e292fb43
--- /dev/null
+++ b/indra/llcommon/workqueue.cpp
@@ -0,0 +1,114 @@
+/**
+ * @file workqueue.cpp
+ * @author Nat Goodspeed
+ * @date 2021-10-06
+ * @brief Implementation for WorkQueue.
+ *
+ * $LicenseInfo:firstyear=2021&license=viewerlgpl$
+ * Copyright (c) 2021, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+// Precompiled header
+#include "linden_common.h"
+// associated header
+#include "workqueue.h"
+// STL headers
+// std headers
+// external library headers
+// other Linden headers
+#include "llerror.h"
+#include "llexception.h"
+#include "stringize.h"
+
+LL::WorkQueue::WorkQueue(const std::string& name):
+ super(makeName(name))
+{
+ // TODO: register for "LLApp" events so we can implicitly close() on
+ // viewer shutdown.
+}
+
+void LL::WorkQueue::close()
+{
+ mQueue.close();
+}
+
+void LL::WorkQueue::runUntilClose()
+{
+ try
+ {
+ for (;;)
+ {
+ callWork(mQueue.pop());
+ }
+ }
+ catch (const Queue::Closed&)
+ {
+ }
+}
+
+bool LL::WorkQueue::runPending()
+{
+ for (Work work; mQueue.tryPop(work); )
+ {
+ callWork(work);
+ }
+ return ! mQueue.done();
+}
+
+bool LL::WorkQueue::runOne()
+{
+ Work work;
+ if (mQueue.tryPop(work))
+ {
+ callWork(work);
+ }
+ return ! mQueue.done();
+}
+
+bool LL::WorkQueue::runUntil(const TimePoint& until)
+{
+ // Should we subtract some slop to allow for typical Work execution time?
+ // How much slop?
+ Work work;
+ while (TimePoint::clock::now() < until && mQueue.tryPopUntil(until, work))
+ {
+ callWork(work);
+ }
+ return ! mQueue.done();
+}
+
+std::string LL::WorkQueue::makeName(const std::string& name)
+{
+ if (! name.empty())
+ return name;
+
+ static thread_local U32 discriminator = 0;
+ return STRINGIZE("WorkQueue" << discriminator++);
+}
+
+void LL::WorkQueue::callWork(const Queue::DataTuple& work)
+{
+ // ThreadSafeSchedule::pop() always delivers a tuple, even when
+ // there's only one data field per item, as for us.
+ callWork(std::get<0>(work));
+}
+
+void LL::WorkQueue::callWork(const Work& work)
+{
+ try
+ {
+ work();
+ }
+ catch (...)
+ {
+ // No matter what goes wrong with any individual work item, the worker
+ // thread must go on! Log our own instance name with the exception.
+ LOG_UNHANDLED_EXCEPTION(getKey());
+ }
+}
+
+void LL::WorkQueue::error(const std::string& msg)
+{
+ LL_ERRS("WorkQueue") << msg << LL_ENDL;
+}
diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h
new file mode 100644
index 0000000000..a52f7b0e26
--- /dev/null
+++ b/indra/llcommon/workqueue.h
@@ -0,0 +1,325 @@
+/**
+ * @file workqueue.h
+ * @author Nat Goodspeed
+ * @date 2021-09-30
+ * @brief Queue used for inter-thread work passing.
+ *
+ * $LicenseInfo:firstyear=2021&license=viewerlgpl$
+ * Copyright (c) 2021, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+#if ! defined(LL_WORKQUEUE_H)
+#define LL_WORKQUEUE_H
+
+#include "llinstancetracker.h"
+#include "threadsafeschedule.h"
+#include <chrono>
+#include <functional> // std::function
+#include <queue>
+#include <string>
+#include <utility> // std::pair
+#include <vector>
+
+namespace LL
+{
+ /**
+ * A typical WorkQueue has a string name that can be used to find it.
+ */
+ class WorkQueue: public LLInstanceTracker<WorkQueue, std::string>
+ {
+ private:
+ using super = LLInstanceTracker<WorkQueue, std::string>;
+
+ public:
+ using Work = std::function<void()>;
+
+ private:
+ using Queue = ThreadSafeSchedule<Work>;
+ // helper for postEvery()
+ template <typename Rep, typename Period, typename CALLABLE>
+ class BackJack;
+
+ public:
+ using TimePoint = Queue::TimePoint;
+ using TimedWork = Queue::TimeTuple;
+ using Closed = Queue::Closed;
+
+ /**
+ * You may omit the WorkQueue name, in which case a unique name is
+ * synthesized; for practical purposes that makes it anonymous.
+ */
+ WorkQueue(const std::string& name = std::string());
+
+ /**
+ * Since the point of WorkQueue is to pass work to some other worker
+ * thread(s) asynchronously, it's important that the WorkQueue continue
+ * to exist until the worker thread(s) have drained it. To communicate
+ * that it's time for them to quit, close() the queue.
+ */
+ void close();
+
+ /*---------------------- fire and forget API -----------------------*/
+
+ /// fire-and-forget, but at a particular (future?) time
+ template <typename CALLABLE>
+ void post(const TimePoint& time, CALLABLE&& callable)
+ {
+ // Defer reifying an arbitrary CALLABLE until we hit this method.
+ // All other methods should accept CALLABLEs of arbitrary type to
+ // avoid multiple levels of std::function indirection.
+ mQueue.push(TimedWork(time, std::move(callable)));
+ }
+
+ /// fire-and-forget
+ template <typename CALLABLE>
+ void post(CALLABLE&& callable)
+ {
+ // We use TimePoint::clock::now() instead of TimePoint's
+ // representation of the epoch because this WorkQueue may contain
+ // a mix of past-due TimedWork items and TimedWork items scheduled
+ // for the future. Sift this new item into the correct place.
+ post(TimePoint::clock::now(), std::move(callable));
+ }
+
+ /**
+ * Launch a callable returning bool that will trigger repeatedly at
+ * specified interval, until the callable returns false.
+ *
+ * If you need to signal that callable from outside, DO NOT bind a
+ * reference to a simple bool! That's not thread-safe. Instead, bind
+ * an LLCond variant, e.g. LLOneShotCond or LLBoolCond.
+ */
+ template <typename Rep, typename Period, typename CALLABLE>
+ void postEvery(const std::chrono::duration<Rep, Period>& interval,
+ CALLABLE&& callable);
+
+ /*------------------------- handshake API --------------------------*/
+
+ /**
+ * Post work to another WorkQueue to be run at a specified time,
+ * requesting a specific callback to be run on this WorkQueue on
+ * completion.
+ *
+ * Returns true if able to post, false if the other WorkQueue is
+ * inaccessible.
+ */
+ template <typename CALLABLE, typename CALLBACK>
+ bool postTo(std::weak_ptr<WorkQueue> target,
+ const TimePoint& time, CALLABLE&& callable, CALLBACK&& callback)
+ {
+ // We're being asked to post to the WorkQueue at target.
+ // target is a weak_ptr: have to lock it to check it.
+ auto tptr = target.lock();
+ if (! tptr)
+ // can't post() if the target WorkQueue has been destroyed
+ return false;
+
+ // Here we believe target WorkQueue still exists. Post to it a
+ // lambda that packages our callable, our callback and a weak_ptr
+ // to this originating WorkQueue.
+ tptr->post(
+ time,
+ [reply = super::getWeak(),
+ callable = std::move(callable),
+ callback = std::move(callback)]
+ ()
+ {
+ // Call the callable in any case -- but to minimize
+ // copying the result, immediately bind it into a reply
+ // lambda. The reply lambda also binds the original
+ // callback, so that when we, the originating WorkQueue,
+ // finally receive and process the reply lambda, we'll
+ // call the bound callback with the bound result -- on the
+ // same thread that originally called postTo().
+ auto rlambda =
+ [result = callable(),
+ callback = std::move(callback)]
+ ()
+ { callback(std::move(result)); };
+ // Check if this originating WorkQueue still exists.
+ // Remember, the outer lambda is now running on a thread
+ // servicing the target WorkQueue, and real time has
+ // elapsed since postTo()'s tptr->post() call.
+ // reply is a weak_ptr: have to lock it to check it.
+ auto rptr = reply.lock();
+ if (rptr)
+ {
+ // Only post reply lambda if the originating WorkQueue
+ // still exists. If not -- who would we tell? Log it?
+ try
+ {
+ rptr->post(std::move(rlambda));
+ }
+ catch (const Closed&)
+ {
+ // Originating WorkQueue might still exist, but
+ // might be Closed. Same thing: just discard the
+ // callback.
+ }
+ }
+ });
+ // looks like we were able to post()
+ return true;
+ }
+
+ /**
+ * Post work to another WorkQueue, requesting a specific callback to
+ * be run on this WorkQueue on completion.
+ *
+ * Returns true if able to post, false if the other WorkQueue is
+ * inaccessible.
+ */
+ template <typename CALLABLE, typename CALLBACK>
+ bool postTo(std::weak_ptr<WorkQueue> target,
+ CALLABLE&& callable, CALLBACK&& callback)
+ {
+ return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback));
+ }
+
+ /*--------------------------- worker API ---------------------------*/
+
+ /**
+ * runUntilClose() pulls TimedWork items off this WorkQueue until the
+ * queue is closed, at which point it returns. This would be the
+ * typical entry point for a simple worker thread.
+ */
+ void runUntilClose();
+
+ /**
+ * runPending() runs all TimedWork items that are ready to run. It
+ * returns true if the queue remains open, false if the queue has been
+ * closed. This could be used by a thread whose primary purpose is to
+ * serve the queue, but also wants to do other things with its idle time.
+ */
+ bool runPending();
+
+ /**
+ * runOne() runs at most one ready TimedWork item -- zero if none are
+ * ready. It returns true if the queue remains open, false if the
+ * queue has been closed.
+ */
+ bool runOne();
+
+ /**
+ * runFor() runs a subset of ready TimedWork items, until the
+ * timeslice has been exceeded. It returns true if the queue remains
+ * open, false if the queue has been closed. This could be used by a
+ * busy main thread to lend a bounded few CPU cycles to this WorkQueue
+ * without risking the WorkQueue blowing out the length of any one
+ * frame.
+ */
+ template <typename Rep, typename Period>
+ bool runFor(const std::chrono::duration<Rep, Period>& timeslice)
+ {
+ return runUntil(TimePoint::clock::now() + timeslice);
+ }
+
+ /**
+ * runUntil() is just like runFor(), only with a specific end time
+ * instead of a timeslice duration.
+ */
+ bool runUntil(const TimePoint& until);
+
+ private:
+ static void error(const std::string& msg);
+ static std::string makeName(const std::string& name);
+ void callWork(const Queue::DataTuple& work);
+ void callWork(const Work& work);
+ Queue mQueue;
+ };
+
+ /**
+ * BackJack is, in effect, a hand-rolled lambda, binding a WorkQueue, a
+ * CALLABLE that returns bool, a TimePoint and an interval at which to
+ * relaunch it. As long as the callable continues returning true, BackJack
+ * keeps resubmitting it to the target WorkQueue.
+ */
+ // Why is BackJack a class and not a lambda? Because, unlike a lambda, a
+ // class method gets its own 'this' pointer -- which we need to resubmit
+ // the whole BackJack callable.
+ template <typename Rep, typename Period, typename CALLABLE>
+ class WorkQueue::BackJack
+ {
+ public:
+ // bind the desired data
+ BackJack(std::weak_ptr<WorkQueue> target,
+ const WorkQueue::TimePoint& start,
+ const std::chrono::duration<Rep, Period>& interval,
+ CALLABLE&& callable):
+ mTarget(target),
+ mStart(start),
+ mInterval(interval),
+ mCallable(std::move(callable))
+ {}
+
+ // Call by target WorkQueue -- note that although WE require a
+ // callable returning bool, WorkQueue wants a void callable. We
+ // consume the bool.
+ void operator()()
+ {
+ // If mCallable() throws an exception, don't catch it here: if it
+ // throws once, it's likely to throw every time, so it's a waste
+ // of time to arrange to call it again.
+ if (mCallable())
+ {
+ // Modify mStart to the new start time we desire. If we simply
+ // added mInterval to now, we'd get actual timings of
+ // (mInterval + slop), where 'slop' is the latency between the
+ // previous mStart and the WorkQueue actually calling us.
+ // Instead, add mInterval to mStart so that at least we
+ // register our intent to fire at exact mIntervals.
+ mStart += mInterval;
+
+ // We're being called at this moment by the target WorkQueue.
+ // Assume it still exists, rather than checking the result of
+ // lock().
+ // Resubmit the whole *this callable: that's why we're a class
+ // rather than a lambda. Allow moving *this so we can carry a
+ // move-only callable; but naturally this statement must be
+ // the last time we reference this instance, which may become
+ // moved-from.
+ try
+ {
+ mTarget.lock()->post(mStart, std::move(*this));
+ }
+ catch (const Closed&)
+ {
+ // Once this queue is closed, oh well, just stop
+ }
+ }
+ }
+
+ private:
+ std::weak_ptr<WorkQueue> mTarget;
+ WorkQueue::TimePoint mStart;
+ std::chrono::duration<Rep, Period> mInterval;
+ CALLABLE mCallable;
+ };
+
+ template <typename Rep, typename Period, typename CALLABLE>
+ void WorkQueue::postEvery(const std::chrono::duration<Rep, Period>& interval,
+ CALLABLE&& callable)
+ {
+ if (interval.count() <= 0)
+ {
+ // It's essential that postEvery() be called with a positive
+ // interval, since each call to BackJack posts another instance of
+ // itself at (start + interval) and we order by target time. A
+ // zero or negative interval would result in that BackJack
+ // instance going to the head of the queue every time, immediately
+ // ready to run. Effectively that would produce an infinite loop,
+ // a denial of service on this WorkQueue.
+ error("postEvery(interval) may not be 0");
+ }
+ // Instantiate and post a suitable BackJack, binding a weak_ptr to
+ // self, the current time, the desired interval and the desired
+ // callable.
+ post(
+ BackJack<Rep, Period, CALLABLE>(
+ getWeak(), TimePoint::clock::now(), interval, std::move(callable)));
+ }
+
+} // namespace LL
+
+#endif /* ! defined(LL_WORKQUEUE_H) */