diff options
Diffstat (limited to 'indra/llcommon/lleventcoro.cpp')
-rw-r--r-- | indra/llcommon/lleventcoro.cpp | 419 |
1 files changed, 180 insertions, 239 deletions
diff --git a/indra/llcommon/lleventcoro.cpp b/indra/llcommon/lleventcoro.cpp index 56367b8f54..995356dc52 100644 --- a/indra/llcommon/lleventcoro.cpp +++ b/indra/llcommon/lleventcoro.cpp @@ -31,17 +31,17 @@ // associated header #include "lleventcoro.h" // STL headers -#include <map> +#include <chrono> +#include <exception> // std headers // external library headers +#include <boost/fiber/operations.hpp> // other Linden headers #include "llsdserialize.h" +#include "llsdutil.h" #include "llerror.h" #include "llcoros.h" -#include "llmake.h" -#include "llexception.h" - -#include "lleventfilter.h" +#include "stringize.h" namespace { @@ -62,7 +62,7 @@ namespace std::string listenerNameForCoro() { // If this coroutine was launched by LLCoros::launch(), find that name. - std::string name(LLCoros::instance().getName()); + std::string name(LLCoros::getName()); if (! name.empty()) { return name; @@ -92,137 +92,173 @@ std::string listenerNameForCoro() * In the degenerate case in which @a path is an empty array, @a dest will * @em become @a value rather than @em containing it. */ -void storeToLLSDPath(LLSD& dest, const LLSD& rawPath, const LLSD& value) +void storeToLLSDPath(LLSD& dest, const LLSD& path, const LLSD& value) { - if (rawPath.isUndefined()) + if (path.isUndefined()) { // no-op case return; } - // Arrange to treat rawPath uniformly as an array. If it's not already an - // array, store it as the only entry in one. - LLSD path; - if (rawPath.isArray()) - { - path = rawPath; - } - else - { - path.append(rawPath); - } - - // Need to indicate a current destination -- but that current destination - // needs to change as we step through the path array. Where normally we'd - // use an LLSD& to capture a subscripted LLSD lvalue, this time we must - // instead use a pointer -- since it must be reassigned. - LLSD* pdest = &dest; - - // Now loop through that array - for (LLSD::Integer i = 0; i < path.size(); ++i) - { - if (path[i].isString()) - { - // *pdest is an LLSD map - pdest = &((*pdest)[path[i].asString()]); - } - else if (path[i].isInteger()) - { - // *pdest is an LLSD array - pdest = &((*pdest)[path[i].asInteger()]); - } - else - { - // What do we do with Real or Array or Map or ...? - // As it's a coder error -- not a user error -- rub the coder's - // face in it so it gets fixed. - LL_ERRS("lleventcoro") << "storeToLLSDPath(" << dest << ", " << rawPath << ", " << value - << "): path[" << i << "] bad type " << path[i].type() << LL_ENDL; - } - } - - // Here *pdest is where we should store value. - *pdest = value; + // Drill down to where we should store 'value'. + llsd::drill(dest, path) = value; } -/// For LLCoros::Future<LLSD>::make_callback(), the callback has a signature -/// like void callback(LLSD), which isn't a valid LLEventPump listener: such -/// listeners must return bool. -template <typename LISTENER> -class FutureListener -{ -public: - // FutureListener is instantiated on the coroutine stack: the stack, in - // other words, that wants to suspend. - FutureListener(const LISTENER& listener): - mListener(listener), - // Capture the suspending coroutine's flag as a consuming or - // non-consuming listener. - mConsume(LLCoros::get_consuming()) - {} - - // operator()() is called on the main stack: the stack on which the - // expected event is fired. - bool operator()(const LLSD& event) - { - mListener(event); - // tell upstream LLEventPump whether listener consumed - return mConsume; - } - -protected: - LISTENER mListener; - bool mConsume; -}; - } // anonymous void llcoro::suspend() { - // By viewer convention, we post an event on the "mainloop" LLEventPump - // each iteration of the main event-handling loop. So waiting for a single - // event on "mainloop" gives us a one-frame suspend. - suspendUntilEventOn("mainloop"); + LLCoros::checkStop(); + LLCoros::TempStatus st("waiting one tick"); + boost::this_fiber::yield(); } void llcoro::suspendUntilTimeout(float seconds) { - LLEventTimeout timeout; - - timeout.eventAfter(seconds, LLSD()); - llcoro::suspendUntilEventOn(timeout); + LLCoros::checkStop(); + // We used to call boost::this_fiber::sleep_for(). But some coroutines + // (e.g. LLExperienceCache::idleCoro()) sit in a suspendUntilTimeout() + // loop, in which case a sleep_for() call risks sleeping through shutdown. + // So instead, listen for "LLApp" state-changing events -- which + // fortunately is handled for us by suspendUntilEventOnWithTimeout(). + // Wait for an event on a bogus LLEventPump on which nobody ever posts + // events. Don't make it static because that would force instantiation of + // the LLEventPumps LLSingleton registry at static initialization time. + // DO allow tweaking the name for uniqueness, this definitely gets + // re-entered on multiple coroutines! + // We could use an LLUUID if it were important to actively prohibit anyone + // from ever posting on this LLEventPump. + LLEventStream bogus("xyzzy", true); + // Timeout is the NORMAL case for this call! + static LLSD timedout; + // Deliver, but ignore, timedout when (as usual) we did not receive any + // "LLApp" event. The point is that suspendUntilEventOnWithTimeout() will + // itself throw Stopping when "LLApp" starts broadcasting shutdown events. + suspendUntilEventOnWithTimeout(bogus, seconds, timedout); } -LLSD llcoro::postAndSuspend(const LLSD& event, const LLEventPumpOrPumpName& requestPump, - const LLEventPumpOrPumpName& replyPump, const LLSD& replyPumpNamePath) +namespace { - // declare the future - LLCoros::Future<LLSD> future; - // make a callback that will assign a value to the future, and listen on - // the specified LLEventPump with that callback - std::string listenerName(listenerNameForCoro()); - LLTempBoundListener connection( - replyPump.getPump().listen(listenerName, - llmake<FutureListener>(future.make_callback()))); + +// returns a listener on replyPumpP, also on "mainloop" -- both should be +// stored in LLTempBoundListeners on the caller's stack frame +std::pair<LLBoundListener, LLBoundListener> +postAndSuspendSetup(const std::string& callerName, + const std::string& listenerName, + LLCoros::Promise<LLSD>& promise, + const LLSD& event, + const LLEventPumpOrPumpName& requestPumpP, + const LLEventPumpOrPumpName& replyPumpP, + const LLSD& replyPumpNamePath) +{ + // Before we get any farther -- should we be stopping instead of + // suspending? + LLCoros::checkStop(); + // Get the consuming attribute for THIS coroutine, the one that's about to + // suspend. Don't call get_consuming() in the lambda body: that would + // return the consuming attribute for some other coroutine, most likely + // the main routine. + bool consuming(LLCoros::get_consuming()); + // listen on the specified LLEventPump with a lambda that will assign a + // value to the promise, thus fulfilling its future + llassert_always_msg(replyPumpP, ("replyPump required for " + callerName)); + LLEventPump& replyPump(replyPumpP.getPump()); + // The relative order of the two listen() calls below would only matter if + // "LLApp" were an LLEventMailDrop. But if we ever go there, we'd want to + // notice the pending LLApp status first. + LLBoundListener stopper( + LLEventPumps::instance().obtain("LLApp").listen( + listenerName, + [&promise, listenerName](const LLSD& status) + { + // anything except "running" should wake up the waiting + // coroutine + auto& statsd = status["status"]; + if (statsd.asString() != "running") + { + LL_DEBUGS("lleventcoro") << listenerName + << " spotted status " << statsd + << ", throwing Stopping" << LL_ENDL; + try + { + promise.set_exception( + std::make_exception_ptr( + LLCoros::Stopping("status " + statsd.asString()))); + } + catch (const boost::fibers::promise_already_satisfied&) + { + LL_WARNS("lleventcoro") << listenerName + << " couldn't throw Stopping " + "because promise already set" << LL_ENDL; + } + } + // do not consume -- every listener must see status + return false; + })); + LLBoundListener connection( + replyPump.listen( + listenerName, + [&promise, consuming, listenerName](const LLSD& result) + { + try + { + promise.set_value(result); + // We did manage to propagate the result value to the + // (real) listener. If we're supposed to indicate that + // we've consumed it, do so. + return consuming; + } + catch(boost::fibers::promise_already_satisfied & ex) + { + LL_DEBUGS("lleventcoro") << "promise already satisfied in '" + << listenerName << "': " << ex.what() << LL_ENDL; + // We could not propagate the result value to the + // listener. + return false; + } + })); + // skip the "post" part if requestPump is default-constructed - if (requestPump) + if (requestPumpP) { + LLEventPump& requestPump(requestPumpP.getPump()); // If replyPumpNamePath is non-empty, store the replyPump name in the // request event. LLSD modevent(event); - storeToLLSDPath(modevent, replyPumpNamePath, replyPump.getPump().getName()); - LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << listenerName - << " posting to " << requestPump.getPump().getName() + storeToLLSDPath(modevent, replyPumpNamePath, replyPump.getName()); + LL_DEBUGS("lleventcoro") << callerName << ": coroutine " << listenerName + << " posting to " << requestPump.getName() << LL_ENDL; // *NOTE:Mani - Removed because modevent could contain user's hashed passwd. // << ": " << modevent << LL_ENDL; - requestPump.getPump().post(modevent); + requestPump.post(modevent); } - LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << listenerName - << " about to wait on LLEventPump " << replyPump.getPump().getName() + LL_DEBUGS("lleventcoro") << callerName << ": coroutine " << listenerName + << " about to wait on LLEventPump " << replyPump.getName() << LL_ENDL; + return { connection, stopper }; +} + +} // anonymous + +LLSD llcoro::postAndSuspend(const LLSD& event, const LLEventPumpOrPumpName& requestPump, + const LLEventPumpOrPumpName& replyPump, const LLSD& replyPumpNamePath) +{ + LLCoros::Promise<LLSD> promise; + std::string listenerName(listenerNameForCoro()); + + // Store both connections into LLTempBoundListeners so we implicitly + // disconnect on return from this function. + auto connections = + postAndSuspendSetup("postAndSuspend()", listenerName, promise, + event, requestPump, replyPump, replyPumpNamePath); + LLTempBoundListener connection(connections.first), stopper(connections.second); + + // declare the future + LLCoros::Future<LLSD> future = LLCoros::getFuture(promise); // calling get() on the future makes us wait for it + LLCoros::TempStatus st(STRINGIZE("waiting for " << replyPump.getPump().getName())); LLSD value(future.get()); LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << listenerName << " resuming with " << value << LL_ENDL; @@ -230,147 +266,52 @@ LLSD llcoro::postAndSuspend(const LLSD& event, const LLEventPumpOrPumpName& requ return value; } -LLSD llcoro::suspendUntilEventOnWithTimeout(const LLEventPumpOrPumpName& suspendPumpOrName, - F32 timeoutin, const LLSD &timeoutResult) -{ - /** - * The timeout pump is attached upstream of of the waiting pump and will - * pass the timeout event through it. We CAN NOT attach downstream since - * doing so will cause the suspendPump to fire any waiting events immediately - * and they will be lost. This becomes especially problematic with the - * LLEventTimeout(pump) constructor which will also attempt to fire those - * events using the virtual listen_impl method in the not yet fully constructed - * timeoutPump. - */ - LLEventTimeout timeoutPump; - LLEventPump &suspendPump = suspendPumpOrName.getPump(); - - LLTempBoundListener timeoutListener(timeoutPump.listen(suspendPump.getName(), - boost::bind(&LLEventPump::post, &suspendPump, _1))); - - timeoutPump.eventAfter(timeoutin, timeoutResult); - return llcoro::suspendUntilEventOn(suspendPump); -} - -namespace -{ - -/** - * This helper is specifically for postAndSuspend2(). We use a single future - * object, but we want to listen on two pumps with it. Since we must still - * adapt from the callable constructed by boost::dcoroutines::make_callback() - * (void return) to provide an event listener (bool return), we've adapted - * FutureListener for the purpose. The basic idea is that we construct a - * distinct instance of FutureListener2 -- binding different instance data -- - * for each of the pumps. Then, when a pump delivers an LLSD value to either - * FutureListener2, it can combine that LLSD with its discriminator to feed - * the future object. - * - * DISCRIM is a template argument so we can use llmake() rather than - * having to write our own argument-deducing helper function. - */ -template <typename LISTENER, typename DISCRIM> -class FutureListener2: public FutureListener<LISTENER> +LLSD llcoro::postAndSuspendWithTimeout(const LLSD& event, + const LLEventPumpOrPumpName& requestPump, + const LLEventPumpOrPumpName& replyPump, + const LLSD& replyPumpNamePath, + F32 timeout, const LLSD& timeoutResult) { - typedef FutureListener<LISTENER> super; - -public: - // instantiated on coroutine stack: the stack about to suspend - FutureListener2(const LISTENER& listener, DISCRIM discriminator): - super(listener), - mDiscrim(discriminator) - {} - - // called on main stack: the stack on which event is fired - bool operator()(const LLSD& event) - { - // our future object is defined to accept LLEventWithID - super::mListener(LLEventWithID(event, mDiscrim)); - // tell LLEventPump whether or not event was consumed - return super::mConsume; - } - -private: - const DISCRIM mDiscrim; -}; + LLCoros::Promise<LLSD> promise; + std::string listenerName(listenerNameForCoro()); -} // anonymous + // Store both connections into LLTempBoundListeners so we implicitly + // disconnect on return from this function. + auto connections = + postAndSuspendSetup("postAndSuspendWithTimeout()", listenerName, promise, + event, requestPump, replyPump, replyPumpNamePath); + LLTempBoundListener connection(connections.first), stopper(connections.second); -namespace llcoro -{ - -LLEventWithID postAndSuspend2(const LLSD& event, - const LLEventPumpOrPumpName& requestPump, - const LLEventPumpOrPumpName& replyPump0, - const LLEventPumpOrPumpName& replyPump1, - const LLSD& replyPump0NamePath, - const LLSD& replyPump1NamePath) -{ // declare the future - LLCoros::Future<LLEventWithID> future; - // either callback will assign a value to this future; listen on - // each specified LLEventPump with a callback - std::string name(listenerNameForCoro()); - LLTempBoundListener connection0( - replyPump0.getPump().listen( - name + "a", - llmake<FutureListener2>(future.make_callback(), 0))); - LLTempBoundListener connection1( - replyPump1.getPump().listen( - name + "b", - llmake<FutureListener2>(future.make_callback(), 1))); - // skip the "post" part if requestPump is default-constructed - if (requestPump) + LLCoros::Future<LLSD> future = LLCoros::getFuture(promise); + // wait for specified timeout + boost::fibers::future_status status; { - // If either replyPumpNamePath is non-empty, store the corresponding - // replyPump name in the request event. - LLSD modevent(event); - storeToLLSDPath(modevent, replyPump0NamePath, - replyPump0.getPump().getName()); - storeToLLSDPath(modevent, replyPump1NamePath, - replyPump1.getPump().getName()); - LL_DEBUGS("lleventcoro") << "postAndSuspend2(): coroutine " << name - << " posting to " << requestPump.getPump().getName() - << ": " << modevent << LL_ENDL; - requestPump.getPump().post(modevent); + LLCoros::TempStatus st(STRINGIZE("waiting for " << replyPump.getPump().getName() + << " for " << timeout << "s")); + // The fact that we accept non-integer seconds means we should probably + // use granularity finer than one second. However, given the overhead of + // the rest of our processing, it seems silly to use granularity finer + // than a millisecond. + status = future.wait_for(std::chrono::milliseconds(long(timeout * 1000))); } - LL_DEBUGS("lleventcoro") << "postAndSuspend2(): coroutine " << name - << " about to wait on LLEventPumps " << replyPump0.getPump().getName() - << ", " << replyPump1.getPump().getName() << LL_ENDL; - // calling get() on the future makes us wait for it - LLEventWithID value(future.get()); - LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << name - << " resuming with (" << value.first << ", " << value.second << ")" - << LL_ENDL; - // returning should disconnect both connections - return value; -} - -LLSD errorException(const LLEventWithID& result, const std::string& desc) -{ - // If the result arrived on the error pump (pump 1), instead of - // returning it, deliver it via exception. - if (result.second) + // if the future is NOT yet ready, return timeoutResult instead + if (status == boost::fibers::future_status::timeout) { - LLTHROW(LLErrorEvent(desc, result.first)); + LL_DEBUGS("lleventcoro") << "postAndSuspendWithTimeout(): coroutine " << listenerName + << " timed out after " << timeout << " seconds," + << " resuming with " << timeoutResult << LL_ENDL; + return timeoutResult; } - // That way, our caller knows a simple return must be from the reply - // pump (pump 0). - return result.first; -} - -LLSD errorLog(const LLEventWithID& result, const std::string& desc) -{ - // If the result arrived on the error pump (pump 1), log it as a fatal - // error. - if (result.second) + else { - LL_ERRS("errorLog") << desc << ":" << std::endl; - LLSDSerialize::toPrettyXML(result.first, LL_CONT); - LL_CONT << LL_ENDL; + llassert_always(status == boost::fibers::future_status::ready); + + // future is now ready, no more waiting + LLSD value(future.get()); + LL_DEBUGS("lleventcoro") << "postAndSuspendWithTimeout(): coroutine " << listenerName + << " resuming with " << value << LL_ENDL; + // returning should disconnect the connection + return value; } - // A simple return must therefore be from the reply pump (pump 0). - return result.first; } - -} // namespace llcoro |