diff options
Diffstat (limited to 'indra/llcommon/lleventcoro.cpp')
-rw-r--r-- | indra/llcommon/lleventcoro.cpp | 268 |
1 files changed, 81 insertions, 187 deletions
diff --git a/indra/llcommon/lleventcoro.cpp b/indra/llcommon/lleventcoro.cpp index 43e41f250d..47d99f0050 100644 --- a/indra/llcommon/lleventcoro.cpp +++ b/indra/llcommon/lleventcoro.cpp @@ -31,18 +31,15 @@ // associated header #include "lleventcoro.h" // STL headers -#include <map> +#include <chrono> // std headers // external library headers +#include <boost/fiber/operations.hpp> // other Linden headers #include "llsdserialize.h" #include "llsdutil.h" #include "llerror.h" #include "llcoros.h" -#include "llmake.h" -#include "llexception.h" - -#include "lleventfilter.h" namespace { @@ -105,65 +102,47 @@ void storeToLLSDPath(LLSD& dest, const LLSD& path, const LLSD& value) llsd::drill(dest, path) = value; } -/// For LLCoros::Future<LLSD>::make_callback(), the callback has a signature -/// like void callback(LLSD), which isn't a valid LLEventPump listener: such -/// listeners must return bool. -template <typename LISTENER> -class FutureListener -{ -public: - // FutureListener is instantiated on the coroutine stack: the stack, in - // other words, that wants to suspend. - FutureListener(const LISTENER& listener): - mListener(listener), - // Capture the suspending coroutine's flag as a consuming or - // non-consuming listener. - mConsume(LLCoros::get_consuming()) - {} - - // operator()() is called on the main stack: the stack on which the - // expected event is fired. - bool operator()(const LLSD& event) - { - mListener(event); - // tell upstream LLEventPump whether listener consumed - return mConsume; - } - -protected: - LISTENER mListener; - bool mConsume; -}; - } // anonymous void llcoro::suspend() { - // By viewer convention, we post an event on the "mainloop" LLEventPump - // each iteration of the main event-handling loop. So waiting for a single - // event on "mainloop" gives us a one-frame suspend. - suspendUntilEventOn("mainloop"); + boost::this_fiber::yield(); } void llcoro::suspendUntilTimeout(float seconds) { - LLEventTimeout timeout; - - timeout.eventAfter(seconds, LLSD()); - llcoro::suspendUntilEventOn(timeout); + // The fact that we accept non-integer seconds means we should probably + // use granularity finer than one second. However, given the overhead of + // the rest of our processing, it seems silly to use granularity finer + // than a millisecond. + boost::this_fiber::sleep_for(std::chrono::milliseconds(long(seconds * 1000))); } -LLSD llcoro::postAndSuspend(const LLSD& event, const LLEventPumpOrPumpName& requestPump, - const LLEventPumpOrPumpName& replyPump, const LLSD& replyPumpNamePath) +namespace { - // declare the future - LLCoros::Future<LLSD> future; + +LLBoundListener postAndSuspendSetup(const std::string& callerName, + const std::string& listenerName, + LLCoros::Promise<LLSD>& promise, + const LLSD& event, + const LLEventPumpOrPumpName& requestPump, + const LLEventPumpOrPumpName& replyPump, + const LLSD& replyPumpNamePath) +{ + // Get the consuming attribute for THIS coroutine, the one that's about to + // suspend. Don't call get_consuming() in the lambda body: that would + // return the consuming attribute for some other coroutine, most likely + // the main routine. + bool consuming(LLCoros::get_consuming()); // make a callback that will assign a value to the future, and listen on // the specified LLEventPump with that callback - std::string listenerName(listenerNameForCoro()); - LLTempBoundListener connection( + LLBoundListener connection( replyPump.getPump().listen(listenerName, - llmake<FutureListener>(future.make_callback()))); + [&promise, consuming](const LLSD& result) + { + promise.set_value(result); + return consuming; + })); // skip the "post" part if requestPump is default-constructed if (requestPump) { @@ -171,7 +150,7 @@ LLSD llcoro::postAndSuspend(const LLSD& event, const LLEventPumpOrPumpName& requ // request event. LLSD modevent(event); storeToLLSDPath(modevent, replyPumpNamePath, replyPump.getPump().getName()); - LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << listenerName + LL_DEBUGS("lleventcoro") << callerName << ": coroutine " << listenerName << " posting to " << requestPump.getPump().getName() << LL_ENDL; @@ -179,158 +158,73 @@ LLSD llcoro::postAndSuspend(const LLSD& event, const LLEventPumpOrPumpName& requ // << ": " << modevent << LL_ENDL; requestPump.getPump().post(modevent); } - LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << listenerName + LL_DEBUGS("lleventcoro") << callerName << ": coroutine " << listenerName << " about to wait on LLEventPump " << replyPump.getPump().getName() << LL_ENDL; - // calling get() on the future makes us wait for it - LLSD value(future.get()); - LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << listenerName - << " resuming with " << value << LL_ENDL; - // returning should disconnect the connection - return value; -} - -LLSD llcoro::suspendUntilEventOnWithTimeout(const LLEventPumpOrPumpName& suspendPumpOrName, - F32 timeoutin, const LLSD &timeoutResult) -{ - /** - * The timeout pump is attached upstream of of the waiting pump and will - * pass the timeout event through it. We CAN NOT attach downstream since - * doing so will cause the suspendPump to fire any waiting events immediately - * and they will be lost. This becomes especially problematic with the - * LLEventTimeout(pump) constructor which will also attempt to fire those - * events using the virtual listen_impl method in the not yet fully constructed - * timeoutPump. - */ - LLEventTimeout timeoutPump; - LLEventPump &suspendPump = suspendPumpOrName.getPump(); - - LLTempBoundListener timeoutListener(timeoutPump.listen(suspendPump.getName(), - boost::bind(&LLEventPump::post, &suspendPump, _1))); - - timeoutPump.eventAfter(timeoutin, timeoutResult); - return llcoro::suspendUntilEventOn(suspendPump); + return connection; } -namespace -{ - -/** - * This helper is specifically for postAndSuspend2(). We use a single future - * object, but we want to listen on two pumps with it. Since we must still - * adapt from the callable constructed by boost::dcoroutines::make_callback() - * (void return) to provide an event listener (bool return), we've adapted - * FutureListener for the purpose. The basic idea is that we construct a - * distinct instance of FutureListener2 -- binding different instance data -- - * for each of the pumps. Then, when a pump delivers an LLSD value to either - * FutureListener2, it can combine that LLSD with its discriminator to feed - * the future object. - * - * DISCRIM is a template argument so we can use llmake() rather than - * having to write our own argument-deducing helper function. - */ -template <typename LISTENER, typename DISCRIM> -class FutureListener2: public FutureListener<LISTENER> -{ - typedef FutureListener<LISTENER> super; - -public: - // instantiated on coroutine stack: the stack about to suspend - FutureListener2(const LISTENER& listener, DISCRIM discriminator): - super(listener), - mDiscrim(discriminator) - {} - - // called on main stack: the stack on which event is fired - bool operator()(const LLSD& event) - { - // our future object is defined to accept LLEventWithID - super::mListener(LLEventWithID(event, mDiscrim)); - // tell LLEventPump whether or not event was consumed - return super::mConsume; - } - -private: - const DISCRIM mDiscrim; -}; - } // anonymous -namespace llcoro +LLSD llcoro::postAndSuspend(const LLSD& event, const LLEventPumpOrPumpName& requestPump, + const LLEventPumpOrPumpName& replyPump, const LLSD& replyPumpNamePath) { + LLCoros::Promise<LLSD> promise; + std::string listenerName(listenerNameForCoro()); + + // Store connection into an LLTempBoundListener so we implicitly + // disconnect on return from this function. + LLTempBoundListener connection = + postAndSuspendSetup("postAndSuspend()", listenerName, promise, + event, requestPump, replyPump, replyPumpNamePath); -LLEventWithID postAndSuspend2(const LLSD& event, - const LLEventPumpOrPumpName& requestPump, - const LLEventPumpOrPumpName& replyPump0, - const LLEventPumpOrPumpName& replyPump1, - const LLSD& replyPump0NamePath, - const LLSD& replyPump1NamePath) -{ // declare the future - LLCoros::Future<LLEventWithID> future; - // either callback will assign a value to this future; listen on - // each specified LLEventPump with a callback - std::string name(listenerNameForCoro()); - LLTempBoundListener connection0( - replyPump0.getPump().listen( - name + "a", - llmake<FutureListener2>(future.make_callback(), 0))); - LLTempBoundListener connection1( - replyPump1.getPump().listen( - name + "b", - llmake<FutureListener2>(future.make_callback(), 1))); - // skip the "post" part if requestPump is default-constructed - if (requestPump) - { - // If either replyPumpNamePath is non-empty, store the corresponding - // replyPump name in the request event. - LLSD modevent(event); - storeToLLSDPath(modevent, replyPump0NamePath, - replyPump0.getPump().getName()); - storeToLLSDPath(modevent, replyPump1NamePath, - replyPump1.getPump().getName()); - LL_DEBUGS("lleventcoro") << "postAndSuspend2(): coroutine " << name - << " posting to " << requestPump.getPump().getName() - << ": " << modevent << LL_ENDL; - requestPump.getPump().post(modevent); - } - LL_DEBUGS("lleventcoro") << "postAndSuspend2(): coroutine " << name - << " about to wait on LLEventPumps " << replyPump0.getPump().getName() - << ", " << replyPump1.getPump().getName() << LL_ENDL; + LLCoros::Future<LLSD> future = LLCoros::getFuture(promise); // calling get() on the future makes us wait for it - LLEventWithID value(future.get()); - LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << name - << " resuming with (" << value.first << ", " << value.second << ")" - << LL_ENDL; - // returning should disconnect both connections + LLSD value(future.get()); + LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << listenerName + << " resuming with " << value << LL_ENDL; + // returning should disconnect the connection return value; } -LLSD errorException(const LLEventWithID& result, const std::string& desc) +LLSD llcoro::postAndSuspendWithTimeout(const LLSD& event, + const LLEventPumpOrPumpName& requestPump, + const LLEventPumpOrPumpName& replyPump, + const LLSD& replyPumpNamePath, + F32 timeout, const LLSD& timeoutResult) { - // If the result arrived on the error pump (pump 1), instead of - // returning it, deliver it via exception. - if (result.second) + LLCoros::Promise<LLSD> promise; + std::string listenerName(listenerNameForCoro()); + + // Store connection into an LLTempBoundListener so we implicitly + // disconnect on return from this function. + LLTempBoundListener connection = + postAndSuspendSetup("postAndSuspendWithTimeout()", listenerName, promise, + event, requestPump, replyPump, replyPumpNamePath); + + // declare the future + LLCoros::Future<LLSD> future = LLCoros::getFuture(promise); + // wait for specified timeout + boost::fibers::future_status status = + future.wait_for(std::chrono::milliseconds(long(timeout * 1000))); + // if the future is NOT yet ready, return timeoutResult instead + if (status == boost::fibers::future_status::timeout) { - LLTHROW(LLErrorEvent(desc, result.first)); + LL_DEBUGS("lleventcoro") << "postAndSuspendWithTimeout(): coroutine " << listenerName + << " timed out after " << timeout << " seconds," + << " resuming with " << timeoutResult << LL_ENDL; + return timeoutResult; } - // That way, our caller knows a simple return must be from the reply - // pump (pump 0). - return result.first; -} - -LLSD errorLog(const LLEventWithID& result, const std::string& desc) -{ - // If the result arrived on the error pump (pump 1), log it as a fatal - // error. - if (result.second) + else { - LL_ERRS("errorLog") << desc << ":" << std::endl; - LLSDSerialize::toPrettyXML(result.first, LL_CONT); - LL_CONT << LL_ENDL; + llassert_always(status == boost::fibers::future_status::ready); + + // future is now ready, no more waiting + LLSD value(future.get()); + LL_DEBUGS("lleventcoro") << "postAndSuspendWithTimeout(): coroutine " << listenerName + << " resuming with " << value << LL_ENDL; + // returning should disconnect the connection + return value; } - // A simple return must therefore be from the reply pump (pump 0). - return result.first; } - -} // namespace llcoro |