diff options
Diffstat (limited to 'indra')
-rw-r--r-- | indra/llcommon/CMakeLists.txt | 15 | ||||
-rw-r--r-- | indra/llcommon/lleventfilter.cpp | 245 | ||||
-rw-r--r-- | indra/llcommon/lleventfilter.h | 161 | ||||
-rw-r--r-- | indra/llcommon/tests/listener.h | 11 | ||||
-rw-r--r-- | indra/llcommon/tests/lleventfilter_test.cpp | 124 |
5 files changed, 548 insertions, 8 deletions
diff --git a/indra/llcommon/CMakeLists.txt b/indra/llcommon/CMakeLists.txt index 3493f80556..aa76a57f1d 100644 --- a/indra/llcommon/CMakeLists.txt +++ b/indra/llcommon/CMakeLists.txt @@ -324,26 +324,27 @@ if (LL_TESTS) LL_ADD_INTEGRATION_TEST(lldeadmantimer "" "${test_libs}") LL_ADD_INTEGRATION_TEST(lldependencies "" "${test_libs}") LL_ADD_INTEGRATION_TEST(llerror "" "${test_libs}") + LL_ADD_INTEGRATION_TEST(lleventdispatcher "" "${test_libs}") + LL_ADD_INTEGRATION_TEST(lleventcoro "" "${test_libs}") + LL_ADD_INTEGRATION_TEST(lleventfilter "" "${test_libs}") LL_ADD_INTEGRATION_TEST(llframetimer "" "${test_libs}") + LL_ADD_INTEGRATION_TEST(llheteromap "" "${test_libs}") LL_ADD_INTEGRATION_TEST(llinstancetracker "" "${test_libs}") + LL_ADD_INTEGRATION_TEST(llleap "" "${test_libs}") + LL_ADD_INTEGRATION_TEST(llpounceable "" "${test_libs}") + LL_ADD_INTEGRATION_TEST(llprocess "" "${test_libs}") LL_ADD_INTEGRATION_TEST(llprocessor "" "${test_libs}") LL_ADD_INTEGRATION_TEST(llprocinfo "" "${test_libs}") LL_ADD_INTEGRATION_TEST(llrand "" "${test_libs}") LL_ADD_INTEGRATION_TEST(llsdserialize "" "${test_libs}") LL_ADD_INTEGRATION_TEST(llsingleton "" "${test_libs}") + LL_ADD_INTEGRATION_TEST(llstreamqueue "" "${test_libs}") LL_ADD_INTEGRATION_TEST(llstring "" "${test_libs}") LL_ADD_INTEGRATION_TEST(lltrace "" "${test_libs}") LL_ADD_INTEGRATION_TEST(lltreeiterators "" "${test_libs}") LL_ADD_INTEGRATION_TEST(lluri "" "${test_libs}") LL_ADD_INTEGRATION_TEST(llunits "" "${test_libs}") LL_ADD_INTEGRATION_TEST(stringize "" "${test_libs}") - LL_ADD_INTEGRATION_TEST(lleventdispatcher "" "${test_libs}") - LL_ADD_INTEGRATION_TEST(lleventcoro "" "${test_libs}") - LL_ADD_INTEGRATION_TEST(llprocess "" "${test_libs}") - LL_ADD_INTEGRATION_TEST(llleap "" "${test_libs}") - LL_ADD_INTEGRATION_TEST(llstreamqueue "" "${test_libs}") - LL_ADD_INTEGRATION_TEST(llpounceable "" "${test_libs}") - LL_ADD_INTEGRATION_TEST(llheteromap "" "${test_libs}") ## llexception_test.cpp isn't a regression test, and doesn't need to be run ## every build. It's to help a developer make implementation choices about diff --git a/indra/llcommon/lleventfilter.cpp b/indra/llcommon/lleventfilter.cpp index 64ab58adcd..9fb18dc67d 100644 --- a/indra/llcommon/lleventfilter.cpp +++ b/indra/llcommon/lleventfilter.cpp @@ -38,12 +38,18 @@ #include "llerror.h" // LL_ERRS #include "llsdutil.h" // llsd_matches() +/***************************************************************************** +* LLEventFilter +*****************************************************************************/ LLEventFilter::LLEventFilter(LLEventPump& source, const std::string& name, bool tweak): LLEventStream(name, tweak), mSource(source.listen(getName(), boost::bind(&LLEventFilter::post, this, _1))) { } +/***************************************************************************** +* LLEventMatching +*****************************************************************************/ LLEventMatching::LLEventMatching(const LLSD& pattern): LLEventFilter("matching"), mPattern(pattern) @@ -64,6 +70,9 @@ bool LLEventMatching::post(const LLSD& event) return LLEventStream::post(event); } +/***************************************************************************** +* LLEventTimeoutBase +*****************************************************************************/ LLEventTimeoutBase::LLEventTimeoutBase(): LLEventFilter("timeout") { @@ -148,6 +157,14 @@ bool LLEventTimeoutBase::tick(const LLSD&) return false; // show event to other listeners } +bool LLEventTimeoutBase::running() const +{ + return mMainloop.connected(); +} + +/***************************************************************************** +* LLEventTimeout +*****************************************************************************/ LLEventTimeout::LLEventTimeout() {} LLEventTimeout::LLEventTimeout(LLEventPump& source): @@ -164,3 +181,231 @@ bool LLEventTimeout::countdownElapsed() const { return mTimer.hasExpired(); } + +/***************************************************************************** +* LLEventBatch +*****************************************************************************/ +LLEventBatch::LLEventBatch(std::size_t size): + LLEventFilter("batch"), + mBatchSize(size) +{} + +LLEventBatch::LLEventBatch(LLEventPump& source, std::size_t size): + LLEventFilter(source, "batch"), + mBatchSize(size) +{} + +void LLEventBatch::flush() +{ + // copy and clear mBatch BEFORE posting to avoid weird circularity effects + LLSD batch(mBatch); + mBatch.clear(); + LLEventStream::post(batch); +} + +bool LLEventBatch::post(const LLSD& event) +{ + mBatch.append(event); + // calling setSize(same) performs the very check we want + setSize(mBatchSize); + return false; +} + +void LLEventBatch::setSize(std::size_t size) +{ + mBatchSize = size; + // changing the size might mean that we have to flush NOW + if (mBatch.size() >= mBatchSize) + { + flush(); + } +} + +/***************************************************************************** +* LLEventThrottleBase +*****************************************************************************/ +LLEventThrottleBase::LLEventThrottleBase(F32 interval): + LLEventFilter("throttle"), + mInterval(interval), + mPosts(0) +{} + +LLEventThrottleBase::LLEventThrottleBase(LLEventPump& source, F32 interval): + LLEventFilter(source, "throttle"), + mInterval(interval), + mPosts(0) +{} + +void LLEventThrottleBase::flush() +{ + // flush() is a no-op unless there's something pending. + // Don't test mPending because there's no requirement that the consumer + // post() anything but an isUndefined(). This is what mPosts is for. + if (mPosts) + { + mPosts = 0; + alarmCancel(); + // This is not to set our alarm; we are not yet requesting + // any notification. This is just to track whether subsequent post() + // calls fall within this mInterval or not. + timerSet(mInterval); + // copy and clear mPending BEFORE posting to avoid weird circularity + // effects + LLSD pending = mPending; + mPending.clear(); + LLEventStream::post(pending); + } +} + +LLSD LLEventThrottleBase::pending() const +{ + return mPending; +} + +bool LLEventThrottleBase::post(const LLSD& event) +{ + // Always capture most recent post() event data. If caller wants to + // aggregate multiple events, let them retrieve pending() and modify + // before calling post(). + mPending = event; + // Always increment mPosts. Unless we count this call, flush() does + // nothing. + ++mPosts; + // We reset mTimer on every flush() call to let us know if we're still + // within the same mInterval. So -- are we? + F32 timeRemaining = timerGetRemaining(); + if (! timeRemaining) + { + // more than enough time has elapsed, immediately flush() + flush(); + } + else + { + // still within mInterval of the last flush() call: have to defer + if (! alarmRunning()) + { + // timeRemaining tells us how much longer it will be until + // mInterval seconds since the last flush() call. At that time, + // flush() deferred events. + alarmActionAfter(timeRemaining, boost::bind(&LLEventThrottleBase::flush, this)); + } + } + return false; +} + +void LLEventThrottleBase::setInterval(F32 interval) +{ + F32 oldInterval = mInterval; + mInterval = interval; + // If we are not now within oldInterval of the last flush(), we're done: + // this will only affect behavior starting with the next flush(). + F32 timeRemaining = timerGetRemaining(); + if (timeRemaining) + { + // We are currently within oldInterval of the last flush(). Figure out + // how much time remains until (the new) mInterval of the last + // flush(). Bt we don't actually store a timestamp for the last + // flush(); it's implicit. There are timeRemaining seconds until what + // used to be the end of the interval. Move that endpoint by the + // difference between the new interval and the old. + timeRemaining += (mInterval - oldInterval); + // If we're called with a larger interval, the difference is positive + // and timeRemaining increases. + // If we're called with a smaller interval, the difference is negative + // and timeRemaining decreases. The interesting case is when it goes + // nonpositive: when the new interval means we can flush immediately. + if (timeRemaining <= 0.0f) + { + flush(); + } + else + { + // immediately reset mTimer + timerSet(timeRemaining); + // and if mAlarm is running, reset that too + if (alarmRunning()) + { + alarmActionAfter(timeRemaining, boost::bind(&LLEventThrottleBase::flush, this)); + } + } + } +} + +F32 LLEventThrottleBase::getDelay() const +{ + return timerGetRemaining(); +} + +/***************************************************************************** +* LLEventThrottle implementation +*****************************************************************************/ +LLEventThrottle::LLEventThrottle(F32 interval): + LLEventThrottleBase(interval) +{} + +LLEventThrottle::LLEventThrottle(LLEventPump& source, F32 interval): + LLEventThrottleBase(source, interval) +{} + +void LLEventThrottle::alarmActionAfter(F32 interval, const LLEventTimeoutBase::Action& action) +{ + mAlarm.actionAfter(interval, action); +} + +bool LLEventThrottle::alarmRunning() const +{ + return mAlarm.running(); +} + +void LLEventThrottle::alarmCancel() +{ + return mAlarm.cancel(); +} + +void LLEventThrottle::timerSet(F32 interval) +{ + mTimer.setTimerExpirySec(interval); +} + +F32 LLEventThrottle::timerGetRemaining() const +{ + return mTimer.getRemainingTimeF32(); +} + +/***************************************************************************** +* LLEventBatchThrottle +*****************************************************************************/ +LLEventBatchThrottle::LLEventBatchThrottle(F32 interval, std::size_t size): + LLEventThrottle(interval), + mBatchSize(size) +{} + +LLEventBatchThrottle::LLEventBatchThrottle(LLEventPump& source, F32 interval, std::size_t size): + LLEventThrottle(source, interval), + mBatchSize(size) +{} + +bool LLEventBatchThrottle::post(const LLSD& event) +{ + // simply retrieve pending value and append the new event to it + LLSD partial = pending(); + partial.append(event); + bool ret = LLEventThrottle::post(partial); + // The post() call above MIGHT have called flush() already. If it did, + // then pending() was reset to empty. If it did not, though, but the batch + // size has grown to the limit, flush() anyway. If there's a limit at all, + // of course. Calling setSize(same) performs the very check we want. + setSize(mBatchSize); + return ret; +} + +void LLEventBatchThrottle::setSize(std::size_t size) +{ + mBatchSize = size; + // Changing the size might mean that we have to flush NOW. Don't forget + // that 0 means unlimited. + if (mBatchSize && pending().size() >= mBatchSize) + { + flush(); + } +} diff --git a/indra/llcommon/lleventfilter.h b/indra/llcommon/lleventfilter.h index 66f3c14869..ccbbc8bd25 100644 --- a/indra/llcommon/lleventfilter.h +++ b/indra/llcommon/lleventfilter.h @@ -177,6 +177,9 @@ public: /// Cancel timer without event void cancel(); + /// Is this timer currently running? + bool running() const; + protected: virtual void setCountdown(F32 seconds) = 0; virtual bool countdownElapsed() const = 0; @@ -215,4 +218,162 @@ private: LLTimer mTimer; }; +/** + * LLEventBatch: accumulate post() events (LLSD blobs) into an LLSD Array + * until the array reaches a certain size, then call listeners with the Array + * and clear it back to empty. + */ +class LL_COMMON_API LLEventBatch: public LLEventFilter +{ +public: + // pass batch size + LLEventBatch(std::size_t size); + // construct and connect + LLEventBatch(LLEventPump& source, std::size_t size); + + // force out the pending batch + void flush(); + + // accumulate an event and flush() when big enough + virtual bool post(const LLSD& event); + + // query or reset batch size + std::size_t getSize() const { return mBatchSize; } + void setSize(std::size_t size); + +private: + LLSD mBatch; + std::size_t mBatchSize; +}; + +/** + * LLEventThrottleBase: construct with a time interval. Regardless of how + * frequently you call post(), LLEventThrottle will pass on an event to + * its listeners no more often than once per specified interval. + * + * A new event after more than the specified interval will immediately be + * passed along to listeners. But subsequent events will be delayed until at + * least one time interval since listeners were last called. Consider the + * sequence below. Suppose we have an LLEventThrottle constructed with an + * interval of 3 seconds. The numbers on the left are timestamps in seconds + * relative to an arbitrary reference point. + * + * 1: post(): event immediately passed to listeners, next no sooner than 4 + * 2: post(): deferred: waiting for 3 seconds to elapse + * 3: post(): deferred + * 4: no post() call, but event delivered to listeners; next no sooner than 7 + * 6: post(): deferred + * 7: no post() call, but event delivered; next no sooner than 10 + * 12: post(): immediately passed to listeners, next no sooner than 15 + * 17: post(): immediately passed to listeners, next no sooner than 20 + * + * For a deferred event, the LLSD blob delivered to listeners is from the most + * recent deferred post() call. However, a sender may obtain the previous + * event blob by calling pending(), modifying it as desired and post()ing the + * new value. (See LLEventBatchThrottle.) Each time an event is delivered to + * listeners, the pending() value is reset to isUndefined(). + * + * You may also call flush() to immediately pass along any deferred events to + * all listeners. + * + * @NOTE This is an abstract base class so that, for testing, we can use an + * alternate "timer" that doesn't actually consume real time. See + * LLEventThrottle. + */ +class LL_COMMON_API LLEventThrottleBase: public LLEventFilter +{ +public: + // pass time interval + LLEventThrottleBase(F32 interval); + // construct and connect + LLEventThrottleBase(LLEventPump& source, F32 interval); + + // force out any deferred events + void flush(); + + // retrieve (aggregate) deferred event since last event sent to listeners + LLSD pending() const; + + // register an event, may be either passed through or deferred + virtual bool post(const LLSD& event); + + // query or reset interval + F32 getInterval() const { return mInterval; } + void setInterval(F32 interval); + + // deferred posts + std::size_t getPostCount() const { return mPosts; } + + // time until next event would be passed through, 0.0 if now + F32 getDelay() const; + +protected: + // Implement these time-related methods for a valid LLEventThrottleBase + // subclass (see LLEventThrottle). For testing, we use a subclass that + // doesn't involve actual elapsed time. + virtual void alarmActionAfter(F32 interval, const LLEventTimeoutBase::Action& action) = 0; + virtual bool alarmRunning() const = 0; + virtual void alarmCancel() = 0; + virtual void timerSet(F32 interval) = 0; + virtual F32 timerGetRemaining() const = 0; + +private: + // remember throttle interval + F32 mInterval; + // count post() calls since last flush() + std::size_t mPosts; + // pending event data from most recent deferred event + LLSD mPending; +}; + +/** + * Production implementation of LLEventThrottle. + */ +class LLEventThrottle: public LLEventThrottleBase +{ +public: + LLEventThrottle(F32 interval); + LLEventThrottle(LLEventPump& source, F32 interval); + +private: + virtual void alarmActionAfter(F32 interval, const LLEventTimeoutBase::Action& action) override; + virtual bool alarmRunning() const override; + virtual void alarmCancel() override; + virtual void timerSet(F32 interval) override; + virtual F32 timerGetRemaining() const override; + + // use this to arrange a deferred flush() call + LLEventTimeout mAlarm; + // use this to track whether we're within mInterval of last flush() + LLTimer mTimer; +}; + +/** + * LLEventBatchThrottle: like LLEventThrottle, it's reluctant to pass events + * to listeners more often than once per specified time interval -- but only + * reluctant, since exceeding the specified batch size limit can cause it to + * deliver accumulated events sooner. Like LLEventBatch, it accumulates + * pending events into an LLSD Array, optionally flushing when the batch grows + * to a certain size. + */ +class LLEventBatchThrottle: public LLEventThrottle +{ +public: + // pass time interval and (optionally) max batch size; 0 means batch can + // grow arbitrarily large + LLEventBatchThrottle(F32 interval, std::size_t size = 0); + // construct and connect + LLEventBatchThrottle(LLEventPump& source, F32 interval, std::size_t size = 0); + + // append a new event to current batch + virtual bool post(const LLSD& event); + + // query or reset batch size + std::size_t getSize() const { return mBatchSize; } + void setSize(std::size_t size); + +private: + std::size_t mBatchSize; +}; + #endif /* ! defined(LL_LLEVENTFILTER_H) */ diff --git a/indra/llcommon/tests/listener.h b/indra/llcommon/tests/listener.h index 9c5c18a150..6072060bb6 100644 --- a/indra/llcommon/tests/listener.h +++ b/indra/llcommon/tests/listener.h @@ -138,4 +138,15 @@ struct Collect StringVec result; }; +struct Concat +{ + bool operator()(const LLSD& event) + { + result += event.asString(); + return false; + } + void clear() { result.clear(); } + std::string result; +}; + #endif /* ! defined(LL_LISTENER_H) */ diff --git a/indra/llcommon/tests/lleventfilter_test.cpp b/indra/llcommon/tests/lleventfilter_test.cpp index 2cdfb52f2f..712864bf63 100644 --- a/indra/llcommon/tests/lleventfilter_test.cpp +++ b/indra/llcommon/tests/lleventfilter_test.cpp @@ -70,6 +70,85 @@ private: bool mElapsed; }; +// Similar remarks about LLEventThrottle: we're actually testing the logic in +// LLEventThrottleBase, dummying out the LLTimer and LLEventTimeout used by +// the production LLEventThrottle class. +class TestEventThrottle: public LLEventThrottleBase +{ +public: + TestEventThrottle(F32 interval): + LLEventThrottleBase(interval), + mAlarmRemaining(-1), + mTimerRemaining(-1) + {} + TestEventThrottle(LLEventPump& source, F32 interval): + LLEventThrottleBase(source, interval), + mAlarmRemaining(-1), + mTimerRemaining(-1) + {} + + /*----- implementation of LLEventThrottleBase timing functionality -----*/ + virtual void alarmActionAfter(F32 interval, const LLEventTimeoutBase::Action& action) override + { + mAlarmRemaining = interval; + mAlarmAction = action; + } + + virtual bool alarmRunning() const override + { + // decrementing to exactly 0 should mean the alarm fires + return mAlarmRemaining > 0; + } + + virtual void alarmCancel() override + { + mAlarmRemaining = -1; + } + + virtual void timerSet(F32 interval) override + { + mTimerRemaining = interval; + } + + virtual F32 timerGetRemaining() const override + { + // LLTimer.getRemainingTimeF32() never returns negative; 0.0 means expired + return (mTimerRemaining > 0.0)? mTimerRemaining : 0.0; + } + + /*------------------- methods for manipulating time --------------------*/ + void alarmAdvance(F32 delta) + { + bool wasRunning = alarmRunning(); + mAlarmRemaining -= delta; + if (wasRunning && ! alarmRunning()) + { + mAlarmAction(); + } + } + + void timerAdvance(F32 delta) + { + // This simple implementation, like alarmAdvance(), completely ignores + // HOW negative mTimerRemaining might go. All that matters is whether + // it's negative. We trust that no test method in this source will + // drive it beyond the capacity of an F32. Seems like a safe assumption. + mTimerRemaining -= delta; + } + + void advance(F32 delta) + { + // Advance the timer first because it has no side effects. + // alarmAdvance() might call flush(), which will need to see the + // change in the timer. + timerAdvance(delta); + alarmAdvance(delta); + } + + F32 mAlarmRemaining, mTimerRemaining; + LLEventTimeoutBase::Action mAlarmAction; +}; + /***************************************************************************** * TUT *****************************************************************************/ @@ -116,7 +195,9 @@ namespace tut listener0.listenTo(driver)); // Construct a pattern LLSD: desired Event must have a key "foo" // containing string "bar" - LLEventMatching filter(driver, LLSD().insert("foo", "bar")); + LLSD pattern; + pattern.insert("foo", "bar"); + LLEventMatching filter(driver, pattern); listener1.reset(0); LLTempBoundListener temp2( listener1.listenTo(filter)); @@ -285,6 +366,47 @@ namespace tut mainloop.post(17); check_listener("no timeout 3", listener0, LLSD(0)); } + + template<> template<> + void filter_object::test<5>() + { + set_test_name("LLEventThrottle"); + TestEventThrottle throttle(3); + Concat cat; + throttle.listen("concat", boost::ref(cat)); + + // (sequence taken from LLEventThrottleBase Doxygen comments) + // 1: post(): event immediately passed to listeners, next no sooner than 4 + throttle.advance(1); + throttle.post("1"); + ensure_equals("1", cat.result, "1"); // delivered immediately + // 2: post(): deferred: waiting for 3 seconds to elapse + throttle.advance(1); + throttle.post("2"); + ensure_equals("2", cat.result, "1"); // "2" not yet delivered + // 3: post(): deferred + throttle.advance(1); + throttle.post("3"); + ensure_equals("3", cat.result, "1"); // "3" not yet delivered + // 4: no post() call, but event delivered to listeners; next no sooner than 7 + throttle.advance(1); + ensure_equals("4", cat.result, "13"); // "3" delivered + // 6: post(): deferred + throttle.advance(2); + throttle.post("6"); + ensure_equals("6", cat.result, "13"); // "6" not yet delivered + // 7: no post() call, but event delivered; next no sooner than 10 + throttle.advance(1); + ensure_equals("7", cat.result, "136"); // "6" delivered + // 12: post(): immediately passed to listeners, next no sooner than 15 + throttle.advance(5); + throttle.post(";12"); + ensure_equals("12", cat.result, "136;12"); // "12" delivered + // 17: post(): immediately passed to listeners, next no sooner than 20 + throttle.advance(5); + throttle.post(";17"); + ensure_equals("17", cat.result, "136;12;17"); // "17" delivered + } } // namespace tut /***************************************************************************** |