diff options
Diffstat (limited to 'indra/llcommon')
-rwxr-xr-x | indra/llcommon/llcoros.cpp | 26 | ||||
-rwxr-xr-x | indra/llcommon/llcoros.h | 15 | ||||
-rwxr-xr-x | indra/llcommon/lleventcoro.cpp | 44 | ||||
-rwxr-xr-x | indra/llcommon/lleventcoro.h | 16 | ||||
-rwxr-xr-x | indra/llcommon/llevents.cpp | 49 | ||||
-rwxr-xr-x | indra/llcommon/llevents.h | 61 | ||||
-rwxr-xr-x | indra/llcommon/tests/lleventcoro_test.cpp | 4 |
7 files changed, 189 insertions, 26 deletions
diff --git a/indra/llcommon/llcoros.cpp b/indra/llcommon/llcoros.cpp index 548a6d22be..d16bf0160b 100755 --- a/indra/llcommon/llcoros.cpp +++ b/indra/llcommon/llcoros.cpp @@ -51,14 +51,32 @@ boost::thread_specific_ptr<LLCoros::CoroData> LLCoros::sCurrentCoro(LLCoros::no_cleanup); //static -LLCoros::coro::self& LLCoros::get_self() +LLCoros::CoroData& LLCoros::get_CoroData(const std::string& caller) { CoroData* current = sCurrentCoro.get(); if (! current) { - LL_ERRS("LLCoros") << "Calling get_self() from non-coroutine context!" << LL_ENDL; + LL_ERRS("LLCoros") << "Calling " << caller << " from non-coroutine context!" << LL_ENDL; } - return *current->mSelf; + return *current; +} + +//static +LLCoros::coro::self& LLCoros::get_self() +{ + return *get_CoroData("get_self()").mSelf; +} + +//static +void LLCoros::set_consuming(bool consuming) +{ + get_CoroData("set_consuming()").mConsuming = consuming; +} + +//static +bool LLCoros::get_consuming() +{ + return get_CoroData("get_consuming()").mConsuming; } llcoro::Suspending::Suspending(): @@ -245,6 +263,8 @@ LLCoros::CoroData::CoroData(CoroData* prev, const std::string& name, // Wrap the caller's callable in our toplevel() function so we can manage // sCurrentCoro appropriately at startup and shutdown of each coroutine. mCoro(boost::bind(toplevel, _1, this, callable), stacksize), + // don't consume events unless specifically directed + mConsuming(false), mSelf(0) { } diff --git a/indra/llcommon/llcoros.h b/indra/llcommon/llcoros.h index 56eed8cafe..0b1f58f48e 100755 --- a/indra/llcommon/llcoros.h +++ b/indra/llcommon/llcoros.h @@ -150,6 +150,18 @@ public: /// get the current coro::self& for those who really really care static coro::self& get_self(); + /** + * Most coroutines, most of the time, don't "consume" the events for which + * they're suspending. This way, an arbitrary number of listeners (whether + * coroutines or simple callbacks) can be registered on a particular + * LLEventPump, every listener responding to each of the events on that + * LLEventPump. But a particular coroutine can assert that it will consume + * each event for which it suspends. (See also llcoro::postAndSuspend(), + * llcoro::VoidListener) + */ + static void set_consuming(bool consuming); + static bool get_consuming(); + private: LLCoros(); friend class LLSingleton<LLCoros>; @@ -159,6 +171,7 @@ private: struct CoroData; static void no_cleanup(CoroData*); static void toplevel(coro::self& self, CoroData* data, const callable_t& callable); + static CoroData& get_CoroData(const std::string& caller); S32 mStackSize; @@ -178,6 +191,8 @@ private: const std::string mName; // the actual coroutine instance LLCoros::coro mCoro; + // set_consuming() state + bool mConsuming; // When the dcoroutine library calls a top-level callable, it implicitly // passes coro::self& as the first parameter. All our consumer code used // to explicitly pass coro::self& down through all levels of call stack, diff --git a/indra/llcommon/lleventcoro.cpp b/indra/llcommon/lleventcoro.cpp index c9bfcacedc..9b7e8fb65f 100755 --- a/indra/llcommon/lleventcoro.cpp +++ b/indra/llcommon/lleventcoro.cpp @@ -41,6 +41,8 @@ #include "llerror.h" #include "llcoros.h" +#include "lleventfilter.h" + namespace { @@ -153,11 +155,19 @@ void llcoro::suspend() suspendUntilEventOn("mainloop"); } +void llcoro::suspendUntilTimeout(float seconds) +{ + LLEventTimeout timeout; + + timeout.eventAfter(seconds, LLSD()); + llcoro::suspendUntilEventOn(timeout); +} + LLSD llcoro::postAndSuspend(const LLSD& event, const LLEventPumpOrPumpName& requestPump, const LLEventPumpOrPumpName& replyPump, const LLSD& replyPumpNamePath) { // declare the future - boost::dcoroutines::future<LLSD> future(LLCoros::get_self()); + boost::dcoroutines::future<LLSD_consumed> future(LLCoros::get_self()); // make a callback that will assign a value to the future, and listen on // the specified LLEventPump with that callback std::string listenerName(listenerNameForCoro()); @@ -183,21 +193,25 @@ LLSD llcoro::postAndSuspend(const LLSD& event, const LLEventPumpOrPumpName& requ << " about to wait on LLEventPump " << replyPump.getPump().getName() << LL_ENDL; // trying to dereference ("resolve") the future makes us wait for it - LLSD value; + LLSD_consumed value; { // instantiate Suspending to manage the "current" coroutine llcoro::Suspending suspended; value = *future; } // destroy Suspending as soon as we're back LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << listenerName - << " resuming with " << value << LL_ENDL; + << " resuming with " << value.first << LL_ENDL; + // immediately set consumed according to consuming + *value.second = LLCoros::get_consuming(); // returning should disconnect the connection - return value; + return value.first; } namespace { +typedef std::pair<LLEventWithID, bool*> LLEventWithID_consumed; + /** * This helper is specifically for the two-pump version of suspendUntilEventOn(). * We use a single future object, but we want to listen on two pumps with it. @@ -217,13 +231,15 @@ public: mListener(listener), mDiscrim(discriminator) {} + // this signature is required for an LLEventPump listener bool operator()(const LLSD& event) { - // our future object is defined to accept LLEventWithID - mListener(LLEventWithID(event, mDiscrim)); - // don't swallow the event, let other listeners see it - return false; + bool consumed = false; + // our future object is defined to accept LLEventWithID_consumed + mListener(LLEventWithID_consumed(LLEventWithID(event, mDiscrim), &consumed)); + // tell LLEventPump whether or not event was consumed + return consumed; } private: LISTENER mListener; @@ -250,7 +266,7 @@ LLEventWithID postAndSuspend2(const LLSD& event, const LLSD& replyPump1NamePath) { // declare the future - boost::dcoroutines::future<LLEventWithID> future(LLCoros::get_self()); + boost::dcoroutines::future<LLEventWithID_consumed> future(LLCoros::get_self()); // either callback will assign a value to this future; listen on // each specified LLEventPump with a callback std::string name(listenerNameForCoro()); @@ -279,17 +295,19 @@ LLEventWithID postAndSuspend2(const LLSD& event, << " about to wait on LLEventPumps " << replyPump0.getPump().getName() << ", " << replyPump1.getPump().getName() << LL_ENDL; // trying to dereference ("resolve") the future makes us wait for it - LLEventWithID value; + LLEventWithID_consumed value; { // instantiate Suspending to manage "current" coroutine llcoro::Suspending suspended; value = *future; } // destroy Suspending as soon as we're back LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << name - << " resuming with (" << value.first << ", " << value.second << ")" - << LL_ENDL; + << " resuming with (" << value.first.first + << ", " << value.first.second << ")" << LL_ENDL; + // tell LLEventPump whether we're consuming + *value.second = LLCoros::get_consuming(); // returning should disconnect both connections - return value; + return value.first; } LLSD errorException(const LLEventWithID& result, const std::string& desc) diff --git a/indra/llcommon/lleventcoro.h b/indra/llcommon/lleventcoro.h index 6fda3d2572..acf2ad24a4 100755 --- a/indra/llcommon/lleventcoro.h +++ b/indra/llcommon/lleventcoro.h @@ -32,6 +32,7 @@ #include <boost/optional.hpp> #include <string> #include <stdexcept> +#include <utility> // std::pair #include "llevents.h" #include "llerror.h" @@ -75,6 +76,8 @@ private: namespace llcoro { +typedef std::pair<LLSD, bool*> LLSD_consumed; + /// This is an adapter for a signature like void LISTENER(const LLSD&), which /// isn't a valid LLEventPump listener: such listeners should return bool. template <typename LISTENER> @@ -84,11 +87,13 @@ public: VoidListener(const LISTENER& listener): mListener(listener) {} + bool operator()(const LLSD& event) { - mListener(event); - // don't swallow the event, let other listeners see it - return false; + bool consumed = false; + mListener(LLSD_consumed(event, &consumed)); + // tell upstream LLEventPump whether listener consumed + return consumed; } private: LISTENER mListener; @@ -109,6 +114,11 @@ VoidListener<LISTENER> voidlistener(const LISTENER& listener) void suspend(); /** + * Yield control from a coroutine for at least the specified number of seconds + */ +void suspendUntilTimeout(float seconds); + +/** * Post specified LLSD event on the specified LLEventPump, then suspend for a * response on specified other LLEventPump. This is more than mere * convenience: the difference between this function and the sequence diff --git a/indra/llcommon/llevents.cpp b/indra/llcommon/llevents.cpp index 1c928b3db8..0c5e55dc76 100755 --- a/indra/llcommon/llevents.cpp +++ b/indra/llcommon/llevents.cpp @@ -57,6 +57,7 @@ #include "stringize.h" #include "llerror.h" #include "llsdutil.h" +#include "llcoros.h" #if LL_MSVC #pragma warning (disable : 4702) #endif @@ -132,6 +133,17 @@ LLEventPump& LLEventPumps::obtain(const std::string& name) return *newInstance; } +bool LLEventPumps::post(const std::string&name, const LLSD&message) +{ + PumpMap::iterator found = mPumpMap.find(name); + + if (found == mPumpMap.end()) + return false; + + return (*found).second->post(message); +} + + void LLEventPumps::flush() { // Flush every known LLEventPump instance. Leave it up to each instance to @@ -497,6 +509,43 @@ bool LLEventStream::post(const LLSD& event) } /***************************************************************************** + * LLEventMailDrop + *****************************************************************************/ +bool LLEventMailDrop::post(const LLSD& event) +{ + bool posted = false; + + if (!mSignal->empty()) + posted = LLEventStream::post(event); + + if (!posted) + { // if the event was not handled we will save it for later so that it can + // be posted to any future listeners when they attach. + mEventHistory.push_back(event); + } + + return posted; +} + +LLBoundListener LLEventMailDrop::listen_impl(const std::string& name, + const LLEventListener& listener, + const NameList& after, + const NameList& before) +{ + if (!mEventHistory.empty()) + { + if (listener(mEventHistory.front()) || LLCoros::get_consuming()) + { + mEventHistory.pop_front(); + } + } + + return LLEventStream::listen_impl(name, listener, after, before); + +} + + +/***************************************************************************** * LLEventQueue *****************************************************************************/ bool LLEventQueue::post(const LLSD& event) diff --git a/indra/llcommon/llevents.h b/indra/llcommon/llevents.h index 0cbd1da32d..6175329a9d 100755 --- a/indra/llcommon/llevents.h +++ b/indra/llcommon/llevents.h @@ -217,6 +217,18 @@ public: * an instance without conferring @em ownership. */ LLEventPump& obtain(const std::string& name); + + /** + * Find the named LLEventPump instance. If it exists post the message to it. + * If the pump does not exist, do nothing. + * + * returns the result of the LLEventPump::post. If no pump exists returns false. + * + * This is syntactically similar to LLEventPumps::instance().post(name, message), + * however if the pump does not already exist it will not be created. + */ + bool post(const std::string&, const LLSD&); + /** * Flush all known LLEventPump instances */ @@ -496,7 +508,7 @@ public: // at the boost::bind object itself before that happens. return LLEventDetail::visit_and_connect(name, listener, - boost::bind(&LLEventPump::listen_impl, + boost::bind(&LLEventPump::listen_invoke, this, name, _1, @@ -541,13 +553,23 @@ private: virtual void reset(); + + private: - virtual LLBoundListener listen_impl(const std::string& name, const LLEventListener&, - const NameList& after, - const NameList& before); + LLBoundListener listen_invoke(const std::string& name, const LLEventListener& listener, + const NameList& after, + const NameList& before) + { + return this->listen_impl(name, listener, after, before); + } + std::string mName; protected: + virtual LLBoundListener listen_impl(const std::string& name, const LLEventListener&, + const NameList& after, + const NameList& before); + /// implement the dispatching boost::shared_ptr<LLStandardSignal> mSignal; @@ -586,10 +608,39 @@ public: }; /***************************************************************************** + * LLEventMailDrop + *****************************************************************************/ +/** + * LLEventMailDrop is a specialization of LLEventStream. Events are posted normally, + * however if no listeners return that they have handled the event it is placed in + * a queue. Subsequent attaching listeners will receive stored events from the queue + * until a listener indicates that the event has been handled. In order to receive + * multiple events from a mail drop the listener must disconnect and reconnect. + */ +class LL_COMMON_API LLEventMailDrop : public LLEventStream +{ +public: + LLEventMailDrop(const std::string& name, bool tweak = false) : LLEventStream(name, tweak) {} + virtual ~LLEventMailDrop() {} + + /// Post an event to all listeners + virtual bool post(const LLSD& event); + +protected: + virtual LLBoundListener listen_impl(const std::string& name, const LLEventListener&, + const NameList& after, + const NameList& before); + +private: + typedef std::list<LLSD> EventList; + EventList mEventHistory; +}; + +/***************************************************************************** * LLEventQueue *****************************************************************************/ /** - * LLEventQueue isa LLEventPump whose post() method defers calling registered + * LLEventQueue is a LLEventPump whose post() method defers calling registered * listeners until flush() is called. */ class LL_COMMON_API LLEventQueue: public LLEventPump diff --git a/indra/llcommon/tests/lleventcoro_test.cpp b/indra/llcommon/tests/lleventcoro_test.cpp index 1ee79e9eb6..da1439418f 100755 --- a/indra/llcommon/tests/lleventcoro_test.cpp +++ b/indra/llcommon/tests/lleventcoro_test.cpp @@ -240,7 +240,7 @@ namespace tut // ... do whatever preliminary stuff must happen ... // declare the future - boost::dcoroutines::future<LLSD> future(self); + boost::dcoroutines::future<llcoro::LLSD_consumed> future(self); // tell the future what to suspend for LLTempBoundListener connection( LLEventPumps::instance().obtain("source").listen("coro", voidlistener(boost::dcoroutines::make_callback(future)))); @@ -248,7 +248,7 @@ namespace tut // attempting to dereference ("resolve") the future causes the calling // coroutine to suspend for it debug("about to suspend"); - result = *future; + result = (*future).first; ensure("Got it", future); } END |