summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--indra/llcommon/tests/threadsafeschedule_test.cpp4
-rw-r--r--indra/llcommon/tests/workqueue_test.cpp23
-rw-r--r--indra/llcommon/workqueue.h167
3 files changed, 134 insertions, 60 deletions
diff --git a/indra/llcommon/tests/threadsafeschedule_test.cpp b/indra/llcommon/tests/threadsafeschedule_test.cpp
index af67b9f492..c421cc7b1c 100644
--- a/indra/llcommon/tests/threadsafeschedule_test.cpp
+++ b/indra/llcommon/tests/threadsafeschedule_test.cpp
@@ -46,11 +46,11 @@ namespace tut
// the real time required for each push() call. Explicitly increment
// the timestamp for each one -- but since we're passing explicit
// timestamps, make the queue reorder them.
- queue.push(Queue::TimeTuple(Queue::Clock::now() + 20ms, "ghi"));
+ queue.push(Queue::TimeTuple(Queue::Clock::now() + 200ms, "ghi"));
// Given the various push() overloads, you have to match the type
// exactly: conversions are ambiguous.
queue.push("abc"s);
- queue.push(Queue::Clock::now() + 10ms, "def");
+ queue.push(Queue::Clock::now() + 100ms, "def");
queue.close();
auto entry = queue.pop();
ensure_equals("failed to pop first", std::get<0>(entry), "abc"s);
diff --git a/indra/llcommon/tests/workqueue_test.cpp b/indra/llcommon/tests/workqueue_test.cpp
index d5405400fd..b69df49d33 100644
--- a/indra/llcommon/tests/workqueue_test.cpp
+++ b/indra/llcommon/tests/workqueue_test.cpp
@@ -138,7 +138,8 @@ namespace tut
[](){ return 17; },
// Note that a postTo() *callback* can safely bind a reference to
// a variable on the invoking thread, because the callback is run
- // on the invoking thread.
+ // on the invoking thread. (Of course the bound variable must
+ // survive until the callback is called.)
[&result](int i){ result = i; });
// this should post the callback to main
qptr->runOne();
@@ -156,4 +157,24 @@ namespace tut
main.runPending();
ensure_equals("failed to run string callback", alpha, "abc");
}
+
+ template<> template<>
+ void object::test<5>()
+ {
+ set_test_name("postTo with void return");
+ WorkQueue main("main");
+ auto qptr = WorkQueue::getInstance("queue");
+ std::string observe;
+ main.postTo(
+ qptr,
+ // The ONLY reason we can get away with binding a reference to
+ // 'observe' in our work callable is because we're directly
+ // calling qptr->runOne() on this same thread. It would be a
+ // mistake to do that if some other thread were servicing 'queue'.
+ [&observe](){ observe = "queue"; },
+ [&observe](){ observe.append(";main"); });
+ qptr->runOne();
+ main.runOne();
+ ensure_equals("failed to run both lambdas", observe, "queue;main");
+ }
} // namespace tut
diff --git a/indra/llcommon/workqueue.h b/indra/llcommon/workqueue.h
index cfae2019dc..deef3c8e84 100644
--- a/indra/llcommon/workqueue.h
+++ b/indra/llcommon/workqueue.h
@@ -115,62 +115,7 @@ namespace LL
// code.
template <typename CALLABLE, typename FOLLOWUP>
bool postTo(WorkQueue::weak_t target,
- const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback)
- {
- // We're being asked to post to the WorkQueue at target.
- // target is a weak_ptr: have to lock it to check it.
- auto tptr = target.lock();
- if (! tptr)
- // can't post() if the target WorkQueue has been destroyed
- return false;
-
- // Here we believe target WorkQueue still exists. Post to it a
- // lambda that packages our callable, our callback and a weak_ptr
- // to this originating WorkQueue.
- tptr->post(
- time,
- [reply = super::getWeak(),
- callable = std::move(callable),
- callback = std::move(callback)]
- ()
- {
- // Call the callable in any case -- but to minimize
- // copying the result, immediately bind it into a reply
- // lambda. The reply lambda also binds the original
- // callback, so that when we, the originating WorkQueue,
- // finally receive and process the reply lambda, we'll
- // call the bound callback with the bound result -- on the
- // same thread that originally called postTo().
- auto rlambda =
- [result = callable(),
- callback = std::move(callback)]
- ()
- { callback(std::move(result)); };
- // Check if this originating WorkQueue still exists.
- // Remember, the outer lambda is now running on a thread
- // servicing the target WorkQueue, and real time has
- // elapsed since postTo()'s tptr->post() call.
- // reply is a weak_ptr: have to lock it to check it.
- auto rptr = reply.lock();
- if (rptr)
- {
- // Only post reply lambda if the originating WorkQueue
- // still exists. If not -- who would we tell? Log it?
- try
- {
- rptr->post(std::move(rlambda));
- }
- catch (const Closed&)
- {
- // Originating WorkQueue might still exist, but
- // might be Closed. Same thing: just discard the
- // callback.
- }
- }
- });
- // looks like we were able to post()
- return true;
- }
+ const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback);
/**
* Post work to another WorkQueue, requesting a specific callback to
@@ -183,7 +128,8 @@ namespace LL
bool postTo(WorkQueue::weak_t target,
CALLABLE&& callable, FOLLOWUP&& callback)
{
- return postTo(target, TimePoint::clock::now(), std::move(callable), std::move(callback));
+ return postTo(target, TimePoint::clock::now(),
+ std::move(callable), std::move(callback));
}
/*--------------------------- worker API ---------------------------*/
@@ -231,6 +177,17 @@ namespace LL
bool runUntil(const TimePoint& until);
private:
+ template <typename CALLABLE, typename FOLLOWUP>
+ static auto makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback);
+
+ /// general case: arbitrary C++ return type
+ template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE>
+ struct MakeReplyLambda;
+
+ /// specialize for CALLABLE returning void
+ template <typename CALLABLE, typename FOLLOWUP>
+ struct MakeReplyLambda<CALLABLE, FOLLOWUP, void>;
+
static void error(const std::string& msg);
static std::string makeName(const std::string& name);
void callWork(const Queue::DataTuple& work);
@@ -329,6 +286,102 @@ namespace LL
getWeak(), TimePoint::clock::now(), interval, std::move(callable)));
}
+ template <typename CALLABLE, typename FOLLOWUP, typename RETURNTYPE>
+ struct WorkQueue::MakeReplyLambda
+ {
+ auto operator()(CALLABLE&& callable, FOLLOWUP&& callback)
+ {
+ // Call the callable in any case -- but to minimize
+ // copying the result, immediately bind it into the reply
+ // lambda. The reply lambda also binds the original
+ // callback, so that when we, the originating WorkQueue,
+ // finally receive and process the reply lambda, we'll
+ // call the bound callback with the bound result -- on the
+ // same thread that originally called postTo().
+ return
+ [result = std::forward<CALLABLE>(callable)(),
+ callback = std::move(callback)]
+ ()
+ { callback(std::move(result)); };
+ }
+ };
+
+ /// specialize for CALLABLE returning void
+ template <typename CALLABLE, typename FOLLOWUP>
+ struct WorkQueue::MakeReplyLambda<CALLABLE, FOLLOWUP, void>
+ {
+ auto operator()(CALLABLE&& callable, FOLLOWUP&& callback)
+ {
+ // Call the callable, which produces no result.
+ std::forward<CALLABLE>(callable)();
+ // This reply lambda binds the original callback, so
+ // that when we, the originating WorkQueue, finally
+ // receive and process the reply lambda, we'll call
+ // the bound callback -- on the same thread that
+ // originally called postTo().
+ return [callback = std::move(callback)](){ callback(); };
+ }
+ };
+
+ template <typename CALLABLE, typename FOLLOWUP>
+ auto WorkQueue::makeReplyLambda(CALLABLE&& callable, FOLLOWUP&& callback)
+ {
+ return MakeReplyLambda<CALLABLE, FOLLOWUP,
+ decltype(std::forward<CALLABLE>(callable)())>()
+ (std::move(callable), std::move(callback));
+ }
+
+ template <typename CALLABLE, typename FOLLOWUP>
+ bool WorkQueue::postTo(WorkQueue::weak_t target,
+ const TimePoint& time, CALLABLE&& callable, FOLLOWUP&& callback)
+ {
+ // We're being asked to post to the WorkQueue at target.
+ // target is a weak_ptr: have to lock it to check it.
+ auto tptr = target.lock();
+ if (! tptr)
+ // can't post() if the target WorkQueue has been destroyed
+ return false;
+
+ // Here we believe target WorkQueue still exists. Post to it a
+ // lambda that packages our callable, our callback and a weak_ptr
+ // to this originating WorkQueue.
+ tptr->post(
+ time,
+ [reply = super::getWeak(),
+ callable = std::move(callable),
+ callback = std::move(callback)]
+ ()
+ {
+ // Make a reply lambda to repost to THIS WorkQueue.
+ // Delegate to makeReplyLambda() so we can partially
+ // specialize on void return.
+ auto rlambda = makeReplyLambda(std::move(callable), std::move(callback));
+ // Check if this originating WorkQueue still exists.
+ // Remember, the outer lambda is now running on a thread
+ // servicing the target WorkQueue, and real time has
+ // elapsed since postTo()'s tptr->post() call.
+ // reply is a weak_ptr: have to lock it to check it.
+ auto rptr = reply.lock();
+ if (rptr)
+ {
+ // Only post reply lambda if the originating WorkQueue
+ // still exists. If not -- who would we tell? Log it?
+ try
+ {
+ rptr->post(std::move(rlambda));
+ }
+ catch (const Closed&)
+ {
+ // Originating WorkQueue might still exist, but
+ // might be Closed. Same thing: just discard the
+ // callback.
+ }
+ }
+ });
+ // looks like we were able to post()
+ return true;
+ }
+
} // namespace LL
#endif /* ! defined(LL_WORKQUEUE_H) */