From 7af4a49835880fc92461882d5354c0ff12a9672a Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 23 Mar 2017 22:39:31 -0400 Subject: MAINT-6789: Add LLEventBatch, LLEventThrottle, LLEventBatchThrottle. These classes are as yet untested: they are straw people for API review, based on email conversations with Caladbolg and Rider. --- indra/llcommon/lleventfilter.cpp | 115 +++++++++++++++++++++++++++++++++++++++ indra/llcommon/lleventfilter.h | 104 +++++++++++++++++++++++++++++++++++ 2 files changed, 219 insertions(+) diff --git a/indra/llcommon/lleventfilter.cpp b/indra/llcommon/lleventfilter.cpp index 64ab58adcd..1fd29ea9e5 100644 --- a/indra/llcommon/lleventfilter.cpp +++ b/indra/llcommon/lleventfilter.cpp @@ -148,6 +148,11 @@ bool LLEventTimeoutBase::tick(const LLSD&) return false; // show event to other listeners } +bool LLEventTimeoutBase::running() const +{ + return mMainloop.connected(); +} + LLEventTimeout::LLEventTimeout() {} LLEventTimeout::LLEventTimeout(LLEventPump& source): @@ -164,3 +169,113 @@ bool LLEventTimeout::countdownElapsed() const { return mTimer.hasExpired(); } + +LLEventBatch::LLEventBatch(std::size_t size): + LLEventFilter("batch"), + mBatchSize(size) +{} + +LLEventBatch::LLEventBatch(LLEventPump& source, std::size_t size): + LLEventFilter(source, "batch"), + mBatchSize(size) +{} + +void LLEventBatch::flush() +{ + // copy and clear mBatch BEFORE posting to avoid weird circularity effects + LLSD batch(mBatch); + mBatch.clear(); + LLEventStream::post(batch); +} + +bool LLEventBatch::post(const LLSD& event) +{ + mBatch.append(event); + if (mBatch.size() >= mBatchSize) + { + flush(); + } + return false; +} + +LLEventThrottle::LLEventThrottle(F32 interval): + LLEventFilter("throttle"), + mInterval(interval), + mPosts(0) +{} + +LLEventThrottle::LLEventThrottle(LLEventPump& source, F32 interval): + LLEventFilter(source, "throttle"), + mInterval(interval), + mPosts(0) +{} + +void LLEventThrottle::flush() +{ + // flush() is a no-op unless there's something pending. + // Don't test mPending because there's no requirement that the consumer + // post() anything but an isUndefined(). This is what mPosts is for. + if (mPosts) + { + mPosts = 0; + mAlarm.cancel(); + // This is not to set our alarm; we are not yet requesting + // any notification. This is just to track whether subsequent post() + // calls fall within this mInterval or not. + mTimer.setTimerExpirySec(mInterval); + // copy and clear mPending BEFORE posting to avoid weird circularity + // effects + LLSD pending = mPending; + mPending.clear(); + LLEventStream::post(pending); + } +} + +LLSD LLEventThrottle::pending() const +{ + return mPending; +} + +bool LLEventThrottle::post(const LLSD& event) +{ + // Always capture most recent post() event data. If caller wants to + // aggregate multiple events, let them retrieve pending() and modify + // before calling post(). + mPending = event; + // Always increment mPosts. Unless we count this call, flush() does + // nothing. + ++mPosts; + // We reset mTimer on every flush() call to let us know if we're still + // within the same mInterval. So -- are we? + F32 timeRemaining = mTimer.getRemainingTimeF32(); + if (! timeRemaining) + { + // more than enough time has elapsed, immediately flush() + flush(); + } + else + { + // still within mInterval of the last flush() call: have to defer + if (! mAlarm.running()) + { + // timeRemaining tells us how much longer it will be until + // mInterval seconds since the last flush() call. At that time, + // flush() deferred events. + mAlarm.actionAfter(timeRemaining, boost::bind(&LLEventThrottle::flush, this)); + } + } +} + +LLEventBatchThrottle::LLEventBatchThrottle(F32 interval): + LLEventThrottle(interval) +{} + +LLEventBatchThrottle::LLEventBatchThrottle(LLEventPump& source, F32 interval): + LLEventThrottle(source, interval) +{} + +bool LLEventBatchThrottle::post(const LLSD& event) +{ + // simply retrieve pending value and append the new event to it + return LLEventThrottle::post(pending().append(event)); +} diff --git a/indra/llcommon/lleventfilter.h b/indra/llcommon/lleventfilter.h index 66f3c14869..1445b8a3b7 100644 --- a/indra/llcommon/lleventfilter.h +++ b/indra/llcommon/lleventfilter.h @@ -177,6 +177,9 @@ public: /// Cancel timer without event void cancel(); + /// Is this timer currently running? + bool running() const; + protected: virtual void setCountdown(F32 seconds) = 0; virtual bool countdownElapsed() const = 0; @@ -215,4 +218,105 @@ private: LLTimer mTimer; }; +/** + * LLEventBatch: accumulate post() events (LLSD blobs) into an LLSD Array + * until the array reaches a certain size, then call listeners with the Array + * and clear it back to empty. + */ +class LL_COMMON_API LLEventBatch: public LLEventFilter +{ +public: + // pass batch size + LLEventBatch(std::size_t size); + // construct and connect + LLEventBatch(LLEventPump& source, std::size_t size); + + // force out the pending batch + void flush(); + + // accumulate an event and flush() when big enough + virtual bool post(const LLSD& event); + +private: + LLSD mBatch; + std::size_t mBatchSize; +}; + +/** + * LLEventThrottle: construct with a time interval. Regardless of how + * frequently you call post(), LLEventThrottle will pass on an event to + * its listeners no more often than once per specified interval. + * + * A new event after more than the specified interval will immediately be + * passed along to listeners. But subsequent events will be delayed until at + * least one time interval since listeners were last called. Consider the + * sequence below. Suppose we have an LLEventThrottle constructed with an + * interval of 3 seconds. The numbers on the left are timestamps in seconds + * relative to an arbitrary reference point. + * + * 1: post(): event immediately passed to listeners, next no sooner than 4 + * 2: post(): deferred: waiting for 3 seconds to elapse + * 3: post(): deferred + * 4: no post() call, but event delivered to listeners; next no sooner than 7 + * 6: post(): deferred + * 7: no post() call, but event delivered; next no sooner than 10 + * 12: post(): immediately passed to listeners, next no sooner than 15 + * 17: post(): immediately passed to listeners, next no sooner than 20 + * + * For a deferred event, the LLSD blob delivered to listeners is from the most + * recent deferred post() call. However, you may obtain the previous event + * blob by calling pending(), modify it however you want and post() the new + * value. Each time an event is delivered to listeners, the pending() value is + * reset to isUndefined(). + * + * You may also call flush() to immediately pass along any deferred events to + * all listeners. + */ +class LL_COMMON_API LLEventThrottle: public LLEventFilter +{ +public: + // pass time interval + LLEventThrottle(F32 interval); + // construct and connect + LLEventThrottle(LLEventPump& source, F32 interval); + + // force out any deferred events + void flush(); + + // retrieve (aggregate) deferred event since last event sent to listeners + LLSD pending() const; + + // register an event, may be either passed through or deferred + virtual bool post(const LLSD& event); + +private: + // remember throttle interval + F32 mInterval; + // count post() calls since last flush() + std::size_t mPosts; + // pending event data from most recent deferred event + LLSD mPending; + // use this to arrange a deferred flush() call + LLEventTimeout mAlarm; + // use this to track whether we're within mInterval of last flush() + LLTimer mTimer; +}; + +/** + * LLEventBatchThrottle: like LLEventThrottle, it refuses to pass events to + * listeners more often than once per specified time interval. + * Like LLEventBatch, it accumulates pending events into an LLSD Array. + */ +class LLEventBatchThrottle: public LLEventThrottle +{ +public: + // pass time interval + LLEventBatchThrottle(F32 interval); + // construct and connect + LLEventBatchThrottle(LLEventPump& source, F32 interval); + + // append a new event to current batch + virtual bool post(const LLSD& event); +}; + #endif /* ! defined(LL_LLEVENTFILTER_H) */ -- cgit v1.2.3