summaryrefslogtreecommitdiff
path: root/indra/llcommon/lleventcoro.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llcommon/lleventcoro.cpp')
-rw-r--r--indra/llcommon/lleventcoro.cpp419
1 files changed, 180 insertions, 239 deletions
diff --git a/indra/llcommon/lleventcoro.cpp b/indra/llcommon/lleventcoro.cpp
index 56367b8f54..995356dc52 100644
--- a/indra/llcommon/lleventcoro.cpp
+++ b/indra/llcommon/lleventcoro.cpp
@@ -31,17 +31,17 @@
// associated header
#include "lleventcoro.h"
// STL headers
-#include <map>
+#include <chrono>
+#include <exception>
// std headers
// external library headers
+#include <boost/fiber/operations.hpp>
// other Linden headers
#include "llsdserialize.h"
+#include "llsdutil.h"
#include "llerror.h"
#include "llcoros.h"
-#include "llmake.h"
-#include "llexception.h"
-
-#include "lleventfilter.h"
+#include "stringize.h"
namespace
{
@@ -62,7 +62,7 @@ namespace
std::string listenerNameForCoro()
{
// If this coroutine was launched by LLCoros::launch(), find that name.
- std::string name(LLCoros::instance().getName());
+ std::string name(LLCoros::getName());
if (! name.empty())
{
return name;
@@ -92,137 +92,173 @@ std::string listenerNameForCoro()
* In the degenerate case in which @a path is an empty array, @a dest will
* @em become @a value rather than @em containing it.
*/
-void storeToLLSDPath(LLSD& dest, const LLSD& rawPath, const LLSD& value)
+void storeToLLSDPath(LLSD& dest, const LLSD& path, const LLSD& value)
{
- if (rawPath.isUndefined())
+ if (path.isUndefined())
{
// no-op case
return;
}
- // Arrange to treat rawPath uniformly as an array. If it's not already an
- // array, store it as the only entry in one.
- LLSD path;
- if (rawPath.isArray())
- {
- path = rawPath;
- }
- else
- {
- path.append(rawPath);
- }
-
- // Need to indicate a current destination -- but that current destination
- // needs to change as we step through the path array. Where normally we'd
- // use an LLSD& to capture a subscripted LLSD lvalue, this time we must
- // instead use a pointer -- since it must be reassigned.
- LLSD* pdest = &dest;
-
- // Now loop through that array
- for (LLSD::Integer i = 0; i < path.size(); ++i)
- {
- if (path[i].isString())
- {
- // *pdest is an LLSD map
- pdest = &((*pdest)[path[i].asString()]);
- }
- else if (path[i].isInteger())
- {
- // *pdest is an LLSD array
- pdest = &((*pdest)[path[i].asInteger()]);
- }
- else
- {
- // What do we do with Real or Array or Map or ...?
- // As it's a coder error -- not a user error -- rub the coder's
- // face in it so it gets fixed.
- LL_ERRS("lleventcoro") << "storeToLLSDPath(" << dest << ", " << rawPath << ", " << value
- << "): path[" << i << "] bad type " << path[i].type() << LL_ENDL;
- }
- }
-
- // Here *pdest is where we should store value.
- *pdest = value;
+ // Drill down to where we should store 'value'.
+ llsd::drill(dest, path) = value;
}
-/// For LLCoros::Future<LLSD>::make_callback(), the callback has a signature
-/// like void callback(LLSD), which isn't a valid LLEventPump listener: such
-/// listeners must return bool.
-template <typename LISTENER>
-class FutureListener
-{
-public:
- // FutureListener is instantiated on the coroutine stack: the stack, in
- // other words, that wants to suspend.
- FutureListener(const LISTENER& listener):
- mListener(listener),
- // Capture the suspending coroutine's flag as a consuming or
- // non-consuming listener.
- mConsume(LLCoros::get_consuming())
- {}
-
- // operator()() is called on the main stack: the stack on which the
- // expected event is fired.
- bool operator()(const LLSD& event)
- {
- mListener(event);
- // tell upstream LLEventPump whether listener consumed
- return mConsume;
- }
-
-protected:
- LISTENER mListener;
- bool mConsume;
-};
-
} // anonymous
void llcoro::suspend()
{
- // By viewer convention, we post an event on the "mainloop" LLEventPump
- // each iteration of the main event-handling loop. So waiting for a single
- // event on "mainloop" gives us a one-frame suspend.
- suspendUntilEventOn("mainloop");
+ LLCoros::checkStop();
+ LLCoros::TempStatus st("waiting one tick");
+ boost::this_fiber::yield();
}
void llcoro::suspendUntilTimeout(float seconds)
{
- LLEventTimeout timeout;
-
- timeout.eventAfter(seconds, LLSD());
- llcoro::suspendUntilEventOn(timeout);
+ LLCoros::checkStop();
+ // We used to call boost::this_fiber::sleep_for(). But some coroutines
+ // (e.g. LLExperienceCache::idleCoro()) sit in a suspendUntilTimeout()
+ // loop, in which case a sleep_for() call risks sleeping through shutdown.
+ // So instead, listen for "LLApp" state-changing events -- which
+ // fortunately is handled for us by suspendUntilEventOnWithTimeout().
+ // Wait for an event on a bogus LLEventPump on which nobody ever posts
+ // events. Don't make it static because that would force instantiation of
+ // the LLEventPumps LLSingleton registry at static initialization time.
+ // DO allow tweaking the name for uniqueness, this definitely gets
+ // re-entered on multiple coroutines!
+ // We could use an LLUUID if it were important to actively prohibit anyone
+ // from ever posting on this LLEventPump.
+ LLEventStream bogus("xyzzy", true);
+ // Timeout is the NORMAL case for this call!
+ static LLSD timedout;
+ // Deliver, but ignore, timedout when (as usual) we did not receive any
+ // "LLApp" event. The point is that suspendUntilEventOnWithTimeout() will
+ // itself throw Stopping when "LLApp" starts broadcasting shutdown events.
+ suspendUntilEventOnWithTimeout(bogus, seconds, timedout);
}
-LLSD llcoro::postAndSuspend(const LLSD& event, const LLEventPumpOrPumpName& requestPump,
- const LLEventPumpOrPumpName& replyPump, const LLSD& replyPumpNamePath)
+namespace
{
- // declare the future
- LLCoros::Future<LLSD> future;
- // make a callback that will assign a value to the future, and listen on
- // the specified LLEventPump with that callback
- std::string listenerName(listenerNameForCoro());
- LLTempBoundListener connection(
- replyPump.getPump().listen(listenerName,
- llmake<FutureListener>(future.make_callback())));
+
+// returns a listener on replyPumpP, also on "mainloop" -- both should be
+// stored in LLTempBoundListeners on the caller's stack frame
+std::pair<LLBoundListener, LLBoundListener>
+postAndSuspendSetup(const std::string& callerName,
+ const std::string& listenerName,
+ LLCoros::Promise<LLSD>& promise,
+ const LLSD& event,
+ const LLEventPumpOrPumpName& requestPumpP,
+ const LLEventPumpOrPumpName& replyPumpP,
+ const LLSD& replyPumpNamePath)
+{
+ // Before we get any farther -- should we be stopping instead of
+ // suspending?
+ LLCoros::checkStop();
+ // Get the consuming attribute for THIS coroutine, the one that's about to
+ // suspend. Don't call get_consuming() in the lambda body: that would
+ // return the consuming attribute for some other coroutine, most likely
+ // the main routine.
+ bool consuming(LLCoros::get_consuming());
+ // listen on the specified LLEventPump with a lambda that will assign a
+ // value to the promise, thus fulfilling its future
+ llassert_always_msg(replyPumpP, ("replyPump required for " + callerName));
+ LLEventPump& replyPump(replyPumpP.getPump());
+ // The relative order of the two listen() calls below would only matter if
+ // "LLApp" were an LLEventMailDrop. But if we ever go there, we'd want to
+ // notice the pending LLApp status first.
+ LLBoundListener stopper(
+ LLEventPumps::instance().obtain("LLApp").listen(
+ listenerName,
+ [&promise, listenerName](const LLSD& status)
+ {
+ // anything except "running" should wake up the waiting
+ // coroutine
+ auto& statsd = status["status"];
+ if (statsd.asString() != "running")
+ {
+ LL_DEBUGS("lleventcoro") << listenerName
+ << " spotted status " << statsd
+ << ", throwing Stopping" << LL_ENDL;
+ try
+ {
+ promise.set_exception(
+ std::make_exception_ptr(
+ LLCoros::Stopping("status " + statsd.asString())));
+ }
+ catch (const boost::fibers::promise_already_satisfied&)
+ {
+ LL_WARNS("lleventcoro") << listenerName
+ << " couldn't throw Stopping "
+ "because promise already set" << LL_ENDL;
+ }
+ }
+ // do not consume -- every listener must see status
+ return false;
+ }));
+ LLBoundListener connection(
+ replyPump.listen(
+ listenerName,
+ [&promise, consuming, listenerName](const LLSD& result)
+ {
+ try
+ {
+ promise.set_value(result);
+ // We did manage to propagate the result value to the
+ // (real) listener. If we're supposed to indicate that
+ // we've consumed it, do so.
+ return consuming;
+ }
+ catch(boost::fibers::promise_already_satisfied & ex)
+ {
+ LL_DEBUGS("lleventcoro") << "promise already satisfied in '"
+ << listenerName << "': " << ex.what() << LL_ENDL;
+ // We could not propagate the result value to the
+ // listener.
+ return false;
+ }
+ }));
+
// skip the "post" part if requestPump is default-constructed
- if (requestPump)
+ if (requestPumpP)
{
+ LLEventPump& requestPump(requestPumpP.getPump());
// If replyPumpNamePath is non-empty, store the replyPump name in the
// request event.
LLSD modevent(event);
- storeToLLSDPath(modevent, replyPumpNamePath, replyPump.getPump().getName());
- LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << listenerName
- << " posting to " << requestPump.getPump().getName()
+ storeToLLSDPath(modevent, replyPumpNamePath, replyPump.getName());
+ LL_DEBUGS("lleventcoro") << callerName << ": coroutine " << listenerName
+ << " posting to " << requestPump.getName()
<< LL_ENDL;
// *NOTE:Mani - Removed because modevent could contain user's hashed passwd.
// << ": " << modevent << LL_ENDL;
- requestPump.getPump().post(modevent);
+ requestPump.post(modevent);
}
- LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << listenerName
- << " about to wait on LLEventPump " << replyPump.getPump().getName()
+ LL_DEBUGS("lleventcoro") << callerName << ": coroutine " << listenerName
+ << " about to wait on LLEventPump " << replyPump.getName()
<< LL_ENDL;
+ return { connection, stopper };
+}
+
+} // anonymous
+
+LLSD llcoro::postAndSuspend(const LLSD& event, const LLEventPumpOrPumpName& requestPump,
+ const LLEventPumpOrPumpName& replyPump, const LLSD& replyPumpNamePath)
+{
+ LLCoros::Promise<LLSD> promise;
+ std::string listenerName(listenerNameForCoro());
+
+ // Store both connections into LLTempBoundListeners so we implicitly
+ // disconnect on return from this function.
+ auto connections =
+ postAndSuspendSetup("postAndSuspend()", listenerName, promise,
+ event, requestPump, replyPump, replyPumpNamePath);
+ LLTempBoundListener connection(connections.first), stopper(connections.second);
+
+ // declare the future
+ LLCoros::Future<LLSD> future = LLCoros::getFuture(promise);
// calling get() on the future makes us wait for it
+ LLCoros::TempStatus st(STRINGIZE("waiting for " << replyPump.getPump().getName()));
LLSD value(future.get());
LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << listenerName
<< " resuming with " << value << LL_ENDL;
@@ -230,147 +266,52 @@ LLSD llcoro::postAndSuspend(const LLSD& event, const LLEventPumpOrPumpName& requ
return value;
}
-LLSD llcoro::suspendUntilEventOnWithTimeout(const LLEventPumpOrPumpName& suspendPumpOrName,
- F32 timeoutin, const LLSD &timeoutResult)
-{
- /**
- * The timeout pump is attached upstream of of the waiting pump and will
- * pass the timeout event through it. We CAN NOT attach downstream since
- * doing so will cause the suspendPump to fire any waiting events immediately
- * and they will be lost. This becomes especially problematic with the
- * LLEventTimeout(pump) constructor which will also attempt to fire those
- * events using the virtual listen_impl method in the not yet fully constructed
- * timeoutPump.
- */
- LLEventTimeout timeoutPump;
- LLEventPump &suspendPump = suspendPumpOrName.getPump();
-
- LLTempBoundListener timeoutListener(timeoutPump.listen(suspendPump.getName(),
- boost::bind(&LLEventPump::post, &suspendPump, _1)));
-
- timeoutPump.eventAfter(timeoutin, timeoutResult);
- return llcoro::suspendUntilEventOn(suspendPump);
-}
-
-namespace
-{
-
-/**
- * This helper is specifically for postAndSuspend2(). We use a single future
- * object, but we want to listen on two pumps with it. Since we must still
- * adapt from the callable constructed by boost::dcoroutines::make_callback()
- * (void return) to provide an event listener (bool return), we've adapted
- * FutureListener for the purpose. The basic idea is that we construct a
- * distinct instance of FutureListener2 -- binding different instance data --
- * for each of the pumps. Then, when a pump delivers an LLSD value to either
- * FutureListener2, it can combine that LLSD with its discriminator to feed
- * the future object.
- *
- * DISCRIM is a template argument so we can use llmake() rather than
- * having to write our own argument-deducing helper function.
- */
-template <typename LISTENER, typename DISCRIM>
-class FutureListener2: public FutureListener<LISTENER>
+LLSD llcoro::postAndSuspendWithTimeout(const LLSD& event,
+ const LLEventPumpOrPumpName& requestPump,
+ const LLEventPumpOrPumpName& replyPump,
+ const LLSD& replyPumpNamePath,
+ F32 timeout, const LLSD& timeoutResult)
{
- typedef FutureListener<LISTENER> super;
-
-public:
- // instantiated on coroutine stack: the stack about to suspend
- FutureListener2(const LISTENER& listener, DISCRIM discriminator):
- super(listener),
- mDiscrim(discriminator)
- {}
-
- // called on main stack: the stack on which event is fired
- bool operator()(const LLSD& event)
- {
- // our future object is defined to accept LLEventWithID
- super::mListener(LLEventWithID(event, mDiscrim));
- // tell LLEventPump whether or not event was consumed
- return super::mConsume;
- }
-
-private:
- const DISCRIM mDiscrim;
-};
+ LLCoros::Promise<LLSD> promise;
+ std::string listenerName(listenerNameForCoro());
-} // anonymous
+ // Store both connections into LLTempBoundListeners so we implicitly
+ // disconnect on return from this function.
+ auto connections =
+ postAndSuspendSetup("postAndSuspendWithTimeout()", listenerName, promise,
+ event, requestPump, replyPump, replyPumpNamePath);
+ LLTempBoundListener connection(connections.first), stopper(connections.second);
-namespace llcoro
-{
-
-LLEventWithID postAndSuspend2(const LLSD& event,
- const LLEventPumpOrPumpName& requestPump,
- const LLEventPumpOrPumpName& replyPump0,
- const LLEventPumpOrPumpName& replyPump1,
- const LLSD& replyPump0NamePath,
- const LLSD& replyPump1NamePath)
-{
// declare the future
- LLCoros::Future<LLEventWithID> future;
- // either callback will assign a value to this future; listen on
- // each specified LLEventPump with a callback
- std::string name(listenerNameForCoro());
- LLTempBoundListener connection0(
- replyPump0.getPump().listen(
- name + "a",
- llmake<FutureListener2>(future.make_callback(), 0)));
- LLTempBoundListener connection1(
- replyPump1.getPump().listen(
- name + "b",
- llmake<FutureListener2>(future.make_callback(), 1)));
- // skip the "post" part if requestPump is default-constructed
- if (requestPump)
+ LLCoros::Future<LLSD> future = LLCoros::getFuture(promise);
+ // wait for specified timeout
+ boost::fibers::future_status status;
{
- // If either replyPumpNamePath is non-empty, store the corresponding
- // replyPump name in the request event.
- LLSD modevent(event);
- storeToLLSDPath(modevent, replyPump0NamePath,
- replyPump0.getPump().getName());
- storeToLLSDPath(modevent, replyPump1NamePath,
- replyPump1.getPump().getName());
- LL_DEBUGS("lleventcoro") << "postAndSuspend2(): coroutine " << name
- << " posting to " << requestPump.getPump().getName()
- << ": " << modevent << LL_ENDL;
- requestPump.getPump().post(modevent);
+ LLCoros::TempStatus st(STRINGIZE("waiting for " << replyPump.getPump().getName()
+ << " for " << timeout << "s"));
+ // The fact that we accept non-integer seconds means we should probably
+ // use granularity finer than one second. However, given the overhead of
+ // the rest of our processing, it seems silly to use granularity finer
+ // than a millisecond.
+ status = future.wait_for(std::chrono::milliseconds(long(timeout * 1000)));
}
- LL_DEBUGS("lleventcoro") << "postAndSuspend2(): coroutine " << name
- << " about to wait on LLEventPumps " << replyPump0.getPump().getName()
- << ", " << replyPump1.getPump().getName() << LL_ENDL;
- // calling get() on the future makes us wait for it
- LLEventWithID value(future.get());
- LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << name
- << " resuming with (" << value.first << ", " << value.second << ")"
- << LL_ENDL;
- // returning should disconnect both connections
- return value;
-}
-
-LLSD errorException(const LLEventWithID& result, const std::string& desc)
-{
- // If the result arrived on the error pump (pump 1), instead of
- // returning it, deliver it via exception.
- if (result.second)
+ // if the future is NOT yet ready, return timeoutResult instead
+ if (status == boost::fibers::future_status::timeout)
{
- LLTHROW(LLErrorEvent(desc, result.first));
+ LL_DEBUGS("lleventcoro") << "postAndSuspendWithTimeout(): coroutine " << listenerName
+ << " timed out after " << timeout << " seconds,"
+ << " resuming with " << timeoutResult << LL_ENDL;
+ return timeoutResult;
}
- // That way, our caller knows a simple return must be from the reply
- // pump (pump 0).
- return result.first;
-}
-
-LLSD errorLog(const LLEventWithID& result, const std::string& desc)
-{
- // If the result arrived on the error pump (pump 1), log it as a fatal
- // error.
- if (result.second)
+ else
{
- LL_ERRS("errorLog") << desc << ":" << std::endl;
- LLSDSerialize::toPrettyXML(result.first, LL_CONT);
- LL_CONT << LL_ENDL;
+ llassert_always(status == boost::fibers::future_status::ready);
+
+ // future is now ready, no more waiting
+ LLSD value(future.get());
+ LL_DEBUGS("lleventcoro") << "postAndSuspendWithTimeout(): coroutine " << listenerName
+ << " resuming with " << value << LL_ENDL;
+ // returning should disconnect the connection
+ return value;
}
- // A simple return must therefore be from the reply pump (pump 0).
- return result.first;
}
-
-} // namespace llcoro